mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-15 17:41:45 +00:00
Update to new DDS 2.2 API
- require DDS 2.2 - fix regressions in automatic port binding - fix regression in channel API - update DDS example readme
This commit is contained in:
committed by
Dennis Klein
parent
dc1d7a23c1
commit
ffbe90b638
@@ -48,26 +48,20 @@ DDS::DDS(const string& name, const Plugin::Version version, const string& mainta
|
||||
, fDeviceTerminationRequested(false)
|
||||
, fHeartbeatInterval{100}
|
||||
{
|
||||
try
|
||||
{
|
||||
try {
|
||||
TakeDeviceControl();
|
||||
fControllerThread = thread(&DDS::HandleControl, this);
|
||||
fHeartbeatThread = thread(&DDS::HeartbeatSender, this);
|
||||
}
|
||||
catch (PluginServices::DeviceControlError& e)
|
||||
{
|
||||
} catch (PluginServices::DeviceControlError& e) {
|
||||
LOG(debug) << e.what();
|
||||
}
|
||||
catch (exception& e)
|
||||
{
|
||||
} catch (exception& e) {
|
||||
LOG(error) << "Error in plugin initialization: " << e.what();
|
||||
}
|
||||
}
|
||||
|
||||
auto DDS::HandleControl() -> void
|
||||
{
|
||||
try
|
||||
{
|
||||
try {
|
||||
// subscribe for state changes from DDS (subscriptions start firing after fService.start() is called)
|
||||
SubscribeForCustomCommands();
|
||||
|
||||
@@ -80,23 +74,20 @@ auto DDS::HandleControl() -> void
|
||||
SubscribeForConnectingChannels();
|
||||
|
||||
// subscribe to device state changes, pushing new state chenges into the event queue
|
||||
SubscribeToDeviceStateChange([&](DeviceState newState)
|
||||
{
|
||||
SubscribeToDeviceStateChange([&](DeviceState newState) {
|
||||
{
|
||||
lock_guard<mutex> lock{fEventsMutex};
|
||||
fEvents.push(newState);
|
||||
}
|
||||
fNewEvent.notify_one();
|
||||
if (newState == DeviceState::Exiting)
|
||||
{
|
||||
if (newState == DeviceState::Exiting) {
|
||||
fDeviceTerminationRequested = true;
|
||||
}
|
||||
|
||||
{
|
||||
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
|
||||
string id = GetProperty<string>("id");
|
||||
for (auto subscriberId : fStateChangeSubscribers)
|
||||
{
|
||||
for (auto subscriberId : fStateChangeSubscribers) {
|
||||
LOG(debug) << "Publishing state-change: " << newState << " to " << subscriberId;
|
||||
fDDSCustomCmd.send("state-change: " + id + "," + ToStr(newState), to_string(subscriberId));
|
||||
}
|
||||
@@ -131,14 +122,11 @@ auto DDS::HandleControl() -> void
|
||||
|
||||
// wait until stop signal
|
||||
unique_lock<mutex> lock(fStopMutex);
|
||||
while (!fDeviceTerminationRequested)
|
||||
{
|
||||
while (!fDeviceTerminationRequested) {
|
||||
fStopCondition.wait_for(lock, chrono::seconds(1));
|
||||
}
|
||||
LOG(debug) << "Stopping DDS control plugin";
|
||||
}
|
||||
catch (exception& e)
|
||||
{
|
||||
} catch (exception& e) {
|
||||
LOG(error) << "Error: " << e.what() << endl;
|
||||
return;
|
||||
}
|
||||
@@ -146,13 +134,10 @@ auto DDS::HandleControl() -> void
|
||||
fDDSKeyValue.unsubscribe();
|
||||
fDDSCustomCmd.unsubscribe();
|
||||
|
||||
try
|
||||
{
|
||||
try {
|
||||
UnsubscribeFromDeviceStateChange();
|
||||
ReleaseDeviceControl();
|
||||
}
|
||||
catch (fair::mq::PluginServices::DeviceControlError& e)
|
||||
{
|
||||
} catch (fair::mq::PluginServices::DeviceControlError& e) {
|
||||
LOG(error) << e.what();
|
||||
}
|
||||
}
|
||||
@@ -194,7 +179,7 @@ auto DDS::FillChannelContainers() -> void
|
||||
|
||||
for (const auto& vi : iValues) {
|
||||
size_t pos = vi.find(":");
|
||||
string chanName = vi.substr(0, pos );
|
||||
string chanName = vi.substr(0, pos);
|
||||
|
||||
// check if provided name is a valid channel name
|
||||
if (fConnectingChans.find(chanName) == fConnectingChans.end()) {
|
||||
@@ -229,11 +214,9 @@ auto DDS::FillChannelContainers() -> void
|
||||
|
||||
auto DDS::SubscribeForConnectingChannels() -> void
|
||||
{
|
||||
fDDSKeyValue.subscribe([&] (const string& propertyId, const string& key, const string& value)
|
||||
{
|
||||
try
|
||||
{
|
||||
LOG(debug) << "Received update for " << propertyId << ": key=" << key << " value=" << value;
|
||||
fDDSKeyValue.subscribe([&] (const string& propertyId, const string& value, uint64_t senderTaskID) {
|
||||
try {
|
||||
LOG(debug) << "Received update for " << propertyId << ": value=" << value << ", senderTaskID=" << senderTaskID;
|
||||
string val = value;
|
||||
// check if it is to handle as one out of multiple values
|
||||
auto it = fIofN.find(propertyId);
|
||||
@@ -254,20 +237,18 @@ auto DDS::SubscribeForConnectingChannels() -> void
|
||||
auto it2 = fI.find(propertyId);
|
||||
if (it2 != fI.end()) {
|
||||
LOG(debug) << "adding connecting channel " << propertyId << " : " << connectionStrings.at(it2->second);
|
||||
fConnectingChans.at(propertyId).fDDSValues.insert(make_pair<string, string>(key.c_str(), connectionStrings.at(it2->second).c_str()));
|
||||
fConnectingChans.at(propertyId).fDDSValues.insert({senderTaskID, connectionStrings.at(it2->second).c_str()});
|
||||
} else {
|
||||
LOG(error) << "multiple bound channels received, but no task index specified, only assigning the first";
|
||||
fConnectingChans.at(propertyId).fDDSValues.insert(make_pair<string, string>(key.c_str(), connectionStrings.at(0).c_str()));
|
||||
fConnectingChans.at(propertyId).fDDSValues.insert({senderTaskID, connectionStrings.at(0).c_str()});
|
||||
}
|
||||
} else { // only one bound channel received
|
||||
fConnectingChans.at(propertyId).fDDSValues.insert(make_pair<string, string>(key.c_str(), val.c_str()));
|
||||
fConnectingChans.at(propertyId).fDDSValues.insert({senderTaskID, val.c_str()});
|
||||
}
|
||||
|
||||
// update channels and remove them from unfinished container
|
||||
for (auto mi = fConnectingChans.begin(); mi != fConnectingChans.end(); /* no increment */)
|
||||
{
|
||||
if (mi->second.fSubChannelAddresses.size() == mi->second.fDDSValues.size())
|
||||
{
|
||||
for (auto mi = fConnectingChans.begin(); mi != fConnectingChans.end(); /* no increment */) {
|
||||
if (mi->second.fSubChannelAddresses.size() == mi->second.fDDSValues.size()) {
|
||||
// when multiple subChannels are used, their order on every device should be the same, irregardless of arrival order from DDS.
|
||||
sort(mi->second.fSubChannelAddresses.begin(), mi->second.fSubChannelAddresses.end());
|
||||
auto it3 = mi->second.fDDSValues.begin();
|
||||
@@ -277,24 +258,19 @@ auto DDS::SubscribeForConnectingChannels() -> void
|
||||
++it3;
|
||||
}
|
||||
fConnectingChans.erase(mi++);
|
||||
}
|
||||
else
|
||||
{
|
||||
} else {
|
||||
++mi;
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (const exception& e)
|
||||
{
|
||||
LOG(error) << "Error on handling DDS property update for " << propertyId << ": key=" << key << " value=" << value << ": " << e.what();
|
||||
} catch (const exception& e) {
|
||||
LOG(error) << "Error on handling DDS property update for " << propertyId << ": value=" << value << ", senderTaskID=" << senderTaskID << ": " << e.what();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
auto DDS::PublishBoundChannels() -> void
|
||||
{
|
||||
for (const auto& chan : fBindingChans)
|
||||
{
|
||||
for (const auto& chan : fBindingChans) {
|
||||
string joined = boost::algorithm::join(chan.second, ",");
|
||||
LOG(debug) << "Publishing " << chan.first << " bound addresses (" << chan.second.size() << ") to DDS under '" << chan.first << "' property name.";
|
||||
fDDSKeyValue.putValue(chan.first, joined);
|
||||
@@ -306,13 +282,11 @@ auto DDS::HeartbeatSender() -> void
|
||||
string id = GetProperty<string>("id");
|
||||
string pid(to_string(getpid()));
|
||||
|
||||
while (!fDeviceTerminationRequested)
|
||||
{
|
||||
while (!fDeviceTerminationRequested) {
|
||||
{
|
||||
lock_guard<mutex> lock{fHeartbeatSubscriberMutex};
|
||||
|
||||
for (const auto subscriberId : fHeartbeatSubscribers)
|
||||
{
|
||||
for (const auto subscriberId : fHeartbeatSubscribers) {
|
||||
fDDSCustomCmd.send("heartbeat: " + id + "," + pid, to_string(subscriberId));
|
||||
}
|
||||
}
|
||||
@@ -326,58 +300,42 @@ auto DDS::SubscribeForCustomCommands() -> void
|
||||
string id = GetProperty<string>("id");
|
||||
string pid(to_string(getpid()));
|
||||
|
||||
fDDSCustomCmd.subscribe([id, pid, this](const string& cmd, const string& cond, uint64_t senderId)
|
||||
{
|
||||
fDDSCustomCmd.subscribe([id, pid, this](const string& cmd, const string& cond, uint64_t senderId) {
|
||||
LOG(info) << "Received command: " << cmd;
|
||||
|
||||
if (cmd == "check-state")
|
||||
{
|
||||
if (cmd == "check-state") {
|
||||
fDDSCustomCmd.send(id + ": " + ToStr(GetCurrentDeviceState()) + " (pid: " + pid + ")", to_string(senderId));
|
||||
}
|
||||
else if (fCommands.find(cmd) != fCommands.end())
|
||||
{
|
||||
} else if (fCommands.find(cmd) != fCommands.end()) {
|
||||
fDDSCustomCmd.send(id + ": attempting to " + cmd, to_string(senderId));
|
||||
ChangeDeviceState(ToDeviceStateTransition(cmd));
|
||||
}
|
||||
else if (cmd == "END")
|
||||
{
|
||||
} else if (cmd == "END") {
|
||||
fDDSCustomCmd.send(id + ": attempting to " + cmd, to_string(senderId));
|
||||
ChangeDeviceState(ToDeviceStateTransition(cmd));
|
||||
fDDSCustomCmd.send(id + ": " + ToStr(GetCurrentDeviceState()), to_string(senderId));
|
||||
if (ToStr(GetCurrentDeviceState()) == "EXITING")
|
||||
{
|
||||
if (ToStr(GetCurrentDeviceState()) == "EXITING") {
|
||||
unique_lock<mutex> lock(fStopMutex);
|
||||
fStopCondition.notify_one();
|
||||
}
|
||||
}
|
||||
else if (cmd == "dump-config")
|
||||
{
|
||||
} else if (cmd == "dump-config") {
|
||||
stringstream ss;
|
||||
for (const auto pKey: GetPropertyKeys())
|
||||
{
|
||||
for (const auto pKey: GetPropertyKeys()) {
|
||||
ss << id << ": " << pKey << " -> " << GetPropertyAsString(pKey) << endl;
|
||||
}
|
||||
fDDSCustomCmd.send(ss.str(), to_string(senderId));
|
||||
}
|
||||
else if (cmd == "subscribe-to-heartbeats")
|
||||
{
|
||||
} else if (cmd == "subscribe-to-heartbeats") {
|
||||
{
|
||||
// auto size = fHeartbeatSubscribers.size();
|
||||
lock_guard<mutex> lock{fHeartbeatSubscriberMutex};
|
||||
fHeartbeatSubscribers.insert(senderId);
|
||||
}
|
||||
fDDSCustomCmd.send("heartbeat-subscription: " + id + ",OK", to_string(senderId));
|
||||
}
|
||||
else if (cmd == "unsubscribe-from-heartbeats")
|
||||
{
|
||||
} else if (cmd == "unsubscribe-from-heartbeats") {
|
||||
{
|
||||
lock_guard<mutex> lock{fHeartbeatSubscriberMutex};
|
||||
fHeartbeatSubscribers.erase(senderId);
|
||||
}
|
||||
fDDSCustomCmd.send("heartbeat-unsubscription: " + id + ",OK", to_string(senderId));
|
||||
}
|
||||
else if (cmd == "subscribe-to-state-changes")
|
||||
{
|
||||
} else if (cmd == "subscribe-to-state-changes") {
|
||||
{
|
||||
// auto size = fStateChangeSubscribers.size();
|
||||
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
|
||||
@@ -387,17 +345,13 @@ auto DDS::SubscribeForCustomCommands() -> void
|
||||
auto state = GetCurrentDeviceState();
|
||||
LOG(debug) << "Publishing state-change: " << state << " to " << senderId;
|
||||
fDDSCustomCmd.send("state-change: " + id + "," + ToStr(state), to_string(senderId));
|
||||
}
|
||||
else if (cmd == "unsubscribe-from-state-changes")
|
||||
{
|
||||
} else if (cmd == "unsubscribe-from-state-changes") {
|
||||
{
|
||||
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
|
||||
fStateChangeSubscribers.erase(senderId);
|
||||
}
|
||||
fDDSCustomCmd.send("state-changes-unsubscription: " + id + ",OK", to_string(senderId));
|
||||
}
|
||||
else
|
||||
{
|
||||
} else {
|
||||
LOG(warn) << "Unknown command: " << cmd;
|
||||
LOG(warn) << "Origin: " << senderId;
|
||||
LOG(warn) << "Destination: " << cond;
|
||||
@@ -408,8 +362,7 @@ auto DDS::SubscribeForCustomCommands() -> void
|
||||
auto DDS::WaitForNextState() -> DeviceState
|
||||
{
|
||||
unique_lock<mutex> lock{fEventsMutex};
|
||||
while (fEvents.empty())
|
||||
{
|
||||
while (fEvents.empty()) {
|
||||
fNewEvent.wait(lock);
|
||||
}
|
||||
|
||||
@@ -420,13 +373,11 @@ auto DDS::WaitForNextState() -> DeviceState
|
||||
|
||||
DDS::~DDS()
|
||||
{
|
||||
if (fControllerThread.joinable())
|
||||
{
|
||||
if (fControllerThread.joinable()) {
|
||||
fControllerThread.join();
|
||||
}
|
||||
|
||||
if (fHeartbeatThread.joinable())
|
||||
{
|
||||
if (fHeartbeatThread.joinable()) {
|
||||
fHeartbeatThread.join();
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user