DDS plugin: Do not block the DDS KeyValue callback

This commit is contained in:
Dennis Klein 2019-09-13 14:53:42 +02:00
parent fa394194e8
commit e3890a4033
No known key found for this signature in database
GPG Key ID: 08E62D23FA0ECBBC
3 changed files with 74 additions and 51 deletions

View File

@ -19,7 +19,7 @@ set_target_properties(${plugin} PROPERTIES
set(exe fairmq-dds-command-ui) set(exe fairmq-dds-command-ui)
add_executable(${exe} ${CMAKE_CURRENT_SOURCE_DIR}/runDDSCommandUI.cxx) add_executable(${exe} ${CMAKE_CURRENT_SOURCE_DIR}/runDDSCommandUI.cxx)
target_link_libraries(${exe} FairMQ DDS::dds_intercom_lib DDS::dds_protocol_lib) target_link_libraries(${exe} FairMQ DDS::dds_intercom_lib DDS::dds_protocol_lib Boost::boost)
target_include_directories(${exe} PRIVATE ${CMAKE_CURRENT_BINARY_DIR}) target_include_directories(${exe} PRIVATE ${CMAKE_CURRENT_BINARY_DIR})
install(TARGETS ${plugin} ${exe} install(TARGETS ${plugin} ${exe}

View File

@ -13,6 +13,7 @@
#include <boost/algorithm/string/join.hpp> #include <boost/algorithm/string/join.hpp>
#include <boost/algorithm/string/split.hpp> #include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/classification.hpp> #include <boost/algorithm/string/classification.hpp>
#include <boost/asio/post.hpp>
#include <termios.h> // for the interactive mode #include <termios.h> // for the interactive mode
#include <poll.h> // for the interactive mode #include <poll.h> // for the interactive mode
@ -54,6 +55,7 @@ DDS::DDS(const string& name,
, fExitingAckedByLastExternalController(false) , fExitingAckedByLastExternalController(false)
, fHeartbeatInterval(100) , fHeartbeatInterval(100)
, fUpdatesAllowed(false) , fUpdatesAllowed(false)
, fWorkGuard(fWorkerQueue.get_executor())
{ {
try { try {
TakeDeviceControl(); TakeDeviceControl();
@ -83,7 +85,6 @@ DDS::DDS(const string& name,
SubscribeForCustomCommands(); SubscribeForCustomCommands();
SubscribeForConnectingChannels(); SubscribeForConnectingChannels();
fDDS.Start();
// subscribe to device state changes, pushing new state changes into the event queue // subscribe to device state changes, pushing new state changes into the event queue
SubscribeToDeviceStateChange([&](DeviceState newState) { SubscribeToDeviceStateChange([&](DeviceState newState) {
@ -104,6 +105,7 @@ DDS::DDS(const string& name,
break; break;
} }
case DeviceState::Exiting: case DeviceState::Exiting:
fWorkGuard.reset();
fDeviceTerminationRequested = true; fDeviceTerminationRequested = true;
UnsubscribeFromDeviceStateChange(); UnsubscribeFromDeviceStateChange();
ReleaseDeviceControl(); ReleaseDeviceControl();
@ -124,7 +126,17 @@ DDS::DDS(const string& name,
if (staticMode) { if (staticMode) {
fControllerThread = thread(&DDS::StaticControl, this); fControllerThread = thread(&DDS::StaticControl, this);
} else {
fWorkerThread = thread([this]() {
{
std::unique_lock<std::mutex> lk(fUpdateMutex);
fUpdateCondition.wait(lk, [&]{ return fUpdatesAllowed; });
} }
fWorkerQueue.run();
});
}
fDDS.Start();
} catch (PluginServices::DeviceControlError& e) { } catch (PluginServices::DeviceControlError& e) {
LOG(debug) << e.what(); LOG(debug) << e.what();
} catch (exception& e) { } catch (exception& e) {
@ -241,12 +253,10 @@ auto DDS::SubscribeForConnectingChannels() -> void
LOG(debug) << "Subscribing for DDS properties."; LOG(debug) << "Subscribing for DDS properties.";
fDDS.SubscribeKeyValue([&] (const string& propertyId, const string& value, uint64_t senderTaskID) { fDDS.SubscribeKeyValue([&] (const string& propertyId, const string& value, uint64_t senderTaskID) {
try {
LOG(debug) << "Received update for " << propertyId << ": value=" << value << ", senderTaskID=" << senderTaskID; LOG(debug) << "Received update for " << propertyId << ": value=" << value << ", senderTaskID=" << senderTaskID;
std::unique_lock<std::mutex> lk(fUpdateMutex); boost::asio::post(fWorkerQueue, [=]() {
fUpdateCondition.wait(lk, [&]{ return fUpdatesAllowed; }); try {
string val = value; string val = value;
// check if it is to handle as one out of multiple values // check if it is to handle as one out of multiple values
auto it = fIofN.find(propertyId); auto it = fIofN.find(propertyId);
@ -295,6 +305,7 @@ auto DDS::SubscribeForConnectingChannels() -> void
LOG(error) << "Error on handling DDS property update for " << propertyId << ": value=" << value << ", senderTaskID=" << senderTaskID << ": " << e.what(); LOG(error) << "Error on handling DDS property update for " << propertyId << ": value=" << value << ", senderTaskID=" << senderTaskID << ": " << e.what();
} }
}); });
});
} }
auto DDS::PublishBoundChannels() -> void auto DDS::PublishBoundChannels() -> void
@ -421,6 +432,11 @@ DDS::~DDS()
if (fHeartbeatThread.joinable()) { if (fHeartbeatThread.joinable()) {
fHeartbeatThread.join(); fHeartbeatThread.join();
} }
fWorkGuard.reset();
if (fWorkerThread.joinable()) {
fWorkerThread.join();
}
} }
} /* namespace plugins */ } /* namespace plugins */

View File

@ -11,6 +11,9 @@
#include <DDS/dds_env_prop.h> #include <DDS/dds_env_prop.h>
#include <DDS/dds_intercom.h> #include <DDS/dds_intercom.h>
#include <boost/asio/executor.hpp>
#include <boost/asio/executor_work_guard.hpp>
#include <boost/asio/io_context.hpp>
#include <cassert> #include <cassert>
#include <chrono> #include <chrono>
#include <condition_variable> #include <condition_variable>
@ -172,6 +175,10 @@ class DDS : public Plugin
bool fUpdatesAllowed; bool fUpdatesAllowed;
std::mutex fUpdateMutex; std::mutex fUpdateMutex;
std::condition_variable fUpdateCondition; std::condition_variable fUpdateCondition;
std::thread fWorkerThread;
boost::asio::io_context fWorkerQueue;
boost::asio::executor_work_guard<boost::asio::executor> fWorkGuard;
}; };
Plugin::ProgOptions DDSProgramOptions() Plugin::ProgOptions DDSProgramOptions()