mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 08:41:16 +00:00
DDS plugin: handle n to m on single channel name case
This commit is contained in:
parent
13678f9f6d
commit
6e40011a18
|
@ -714,7 +714,7 @@ bool FairMQDevice::HandleMsgInput(const string& chName, const InputMsgCallback&
|
||||||
|
|
||||||
if (Receive(input, chName, i) >= 0)
|
if (Receive(input, chName, i) >= 0)
|
||||||
{
|
{
|
||||||
return callback(input, 0);
|
return callback(input, i);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
|
|
@ -59,7 +59,7 @@ FairMQMap SUBOPT::UserParser(const vector<string>& channelConfig, const string&
|
||||||
}
|
}
|
||||||
else if (subopt >= 0 && value != nullptr)
|
else if (subopt >= 0 && value != nullptr)
|
||||||
{
|
{
|
||||||
socketProperties.put(channelOptionKeys[subopt], value);
|
channelProperties.put(channelOptionKeys[subopt], value);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -59,6 +59,7 @@ struct SUBOPT
|
||||||
SNDKERNELSIZE,
|
SNDKERNELSIZE,
|
||||||
RCVKERNELSIZE,
|
RCVKERNELSIZE,
|
||||||
RATELOGGING, // logging rate
|
RATELOGGING, // logging rate
|
||||||
|
NUMSOCKETS,
|
||||||
lastsocketkey
|
lastsocketkey
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -73,6 +74,7 @@ struct SUBOPT
|
||||||
/*[SNDKERNELSIZE] = */ "sndKernelSize",
|
/*[SNDKERNELSIZE] = */ "sndKernelSize",
|
||||||
/*[RCVKERNELSIZE] = */ "rcvKernelSize",
|
/*[RCVKERNELSIZE] = */ "rcvKernelSize",
|
||||||
/*[RATELOGGING] = */ "rateLogging",
|
/*[RATELOGGING] = */ "rateLogging",
|
||||||
|
/*[NUMSOCKETS] = */ "numSockets",
|
||||||
nullptr
|
nullptr
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -8,10 +8,14 @@
|
||||||
|
|
||||||
#include "DDS.h"
|
#include "DDS.h"
|
||||||
|
|
||||||
|
#include <boost/algorithm/string/join.hpp>
|
||||||
|
#include <boost/algorithm/string/split.hpp>
|
||||||
|
|
||||||
#include <termios.h> // for the interactive mode
|
#include <termios.h> // for the interactive mode
|
||||||
#include <poll.h> // for the interactive mode
|
#include <poll.h> // for the interactive mode
|
||||||
#include <sstream>
|
#include <sstream>
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
|
#include <cstdlib>
|
||||||
|
|
||||||
using namespace std;
|
using namespace std;
|
||||||
|
|
||||||
|
@ -101,6 +105,13 @@ auto DDS::HandleControl() -> void
|
||||||
// and propagate addresses of bound channels to DDS.
|
// and propagate addresses of bound channels to DDS.
|
||||||
FillChannelContainers();
|
FillChannelContainers();
|
||||||
|
|
||||||
|
LOG(DEBUG) << "$DDS_TASK_PATH: " << getenv("DDS_TASK_PATH");
|
||||||
|
LOG(DEBUG) << "$DDS_GROUP_NAME: " << getenv("DDS_GROUP_NAME");
|
||||||
|
LOG(DEBUG) << "$DDS_COLLECTION_NAME: " << getenv("DDS_COLLECTION_NAME");
|
||||||
|
LOG(DEBUG) << "$DDS_TASK_NAME: " << getenv("DDS_TASK_NAME");
|
||||||
|
LOG(DEBUG) << "$DDS_TASK_INDEX: " << getenv("DDS_TASK_INDEX");
|
||||||
|
LOG(DEBUG) << "$DDS_COLLECTION_INDEX: " << getenv("DDS_COLLECTION_INDEX");
|
||||||
|
|
||||||
// start DDS service - subscriptions will only start firing after this step
|
// start DDS service - subscriptions will only start firing after this step
|
||||||
fService.start();
|
fService.start();
|
||||||
|
|
||||||
|
@ -147,13 +158,12 @@ auto DDS::FillChannelContainers() -> void
|
||||||
for (const auto& c : channelInfo)
|
for (const auto& c : channelInfo)
|
||||||
{
|
{
|
||||||
string methodKey{"chans." + c.first + "." + to_string(c.second - 1) + ".method"};
|
string methodKey{"chans." + c.first + "." + to_string(c.second - 1) + ".method"};
|
||||||
string addressKey{"chans." + c.first + "." + to_string(c.second - 1) + ".address"};
|
|
||||||
if (GetProperty<string>(methodKey) == "bind")
|
if (GetProperty<string>(methodKey) == "bind")
|
||||||
{
|
{
|
||||||
fBindingChans.insert(make_pair(c.first, vector<string>()));
|
fBindingChans.insert(make_pair(c.first, vector<string>()));
|
||||||
for (int i = 0; i < c.second; ++i)
|
for (int i = 0; i < c.second; ++i)
|
||||||
{
|
{
|
||||||
fBindingChans.at(c.first).push_back(GetProperty<string>(addressKey));
|
fBindingChans.at(c.first).push_back(GetProperty<string>(string{"chans." + c.first + "." + to_string(i) + ".address"}));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (GetProperty<string>(methodKey) == "connect")
|
else if (GetProperty<string>(methodKey) == "connect")
|
||||||
|
@ -180,7 +190,26 @@ auto DDS::SubscribeForConnectingChannels() -> void
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
LOG(debug) << "Received update for " << propertyId << ": key=" << key << " value=" << value;
|
LOG(debug) << "Received update for " << propertyId << ": key=" << key << " value=" << value;
|
||||||
fConnectingChans.at(propertyId).fDDSValues.insert(make_pair<string, string>(key.c_str(), value.c_str()));
|
vector<string> values;
|
||||||
|
boost::algorithm::split(values, value, boost::algorithm::is_any_of(","));
|
||||||
|
if (values.size() > 1) // multiple bound channels received
|
||||||
|
{
|
||||||
|
int taskIndex = GetProperty<int>("dds-i");
|
||||||
|
if (taskIndex != -1)
|
||||||
|
{
|
||||||
|
LOG(debug) << "adding connecting channel " << key << " : " << values.at(taskIndex);
|
||||||
|
fConnectingChans.at(propertyId).fDDSValues.insert(make_pair<string, string>(key.c_str(), values.at(taskIndex).c_str()));
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
LOG(error) << "multiple bound channels received, but no task index specified, only assigning the first";
|
||||||
|
fConnectingChans.at(propertyId).fDDSValues.insert(make_pair<string, string>(key.c_str(), values.at(0).c_str()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else // only one bound channel received
|
||||||
|
{
|
||||||
|
fConnectingChans.at(propertyId).fDDSValues.insert(make_pair<string, string>(key.c_str(), value.c_str()));
|
||||||
|
}
|
||||||
|
|
||||||
// update channels and remove them from unfinished container
|
// update channels and remove them from unfinished container
|
||||||
for (auto mi = fConnectingChans.begin(); mi != fConnectingChans.end(); /* no increment */)
|
for (auto mi = fConnectingChans.begin(); mi != fConnectingChans.end(); /* no increment */)
|
||||||
|
@ -192,8 +221,7 @@ auto DDS::SubscribeForConnectingChannels() -> void
|
||||||
auto it = mi->second.fDDSValues.begin();
|
auto it = mi->second.fDDSValues.begin();
|
||||||
for (unsigned int i = 0; i < mi->second.fSubChannelAddresses.size(); ++i)
|
for (unsigned int i = 0; i < mi->second.fSubChannelAddresses.size(); ++i)
|
||||||
{
|
{
|
||||||
string k = "chans." + mi->first + "." + to_string(i) + ".address";
|
SetProperty<string>(string{"chans." + mi->first + "." + to_string(i) + ".address"}, it->second);
|
||||||
SetProperty<string>(k, it->second);
|
|
||||||
++it;
|
++it;
|
||||||
}
|
}
|
||||||
fConnectingChans.erase(mi++);
|
fConnectingChans.erase(mi++);
|
||||||
|
@ -215,13 +243,9 @@ auto DDS::PublishBoundChannels() -> void
|
||||||
{
|
{
|
||||||
for (const auto& chan : fBindingChans)
|
for (const auto& chan : fBindingChans)
|
||||||
{
|
{
|
||||||
unsigned int index = 0;
|
string joined = boost::algorithm::join(chan.second, ",");
|
||||||
for (const auto& i : chan.second)
|
LOG(debug) << "Publishing " << chan.first << " bound addresses (" << chan.second.size() << ") to DDS under '" << chan.first << "' property name.";
|
||||||
{
|
fDDSKeyValue.putValue(chan.first, joined);
|
||||||
LOG(debug) << "Publishing " << chan.first << "[" << index << "] address to DDS under '" << chan.first << "' property name.";
|
|
||||||
fDDSKeyValue.putValue(chan.first, i);
|
|
||||||
++index;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -287,7 +311,7 @@ auto DDS::SubscribeForCustomCommands() -> void
|
||||||
{
|
{
|
||||||
{
|
{
|
||||||
// auto size = fHeartbeatSubscribers.size();
|
// auto size = fHeartbeatSubscribers.size();
|
||||||
std::lock_guard<std::mutex> lock{fHeartbeatSubscriberMutex};
|
lock_guard<mutex> lock{fHeartbeatSubscriberMutex};
|
||||||
fHeartbeatSubscribers.insert(senderId);
|
fHeartbeatSubscribers.insert(senderId);
|
||||||
}
|
}
|
||||||
fDDSCustomCmd.send("heartbeat-subscription: " + id + ",OK", to_string(senderId));
|
fDDSCustomCmd.send("heartbeat-subscription: " + id + ",OK", to_string(senderId));
|
||||||
|
@ -295,7 +319,7 @@ auto DDS::SubscribeForCustomCommands() -> void
|
||||||
else if (cmd == "unsubscribe-from-heartbeats")
|
else if (cmd == "unsubscribe-from-heartbeats")
|
||||||
{
|
{
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> lock{fHeartbeatSubscriberMutex};
|
lock_guard<mutex> lock{fHeartbeatSubscriberMutex};
|
||||||
fHeartbeatSubscribers.erase(senderId);
|
fHeartbeatSubscribers.erase(senderId);
|
||||||
}
|
}
|
||||||
fDDSCustomCmd.send("heartbeat-unsubscription: " + id + ",OK", to_string(senderId));
|
fDDSCustomCmd.send("heartbeat-unsubscription: " + id + ",OK", to_string(senderId));
|
||||||
|
@ -304,7 +328,7 @@ auto DDS::SubscribeForCustomCommands() -> void
|
||||||
{
|
{
|
||||||
{
|
{
|
||||||
// auto size = fStateChangeSubscribers.size();
|
// auto size = fStateChangeSubscribers.size();
|
||||||
std::lock_guard<std::mutex> lock{fStateChangeSubscriberMutex};
|
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
|
||||||
fStateChangeSubscribers.insert(senderId);
|
fStateChangeSubscribers.insert(senderId);
|
||||||
}
|
}
|
||||||
fDDSCustomCmd.send("state-changes-subscription: " + id + ",OK", to_string(senderId));
|
fDDSCustomCmd.send("state-changes-subscription: " + id + ",OK", to_string(senderId));
|
||||||
|
@ -315,7 +339,7 @@ auto DDS::SubscribeForCustomCommands() -> void
|
||||||
else if (cmd == "unsubscribe-from-state-changes")
|
else if (cmd == "unsubscribe-from-state-changes")
|
||||||
{
|
{
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> lock{fStateChangeSubscriberMutex};
|
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
|
||||||
fStateChangeSubscribers.erase(senderId);
|
fStateChangeSubscribers.erase(senderId);
|
||||||
}
|
}
|
||||||
fDDSCustomCmd.send("state-changes-unsubscription: " + id + ",OK", to_string(senderId));
|
fDDSCustomCmd.send("state-changes-unsubscription: " + id + ",OK", to_string(senderId));
|
||||||
|
|
|
@ -90,13 +90,21 @@ class DDS : public Plugin
|
||||||
std::chrono::milliseconds fHeartbeatInterval;
|
std::chrono::milliseconds fHeartbeatInterval;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
Plugin::ProgOptions DDSProgramOptions()
|
||||||
|
{
|
||||||
|
boost::program_options::options_description options{"DDS Plugin"};
|
||||||
|
options.add_options()
|
||||||
|
("dds-i", boost::program_options::value<int>()->default_value(-1), "Task index for chosing connection target (single channel n to m).");
|
||||||
|
return options;
|
||||||
|
}
|
||||||
|
|
||||||
REGISTER_FAIRMQ_PLUGIN(
|
REGISTER_FAIRMQ_PLUGIN(
|
||||||
DDS, // Class name
|
DDS, // Class name
|
||||||
dds, // Plugin name (string, lower case chars only)
|
dds, // Plugin name (string, lower case chars only)
|
||||||
(Plugin::Version{1,0,0}), // Version
|
(Plugin::Version{1,0,0}), // Version
|
||||||
"FairRootGroup <fairroot@gsi.de>", // Maintainer
|
"FairRootGroup <fairroot@gsi.de>", // Maintainer
|
||||||
"https://github.com/FairRootGroup/FairRoot", // Homepage
|
"https://github.com/FairRootGroup/FairRoot", // Homepage
|
||||||
fair::mq::Plugin::NoProgramOptions // custom program options for the plugin
|
DDSProgramOptions // custom program options for the plugin
|
||||||
)
|
)
|
||||||
|
|
||||||
} /* namespace plugins */
|
} /* namespace plugins */
|
||||||
|
|
Loading…
Reference in New Issue
Block a user