diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index 8131c5e9..933c9d3f 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -714,7 +714,7 @@ bool FairMQDevice::HandleMsgInput(const string& chName, const InputMsgCallback& if (Receive(input, chName, i) >= 0) { - return callback(input, 0); + return callback(input, i); } else { diff --git a/fairmq/options/FairMQSuboptParser.cxx b/fairmq/options/FairMQSuboptParser.cxx index d4d22a1a..cbc204ff 100644 --- a/fairmq/options/FairMQSuboptParser.cxx +++ b/fairmq/options/FairMQSuboptParser.cxx @@ -59,7 +59,7 @@ FairMQMap SUBOPT::UserParser(const vector& channelConfig, const string& } else if (subopt >= 0 && value != nullptr) { - socketProperties.put(channelOptionKeys[subopt], value); + channelProperties.put(channelOptionKeys[subopt], value); } } diff --git a/fairmq/options/FairMQSuboptParser.h b/fairmq/options/FairMQSuboptParser.h index 5bb5ffe5..f09ae99a 100644 --- a/fairmq/options/FairMQSuboptParser.h +++ b/fairmq/options/FairMQSuboptParser.h @@ -59,6 +59,7 @@ struct SUBOPT SNDKERNELSIZE, RCVKERNELSIZE, RATELOGGING, // logging rate + NUMSOCKETS, lastsocketkey }; @@ -73,6 +74,7 @@ struct SUBOPT /*[SNDKERNELSIZE] = */ "sndKernelSize", /*[RCVKERNELSIZE] = */ "rcvKernelSize", /*[RATELOGGING] = */ "rateLogging", + /*[NUMSOCKETS] = */ "numSockets", nullptr }; diff --git a/fairmq/plugins/DDS/DDS.cxx b/fairmq/plugins/DDS/DDS.cxx index 4ba8f3a5..481552af 100644 --- a/fairmq/plugins/DDS/DDS.cxx +++ b/fairmq/plugins/DDS/DDS.cxx @@ -8,10 +8,14 @@ #include "DDS.h" +#include +#include + #include // for the interactive mode #include // for the interactive mode #include #include +#include using namespace std; @@ -101,6 +105,13 @@ auto DDS::HandleControl() -> void // and propagate addresses of bound channels to DDS. FillChannelContainers(); + LOG(DEBUG) << "$DDS_TASK_PATH: " << getenv("DDS_TASK_PATH"); + LOG(DEBUG) << "$DDS_GROUP_NAME: " << getenv("DDS_GROUP_NAME"); + LOG(DEBUG) << "$DDS_COLLECTION_NAME: " << getenv("DDS_COLLECTION_NAME"); + LOG(DEBUG) << "$DDS_TASK_NAME: " << getenv("DDS_TASK_NAME"); + LOG(DEBUG) << "$DDS_TASK_INDEX: " << getenv("DDS_TASK_INDEX"); + LOG(DEBUG) << "$DDS_COLLECTION_INDEX: " << getenv("DDS_COLLECTION_INDEX"); + // start DDS service - subscriptions will only start firing after this step fService.start(); @@ -147,13 +158,12 @@ auto DDS::FillChannelContainers() -> void for (const auto& c : channelInfo) { string methodKey{"chans." + c.first + "." + to_string(c.second - 1) + ".method"}; - string addressKey{"chans." + c.first + "." + to_string(c.second - 1) + ".address"}; if (GetProperty(methodKey) == "bind") { fBindingChans.insert(make_pair(c.first, vector())); for (int i = 0; i < c.second; ++i) { - fBindingChans.at(c.first).push_back(GetProperty(addressKey)); + fBindingChans.at(c.first).push_back(GetProperty(string{"chans." + c.first + "." + to_string(i) + ".address"})); } } else if (GetProperty(methodKey) == "connect") @@ -180,7 +190,26 @@ auto DDS::SubscribeForConnectingChannels() -> void try { LOG(debug) << "Received update for " << propertyId << ": key=" << key << " value=" << value; - fConnectingChans.at(propertyId).fDDSValues.insert(make_pair(key.c_str(), value.c_str())); + vector values; + boost::algorithm::split(values, value, boost::algorithm::is_any_of(",")); + if (values.size() > 1) // multiple bound channels received + { + int taskIndex = GetProperty("dds-i"); + if (taskIndex != -1) + { + LOG(debug) << "adding connecting channel " << key << " : " << values.at(taskIndex); + fConnectingChans.at(propertyId).fDDSValues.insert(make_pair(key.c_str(), values.at(taskIndex).c_str())); + } + else + { + LOG(error) << "multiple bound channels received, but no task index specified, only assigning the first"; + fConnectingChans.at(propertyId).fDDSValues.insert(make_pair(key.c_str(), values.at(0).c_str())); + } + } + else // only one bound channel received + { + fConnectingChans.at(propertyId).fDDSValues.insert(make_pair(key.c_str(), value.c_str())); + } // update channels and remove them from unfinished container for (auto mi = fConnectingChans.begin(); mi != fConnectingChans.end(); /* no increment */) @@ -192,8 +221,7 @@ auto DDS::SubscribeForConnectingChannels() -> void auto it = mi->second.fDDSValues.begin(); for (unsigned int i = 0; i < mi->second.fSubChannelAddresses.size(); ++i) { - string k = "chans." + mi->first + "." + to_string(i) + ".address"; - SetProperty(k, it->second); + SetProperty(string{"chans." + mi->first + "." + to_string(i) + ".address"}, it->second); ++it; } fConnectingChans.erase(mi++); @@ -215,13 +243,9 @@ auto DDS::PublishBoundChannels() -> void { for (const auto& chan : fBindingChans) { - unsigned int index = 0; - for (const auto& i : chan.second) - { - LOG(debug) << "Publishing " << chan.first << "[" << index << "] address to DDS under '" << chan.first << "' property name."; - fDDSKeyValue.putValue(chan.first, i); - ++index; - } + string joined = boost::algorithm::join(chan.second, ","); + LOG(debug) << "Publishing " << chan.first << " bound addresses (" << chan.second.size() << ") to DDS under '" << chan.first << "' property name."; + fDDSKeyValue.putValue(chan.first, joined); } } @@ -287,7 +311,7 @@ auto DDS::SubscribeForCustomCommands() -> void { { // auto size = fHeartbeatSubscribers.size(); - std::lock_guard lock{fHeartbeatSubscriberMutex}; + lock_guard lock{fHeartbeatSubscriberMutex}; fHeartbeatSubscribers.insert(senderId); } fDDSCustomCmd.send("heartbeat-subscription: " + id + ",OK", to_string(senderId)); @@ -295,7 +319,7 @@ auto DDS::SubscribeForCustomCommands() -> void else if (cmd == "unsubscribe-from-heartbeats") { { - std::lock_guard lock{fHeartbeatSubscriberMutex}; + lock_guard lock{fHeartbeatSubscriberMutex}; fHeartbeatSubscribers.erase(senderId); } fDDSCustomCmd.send("heartbeat-unsubscription: " + id + ",OK", to_string(senderId)); @@ -304,7 +328,7 @@ auto DDS::SubscribeForCustomCommands() -> void { { // auto size = fStateChangeSubscribers.size(); - std::lock_guard lock{fStateChangeSubscriberMutex}; + lock_guard lock{fStateChangeSubscriberMutex}; fStateChangeSubscribers.insert(senderId); } fDDSCustomCmd.send("state-changes-subscription: " + id + ",OK", to_string(senderId)); @@ -315,7 +339,7 @@ auto DDS::SubscribeForCustomCommands() -> void else if (cmd == "unsubscribe-from-state-changes") { { - std::lock_guard lock{fStateChangeSubscriberMutex}; + lock_guard lock{fStateChangeSubscriberMutex}; fStateChangeSubscribers.erase(senderId); } fDDSCustomCmd.send("state-changes-unsubscription: " + id + ",OK", to_string(senderId)); diff --git a/fairmq/plugins/DDS/DDS.h b/fairmq/plugins/DDS/DDS.h index 335dea7a..2bb0eb76 100644 --- a/fairmq/plugins/DDS/DDS.h +++ b/fairmq/plugins/DDS/DDS.h @@ -90,13 +90,21 @@ class DDS : public Plugin std::chrono::milliseconds fHeartbeatInterval; }; +Plugin::ProgOptions DDSProgramOptions() +{ + boost::program_options::options_description options{"DDS Plugin"}; + options.add_options() + ("dds-i", boost::program_options::value()->default_value(-1), "Task index for chosing connection target (single channel n to m)."); + return options; +} + REGISTER_FAIRMQ_PLUGIN( DDS, // Class name dds, // Plugin name (string, lower case chars only) (Plugin::Version{1,0,0}), // Version "FairRootGroup ", // Maintainer "https://github.com/FairRootGroup/FairRoot", // Homepage - fair::mq::Plugin::NoProgramOptions // custom program options for the plugin + DDSProgramOptions // custom program options for the plugin ) } /* namespace plugins */