mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 08:41:16 +00:00
SDK::DDSSession: remove channel id to task id association
This commit is contained in:
parent
3785fd9ff9
commit
0e72a9bf54
|
@ -347,11 +347,11 @@ auto DDS::SubscribeForCustomCommands() -> void
|
|||
Transition transition = static_cast<cmd::ChangeState&>(*cmd).GetTransition();
|
||||
if (ChangeDeviceState(transition)) {
|
||||
cmd::Cmds outCmds(
|
||||
cmd::make<cmd::TransitionStatus>(id, cmd::Result::Ok, transition));
|
||||
cmd::make<cmd::TransitionStatus>(id, dds::env_prop<dds::task_id>(), cmd::Result::Ok, transition));
|
||||
fDDS.Send(outCmds.Serialize(), to_string(senderId));
|
||||
} else {
|
||||
sdk::cmd::Cmds outCmds(
|
||||
cmd::make<cmd::TransitionStatus>(id, cmd::Result::Failure, transition));
|
||||
cmd::make<cmd::TransitionStatus>(id, dds::env_prop<dds::task_id>(), cmd::Result::Failure, transition));
|
||||
fDDS.Send(outCmds.Serialize(), to_string(senderId));
|
||||
}
|
||||
{
|
||||
|
|
|
@ -135,8 +135,6 @@ struct DDSSession::Impl
|
|||
dds::intercom_api::CCustomCmd fDDSCustomCmd;
|
||||
Id fId;
|
||||
bool fStopOnDestruction;
|
||||
mutable std::mutex fMtx;
|
||||
std::unordered_map<DDSChannel::Id, DDSTask::Id> fTaskIdByChannelIdMap;
|
||||
};
|
||||
|
||||
DDSSession::DDSSession(DDSEnvironment env)
|
||||
|
@ -362,18 +360,6 @@ void DDSSession::SendCommand(const std::string& cmd, DDSChannel::Id recipient)
|
|||
fImpl->fDDSCustomCmd.send(cmd, std::to_string(recipient));
|
||||
}
|
||||
|
||||
auto DDSSession::UpdateChannelToTaskAssociation(DDSChannel::Id channelId, DDSTask::Id taskId) -> void
|
||||
{
|
||||
std::lock_guard<std::mutex> lk(fImpl->fMtx);
|
||||
fImpl->fTaskIdByChannelIdMap[channelId] = taskId;
|
||||
}
|
||||
|
||||
auto DDSSession::GetTaskId(DDSChannel::Id channelId) const -> DDSTask::Id
|
||||
{
|
||||
std::lock_guard<std::mutex> lk(fImpl->fMtx);
|
||||
return fImpl->fTaskIdByChannelIdMap.at(channelId);
|
||||
}
|
||||
|
||||
auto operator<<(std::ostream& os, const DDSSession& session) -> std::ostream&
|
||||
{
|
||||
return os << "$DDS_SESSION_ID: " << session.GetId();
|
||||
|
|
|
@ -103,7 +103,6 @@ class DDSSession
|
|||
void UnsubscribeFromCommands();
|
||||
void SendCommand(const std::string&, const std::string& = "");
|
||||
void SendCommand(const std::string&, DDSChannel::Id);
|
||||
auto UpdateChannelToTaskAssociation(DDSChannel::Id, DDSTask::Id) -> void;
|
||||
auto GetTaskId(DDSChannel::Id) const -> DDSTask::Id;
|
||||
|
||||
friend auto operator<<(std::ostream& os, const DDSSession& session) -> std::ostream&;
|
||||
|
|
|
@ -193,7 +193,6 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
|||
case Type::state_change: {
|
||||
auto _cmd = static_cast<StateChange&>(*cmd);
|
||||
DDSTask::Id taskId(_cmd.GetTaskId());
|
||||
fDDSSession.UpdateChannelToTaskAssociation(senderId, taskId);
|
||||
if (_cmd.GetCurrentState() == DeviceState::Exiting) {
|
||||
Cmds outCmds;
|
||||
outCmds.Add<StateChangeExitingReceived>();
|
||||
|
@ -212,10 +211,11 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
|||
}
|
||||
break;
|
||||
case Type::transition_status: {
|
||||
if (static_cast<TransitionStatus&>(*cmd).GetResult() != Result::Ok) {
|
||||
LOG(error) << "Transition failed for " << static_cast<TransitionStatus&>(*cmd).GetDeviceId();
|
||||
auto _cmd = static_cast<TransitionStatus&>(*cmd);
|
||||
if (_cmd.GetResult() != Result::Ok) {
|
||||
LOG(error) << "Transition failed for " << _cmd.GetDeviceId();
|
||||
std::lock_guard<std::mutex> lk(fMtx);
|
||||
if (!fChangeStateOp.IsCompleted() && fStateData.at(fStateIndex.at(fDDSSession.GetTaskId(senderId))).state != fChangeStateTarget) {
|
||||
if (!fChangeStateOp.IsCompleted() && fStateData.at(fStateIndex.at(_cmd.GetTaskId())).state != fChangeStateTarget) {
|
||||
fChangeStateOpTimer.cancel();
|
||||
fChangeStateOp.Complete(MakeErrorCode(ErrorCode::DeviceChangeStateFailed), fStateData);
|
||||
}
|
||||
|
|
|
@ -296,6 +296,7 @@ string Cmds::Serialize(const Format type) const
|
|||
auto deviceId = fbb.CreateString(_cmd.GetDeviceId());
|
||||
cmdBuilder = tools::make_unique<FBCommandBuilder>(fbb);
|
||||
cmdBuilder->add_device_id(deviceId);
|
||||
cmdBuilder->add_task_id(_cmd.GetTaskId());
|
||||
cmdBuilder->add_result(GetFBResult(_cmd.GetResult()));
|
||||
cmdBuilder->add_transition(GetFBTransition(_cmd.GetTransition()));
|
||||
}
|
||||
|
@ -476,7 +477,7 @@ void Cmds::Deserialize(const string& str, const Format type)
|
|||
fCmds.emplace_back(make<CurrentState>(cmdPtr.device_id()->str(), GetMQState(cmdPtr.current_state())));
|
||||
break;
|
||||
case FBCmd_transition_status:
|
||||
fCmds.emplace_back(make<TransitionStatus>(cmdPtr.device_id()->str(), GetResult(cmdPtr.result()), GetMQTransition(cmdPtr.transition())));
|
||||
fCmds.emplace_back(make<TransitionStatus>(cmdPtr.device_id()->str(), cmdPtr.task_id(), GetResult(cmdPtr.result()), GetMQTransition(cmdPtr.transition())));
|
||||
break;
|
||||
case FBCmd_config:
|
||||
fCmds.emplace_back(make<Config>(cmdPtr.device_id()->str(), cmdPtr.config_string()->str()));
|
||||
|
|
|
@ -51,7 +51,7 @@ enum class Type : int
|
|||
set_properties, // args: { request_id, properties }
|
||||
|
||||
current_state, // args: { device_id, current_state }
|
||||
transition_status, // args: { device_id, Result, transition }
|
||||
transition_status, // args: { device_id, task_id, Result, transition }
|
||||
config, // args: { device_id, config_string }
|
||||
heartbeat_subscription, // args: { device_id, Result }
|
||||
heartbeat_unsubscription, // args: { device_id, Result }
|
||||
|
@ -179,15 +179,18 @@ struct CurrentState : Cmd
|
|||
|
||||
struct TransitionStatus : Cmd
|
||||
{
|
||||
explicit TransitionStatus(const std::string& id, const Result result, const Transition transition)
|
||||
explicit TransitionStatus(const std::string& deviceId, const uint64_t taskId, const Result result, const Transition transition)
|
||||
: Cmd(Type::transition_status)
|
||||
, fDeviceId(id)
|
||||
, fDeviceId(deviceId)
|
||||
, fTaskId(taskId)
|
||||
, fResult(result)
|
||||
, fTransition(transition)
|
||||
{}
|
||||
|
||||
std::string GetDeviceId() const { return fDeviceId; }
|
||||
void SetDeviceId(const std::string& deviceId) { fDeviceId = deviceId; }
|
||||
uint64_t GetTaskId() const { return fTaskId; }
|
||||
void SetTaskId(const uint64_t taskId) { fTaskId = taskId; }
|
||||
Result GetResult() const { return fResult; }
|
||||
void SetResult(const Result result) { fResult = result; }
|
||||
Transition GetTransition() const { return fTransition; }
|
||||
|
@ -195,6 +198,7 @@ struct TransitionStatus : Cmd
|
|||
|
||||
private:
|
||||
std::string fDeviceId;
|
||||
uint64_t fTaskId;
|
||||
Result fResult;
|
||||
Transition fTransition;
|
||||
};
|
||||
|
|
|
@ -32,7 +32,7 @@ TEST(Format, Construction)
|
|||
Cmds getPropertiesCmds(make<GetProperties>(66, "k[12]"));
|
||||
Cmds setPropertiesCmds(make<SetProperties>(42, props));
|
||||
Cmds currentStateCmds(make<CurrentState>("somedeviceid", State::Running));
|
||||
Cmds transitionStatusCmds(make<TransitionStatus>("somedeviceid", Result::Ok, Transition::Stop));
|
||||
Cmds transitionStatusCmds(make<TransitionStatus>("somedeviceid", 123456, Result::Ok, Transition::Stop));
|
||||
Cmds configCmds(make<Config>("somedeviceid", "someconfig"));
|
||||
Cmds heartbeatSubscriptionCmds(make<HeartbeatSubscription>("somedeviceid", Result::Ok));
|
||||
Cmds heartbeatUnsubscriptionCmds(make<HeartbeatUnsubscription>("somedeviceid", Result::Ok));
|
||||
|
@ -63,6 +63,7 @@ TEST(Format, Construction)
|
|||
ASSERT_EQ(static_cast<CurrentState&>(currentStateCmds.At(0)).GetCurrentState(), State::Running);
|
||||
ASSERT_EQ(transitionStatusCmds.At(0).GetType(), Type::transition_status);
|
||||
ASSERT_EQ(static_cast<TransitionStatus&>(transitionStatusCmds.At(0)).GetDeviceId(), "somedeviceid");
|
||||
ASSERT_EQ(static_cast<TransitionStatus&>(transitionStatusCmds.At(0)).GetTaskId(), 123456);
|
||||
ASSERT_EQ(static_cast<TransitionStatus&>(transitionStatusCmds.At(0)).GetResult(), Result::Ok);
|
||||
ASSERT_EQ(static_cast<TransitionStatus&>(transitionStatusCmds.At(0)).GetTransition(), Transition::Stop);
|
||||
ASSERT_EQ(configCmds.At(0).GetType(), Type::config);
|
||||
|
@ -113,7 +114,7 @@ void fillCommands(Cmds& cmds)
|
|||
cmds.Add<GetProperties>(66, "k[12]");
|
||||
cmds.Add<SetProperties>(42, props);
|
||||
cmds.Add<CurrentState>("somedeviceid", State::Running);
|
||||
cmds.Add<TransitionStatus>("somedeviceid", Result::Ok, Transition::Stop);
|
||||
cmds.Add<TransitionStatus>("somedeviceid", 123456, Result::Ok, Transition::Stop);
|
||||
cmds.Add<Config>("somedeviceid", "someconfig");
|
||||
cmds.Add<HeartbeatSubscription>("somedeviceid", Result::Ok);
|
||||
cmds.Add<HeartbeatUnsubscription>("somedeviceid", Result::Ok);
|
||||
|
@ -177,6 +178,7 @@ void checkCommands(Cmds& cmds)
|
|||
case Type::transition_status:
|
||||
++count;
|
||||
ASSERT_EQ(static_cast<TransitionStatus&>(*cmd).GetDeviceId(), "somedeviceid");
|
||||
ASSERT_EQ(static_cast<TransitionStatus&>(*cmd).GetTaskId(), 123456);
|
||||
ASSERT_EQ(static_cast<TransitionStatus&>(*cmd).GetResult(), Result::Ok);
|
||||
ASSERT_EQ(static_cast<TransitionStatus&>(*cmd).GetTransition(), Transition::Stop);
|
||||
break;
|
||||
|
|
Loading…
Reference in New Issue
Block a user