From 5721ea9510c2c307ed57881e0e66510467370f63 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Fri, 10 Apr 2020 13:07:05 +0200 Subject: [PATCH] SDK: send heartbeats when subscribed to state changes --- fairmq/plugins/DDS/DDS.cxx | 43 ++++++++++++++++++------ fairmq/plugins/DDS/DDS.h | 4 +-- fairmq/plugins/PMIx/runPMIxCommandUI.cxx | 2 +- fairmq/sdk/Topology.h | 41 ++++++++++++++++++---- 4 files changed, 69 insertions(+), 21 deletions(-) diff --git a/fairmq/plugins/DDS/DDS.cxx b/fairmq/plugins/DDS/DDS.cxx index c3f61eba..88922abc 100644 --- a/fairmq/plugins/DDS/DDS.cxx +++ b/fairmq/plugins/DDS/DDS.cxx @@ -109,16 +109,27 @@ DDS::DDS(const string& name, break; } - lock_guard lock{fStateChangeSubscriberMutex}; + using namespace sdk::cmd; + auto now = chrono::steady_clock::now(); string id = GetProperty("id"); fLastState = fCurrentState; fCurrentState = newState; - using namespace sdk::cmd; - for (auto subscriberId : fStateChangeSubscribers) { - LOG(debug) << "Publishing state-change: " << fLastState << "->" << newState << " to " << subscriberId; - Cmds cmds(make(id, fDDSTaskId, fLastState, fCurrentState)); - fDDS.Send(cmds.Serialize(), to_string(subscriberId)); + lock_guard lock{fStateChangeSubscriberMutex}; + for (auto it = fStateChangeSubscribers.cbegin(); it != fStateChangeSubscribers.end();) { + // if a subscriber did not send a heartbeat in more than 3 times the promised interval, + // remove it from the subscriber list + if (chrono::duration(now - it->second.first).count() > 3 * it->second.second) { + LOG(warn) << "Controller '" << it->first + << "' did not send heartbeats since over 3 intervals (" + << 3 * it->second.second << " ms), removing it."; + fStateChangeSubscribers.erase(it++); + } else { + LOG(debug) << "Publishing state-change: " << fLastState << "->" << fCurrentState << " to " << it->first; + Cmds cmds(make(id, fDDSTaskId, fLastState, fCurrentState)); + fDDS.Send(cmds.Serialize(), to_string(it->first)); + ++it; + } } }); @@ -150,7 +161,7 @@ auto DDS::WaitForExitingAck() -> void unique_lock lock(fStateChangeSubscriberMutex); auto timeout = GetProperty("wait-for-exiting-ack-timeout"); fExitingAcked.wait_for(lock, chrono::milliseconds(timeout), [this]() { - return fExitingAckedByLastExternalController; + return fExitingAckedByLastExternalController || fStateChangeSubscribers.empty(); }); } @@ -362,8 +373,9 @@ auto DDS::HandleCmd(const string& id, sdk::cmd::Cmd& cmd, const string& cond, ui fExitingAcked.notify_one(); } break; case Type::subscribe_to_state_change: { + auto _cmd = static_cast(cmd); lock_guard lock{fStateChangeSubscriberMutex}; - fStateChangeSubscribers.insert(senderId); + fStateChangeSubscribers.emplace(senderId, make_pair(chrono::steady_clock::now(), _cmd.GetInterval())); LOG(debug) << "Publishing state-change: " << fLastState << "->" << fCurrentState << " to " << senderId; @@ -372,6 +384,15 @@ auto DDS::HandleCmd(const string& id, sdk::cmd::Cmd& cmd, const string& cond, ui fDDS.Send(outCmds.Serialize(), to_string(senderId)); } break; + case Type::subscription_heartbeat: { + try { + auto _cmd = static_cast(cmd); + lock_guard lock{fStateChangeSubscriberMutex}; + fStateChangeSubscribers.at(senderId) = make_pair(chrono::steady_clock::now(), _cmd.GetInterval()); + } catch(out_of_range& oor) { + LOG(warn) << "Received subscription heartbeat from an unknown controller with id '" << senderId << "'"; + } + } break; case Type::unsubscribe_from_state_change: { { lock_guard lock{fStateChangeSubscriberMutex}; @@ -384,12 +405,12 @@ auto DDS::HandleCmd(const string& id, sdk::cmd::Cmd& cmd, const string& cond, ui auto _cmd = static_cast(cmd); auto const request_id(_cmd.GetRequestId()); auto result(Result::Ok); - std::vector> props; + vector> props; try { for (auto const& prop : GetPropertiesAsString(_cmd.GetQuery())) { props.push_back({prop.first, prop.second}); } - } catch (std::exception const& e) { + } catch (exception const& e) { LOG(warn) << "Getting properties (request id: " << request_id << ") failed: " << e.what(); result = Result::Failure; } @@ -407,7 +428,7 @@ auto DDS::HandleCmd(const string& id, sdk::cmd::Cmd& cmd, const string& cond, ui } // TODO Handle builtin keys with different value type than string SetProperties(props); - } catch (std::exception const& e) { + } catch (exception const& e) { LOG(warn) << "Setting properties (request id: " << request_id << ") failed: " << e.what(); result = Result::Failure; } diff --git a/fairmq/plugins/DDS/DDS.h b/fairmq/plugins/DDS/DDS.h index 5086b421..10f6e6d4 100644 --- a/fairmq/plugins/DDS/DDS.h +++ b/fairmq/plugins/DDS/DDS.h @@ -24,12 +24,12 @@ #include #include #include -#include #include #include #include #include #include +#include // pair #include namespace fair @@ -159,7 +159,7 @@ class DDS : public Plugin std::atomic fDeviceTerminationRequested; - std::set fStateChangeSubscribers; + std::unordered_map> fStateChangeSubscribers; uint64_t fLastExternalController; bool fExitingAckedByLastExternalController; std::condition_variable fExitingAcked; diff --git a/fairmq/plugins/PMIx/runPMIxCommandUI.cxx b/fairmq/plugins/PMIx/runPMIxCommandUI.cxx index 474fdaae..c88e99cb 100644 --- a/fairmq/plugins/PMIx/runPMIxCommandUI.cxx +++ b/fairmq/plugins/PMIx/runPMIxCommandUI.cxx @@ -53,7 +53,7 @@ struct StateSubscription explicit StateSubscription(pmix::Commands& commands) : fCommands(commands) { - fCommands.Send(Cmds(make()).Serialize(Format::JSON)); + fCommands.Send(Cmds(make(600000)).Serialize(Format::JSON)); } ~StateSubscription() diff --git a/fairmq/sdk/Topology.h b/fairmq/sdk/Topology.h index 504e50a0..b2590614 100644 --- a/fairmq/sdk/Topology.h +++ b/fairmq/sdk/Topology.h @@ -175,6 +175,8 @@ class BasicTopology : public AsioBase , fDDSTopo(std::move(topo)) , fStateData() , fStateIndex() + , fHeartbeatsTimer(asio::system_executor()) + , fHeartbeatInterval(600000) { makeTopologyState(); @@ -213,16 +215,36 @@ class BasicTopology : public AsioBase void SubscribeToStateChanges() { - using namespace fair::mq::sdk::cmd; // FAIR_LOG(debug) << "Subscribing to state change"; - Cmds cmds(make()); + cmd::Cmds cmds(cmd::make(fHeartbeatInterval.count())); fDDSSession.SendCommand(cmds.Serialize()); + + fHeartbeatsTimer.expires_after(fHeartbeatInterval); + fHeartbeatsTimer.async_wait(std::bind(&BasicTopology::SendSubscriptionHeartbeats, this, std::placeholders::_1)); + } + + void SendSubscriptionHeartbeats(const std::error_code& ec) + { + if (!ec) { + // Timer expired. + fDDSSession.SendCommand(cmd::Cmds(cmd::make(fHeartbeatInterval.count())).Serialize()); + // schedule again + fHeartbeatsTimer.expires_after(fHeartbeatInterval); + fHeartbeatsTimer.async_wait(std::bind(&BasicTopology::SendSubscriptionHeartbeats, this, std::placeholders::_1)); + } else if (ec == asio::error::operation_aborted) { + // FAIR_LOG(debug) << "Heartbeats timer canceled"; + } else { + FAIR_LOG(error) << "Timer error: " << ec; + } } void UnsubscribeFromStateChanges() { - using namespace fair::mq::sdk::cmd; - fDDSSession.SendCommand(Cmds(make()).Serialize()); + // stop sending heartbeats + fHeartbeatsTimer.cancel(); + + // unsubscribe from state changes + fDDSSession.SendCommand(cmd::Cmds(cmd::make()).Serialize()); // wait for all tasks to confirm unsubscription std::unique_lock lk(fMtx); @@ -309,10 +331,8 @@ class BasicTopology : public AsioBase auto HandleCmd(cmd::StateChange const& cmd, DDSChannel::Id const& senderId) -> void { - using namespace fair::mq::sdk::cmd; - if (cmd.GetCurrentState() == DeviceState::Exiting) { - fDDSSession.SendCommand(Cmds(make()).Serialize(), senderId); + fDDSSession.SendCommand(cmd::Cmds(cmd::make()).Serialize(), senderId); } DDSTask::Id taskId(cmd.GetTaskId()); @@ -1193,6 +1213,9 @@ class BasicTopology : public AsioBase return {ec, failed}; } + Duration GetHeartbeatInterval() const { return fHeartbeatInterval; } + void SetHeartbeatInterval(Duration duration) { fHeartbeatInterval = duration; } + private: using TransitionedCount = unsigned int; @@ -1200,8 +1223,12 @@ class BasicTopology : public AsioBase DDSTopology fDDSTopo; TopologyState fStateData; TopologyStateIndex fStateIndex; + mutable std::mutex fMtx; + std::condition_variable fStateChangeUnsubscriptionCV; + asio::steady_timer fHeartbeatsTimer; + Duration fHeartbeatInterval; std::unordered_map fChangeStateOps; std::unordered_map fWaitForStateOps;