From c1719eb285e5da684ce885056385d5f524f46123 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Wed, 19 Feb 2020 16:10:03 +0100 Subject: [PATCH] SDK Commands: remove heartbeat commands --- fairmq/plugins/DDS/DDS.cxx | 41 ---------------- fairmq/plugins/DDS/DDS.h | 8 ---- fairmq/sdk/Topology.h | 4 ++ fairmq/sdk/commands/Commands.cxx | 65 ++------------------------ fairmq/sdk/commands/Commands.h | 65 -------------------------- fairmq/sdk/commands/CommandsFormat.fbs | 5 -- test/commands/_commands.cxx | 44 +---------------- 7 files changed, 9 insertions(+), 223 deletions(-) diff --git a/fairmq/plugins/DDS/DDS.cxx b/fairmq/plugins/DDS/DDS.cxx index 4e120e13..dc565f00 100644 --- a/fairmq/plugins/DDS/DDS.cxx +++ b/fairmq/plugins/DDS/DDS.cxx @@ -42,15 +42,12 @@ DDS::DDS(const string& name, , fDeviceTerminationRequested(false) , fLastExternalController(0) , fExitingAckedByLastExternalController(false) - , fHeartbeatInterval(100) , fUpdatesAllowed(false) , fWorkGuard(fWorkerQueue.get_executor()) { try { TakeDeviceControl(); - fHeartbeatThread = thread(&DDS::HeartbeatSender, this); - string deviceId(GetProperty("id")); if (deviceId.empty()) { SetProperty("id", dds::env_prop()); @@ -304,24 +301,6 @@ auto DDS::PublishBoundChannels() -> void } } -auto DDS::HeartbeatSender() -> void -{ - using namespace sdk::cmd; - string id = GetProperty("id"); - - while (!fDeviceTerminationRequested) { - { - lock_guard lock{fHeartbeatSubscriberMutex}; - - for (const auto subscriberId : fHeartbeatSubscribers) { - fDDS.Send(Cmds(make(id)).Serialize(), to_string(subscriberId)); - } - } - - this_thread::sleep_for(chrono::milliseconds(fHeartbeatInterval)); - } -} - auto DDS::SubscribeForCustomCommands() -> void { LOG(debug) << "Subscribing for DDS custom commands."; @@ -367,22 +346,6 @@ auto DDS::SubscribeForCustomCommands() -> void cmd::Cmds outCmds(cmd::make(id, ss.str())); fDDS.Send(outCmds.Serialize(), to_string(senderId)); } break; - case cmd::Type::subscribe_to_heartbeats: { - { - lock_guard lock{fHeartbeatSubscriberMutex}; - fHeartbeatSubscribers.insert(senderId); - } - cmd::Cmds outCmds(cmd::make(id, cmd::Result::Ok)); - fDDS.Send(outCmds.Serialize(), to_string(senderId)); - } break; - case cmd::Type::unsubscribe_from_heartbeats: { - { - lock_guard lock{fHeartbeatSubscriberMutex}; - fHeartbeatSubscribers.erase(senderId); - } - cmd::Cmds outCmds(cmd::make(id, cmd::Result::Ok)); - fDDS.Send(outCmds.Serialize(), to_string(senderId)); - } break; case cmd::Type::state_change_exiting_received: { { lock_guard lock{fStateChangeSubscriberMutex}; @@ -471,10 +434,6 @@ DDS::~DDS() fControllerThread.join(); } - if (fHeartbeatThread.joinable()) { - fHeartbeatThread.join(); - } - fWorkGuard.reset(); if (fWorkerThread.joinable()) { fWorkerThread.join(); diff --git a/fairmq/plugins/DDS/DDS.h b/fairmq/plugins/DDS/DDS.h index 2629ae89..c400d85a 100644 --- a/fairmq/plugins/DDS/DDS.h +++ b/fairmq/plugins/DDS/DDS.h @@ -142,8 +142,6 @@ class DDS : public Plugin auto PublishBoundChannels() -> void; auto SubscribeForCustomCommands() -> void; - auto HeartbeatSender() -> void; - DDSSubscription fDDS; std::unordered_map> fBindingChans; @@ -157,18 +155,12 @@ class DDS : public Plugin std::atomic fDeviceTerminationRequested; - std::set fHeartbeatSubscribers; - std::mutex fHeartbeatSubscriberMutex; - std::set fStateChangeSubscribers; uint64_t fLastExternalController; bool fExitingAckedByLastExternalController; std::condition_variable fExitingAcked; std::mutex fStateChangeSubscriberMutex; - std::thread fHeartbeatThread; - std::chrono::milliseconds fHeartbeatInterval; - bool fUpdatesAllowed; std::mutex fUpdateMutex; std::condition_variable fUpdateCondition; diff --git a/fairmq/sdk/Topology.h b/fairmq/sdk/Topology.h index 97b7a7c8..937ceb1e 100644 --- a/fairmq/sdk/Topology.h +++ b/fairmq/sdk/Topology.h @@ -252,6 +252,10 @@ class BasicTopology : public AsioBase ~BasicTopology() { + + + + std::lock_guard lk(fMtx); fDDSSession.UnsubscribeFromCommands(); try { diff --git a/fairmq/sdk/commands/Commands.cxx b/fairmq/sdk/commands/Commands.cxx index 903375da..fffb6216 100644 --- a/fairmq/sdk/commands/Commands.cxx +++ b/fairmq/sdk/commands/Commands.cxx @@ -46,14 +46,12 @@ array resultNames = } }; -array typeNames = +array typeNames = { { "CheckState", "ChangeState", "DumpConfig", - "SubscribeToHeartbeats", - "UnsubscribeFromHeartbeats", "SubscribeToStateChange", "UnsubscribeFromStateChange", "StateChangeExitingReceived", @@ -63,9 +61,6 @@ array typeNames = "CurrentState", "TransitionStatus", "Config", - "HeartbeatSubscription", - "HeartbeatUnsubscription", - "Heartbeat", "StateChangeSubscription", "StateChangeUnsubscription", "StateChange", @@ -152,14 +147,12 @@ array mqTransitionToFBTransition = } }; -array typeToFBCmd = +array typeToFBCmd = { { FBCmd::FBCmd_check_state, FBCmd::FBCmd_change_state, FBCmd::FBCmd_dump_config, - FBCmd::FBCmd_subscribe_to_heartbeats, - FBCmd::FBCmd_unsubscribe_from_heartbeats, FBCmd::FBCmd_subscribe_to_state_change, FBCmd::FBCmd_unsubscribe_from_state_change, FBCmd::FBCmd_state_change_exiting_received, @@ -168,9 +161,6 @@ array typeToFBCmd = FBCmd::FBCmd_current_state, FBCmd::FBCmd_transition_status, FBCmd::FBCmd_config, - FBCmd::FBCmd_heartbeat_subscription, - FBCmd::FBCmd_heartbeat_unsubscription, - FBCmd::FBCmd_heartbeat, FBCmd::FBCmd_state_change_subscription, FBCmd::FBCmd_state_change_unsubscription, FBCmd::FBCmd_state_change, @@ -179,14 +169,12 @@ array typeToFBCmd = } }; -array fbCmdToType = +array fbCmdToType = { { Type::check_state, Type::change_state, Type::dump_config, - Type::subscribe_to_heartbeats, - Type::unsubscribe_from_heartbeats, Type::subscribe_to_state_change, Type::unsubscribe_from_state_change, Type::state_change_exiting_received, @@ -195,9 +183,6 @@ array fbCmdToType = Type::current_state, Type::transition_status, Type::config, - Type::heartbeat_subscription, - Type::heartbeat_unsubscription, - Type::heartbeat, Type::state_change_subscription, Type::state_change_unsubscription, Type::state_change, @@ -241,13 +226,6 @@ string Cmds::Serialize(const Format type) const cmdBuilder = tools::make_unique(fbb); } break; - case Type::subscribe_to_heartbeats: { - cmdBuilder = tools::make_unique(fbb); - } - break; - case Type::unsubscribe_from_heartbeats: { - cmdBuilder = tools::make_unique(fbb); - } break; case Type::subscribe_to_state_change: { cmdBuilder = tools::make_unique(fbb); @@ -310,28 +288,6 @@ string Cmds::Serialize(const Format type) const cmdBuilder->add_config_string(config); } break; - case Type::heartbeat_subscription: { - auto _cmd = static_cast(*cmd); - auto deviceId = fbb.CreateString(_cmd.GetDeviceId()); - cmdBuilder = tools::make_unique(fbb); - cmdBuilder->add_device_id(deviceId); - cmdBuilder->add_result(GetFBResult(_cmd.GetResult())); - } - break; - case Type::heartbeat_unsubscription: { - auto _cmd = static_cast(*cmd); - auto deviceId = fbb.CreateString(_cmd.GetDeviceId()); - cmdBuilder = tools::make_unique(fbb); - cmdBuilder->add_device_id(deviceId); - cmdBuilder->add_result(GetFBResult(_cmd.GetResult())); - } - break; - case Type::heartbeat: { - auto deviceId = fbb.CreateString(static_cast(*cmd).GetDeviceId()); - cmdBuilder = tools::make_unique(fbb); - cmdBuilder->add_device_id(deviceId); - } - break; case Type::state_change_subscription: { auto _cmd = static_cast(*cmd); auto deviceId = fbb.CreateString(_cmd.GetDeviceId()); @@ -447,12 +403,6 @@ void Cmds::Deserialize(const string& str, const Format type) case FBCmd_dump_config: fCmds.emplace_back(make()); break; - case FBCmd_subscribe_to_heartbeats: - fCmds.emplace_back(make()); - break; - case FBCmd_unsubscribe_from_heartbeats: - fCmds.emplace_back(make()); - break; case FBCmd_subscribe_to_state_change: fCmds.emplace_back(make()); break; @@ -482,15 +432,6 @@ void Cmds::Deserialize(const string& str, const Format type) case FBCmd_config: fCmds.emplace_back(make(cmdPtr.device_id()->str(), cmdPtr.config_string()->str())); break; - case FBCmd_heartbeat_subscription: - fCmds.emplace_back(make(cmdPtr.device_id()->str(), GetResult(cmdPtr.result()))); - break; - case FBCmd_heartbeat_unsubscription: - fCmds.emplace_back(make(cmdPtr.device_id()->str(), GetResult(cmdPtr.result()))); - break; - case FBCmd_heartbeat: - fCmds.emplace_back(make(cmdPtr.device_id()->str())); - break; case FBCmd_state_change_subscription: fCmds.emplace_back(make(cmdPtr.device_id()->str(), GetResult(cmdPtr.result()))); break; diff --git a/fairmq/sdk/commands/Commands.h b/fairmq/sdk/commands/Commands.h index 3f077cf4..8c30e654 100644 --- a/fairmq/sdk/commands/Commands.h +++ b/fairmq/sdk/commands/Commands.h @@ -42,8 +42,6 @@ enum class Type : int check_state, // args: { } change_state, // args: { transition } dump_config, // args: { } - subscribe_to_heartbeats, // args: { } - unsubscribe_from_heartbeats, // args: { } subscribe_to_state_change, // args: { } unsubscribe_from_state_change, // args: { } state_change_exiting_received, // args: { } @@ -53,9 +51,6 @@ enum class Type : int current_state, // args: { device_id, current_state } transition_status, // args: { device_id, task_id, Result, transition } config, // args: { device_id, config_string } - heartbeat_subscription, // args: { device_id, Result } - heartbeat_unsubscription, // args: { device_id, Result } - heartbeat, // args: { device_id } state_change_subscription, // args: { device_id, Result } state_change_unsubscription, // args: { device_id, Result } state_change, // args: { device_id, task_id, last_state, current_state } @@ -98,16 +93,6 @@ struct DumpConfig : Cmd explicit DumpConfig() : Cmd(Type::dump_config) {} }; -struct SubscribeToHeartbeats : Cmd -{ - explicit SubscribeToHeartbeats() : Cmd(Type::subscribe_to_heartbeats) {} -}; - -struct UnsubscribeFromHeartbeats : Cmd -{ - explicit UnsubscribeFromHeartbeats() : Cmd(Type::unsubscribe_from_heartbeats) {} -}; - struct SubscribeToStateChange : Cmd { explicit SubscribeToStateChange() : Cmd(Type::subscribe_to_state_change) {} @@ -221,56 +206,6 @@ struct Config : Cmd std::string fConfig; }; -struct HeartbeatSubscription : Cmd -{ - explicit HeartbeatSubscription(const std::string& id, const Result result) - : Cmd(Type::heartbeat_subscription) - , fDeviceId(id) - , fResult(result) - {} - - std::string GetDeviceId() const { return fDeviceId; } - void SetDeviceId(const std::string& deviceId) { fDeviceId = deviceId; } - Result GetResult() const { return fResult; } - void SetResult(const Result result) { fResult = result; } - - private: - std::string fDeviceId; - Result fResult; -}; - -struct HeartbeatUnsubscription : Cmd -{ - explicit HeartbeatUnsubscription(const std::string& id, const Result result) - : Cmd(Type::heartbeat_unsubscription) - , fDeviceId(id) - , fResult(result) - {} - - std::string GetDeviceId() const { return fDeviceId; } - void SetDeviceId(const std::string& deviceId) { fDeviceId = deviceId; } - Result GetResult() const { return fResult; } - void SetResult(const Result result) { fResult = result; } - - private: - std::string fDeviceId; - Result fResult; -}; - -struct Heartbeat : Cmd -{ - explicit Heartbeat(const std::string& id) - : Cmd(Type::heartbeat) - , fDeviceId(id) - {} - - std::string GetDeviceId() const { return fDeviceId; } - void SetDeviceId(const std::string& deviceId) { fDeviceId = deviceId; } - - private: - std::string fDeviceId; -}; - struct StateChangeSubscription : Cmd { explicit StateChangeSubscription(const std::string& id, const Result result) diff --git a/fairmq/sdk/commands/CommandsFormat.fbs b/fairmq/sdk/commands/CommandsFormat.fbs index 8633d03d..27736670 100644 --- a/fairmq/sdk/commands/CommandsFormat.fbs +++ b/fairmq/sdk/commands/CommandsFormat.fbs @@ -47,8 +47,6 @@ enum FBCmd:byte { check_state, // args: { } change_state, // args: { transition } dump_config, // args: { } - subscribe_to_heartbeats, // args: { } - unsubscribe_from_heartbeats, // args: { } subscribe_to_state_change, // args: { } unsubscribe_from_state_change, // args: { } state_change_exiting_received, // args: { } @@ -58,9 +56,6 @@ enum FBCmd:byte { current_state, // args: { device_id, current_state } transition_status, // args: { device_id, Result, transition } config, // args: { device_id, config_string } - heartbeat_subscription, // args: { device_id, Result } - heartbeat_unsubscription, // args: { device_id, Result } - heartbeat, // args: { device_id } state_change_subscription, // args: { device_id, Result } state_change_unsubscription, // args: { device_id, Result } state_change, // args: { device_id, task_id, last_state, current_state } diff --git a/test/commands/_commands.cxx b/test/commands/_commands.cxx index cc1047c5..0048816b 100644 --- a/test/commands/_commands.cxx +++ b/test/commands/_commands.cxx @@ -24,8 +24,6 @@ TEST(Format, Construction) Cmds checkStateCmds(make()); Cmds changeStateCmds(make(Transition::Stop)); Cmds dumpConfigCmds(make()); - Cmds subscribeToHeartbeatsCmds(make()); - Cmds unsubscribeFromHeartbeatsCmds(make()); Cmds subscribeToStateChangeCmds(make()); Cmds unsubscribeFromStateChangeCmds(make()); Cmds stateChangeExitingReceivedCmds(make()); @@ -34,9 +32,6 @@ TEST(Format, Construction) Cmds currentStateCmds(make("somedeviceid", State::Running)); Cmds transitionStatusCmds(make("somedeviceid", 123456, Result::Ok, Transition::Stop)); Cmds configCmds(make("somedeviceid", "someconfig")); - Cmds heartbeatSubscriptionCmds(make("somedeviceid", Result::Ok)); - Cmds heartbeatUnsubscriptionCmds(make("somedeviceid", Result::Ok)); - Cmds heartbeatCmds(make("somedeviceid")); Cmds stateChangeSubscriptionCmds(make("somedeviceid", Result::Ok)); Cmds stateChangeUnsubscriptionCmds(make("somedeviceid", Result::Ok)); Cmds stateChangeCmds(make("somedeviceid", 123456, State::Running, State::Ready)); @@ -47,8 +42,6 @@ TEST(Format, Construction) ASSERT_EQ(changeStateCmds.At(0).GetType(), Type::change_state); ASSERT_EQ(static_cast(changeStateCmds.At(0)).GetTransition(), Transition::Stop); ASSERT_EQ(dumpConfigCmds.At(0).GetType(), Type::dump_config); - ASSERT_EQ(subscribeToHeartbeatsCmds.At(0).GetType(), Type::subscribe_to_heartbeats); - ASSERT_EQ(unsubscribeFromHeartbeatsCmds.At(0).GetType(), Type::unsubscribe_from_heartbeats); ASSERT_EQ(subscribeToStateChangeCmds.At(0).GetType(), Type::subscribe_to_state_change); ASSERT_EQ(unsubscribeFromStateChangeCmds.At(0).GetType(), Type::unsubscribe_from_state_change); ASSERT_EQ(stateChangeExitingReceivedCmds.At(0).GetType(), Type::state_change_exiting_received); @@ -69,14 +62,6 @@ TEST(Format, Construction) ASSERT_EQ(configCmds.At(0).GetType(), Type::config); ASSERT_EQ(static_cast(configCmds.At(0)).GetDeviceId(), "somedeviceid"); ASSERT_EQ(static_cast(configCmds.At(0)).GetConfig(), "someconfig"); - ASSERT_EQ(heartbeatSubscriptionCmds.At(0).GetType(), Type::heartbeat_subscription); - ASSERT_EQ(static_cast(heartbeatSubscriptionCmds.At(0)).GetDeviceId(), "somedeviceid"); - ASSERT_EQ(static_cast(heartbeatSubscriptionCmds.At(0)).GetResult(), Result::Ok); - ASSERT_EQ(heartbeatUnsubscriptionCmds.At(0).GetType(), Type::heartbeat_unsubscription); - ASSERT_EQ(static_cast(heartbeatUnsubscriptionCmds.At(0)).GetDeviceId(), "somedeviceid"); - ASSERT_EQ(static_cast(heartbeatUnsubscriptionCmds.At(0)).GetResult(), Result::Ok); - ASSERT_EQ(heartbeatCmds.At(0).GetType(), Type::heartbeat); - ASSERT_EQ(static_cast(heartbeatCmds.At(0)).GetDeviceId(), "somedeviceid"); ASSERT_EQ(stateChangeSubscriptionCmds.At(0).GetType(), Type::state_change_subscription); ASSERT_EQ(static_cast(stateChangeSubscriptionCmds.At(0)).GetDeviceId(), "somedeviceid"); ASSERT_EQ(static_cast(stateChangeSubscriptionCmds.At(0)).GetResult(), Result::Ok); @@ -106,8 +91,6 @@ void fillCommands(Cmds& cmds) cmds.Add(); cmds.Add(Transition::Stop); cmds.Add(); - cmds.Add(); - cmds.Add(); cmds.Add(); cmds.Add(); cmds.Add(); @@ -116,9 +99,6 @@ void fillCommands(Cmds& cmds) cmds.Add("somedeviceid", State::Running); cmds.Add("somedeviceid", 123456, Result::Ok, Transition::Stop); cmds.Add("somedeviceid", "someconfig"); - cmds.Add("somedeviceid", Result::Ok); - cmds.Add("somedeviceid", Result::Ok); - cmds.Add("somedeviceid"); cmds.Add("somedeviceid", Result::Ok); cmds.Add("somedeviceid", Result::Ok); cmds.Add("somedeviceid", 123456, State::Running, State::Ready); @@ -128,7 +108,7 @@ void fillCommands(Cmds& cmds) void checkCommands(Cmds& cmds) { - ASSERT_EQ(cmds.Size(), 21); + ASSERT_EQ(cmds.Size(), 16); int count = 0; auto const props(std::vector>({{"k1", "v1"}, {"k2", "v2"}})); @@ -145,12 +125,6 @@ void checkCommands(Cmds& cmds) case Type::dump_config: ++count; break; - case Type::subscribe_to_heartbeats: - ++count; - break; - case Type::unsubscribe_from_heartbeats: - ++count; - break; case Type::subscribe_to_state_change: ++count; break; @@ -187,20 +161,6 @@ void checkCommands(Cmds& cmds) ASSERT_EQ(static_cast(*cmd).GetDeviceId(), "somedeviceid"); ASSERT_EQ(static_cast(*cmd).GetConfig(), "someconfig"); break; - case Type::heartbeat_subscription: - ++count; - ASSERT_EQ(static_cast(*cmd).GetDeviceId(), "somedeviceid"); - ASSERT_EQ(static_cast(*cmd).GetResult(), Result::Ok); - break; - case Type::heartbeat_unsubscription: - ++count; - ASSERT_EQ(static_cast(*cmd).GetDeviceId(), "somedeviceid"); - ASSERT_EQ(static_cast(*cmd).GetResult(), Result::Ok); - break; - case Type::heartbeat: - ++count; - ASSERT_EQ(static_cast(*cmd).GetDeviceId(), "somedeviceid"); - break; case Type::state_change_subscription: ++count; ASSERT_EQ(static_cast(*cmd).GetDeviceId(), "somedeviceid"); @@ -237,7 +197,7 @@ void checkCommands(Cmds& cmds) } } - ASSERT_EQ(count, 21); + ASSERT_EQ(count, 16); } TEST(Format, SerializationBinary)