diff --git a/fairmq/plugins/DDS/DDS.cxx b/fairmq/plugins/DDS/DDS.cxx index 264d866b..88b6697f 100644 --- a/fairmq/plugins/DDS/DDS.cxx +++ b/fairmq/plugins/DDS/DDS.cxx @@ -100,8 +100,12 @@ DDS::DDS(const string& name, PublishBoundChannels(); break; case DeviceState::ResettingDevice: { - std::lock_guard lk(fUpdateMutex); - fUpdatesAllowed = false; + { + std::lock_guard lk(fUpdateMutex); + fUpdatesAllowed = false; + } + + EmptyChannelContainers(); break; } case DeviceState::Exiting: @@ -127,13 +131,7 @@ DDS::DDS(const string& name, if (staticMode) { fControllerThread = thread(&DDS::StaticControl, this); } else { - fWorkerThread = thread([this]() { - { - std::unique_lock lk(fUpdateMutex); - fUpdateCondition.wait(lk, [&]{ return fUpdatesAllowed; }); - } - fWorkerQueue.run(); - }); + StartWorkerThread(); } fDDS.Start(); @@ -144,6 +142,19 @@ DDS::DDS(const string& name, } } +void DDS::EmptyChannelContainers() +{ + fBindingChans.clear(); + fConnectingChans.clear(); +} + +auto DDS::StartWorkerThread() -> void +{ + fWorkerThread = thread([this]() { + fWorkerQueue.run(); + }); +} + auto DDS::WaitForExitingAck() -> void { unique_lock lock(fStateChangeSubscriberMutex); @@ -257,6 +268,10 @@ auto DDS::SubscribeForConnectingChannels() -> void boost::asio::post(fWorkerQueue, [=]() { try { + { + std::unique_lock lk(fUpdateMutex); + fUpdateCondition.wait(lk, [&]{ return fUpdatesAllowed; }); + } string val = value; // check if it is to handle as one out of multiple values auto it = fIofN.find(propertyId); diff --git a/fairmq/plugins/DDS/DDS.h b/fairmq/plugins/DDS/DDS.h index 36ba637a..1a79aa9f 100644 --- a/fairmq/plugins/DDS/DDS.h +++ b/fairmq/plugins/DDS/DDS.h @@ -133,8 +133,11 @@ class DDS : public Plugin private: auto StaticControl() -> void; auto WaitForExitingAck() -> void; + auto StartWorkerThread() -> void; auto FillChannelContainers() -> void; + auto EmptyChannelContainers() -> void; + auto SubscribeForConnectingChannels() -> void; auto PublishBoundChannels() -> void; auto SubscribeForCustomCommands() -> void;