From 8a2c7fb6016482599c51f43be3296462a4f4251d Mon Sep 17 00:00:00 2001 From: Dennis Klein Date: Wed, 4 Sep 2019 12:49:38 +0200 Subject: [PATCH] DDS plugin: Wait for IDLE->EXITING state-change to be acknowledged Sometimes devices shut down too fast when entering the EXITING state so that the publication of that state-change will never be sent. The plugin now waits for an acknowledgement by the external controller with a configurable timeout. --- examples/dds/fairmq-start-ex-dds.sh.in | 2 +- fairmq/plugins/DDS/DDS.cxx | 26 ++++++++++++++++++++++++++ fairmq/plugins/DDS/DDS.h | 8 +++++++- fairmq/plugins/DDS/runDDSCommandUI.cxx | 3 +++ fairmq/sdk/DDSSession.cxx | 5 +++++ fairmq/sdk/DDSSession.h | 1 + fairmq/sdk/Topology.h | 7 ++++--- 7 files changed, 47 insertions(+), 5 deletions(-) diff --git a/examples/dds/fairmq-start-ex-dds.sh.in b/examples/dds/fairmq-start-ex-dds.sh.in index 1025c1c3..77d36451 100755 --- a/examples/dds/fairmq-start-ex-dds.sh.in +++ b/examples/dds/fairmq-start-ex-dds.sh.in @@ -67,7 +67,7 @@ echo "...$sampler_and_sink are READY, sending shutdown..." fairmq-dds-command-ui -c s -w "RUNNING->READY" -n ${requiredNofAgents} fairmq-dds-command-ui -c t -w "DEVICE READY" -n ${requiredNofAgents} fairmq-dds-command-ui -c d -w "IDLE" -n ${requiredNofAgents} -fairmq-dds-command-ui -c q +fairmq-dds-command-ui -c q -w "EXITING" -n ${requiredNofAgents} echo "...waiting for ${requiredNofAgents} idle agents..." @WAIT_COMMAND@ ${requiredNofAgents} echo "------------------------" diff --git a/fairmq/plugins/DDS/DDS.cxx b/fairmq/plugins/DDS/DDS.cxx index a68e0306..a702ae6d 100644 --- a/fairmq/plugins/DDS/DDS.cxx +++ b/fairmq/plugins/DDS/DDS.cxx @@ -50,6 +50,8 @@ DDS::DDS(const string& name, , fCurrentState(DeviceState::Idle) , fLastState(DeviceState::Idle) , fDeviceTerminationRequested(false) + , fLastExternalController(0) + , fExitingAckedByLastExternalController(false) , fHeartbeatInterval(100) , fUpdatesAllowed(false) { @@ -130,6 +132,15 @@ DDS::DDS(const string& name, } } +auto DDS::WaitForExitingAck() -> void +{ + unique_lock lock(fStateChangeSubscriberMutex); + fExitingAcked.wait_for( + lock, + chrono::milliseconds(GetProperty("wait-for-exiting-ack-timeout")), + [this]() { return fExitingAckedByLastExternalController; }); +} + auto DDS::StaticControl() -> void { try { @@ -333,6 +344,10 @@ auto DDS::SubscribeForCustomCommands() -> void unique_lock lock(fStopMutex); fStopCondition.notify_one(); } + { + lock_guard lock{fStateChangeSubscriberMutex}; + fLastExternalController = senderId; + } } else if (cmd == "dump-config") { stringstream ss; for (const auto pKey: GetPropertyKeys()) { @@ -352,11 +367,22 @@ auto DDS::SubscribeForCustomCommands() -> void fHeartbeatSubscribers.erase(senderId); } fDDS.Send("heartbeat-unsubscription: " + id + ",OK", to_string(senderId)); + } else if (cmd == "state-change-exiting-received") { + { + lock_guard lock{fStateChangeSubscriberMutex}; + if (fLastExternalController == senderId) { + fExitingAckedByLastExternalController = true; + } + } + fExitingAcked.notify_one(); } else if (cmd == "subscribe-to-state-changes") { { // auto size = fStateChangeSubscribers.size(); lock_guard lock{fStateChangeSubscriberMutex}; fStateChangeSubscribers.insert(senderId); + if (!fControllerThread.joinable()) { + fControllerThread = thread(&DDS::WaitForExitingAck, this); + } } fDDS.Send("state-changes-subscription: " + id + ",OK", to_string(senderId)); { diff --git a/fairmq/plugins/DDS/DDS.h b/fairmq/plugins/DDS/DDS.h index 60d3fb10..f1564896 100644 --- a/fairmq/plugins/DDS/DDS.h +++ b/fairmq/plugins/DDS/DDS.h @@ -129,6 +129,7 @@ class DDS : public Plugin private: auto StaticControl() -> void; + auto WaitForExitingAck() -> void; auto FillChannelContainers() -> void; auto SubscribeForConnectingChannels() -> void; @@ -158,7 +159,11 @@ class DDS : public Plugin std::set fHeartbeatSubscribers; std::mutex fHeartbeatSubscriberMutex; + std::set fStateChangeSubscribers; + uint64_t fLastExternalController; + bool fExitingAckedByLastExternalController; + std::condition_variable fExitingAcked; std::mutex fStateChangeSubscriberMutex; std::thread fHeartbeatThread; @@ -174,7 +179,8 @@ Plugin::ProgOptions DDSProgramOptions() boost::program_options::options_description options{"DDS Plugin"}; options.add_options() ("dds-i", boost::program_options::value>()->multitoken()->composing(), "Task index for chosing connection target (single channel n to m). When all values come via same update.") - ("dds-i-n", boost::program_options::value>()->multitoken()->composing(), "Task index for chosing connection target (one out of n values to take). When values come as independent updates."); + ("dds-i-n", boost::program_options::value>()->multitoken()->composing(), "Task index for chosing connection target (one out of n values to take). When values come as independent updates.") + ("wait-for-exiting-ack-timeout", boost::program_options::value()->default_value(1000), "Wait timeout for EXITING state-change acknowledgement by external controller in milliseconds."); return options; } diff --git a/fairmq/plugins/DDS/runDDSCommandUI.cxx b/fairmq/plugins/DDS/runDDSCommandUI.cxx index 925003f0..c0fe4a7b 100644 --- a/fairmq/plugins/DDS/runDDSCommandUI.cxx +++ b/fairmq/plugins/DDS/runDDSCommandUI.cxx @@ -264,6 +264,9 @@ int main(int argc, char* argv[]) // cerr << "Received: " << msg << endl; boost::trim(parts[2]); waitMode.AddNewStateEntry(senderId, parts[3]); + if(parts[3] == "IDLE->EXITING") { + ddsCustomCmd.send("state-change-exiting-received", std::to_string(senderId)); + } } else if (parts[0] == "state-changes-subscription") { if (parts[2] != "OK") { cerr << "state-changes-subscription failed with return code: " << parts[2]; diff --git a/fairmq/sdk/DDSSession.cxx b/fairmq/sdk/DDSSession.cxx index 11edaca4..f5270e59 100644 --- a/fairmq/sdk/DDSSession.cxx +++ b/fairmq/sdk/DDSSession.cxx @@ -356,6 +356,11 @@ void DDSSession::UnsubscribeFromCommands() void DDSSession::SendCommand(const std::string& cmd) { fImpl->fDDSCustomCmd.send(cmd, ""); } +void DDSSession::SendCommand(const std::string& cmd, DDSChannel::Id recipient) +{ + fImpl->fDDSCustomCmd.send(cmd, std::to_string(recipient)); +} + auto DDSSession::UpdateChannelToTaskAssociation(DDSChannel::Id channelId, DDSTask::Id taskId) -> void { std::lock_guard lk(fImpl->fMtx); diff --git a/fairmq/sdk/DDSSession.h b/fairmq/sdk/DDSSession.h index 2b16ad12..45d088f8 100644 --- a/fairmq/sdk/DDSSession.h +++ b/fairmq/sdk/DDSSession.h @@ -102,6 +102,7 @@ class DDSSession void SubscribeToCommands(std::function); void UnsubscribeFromCommands(); void SendCommand(const std::string&); + void SendCommand(const std::string&, DDSChannel::Id); auto UpdateChannelToTaskAssociation(DDSChannel::Id, DDSTask::Id) -> void; auto GetTaskId(DDSChannel::Id) const -> DDSTask::Id; diff --git a/fairmq/sdk/Topology.h b/fairmq/sdk/Topology.h index 68d7e5c9..4edca885 100644 --- a/fairmq/sdk/Topology.h +++ b/fairmq/sdk/Topology.h @@ -146,6 +146,9 @@ class BasicTopology : public AsioBase if (parts[0] == "state-change") { DDSTask::Id taskId(std::stoull(parts[2])); fDDSSession.UpdateChannelToTaskAssociation(senderId, taskId); + if(parts[3] == "IDLE->EXITING") { + fDDSSession.SendCommand("state-change-exiting-received", senderId); + } UpdateStateEntry(taskId, parts[3]); } else if (parts[0] == "state-changes-subscription") { LOG(debug) << "Received from " << senderId << ": " << msg; @@ -402,9 +405,7 @@ class BasicTopology : public AsioBase { bool targetStateReached( std::all_of(fState.cbegin(), fState.cend(), [&](TopologyState::value_type i) { - // TODO Check, if we can make sure that EXITING state change event are not missed - return fChangeStateTarget == DeviceState::Exiting - || ((i.second.state == fChangeStateTarget) && i.second.initialized); + return (i.second.state == fChangeStateTarget) && i.second.initialized; })); if (!fChangeStateOp.IsCompleted() && targetStateReached) {