diff --git a/fairmq/options/FairProgOptionsHelper.h b/fairmq/options/FairProgOptionsHelper.h index 6ca32b70..10bb0c9c 100644 --- a/fairmq/options/FairProgOptionsHelper.h +++ b/fairmq/options/FairProgOptionsHelper.h @@ -199,6 +199,12 @@ struct ConvertVariableValue : T if (typeIs>(varValue)) return T::template Value>(varValue, std::string(">"), defaulted, empty); + if (typeIs(varValue)) + return T::template Value(varValue, std::string(""), defaulted, empty); + + if (typeIs>(varValue)) + return T::template Value>(varValue, std::string(">"), defaulted, empty); + if (typeIs(varValue)) return T::template Value(varValue, std::string(""), defaulted, empty); diff --git a/fairmq/plugins/DDS/DDS.cxx b/fairmq/plugins/DDS/DDS.cxx index 47e775d5..50532e83 100644 --- a/fairmq/plugins/DDS/DDS.cxx +++ b/fairmq/plugins/DDS/DDS.cxx @@ -8,6 +8,8 @@ #include "DDS.h" +#include + #include #include @@ -16,8 +18,10 @@ #include #include #include +#include using namespace std; +using fair::mq::tools::ToString; namespace fair { @@ -154,32 +158,65 @@ auto DDS::HandleControl() -> void auto DDS::FillChannelContainers() -> void { - unordered_map channelInfo(GetChannelInfo()); - for (const auto& c : channelInfo) - { - string methodKey{"chans." + c.first + "." + to_string(c.second - 1) + ".method"}; - if (GetProperty(methodKey) == "bind") - { - fBindingChans.insert(make_pair(c.first, vector())); - for (int i = 0; i < c.second; ++i) - { - fBindingChans.at(c.first).push_back(GetProperty(string{"chans." + c.first + "." + to_string(i) + ".address"})); + try { + unordered_map channelInfo(GetChannelInfo()); + + // fill binding and connecting chans + for (const auto& c : channelInfo) { + string methodKey{"chans." + c.first + "." + to_string(c.second - 1) + ".method"}; + if (GetProperty(methodKey) == "bind") { + fBindingChans.insert(make_pair(c.first, vector())); + for (int i = 0; i < c.second; ++i) { + fBindingChans.at(c.first).push_back(GetProperty(string{"chans." + c.first + "." + to_string(i) + ".address"})); + } + } else if (GetProperty(methodKey) == "connect") { + fConnectingChans.insert(make_pair(c.first, DDSConfig())); + LOG(debug) << "preparing to connect: " << c.first << " with " << c.second << " sub-channels."; + for (int i = 0; i < c.second; ++i) { + fConnectingChans.at(c.first).fSubChannelAddresses.push_back(string()); + } + } else { + LOG(error) << "Cannot update address configuration. Channel method (bind/connect) not specified."; + return; } } - else if (GetProperty(methodKey) == "connect") - { - fConnectingChans.insert(make_pair(c.first, DDSConfig())); - LOG(debug) << "preparing to connect: " << c.first << " with " << c.second << " sub-channels."; - for (int i = 0; i < c.second; ++i) - { - fConnectingChans.at(c.first).fSubChannelAddresses.push_back(string()); + + // save properties that will have multiple values arriving (with only some of them to be used) + vector iValues = GetProperty>("dds-i"); + vector inValues = GetProperty>("dds-i-n"); + + for (const auto& vi : iValues) { + size_t pos = vi.find(":"); + string chanName = vi.substr(0, pos ); + + // check if provided name is a valid channel name + if (fConnectingChans.find(chanName) == fConnectingChans.end()) { + throw invalid_argument(ToString("channel provided to dds-i is not an actual connecting channel of this device: ", chanName)); } + + int i = stoi(vi.substr(pos + 1)); + LOG(debug) << "dds-i: adding " << chanName << " -> i of " << i; + fI.insert(make_pair(chanName, i)); } - else - { - LOG(error) << "Cannot update address configuration. Channel method (bind/connect) not specified."; - return; + + for (const auto& vi : inValues) { + size_t pos = vi.find(":"); + string chanName = vi.substr(0, pos); + + // check if provided name is a valid channel name + if (fConnectingChans.find(chanName) == fConnectingChans.end()) { + throw invalid_argument(ToString("channel provided to dds-i-n is not an actual connecting channel of this device: ", chanName)); + } + + string i_n = vi.substr(pos + 1); + pos = i_n.find("-"); + int i = stoi(i_n.substr(0, pos)); + int n = stoi(i_n.substr(pos + 1)); + LOG(debug) << "dds-i-n: adding " << chanName << " -> i: " << i << " n: " << n; + fIofN.insert(make_pair(chanName, IofN(i, n))); } + } catch (const exception& e) { + LOG(error) << "Error filling channel containers: " << e.what(); } } @@ -190,25 +227,33 @@ auto DDS::SubscribeForConnectingChannels() -> void try { LOG(debug) << "Received update for " << propertyId << ": key=" << key << " value=" << value; - vector values; - boost::algorithm::split(values, value, boost::algorithm::is_any_of(",")); - if (values.size() > 1) // multiple bound channels received - { - int taskIndex = GetProperty("dds-i"); - if (taskIndex != -1) - { - LOG(debug) << "adding connecting channel " << key << " : " << values.at(taskIndex); - fConnectingChans.at(propertyId).fDDSValues.insert(make_pair(key.c_str(), values.at(taskIndex).c_str())); - } - else - { - LOG(error) << "multiple bound channels received, but no task index specified, only assigning the first"; - fConnectingChans.at(propertyId).fDDSValues.insert(make_pair(key.c_str(), values.at(0).c_str())); + string val = value; + // check if it is to handle as one out of multiple values + auto it = fIofN.find(propertyId); + if (it != fIofN.end()) { + it->second.fEntries.push_back(value); + if (it->second.fEntries.size() == it->second.fN) { + sort(it->second.fEntries.begin(), it->second.fEntries.end()); + val = it->second.fEntries.at(it->second.fI); + } else { + LOG(debug) << "received " << it->second.fEntries.size() << " values for " << propertyId << ", expecting total of " << it->second.fN; + return; } } - else // only one bound channel received - { - fConnectingChans.at(propertyId).fDDSValues.insert(make_pair(key.c_str(), value.c_str())); + + vector connectionStrings; + boost::algorithm::split(connectionStrings, val, boost::algorithm::is_any_of(",")); + if (connectionStrings.size() > 1) { // multiple bound channels received + auto it2 = fI.find(propertyId); + if (it2 != fI.end()) { + LOG(debug) << "adding connecting channel " << propertyId << " : " << connectionStrings.at(it2->second); + fConnectingChans.at(propertyId).fDDSValues.insert(make_pair(key.c_str(), connectionStrings.at(it2->second).c_str())); + } else { + LOG(error) << "multiple bound channels received, but no task index specified, only assigning the first"; + fConnectingChans.at(propertyId).fDDSValues.insert(make_pair(key.c_str(), connectionStrings.at(0).c_str())); + } + } else { // only one bound channel received + fConnectingChans.at(propertyId).fDDSValues.insert(make_pair(key.c_str(), val.c_str())); } // update channels and remove them from unfinished container @@ -218,11 +263,11 @@ auto DDS::SubscribeForConnectingChannels() -> void { // when multiple subChannels are used, their order on every device should be the same, irregardless of arrival order from DDS. sort(mi->second.fSubChannelAddresses.begin(), mi->second.fSubChannelAddresses.end()); - auto it = mi->second.fDDSValues.begin(); + auto it3 = mi->second.fDDSValues.begin(); for (unsigned int i = 0; i < mi->second.fSubChannelAddresses.size(); ++i) { - SetProperty(string{"chans." + mi->first + "." + to_string(i) + ".address"}, it->second); - ++it; + SetProperty(string{"chans." + mi->first + "." + to_string(i) + ".address"}, it3->second); + ++it3; } fConnectingChans.erase(mi++); } diff --git a/fairmq/plugins/DDS/DDS.h b/fairmq/plugins/DDS/DDS.h index 2bb0eb76..b4d230d5 100644 --- a/fairmq/plugins/DDS/DDS.h +++ b/fairmq/plugins/DDS/DDS.h @@ -44,6 +44,20 @@ struct DDSConfig std::unordered_map fDDSValues; }; +struct IofN +{ + IofN(int i, int n) + : fI(i) + , fN(n) + , fEntries() + {} + + int fI; + int fN; + std::vector fEntries; + +}; + class DDS : public Plugin { public: @@ -69,6 +83,9 @@ class DDS : public Plugin std::unordered_map> fBindingChans; std::unordered_map fConnectingChans; + std::unordered_map fI; + std::unordered_map fIofN; + std::mutex fStopMutex; std::condition_variable fStopCondition; @@ -94,7 +111,9 @@ Plugin::ProgOptions DDSProgramOptions() { boost::program_options::options_description options{"DDS Plugin"}; options.add_options() - ("dds-i", boost::program_options::value()->default_value(-1), "Task index for chosing connection target (single channel n to m)."); + ("dds-i", boost::program_options::value>()->multitoken()->composing(), "Task index for chosing connection target (single channel n to m). When all values come via same update.") + ("dds-i-n", boost::program_options::value>()->multitoken()->composing(), "Task index for chosing connection target (one out of n values to take). When values come as independent updates."); + return options; }