mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 08:41:16 +00:00
DDS plugin: Wait for IDLE->EXITING state-change to be acknowledged
Sometimes devices shut down too fast when entering the EXITING state so that the publication of that state-change will never be sent. The plugin now waits for an acknowledgement by the external controller with a configurable timeout.
This commit is contained in:
parent
c1a17c97b8
commit
8a2c7fb601
|
@ -67,7 +67,7 @@ echo "...$sampler_and_sink are READY, sending shutdown..."
|
||||||
fairmq-dds-command-ui -c s -w "RUNNING->READY" -n ${requiredNofAgents}
|
fairmq-dds-command-ui -c s -w "RUNNING->READY" -n ${requiredNofAgents}
|
||||||
fairmq-dds-command-ui -c t -w "DEVICE READY" -n ${requiredNofAgents}
|
fairmq-dds-command-ui -c t -w "DEVICE READY" -n ${requiredNofAgents}
|
||||||
fairmq-dds-command-ui -c d -w "IDLE" -n ${requiredNofAgents}
|
fairmq-dds-command-ui -c d -w "IDLE" -n ${requiredNofAgents}
|
||||||
fairmq-dds-command-ui -c q
|
fairmq-dds-command-ui -c q -w "EXITING" -n ${requiredNofAgents}
|
||||||
echo "...waiting for ${requiredNofAgents} idle agents..."
|
echo "...waiting for ${requiredNofAgents} idle agents..."
|
||||||
@WAIT_COMMAND@ ${requiredNofAgents}
|
@WAIT_COMMAND@ ${requiredNofAgents}
|
||||||
echo "------------------------"
|
echo "------------------------"
|
||||||
|
|
|
@ -50,6 +50,8 @@ DDS::DDS(const string& name,
|
||||||
, fCurrentState(DeviceState::Idle)
|
, fCurrentState(DeviceState::Idle)
|
||||||
, fLastState(DeviceState::Idle)
|
, fLastState(DeviceState::Idle)
|
||||||
, fDeviceTerminationRequested(false)
|
, fDeviceTerminationRequested(false)
|
||||||
|
, fLastExternalController(0)
|
||||||
|
, fExitingAckedByLastExternalController(false)
|
||||||
, fHeartbeatInterval(100)
|
, fHeartbeatInterval(100)
|
||||||
, fUpdatesAllowed(false)
|
, fUpdatesAllowed(false)
|
||||||
{
|
{
|
||||||
|
@ -130,6 +132,15 @@ DDS::DDS(const string& name,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
auto DDS::WaitForExitingAck() -> void
|
||||||
|
{
|
||||||
|
unique_lock<mutex> lock(fStateChangeSubscriberMutex);
|
||||||
|
fExitingAcked.wait_for(
|
||||||
|
lock,
|
||||||
|
chrono::milliseconds(GetProperty<unsigned int>("wait-for-exiting-ack-timeout")),
|
||||||
|
[this]() { return fExitingAckedByLastExternalController; });
|
||||||
|
}
|
||||||
|
|
||||||
auto DDS::StaticControl() -> void
|
auto DDS::StaticControl() -> void
|
||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
|
@ -333,6 +344,10 @@ auto DDS::SubscribeForCustomCommands() -> void
|
||||||
unique_lock<mutex> lock(fStopMutex);
|
unique_lock<mutex> lock(fStopMutex);
|
||||||
fStopCondition.notify_one();
|
fStopCondition.notify_one();
|
||||||
}
|
}
|
||||||
|
{
|
||||||
|
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
|
||||||
|
fLastExternalController = senderId;
|
||||||
|
}
|
||||||
} else if (cmd == "dump-config") {
|
} else if (cmd == "dump-config") {
|
||||||
stringstream ss;
|
stringstream ss;
|
||||||
for (const auto pKey: GetPropertyKeys()) {
|
for (const auto pKey: GetPropertyKeys()) {
|
||||||
|
@ -352,11 +367,22 @@ auto DDS::SubscribeForCustomCommands() -> void
|
||||||
fHeartbeatSubscribers.erase(senderId);
|
fHeartbeatSubscribers.erase(senderId);
|
||||||
}
|
}
|
||||||
fDDS.Send("heartbeat-unsubscription: " + id + ",OK", to_string(senderId));
|
fDDS.Send("heartbeat-unsubscription: " + id + ",OK", to_string(senderId));
|
||||||
|
} else if (cmd == "state-change-exiting-received") {
|
||||||
|
{
|
||||||
|
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
|
||||||
|
if (fLastExternalController == senderId) {
|
||||||
|
fExitingAckedByLastExternalController = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fExitingAcked.notify_one();
|
||||||
} else if (cmd == "subscribe-to-state-changes") {
|
} else if (cmd == "subscribe-to-state-changes") {
|
||||||
{
|
{
|
||||||
// auto size = fStateChangeSubscribers.size();
|
// auto size = fStateChangeSubscribers.size();
|
||||||
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
|
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
|
||||||
fStateChangeSubscribers.insert(senderId);
|
fStateChangeSubscribers.insert(senderId);
|
||||||
|
if (!fControllerThread.joinable()) {
|
||||||
|
fControllerThread = thread(&DDS::WaitForExitingAck, this);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
fDDS.Send("state-changes-subscription: " + id + ",OK", to_string(senderId));
|
fDDS.Send("state-changes-subscription: " + id + ",OK", to_string(senderId));
|
||||||
{
|
{
|
||||||
|
|
|
@ -129,6 +129,7 @@ class DDS : public Plugin
|
||||||
|
|
||||||
private:
|
private:
|
||||||
auto StaticControl() -> void;
|
auto StaticControl() -> void;
|
||||||
|
auto WaitForExitingAck() -> void;
|
||||||
|
|
||||||
auto FillChannelContainers() -> void;
|
auto FillChannelContainers() -> void;
|
||||||
auto SubscribeForConnectingChannels() -> void;
|
auto SubscribeForConnectingChannels() -> void;
|
||||||
|
@ -158,7 +159,11 @@ class DDS : public Plugin
|
||||||
|
|
||||||
std::set<uint64_t> fHeartbeatSubscribers;
|
std::set<uint64_t> fHeartbeatSubscribers;
|
||||||
std::mutex fHeartbeatSubscriberMutex;
|
std::mutex fHeartbeatSubscriberMutex;
|
||||||
|
|
||||||
std::set<uint64_t> fStateChangeSubscribers;
|
std::set<uint64_t> fStateChangeSubscribers;
|
||||||
|
uint64_t fLastExternalController;
|
||||||
|
bool fExitingAckedByLastExternalController;
|
||||||
|
std::condition_variable fExitingAcked;
|
||||||
std::mutex fStateChangeSubscriberMutex;
|
std::mutex fStateChangeSubscriberMutex;
|
||||||
|
|
||||||
std::thread fHeartbeatThread;
|
std::thread fHeartbeatThread;
|
||||||
|
@ -174,7 +179,8 @@ Plugin::ProgOptions DDSProgramOptions()
|
||||||
boost::program_options::options_description options{"DDS Plugin"};
|
boost::program_options::options_description options{"DDS Plugin"};
|
||||||
options.add_options()
|
options.add_options()
|
||||||
("dds-i", boost::program_options::value<std::vector<std::string>>()->multitoken()->composing(), "Task index for chosing connection target (single channel n to m). When all values come via same update.")
|
("dds-i", boost::program_options::value<std::vector<std::string>>()->multitoken()->composing(), "Task index for chosing connection target (single channel n to m). When all values come via same update.")
|
||||||
("dds-i-n", boost::program_options::value<std::vector<std::string>>()->multitoken()->composing(), "Task index for chosing connection target (one out of n values to take). When values come as independent updates.");
|
("dds-i-n", boost::program_options::value<std::vector<std::string>>()->multitoken()->composing(), "Task index for chosing connection target (one out of n values to take). When values come as independent updates.")
|
||||||
|
("wait-for-exiting-ack-timeout", boost::program_options::value<unsigned int>()->default_value(1000), "Wait timeout for EXITING state-change acknowledgement by external controller in milliseconds.");
|
||||||
|
|
||||||
return options;
|
return options;
|
||||||
}
|
}
|
||||||
|
|
|
@ -264,6 +264,9 @@ int main(int argc, char* argv[])
|
||||||
// cerr << "Received: " << msg << endl;
|
// cerr << "Received: " << msg << endl;
|
||||||
boost::trim(parts[2]);
|
boost::trim(parts[2]);
|
||||||
waitMode.AddNewStateEntry(senderId, parts[3]);
|
waitMode.AddNewStateEntry(senderId, parts[3]);
|
||||||
|
if(parts[3] == "IDLE->EXITING") {
|
||||||
|
ddsCustomCmd.send("state-change-exiting-received", std::to_string(senderId));
|
||||||
|
}
|
||||||
} else if (parts[0] == "state-changes-subscription") {
|
} else if (parts[0] == "state-changes-subscription") {
|
||||||
if (parts[2] != "OK") {
|
if (parts[2] != "OK") {
|
||||||
cerr << "state-changes-subscription failed with return code: " << parts[2];
|
cerr << "state-changes-subscription failed with return code: " << parts[2];
|
||||||
|
|
|
@ -356,6 +356,11 @@ void DDSSession::UnsubscribeFromCommands()
|
||||||
|
|
||||||
void DDSSession::SendCommand(const std::string& cmd) { fImpl->fDDSCustomCmd.send(cmd, ""); }
|
void DDSSession::SendCommand(const std::string& cmd) { fImpl->fDDSCustomCmd.send(cmd, ""); }
|
||||||
|
|
||||||
|
void DDSSession::SendCommand(const std::string& cmd, DDSChannel::Id recipient)
|
||||||
|
{
|
||||||
|
fImpl->fDDSCustomCmd.send(cmd, std::to_string(recipient));
|
||||||
|
}
|
||||||
|
|
||||||
auto DDSSession::UpdateChannelToTaskAssociation(DDSChannel::Id channelId, DDSTask::Id taskId) -> void
|
auto DDSSession::UpdateChannelToTaskAssociation(DDSChannel::Id channelId, DDSTask::Id taskId) -> void
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> lk(fImpl->fMtx);
|
std::lock_guard<std::mutex> lk(fImpl->fMtx);
|
||||||
|
|
|
@ -102,6 +102,7 @@ class DDSSession
|
||||||
void SubscribeToCommands(std::function<void(const std::string& msg, const std::string& condition, uint64_t senderId)>);
|
void SubscribeToCommands(std::function<void(const std::string& msg, const std::string& condition, uint64_t senderId)>);
|
||||||
void UnsubscribeFromCommands();
|
void UnsubscribeFromCommands();
|
||||||
void SendCommand(const std::string&);
|
void SendCommand(const std::string&);
|
||||||
|
void SendCommand(const std::string&, DDSChannel::Id);
|
||||||
auto UpdateChannelToTaskAssociation(DDSChannel::Id, DDSTask::Id) -> void;
|
auto UpdateChannelToTaskAssociation(DDSChannel::Id, DDSTask::Id) -> void;
|
||||||
auto GetTaskId(DDSChannel::Id) const -> DDSTask::Id;
|
auto GetTaskId(DDSChannel::Id) const -> DDSTask::Id;
|
||||||
|
|
||||||
|
|
|
@ -146,6 +146,9 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
if (parts[0] == "state-change") {
|
if (parts[0] == "state-change") {
|
||||||
DDSTask::Id taskId(std::stoull(parts[2]));
|
DDSTask::Id taskId(std::stoull(parts[2]));
|
||||||
fDDSSession.UpdateChannelToTaskAssociation(senderId, taskId);
|
fDDSSession.UpdateChannelToTaskAssociation(senderId, taskId);
|
||||||
|
if(parts[3] == "IDLE->EXITING") {
|
||||||
|
fDDSSession.SendCommand("state-change-exiting-received", senderId);
|
||||||
|
}
|
||||||
UpdateStateEntry(taskId, parts[3]);
|
UpdateStateEntry(taskId, parts[3]);
|
||||||
} else if (parts[0] == "state-changes-subscription") {
|
} else if (parts[0] == "state-changes-subscription") {
|
||||||
LOG(debug) << "Received from " << senderId << ": " << msg;
|
LOG(debug) << "Received from " << senderId << ": " << msg;
|
||||||
|
@ -402,9 +405,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
{
|
{
|
||||||
bool targetStateReached(
|
bool targetStateReached(
|
||||||
std::all_of(fState.cbegin(), fState.cend(), [&](TopologyState::value_type i) {
|
std::all_of(fState.cbegin(), fState.cend(), [&](TopologyState::value_type i) {
|
||||||
// TODO Check, if we can make sure that EXITING state change event are not missed
|
return (i.second.state == fChangeStateTarget) && i.second.initialized;
|
||||||
return fChangeStateTarget == DeviceState::Exiting
|
|
||||||
|| ((i.second.state == fChangeStateTarget) && i.second.initialized);
|
|
||||||
}));
|
}));
|
||||||
|
|
||||||
if (!fChangeStateOp.IsCompleted() && targetStateReached) {
|
if (!fChangeStateOp.IsCompleted() && targetStateReached) {
|
||||||
|
|
Loading…
Reference in New Issue
Block a user