From 274ba5ec00adebd7c9ba19ded145a1463022daa4 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Tue, 31 Mar 2020 17:00:42 +0200 Subject: [PATCH] Commands: Add task id to subscription status cmds --- fairmq/plugins/DDS/DDS.cxx | 4 +-- fairmq/sdk/Topology.h | 37 +++++++++++++++++++------- fairmq/sdk/commands/Commands.cxx | 6 +++-- fairmq/sdk/commands/Commands.h | 16 ++++++++--- fairmq/sdk/commands/CommandsFormat.fbs | 6 ++--- test/commands/_commands.cxx | 12 ++++++--- 6 files changed, 57 insertions(+), 24 deletions(-) diff --git a/fairmq/plugins/DDS/DDS.cxx b/fairmq/plugins/DDS/DDS.cxx index ad84bb79..c3f61eba 100644 --- a/fairmq/plugins/DDS/DDS.cxx +++ b/fairmq/plugins/DDS/DDS.cxx @@ -367,7 +367,7 @@ auto DDS::HandleCmd(const string& id, sdk::cmd::Cmd& cmd, const string& cond, ui LOG(debug) << "Publishing state-change: " << fLastState << "->" << fCurrentState << " to " << senderId; - Cmds outCmds(make(id, Result::Ok), + Cmds outCmds(make(id, fDDSTaskId, Result::Ok), make(id, fDDSTaskId, fLastState, fCurrentState)); fDDS.Send(outCmds.Serialize(), to_string(senderId)); @@ -377,7 +377,7 @@ auto DDS::HandleCmd(const string& id, sdk::cmd::Cmd& cmd, const string& cond, ui lock_guard lock{fStateChangeSubscriberMutex}; fStateChangeSubscribers.erase(senderId); } - Cmds outCmds(make(id, Result::Ok)); + Cmds outCmds(make(id, fDDSTaskId, Result::Ok)); fDDS.Send(outCmds.Serialize(), to_string(senderId)); } break; case Type::get_properties: { diff --git a/fairmq/sdk/Topology.h b/fairmq/sdk/Topology.h index 827959d5..ec99a3b1 100644 --- a/fairmq/sdk/Topology.h +++ b/fairmq/sdk/Topology.h @@ -71,7 +71,7 @@ const std::map expectedState = struct DeviceStatus { - bool initialized; + bool subscribed_to_state_changes; DeviceState lastState; DeviceState state; DDSTask::Id taskId; @@ -265,15 +265,35 @@ class BasicTopology : public AsioBase auto HandleCmd(cmd::StateChangeSubscription const& cmd) -> void { - if (cmd.GetResult() != cmd::Result::Ok) { - FAIR_LOG(error) << "State change subscription failed for " << cmd.GetDeviceId(); + if (cmd.GetResult() == cmd::Result::Ok) { + DDSTask::Id taskId(cmd.GetTaskId()); + std::lock_guard lk(fMtx); + + try { + DeviceStatus& task = fStateData.at(fStateIndex.at(taskId)); + task.subscribed_to_state_changes = true; + } catch (const std::exception& e) { + FAIR_LOG(error) << "Exception in HandleCmd(cmd::StateChangeSubscription const&): " << e.what(); + } + } else { + FAIR_LOG(error) << "State change subscription failed for device: " << cmd.GetDeviceId() << ", task id: " << cmd.GetTaskId(); } } auto HandleCmd(cmd::StateChangeUnsubscription const& cmd) -> void { - if (cmd.GetResult() != cmd::Result::Ok) { - FAIR_LOG(error) << "State change unsubscription failed for " << cmd.GetDeviceId(); + if (cmd.GetResult() == cmd::Result::Ok) { + DDSTask::Id taskId(cmd.GetTaskId()); + std::lock_guard lk(fMtx); + + try { + DeviceStatus& task = fStateData.at(fStateIndex.at(taskId)); + task.subscribed_to_state_changes = false; + } catch (const std::exception& e) { + FAIR_LOG(error) << "Exception in HandleCmd(cmd::StateChangeUnsubscription const&): " << e.what(); + } + } else { + FAIR_LOG(error) << "State change unsubscription failed for device: " << cmd.GetDeviceId() << ", task id: " << cmd.GetTaskId(); } } @@ -290,7 +310,6 @@ class BasicTopology : public AsioBase try { DeviceStatus& task = fStateData.at(fStateIndex.at(taskId)); - task.initialized = true; task.lastState = cmd.GetLastState(); task.state = cmd.GetCurrentState(); // FAIR_LOG(debug) << "Updated state entry: taskId=" << taskId << ", state=" << state; @@ -310,11 +329,11 @@ class BasicTopology : public AsioBase { if (cmd.GetResult() != cmd::Result::Ok) { FAIR_LOG(error) << cmd.GetTransition() << " transition failed for " << cmd.GetDeviceId(); - DDSTask::Id id(cmd.GetTaskId()); + DDSTask::Id taskId(cmd.GetTaskId()); std::lock_guard lk(fMtx); for (auto& op : fChangeStateOps) { - if (!op.second.IsCompleted() && op.second.ContainsTask(id) && - fStateData.at(fStateIndex.at(id)).state != op.second.GetTargetState()) { + if (!op.second.IsCompleted() && op.second.ContainsTask(taskId) && + fStateData.at(fStateIndex.at(taskId)).state != op.second.GetTargetState()) { op.second.Complete(MakeErrorCode(ErrorCode::DeviceChangeStateFailed)); } } diff --git a/fairmq/sdk/commands/Commands.cxx b/fairmq/sdk/commands/Commands.cxx index fffb6216..9aca9b26 100644 --- a/fairmq/sdk/commands/Commands.cxx +++ b/fairmq/sdk/commands/Commands.cxx @@ -293,6 +293,7 @@ string Cmds::Serialize(const Format type) const auto deviceId = fbb.CreateString(_cmd.GetDeviceId()); cmdBuilder = tools::make_unique(fbb); cmdBuilder->add_device_id(deviceId); + cmdBuilder->add_task_id(_cmd.GetTaskId()); cmdBuilder->add_result(GetFBResult(_cmd.GetResult())); } break; @@ -301,6 +302,7 @@ string Cmds::Serialize(const Format type) const auto deviceId = fbb.CreateString(_cmd.GetDeviceId()); cmdBuilder = tools::make_unique(fbb); cmdBuilder->add_device_id(deviceId); + cmdBuilder->add_task_id(_cmd.GetTaskId()); cmdBuilder->add_result(GetFBResult(_cmd.GetResult())); } break; @@ -433,10 +435,10 @@ void Cmds::Deserialize(const string& str, const Format type) fCmds.emplace_back(make(cmdPtr.device_id()->str(), cmdPtr.config_string()->str())); break; case FBCmd_state_change_subscription: - fCmds.emplace_back(make(cmdPtr.device_id()->str(), GetResult(cmdPtr.result()))); + fCmds.emplace_back(make(cmdPtr.device_id()->str(), cmdPtr.task_id(), GetResult(cmdPtr.result()))); break; case FBCmd_state_change_unsubscription: - fCmds.emplace_back(make(cmdPtr.device_id()->str(), GetResult(cmdPtr.result()))); + fCmds.emplace_back(make(cmdPtr.device_id()->str(), cmdPtr.task_id(), GetResult(cmdPtr.result()))); break; case FBCmd_state_change: fCmds.emplace_back(make(cmdPtr.device_id()->str(), cmdPtr.task_id(), GetMQState(cmdPtr.last_state()), GetMQState(cmdPtr.current_state()))); diff --git a/fairmq/sdk/commands/Commands.h b/fairmq/sdk/commands/Commands.h index 8c30e654..92b18389 100644 --- a/fairmq/sdk/commands/Commands.h +++ b/fairmq/sdk/commands/Commands.h @@ -51,8 +51,8 @@ enum class Type : int current_state, // args: { device_id, current_state } transition_status, // args: { device_id, task_id, Result, transition } config, // args: { device_id, config_string } - state_change_subscription, // args: { device_id, Result } - state_change_unsubscription, // args: { device_id, Result } + state_change_subscription, // args: { device_id, task_id, Result } + state_change_unsubscription, // args: { device_id, task_id, Result } state_change, // args: { device_id, task_id, last_state, current_state } properties, // args: { device_id, request_id, Result, properties } properties_set // args: { device_id, request_id, Result } @@ -208,37 +208,45 @@ struct Config : Cmd struct StateChangeSubscription : Cmd { - explicit StateChangeSubscription(const std::string& id, const Result result) + explicit StateChangeSubscription(const std::string& id, const uint64_t taskId, const Result result) : Cmd(Type::state_change_subscription) , fDeviceId(id) + , fTaskId(taskId) , fResult(result) {} std::string GetDeviceId() const { return fDeviceId; } void SetDeviceId(const std::string& deviceId) { fDeviceId = deviceId; } + uint64_t GetTaskId() const { return fTaskId; } + void SetTaskId(const uint64_t taskId) { fTaskId = taskId; } Result GetResult() const { return fResult; } void SetResult(const Result result) { fResult = result; } private: std::string fDeviceId; + uint64_t fTaskId; Result fResult; }; struct StateChangeUnsubscription : Cmd { - explicit StateChangeUnsubscription(const std::string& id, const Result result) + explicit StateChangeUnsubscription(const std::string& id, const uint64_t taskId, const Result result) : Cmd(Type::state_change_unsubscription) , fDeviceId(id) + , fTaskId(taskId) , fResult(result) {} std::string GetDeviceId() const { return fDeviceId; } void SetDeviceId(const std::string& deviceId) { fDeviceId = deviceId; } + uint64_t GetTaskId() const { return fTaskId; } + void SetTaskId(const uint64_t taskId) { fTaskId = taskId; } Result GetResult() const { return fResult; } void SetResult(const Result result) { fResult = result; } private: std::string fDeviceId; + uint64_t fTaskId; Result fResult; }; diff --git a/fairmq/sdk/commands/CommandsFormat.fbs b/fairmq/sdk/commands/CommandsFormat.fbs index 27736670..71e7506c 100644 --- a/fairmq/sdk/commands/CommandsFormat.fbs +++ b/fairmq/sdk/commands/CommandsFormat.fbs @@ -54,10 +54,10 @@ enum FBCmd:byte { set_properties, // args: { request_id, properties } current_state, // args: { device_id, current_state } - transition_status, // args: { device_id, Result, transition } + transition_status, // args: { device_id, task_id, Result, transition } config, // args: { device_id, config_string } - state_change_subscription, // args: { device_id, Result } - state_change_unsubscription, // args: { device_id, Result } + state_change_subscription, // args: { device_id, task_id, Result } + state_change_unsubscription, // args: { device_id, task_id, Result } state_change, // args: { device_id, task_id, last_state, current_state } properties, // args: { device_id, request_id, Result, properties } properties_set // args: { device_id, request_id, Result } diff --git a/test/commands/_commands.cxx b/test/commands/_commands.cxx index 0048816b..2c395d2e 100644 --- a/test/commands/_commands.cxx +++ b/test/commands/_commands.cxx @@ -32,8 +32,8 @@ TEST(Format, Construction) Cmds currentStateCmds(make("somedeviceid", State::Running)); Cmds transitionStatusCmds(make("somedeviceid", 123456, Result::Ok, Transition::Stop)); Cmds configCmds(make("somedeviceid", "someconfig")); - Cmds stateChangeSubscriptionCmds(make("somedeviceid", Result::Ok)); - Cmds stateChangeUnsubscriptionCmds(make("somedeviceid", Result::Ok)); + Cmds stateChangeSubscriptionCmds(make("somedeviceid", 123456, Result::Ok)); + Cmds stateChangeUnsubscriptionCmds(make("somedeviceid", 123456, Result::Ok)); Cmds stateChangeCmds(make("somedeviceid", 123456, State::Running, State::Ready)); Cmds propertiesCmds(make("somedeviceid", 66, Result::Ok, props)); Cmds propertiesSetCmds(make("somedeviceid", 42, Result::Ok)); @@ -64,9 +64,11 @@ TEST(Format, Construction) ASSERT_EQ(static_cast(configCmds.At(0)).GetConfig(), "someconfig"); ASSERT_EQ(stateChangeSubscriptionCmds.At(0).GetType(), Type::state_change_subscription); ASSERT_EQ(static_cast(stateChangeSubscriptionCmds.At(0)).GetDeviceId(), "somedeviceid"); + ASSERT_EQ(static_cast(stateChangeSubscriptionCmds.At(0)).GetTaskId(), 123456); ASSERT_EQ(static_cast(stateChangeSubscriptionCmds.At(0)).GetResult(), Result::Ok); ASSERT_EQ(stateChangeUnsubscriptionCmds.At(0).GetType(), Type::state_change_unsubscription); ASSERT_EQ(static_cast(stateChangeUnsubscriptionCmds.At(0)).GetDeviceId(), "somedeviceid"); + ASSERT_EQ(static_cast(stateChangeUnsubscriptionCmds.At(0)).GetTaskId(), 123456); ASSERT_EQ(static_cast(stateChangeUnsubscriptionCmds.At(0)).GetResult(), Result::Ok); ASSERT_EQ(stateChangeCmds.At(0).GetType(), Type::state_change); ASSERT_EQ(static_cast(stateChangeCmds.At(0)).GetDeviceId(), "somedeviceid"); @@ -99,8 +101,8 @@ void fillCommands(Cmds& cmds) cmds.Add("somedeviceid", State::Running); cmds.Add("somedeviceid", 123456, Result::Ok, Transition::Stop); cmds.Add("somedeviceid", "someconfig"); - cmds.Add("somedeviceid", Result::Ok); - cmds.Add("somedeviceid", Result::Ok); + cmds.Add("somedeviceid", 123456, Result::Ok); + cmds.Add("somedeviceid", 123456, Result::Ok); cmds.Add("somedeviceid", 123456, State::Running, State::Ready); cmds.Add("somedeviceid", 66, Result::Ok, props); cmds.Add("somedeviceid", 42, Result::Ok); @@ -164,11 +166,13 @@ void checkCommands(Cmds& cmds) case Type::state_change_subscription: ++count; ASSERT_EQ(static_cast(*cmd).GetDeviceId(), "somedeviceid"); + ASSERT_EQ(static_cast(*cmd).GetTaskId(), 123456); ASSERT_EQ(static_cast(*cmd).GetResult(), Result::Ok); break; case Type::state_change_unsubscription: ++count; ASSERT_EQ(static_cast(*cmd).GetDeviceId(), "somedeviceid"); + ASSERT_EQ(static_cast(*cmd).GetTaskId(), 123456); ASSERT_EQ(static_cast(*cmd).GetResult(), Result::Ok); break; case Type::state_change: