mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 16:46:47 +00:00
DDS Plugin: Simplify subchannel bookkeeping
This commit is contained in:
parent
b05782af16
commit
539b088ade
|
@ -165,9 +165,7 @@ auto DDS::FillChannelContainers() -> void
|
||||||
} else if (GetProperty<string>(methodKey) == "connect") {
|
} else if (GetProperty<string>(methodKey) == "connect") {
|
||||||
fConnectingChans.insert(make_pair(c.first, DDSConfig()));
|
fConnectingChans.insert(make_pair(c.first, DDSConfig()));
|
||||||
LOG(debug) << "preparing to connect: " << c.first << " with " << c.second << " sub-channels.";
|
LOG(debug) << "preparing to connect: " << c.first << " with " << c.second << " sub-channels.";
|
||||||
for (int i = 0; i < c.second; ++i) {
|
fConnectingChans.at(c.first).fNumSubChannels = c.second;
|
||||||
fConnectingChans.at(c.first).fSubChannelAddresses.push_back(string());
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
LOG(error) << "Cannot update address configuration. Channel method (bind/connect) not specified.";
|
LOG(error) << "Cannot update address configuration. Channel method (bind/connect) not specified.";
|
||||||
return;
|
return;
|
||||||
|
@ -273,19 +271,13 @@ auto DDS::SubscribeForConnectingChannels() -> void
|
||||||
fConnectingChans.at(channelName).fDDSValues.insert({senderTaskID, val.c_str()});
|
fConnectingChans.at(channelName).fDDSValues.insert({senderTaskID, val.c_str()});
|
||||||
}
|
}
|
||||||
|
|
||||||
// update channels and remove them from unfinished container
|
for (const auto& mi : fConnectingChans) {
|
||||||
for (auto mi = fConnectingChans.begin(); mi != fConnectingChans.end(); /* no increment */) {
|
if (mi.second.fNumSubChannels == mi.second.fDDSValues.size()) {
|
||||||
if (mi->second.fSubChannelAddresses.size() == mi->second.fDDSValues.size()) {
|
int i = 0;
|
||||||
// when multiple subChannels are used, their order on every device should be the same, irregardless of arrival order from DDS.
|
for (const auto& e : mi.second.fDDSValues) {
|
||||||
sort(mi->second.fSubChannelAddresses.begin(), mi->second.fSubChannelAddresses.end());
|
SetProperty<string>(string{"chans." + mi.first + "." + to_string(i) + ".address"}, e.second);
|
||||||
auto it3 = mi->second.fDDSValues.begin();
|
++i;
|
||||||
for (unsigned int i = 0; i < mi->second.fSubChannelAddresses.size(); ++i) {
|
|
||||||
SetProperty<string>(string{"chans." + mi->first + "." + to_string(i) + ".address"}, it3->second);
|
|
||||||
++it3;
|
|
||||||
}
|
}
|
||||||
fConnectingChans.erase(mi++);
|
|
||||||
} else {
|
|
||||||
++mi;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (const exception& e) {
|
} catch (const exception& e) {
|
||||||
|
|
|
@ -27,6 +27,7 @@
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <atomic>
|
#include <atomic>
|
||||||
#include <thread>
|
#include <thread>
|
||||||
|
#include <map>
|
||||||
#include <unordered_map>
|
#include <unordered_map>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
|
||||||
|
@ -40,9 +41,9 @@ namespace plugins
|
||||||
struct DDSConfig
|
struct DDSConfig
|
||||||
{
|
{
|
||||||
// container of sub channel addresses
|
// container of sub channel addresses
|
||||||
std::vector<std::string> fSubChannelAddresses;
|
unsigned int fNumSubChannels;
|
||||||
// dds values for the channel
|
// dds values for the channel
|
||||||
std::unordered_map<uint64_t, std::string> fDDSValues;
|
std::map<uint64_t, std::string> fDDSValues;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct DDSSubscription
|
struct DDSSubscription
|
||||||
|
|
Loading…
Reference in New Issue
Block a user