mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 08:41:16 +00:00
feat: Deprecate Device::fChannels
in preparation for #427
This commit is contained in:
parent
f699208e30
commit
5ef17fddbb
|
@ -1,5 +1,5 @@
|
||||||
/********************************************************************************
|
/********************************************************************************
|
||||||
* Copyright (C) 2012-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
* Copyright (C) 2012-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||||
* *
|
* *
|
||||||
* This software is distributed under the terms of the *
|
* This software is distributed under the terms of the *
|
||||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||||
|
@ -77,6 +77,9 @@ Device::Device(ProgOptions& config, tools::Version version)
|
||||||
: Device(&config, version)
|
: Device(&config, version)
|
||||||
{}
|
{}
|
||||||
|
|
||||||
|
/// TODO: Remove this once Device::fChannels is no longer public
|
||||||
|
#pragma GCC diagnostic push
|
||||||
|
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
|
||||||
Device::Device(ProgOptions* config, tools::Version version)
|
Device::Device(ProgOptions* config, tools::Version version)
|
||||||
: fTransportFactory(nullptr)
|
: fTransportFactory(nullptr)
|
||||||
, fInternalConfig(config ? nullptr : make_unique<ProgOptions>())
|
, fInternalConfig(config ? nullptr : make_unique<ProgOptions>())
|
||||||
|
@ -138,6 +141,7 @@ Device::Device(ProgOptions* config, tools::Version version)
|
||||||
|
|
||||||
fStateMachine.Start();
|
fStateMachine.Start();
|
||||||
}
|
}
|
||||||
|
#pragma GCC diagnostic pop
|
||||||
|
|
||||||
void Device::TransitionTo(State s)
|
void Device::TransitionTo(State s)
|
||||||
{
|
{
|
||||||
|
@ -229,7 +233,7 @@ void Device::InitWrapper()
|
||||||
unordered_map<string, int> infos = fConfig->GetChannelInfo();
|
unordered_map<string, int> infos = fConfig->GetChannelInfo();
|
||||||
for (const auto& info : infos) {
|
for (const auto& info : infos) {
|
||||||
for (int i = 0; i < info.second; ++i) {
|
for (int i = 0; i < info.second; ++i) {
|
||||||
fChannels[info.first].emplace_back(info.first, i, fConfig->GetPropertiesStartingWith(tools::ToString("chans.", info.first, ".", i, ".")));
|
GetChannels()[info.first].emplace_back(info.first, i, fConfig->GetPropertiesStartingWith(tools::ToString("chans.", info.first, ".", i, ".")));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -239,7 +243,7 @@ void Device::InitWrapper()
|
||||||
string networkInterface = fConfig->GetProperty<string>("network-interface", DefaultNetworkInterface);
|
string networkInterface = fConfig->GetProperty<string>("network-interface", DefaultNetworkInterface);
|
||||||
|
|
||||||
// Fill the uninitialized channel containers
|
// Fill the uninitialized channel containers
|
||||||
for (auto& channel : fChannels) {
|
for (auto& channel : GetChannels()) {
|
||||||
int subChannelIndex = 0;
|
int subChannelIndex = 0;
|
||||||
for (auto& subChannel : channel.second) {
|
for (auto& subChannel : channel.second) {
|
||||||
// set channel transport
|
// set channel transport
|
||||||
|
@ -330,7 +334,7 @@ void Device::ConnectWrapper()
|
||||||
AttachChannels(fUninitializedConnectingChannels);
|
AttachChannels(fUninitializedConnectingChannels);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (fChannels.empty()) {
|
if (GetChannels().empty()) {
|
||||||
LOG(warn) << "No channels created after finishing initialization";
|
LOG(warn) << "No channels created after finishing initialization";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -449,7 +453,7 @@ void Device::RunWrapper()
|
||||||
|
|
||||||
unique_ptr<thread> rateLogger;
|
unique_ptr<thread> rateLogger;
|
||||||
// Check if rate logging thread is needed
|
// Check if rate logging thread is needed
|
||||||
const bool rateLogging = any_of(fChannels.cbegin(), fChannels.cend(), [](auto ch) {
|
const bool rateLogging = any_of(GetChannels().cbegin(), GetChannels().cend(), [](auto ch) {
|
||||||
return any_of(ch.second.cbegin(), ch.second.cend(), [](auto sub) { return sub.fRateLogging > 0; });
|
return any_of(ch.second.cbegin(), ch.second.cend(), [](auto sub) { return sub.fRateLogging > 0; });
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -470,7 +474,7 @@ void Device::RunWrapper()
|
||||||
// process either data callbacks or ConditionalRun/Run
|
// process either data callbacks or ConditionalRun/Run
|
||||||
if (fDataCallbacks) {
|
if (fDataCallbacks) {
|
||||||
// if only one input channel, do lightweight handling without additional polling.
|
// if only one input channel, do lightweight handling without additional polling.
|
||||||
if (fInputChannelKeys.size() == 1 && fChannels.at(fInputChannelKeys.at(0)).size() == 1) {
|
if (fInputChannelKeys.size() == 1 && GetChannels().at(fInputChannelKeys.at(0)).size() == 1) {
|
||||||
HandleSingleChannelInput();
|
HandleSingleChannelInput();
|
||||||
} else {// otherwise do full handling with polling
|
} else {// otherwise do full handling with polling
|
||||||
HandleMultipleChannelInput();
|
HandleMultipleChannelInput();
|
||||||
|
@ -517,7 +521,7 @@ void Device::HandleMultipleChannelInput()
|
||||||
// check if more than one transport is used
|
// check if more than one transport is used
|
||||||
fMultitransportInputs.clear();
|
fMultitransportInputs.clear();
|
||||||
for (const auto& k : fInputChannelKeys) {
|
for (const auto& k : fInputChannelKeys) {
|
||||||
mq::Transport t = fChannels.at(k).at(0).fTransportType;
|
mq::Transport t = GetChannel(k, 0).fTransportType;
|
||||||
if (fMultitransportInputs.find(t) == fMultitransportInputs.end()) {
|
if (fMultitransportInputs.find(t) == fMultitransportInputs.end()) {
|
||||||
fMultitransportInputs.insert(pair<mq::Transport, vector<string>>(t, vector<string>()));
|
fMultitransportInputs.insert(pair<mq::Transport, vector<string>>(t, vector<string>()));
|
||||||
fMultitransportInputs.at(t).push_back(k);
|
fMultitransportInputs.at(t).push_back(k);
|
||||||
|
@ -527,13 +531,13 @@ void Device::HandleMultipleChannelInput()
|
||||||
}
|
}
|
||||||
|
|
||||||
for (const auto& mi : fMsgInputs) {
|
for (const auto& mi : fMsgInputs) {
|
||||||
for (auto& i : fChannels.at(mi.first)) {
|
for (auto& i : GetChannels().at(mi.first)) {
|
||||||
i.fMultipart = false;
|
i.fMultipart = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for (const auto& mi : fMultipartInputs) {
|
for (const auto& mi : fMultipartInputs) {
|
||||||
for (auto& i : fChannels.at(mi.first)) {
|
for (auto& i : GetChannels().at(mi.first)) {
|
||||||
i.fMultipart = true;
|
i.fMultipart = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -544,16 +548,16 @@ void Device::HandleMultipleChannelInput()
|
||||||
} else { // otherwise poll directly
|
} else { // otherwise poll directly
|
||||||
bool proceed = true;
|
bool proceed = true;
|
||||||
|
|
||||||
PollerPtr poller(fChannels.at(fInputChannelKeys.at(0)).at(0).fTransportFactory->CreatePoller(fChannels, fInputChannelKeys));
|
PollerPtr poller(GetChannel(fInputChannelKeys.at(0), 0).fTransportFactory->CreatePoller(GetChannels(), fInputChannelKeys));
|
||||||
|
|
||||||
while (!NewStatePending() && proceed) {
|
while (!NewStatePending() && proceed) {
|
||||||
poller->Poll(200);
|
poller->Poll(200);
|
||||||
|
|
||||||
// check which inputs are ready and call their data handlers if they are.
|
// check which inputs are ready and call their data handlers if they are.
|
||||||
for (const auto& ch : fInputChannelKeys) {
|
for (const auto& ch : fInputChannelKeys) {
|
||||||
for (unsigned int i = 0; i < fChannels.at(ch).size(); ++i) {
|
for (unsigned int i = 0; i < GetChannels().at(ch).size(); ++i) {
|
||||||
if (poller->CheckInput(ch, i)) {
|
if (poller->CheckInput(ch, i)) {
|
||||||
if (fChannels.at(ch).at(i).fMultipart) {
|
if (GetChannel(ch, i).fMultipart) {
|
||||||
proceed = HandleMultipartInput(ch, fMultipartInputs.at(ch), i);
|
proceed = HandleMultipartInput(ch, fMultipartInputs.at(ch), i);
|
||||||
} else {
|
} else {
|
||||||
proceed = HandleMsgInput(ch, fMsgInputs.at(ch), i);
|
proceed = HandleMsgInput(ch, fMsgInputs.at(ch), i);
|
||||||
|
@ -590,13 +594,13 @@ void Device::HandleMultipleTransportInput()
|
||||||
void Device::PollForTransport(const TransportFactory* factory, const vector<string>& channelKeys)
|
void Device::PollForTransport(const TransportFactory* factory, const vector<string>& channelKeys)
|
||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
PollerPtr poller(factory->CreatePoller(fChannels, channelKeys));
|
PollerPtr poller(factory->CreatePoller(GetChannels(), channelKeys));
|
||||||
|
|
||||||
while (!NewStatePending() && fMultitransportProceed) {
|
while (!NewStatePending() && fMultitransportProceed) {
|
||||||
poller->Poll(500);
|
poller->Poll(500);
|
||||||
|
|
||||||
for (const auto& ch : channelKeys) {
|
for (const auto& ch : channelKeys) {
|
||||||
for (unsigned int i = 0; i < fChannels.at(ch).size(); ++i) {
|
for (unsigned int i = 0; i < GetChannels().at(ch).size(); ++i) {
|
||||||
if (poller->CheckInput(ch, i)) {
|
if (poller->CheckInput(ch, i)) {
|
||||||
lock_guard<mutex> lock(fMultitransportMutex);
|
lock_guard<mutex> lock(fMultitransportMutex);
|
||||||
|
|
||||||
|
@ -604,7 +608,7 @@ void Device::PollForTransport(const TransportFactory* factory, const vector<stri
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (fChannels.at(ch).at(i).fMultipart) {
|
if (GetChannel(ch, i).fMultipart) {
|
||||||
fMultitransportProceed = HandleMultipartInput(ch, fMultipartInputs.at(ch), i);
|
fMultitransportProceed = HandleMultipartInput(ch, fMultipartInputs.at(ch), i);
|
||||||
} else {
|
} else {
|
||||||
fMultitransportProceed = HandleMsgInput(ch, fMsgInputs.at(ch), i);
|
fMultitransportProceed = HandleMsgInput(ch, fMsgInputs.at(ch), i);
|
||||||
|
@ -628,7 +632,7 @@ void Device::PollForTransport(const TransportFactory* factory, const vector<stri
|
||||||
|
|
||||||
bool Device::HandleMsgInput(const string& chName, const InputMsgCallback& callback, int i)
|
bool Device::HandleMsgInput(const string& chName, const InputMsgCallback& callback, int i)
|
||||||
{
|
{
|
||||||
unique_ptr<Message> input(fChannels.at(chName).at(i).fTransportFactory->CreateMessage());
|
unique_ptr<Message> input(GetChannel(chName, i).fTransportFactory->CreateMessage());
|
||||||
|
|
||||||
if (Receive(input, chName, i) >= 0) {
|
if (Receive(input, chName, i) >= 0) {
|
||||||
return callback(input, i);
|
return callback(input, i);
|
||||||
|
@ -685,7 +689,7 @@ void Device::LogSocketRates()
|
||||||
size_t chanNameLen = 0;
|
size_t chanNameLen = 0;
|
||||||
|
|
||||||
// iterate over the channels map
|
// iterate over the channels map
|
||||||
for (auto& channel : fChannels) {
|
for (auto& channel : GetChannels()) {
|
||||||
// iterate over the channels vector
|
// iterate over the channels vector
|
||||||
for (auto& subChannel : channel.second) {
|
for (auto& subChannel : channel.second) {
|
||||||
if (subChannel.fRateLogging > 0) {
|
if (subChannel.fRateLogging > 0) {
|
||||||
|
@ -806,18 +810,22 @@ void Device::ResetWrapper()
|
||||||
|
|
||||||
Reset();
|
Reset();
|
||||||
|
|
||||||
fChannels.clear();
|
GetChannels().clear();
|
||||||
fTransportFactory.reset();
|
fTransportFactory.reset();
|
||||||
if (!NewStatePending()) {
|
if (!NewStatePending()) {
|
||||||
ChangeState(Transition::Auto);
|
ChangeState(Transition::Auto);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// TODO: Remove this once Device::fChannels is no longer public
|
||||||
|
#pragma GCC diagnostic push
|
||||||
|
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
|
||||||
Device::~Device()
|
Device::~Device()
|
||||||
{
|
{
|
||||||
UnsubscribeFromNewTransition("device");
|
UnsubscribeFromNewTransition("device");
|
||||||
fStateMachine.StopHandlingStates();
|
fStateMachine.StopHandlingStates();
|
||||||
LOG(debug) << "Shutting down device " << fId;
|
LOG(debug) << "Shutting down device " << fId;
|
||||||
}
|
}
|
||||||
|
#pragma GCC diagnostic pop
|
||||||
|
|
||||||
} // namespace fair::mq
|
} // namespace fair::mq
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
/********************************************************************************
|
/********************************************************************************
|
||||||
* Copyright (C) 2021-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
* Copyright (C) 2021-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||||
* *
|
* *
|
||||||
* This software is distributed under the terms of the *
|
* This software is distributed under the terms of the *
|
||||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||||
|
@ -233,7 +233,7 @@ class Device
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return GetChannel(chans.at(0), 0).Transport()->CreatePoller(fChannels, chans);
|
return GetChannel(chans.at(0), 0).Transport()->CreatePoller(GetChannels(), chans);
|
||||||
}
|
}
|
||||||
|
|
||||||
PollerPtr NewPoller(const std::vector<Channel*>& channels)
|
PollerPtr NewPoller(const std::vector<Channel*>& channels)
|
||||||
|
@ -321,7 +321,7 @@ class Device
|
||||||
|
|
||||||
Channel& GetChannel(const std::string& channelName, const int index = 0)
|
Channel& GetChannel(const std::string& channelName, const int index = 0)
|
||||||
try {
|
try {
|
||||||
return fChannels.at(channelName).at(index);
|
return GetChannels().at(channelName).at(index);
|
||||||
} catch (const std::out_of_range& oor) {
|
} catch (const std::out_of_range& oor) {
|
||||||
LOG(error) << "GetChannel(): '" << channelName << "[" << index << "]' does not exist.";
|
LOG(error) << "GetChannel(): '" << channelName << "[" << index << "]' does not exist.";
|
||||||
throw;
|
throw;
|
||||||
|
@ -329,7 +329,7 @@ class Device
|
||||||
|
|
||||||
size_t GetNumSubChannels(const std::string& channelName)
|
size_t GetNumSubChannels(const std::string& channelName)
|
||||||
try {
|
try {
|
||||||
return fChannels.at(channelName).size();
|
return GetChannels().at(channelName).size();
|
||||||
} catch (const std::out_of_range& oor) {
|
} catch (const std::out_of_range& oor) {
|
||||||
LOG(error) << "GetNumSubChannels(): '" << channelName << "' does not exist.";
|
LOG(error) << "GetNumSubChannels(): '" << channelName << "' does not exist.";
|
||||||
throw;
|
throw;
|
||||||
|
@ -340,7 +340,7 @@ class Device
|
||||||
/// @param index sub-channel
|
/// @param index sub-channel
|
||||||
unsigned long GetNumberOfConnectedPeers(const std::string& channelName, int index = 0)
|
unsigned long GetNumberOfConnectedPeers(const std::string& channelName, int index = 0)
|
||||||
{
|
{
|
||||||
return fChannels.at(channelName).at(index).GetNumberOfConnectedPeers();
|
return GetChannel(channelName, index).GetNumberOfConnectedPeers();
|
||||||
}
|
}
|
||||||
|
|
||||||
virtual void RegisterChannelEndpoints() {}
|
virtual void RegisterChannelEndpoints() {}
|
||||||
|
@ -438,7 +438,23 @@ class Device
|
||||||
fTransports; ///< Container for transports
|
fTransports; ///< Container for transports
|
||||||
|
|
||||||
public:
|
public:
|
||||||
|
[[deprecated("Use GetChannels() instead.")]]
|
||||||
std::unordered_map<std::string, std::vector<Channel>> fChannels; ///< Device channels
|
std::unordered_map<std::string, std::vector<Channel>> fChannels; ///< Device channels
|
||||||
|
std::unordered_map<std::string, std::vector<Channel>>& GetChannels()
|
||||||
|
{
|
||||||
|
#pragma GCC diagnostic push
|
||||||
|
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
|
||||||
|
return fChannels;
|
||||||
|
#pragma GCC diagnostic pop
|
||||||
|
}
|
||||||
|
std::unordered_map<std::string, std::vector<Channel>> const& GetChannels() const
|
||||||
|
{
|
||||||
|
#pragma GCC diagnostic push
|
||||||
|
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
|
||||||
|
return fChannels;
|
||||||
|
#pragma GCC diagnostic pop
|
||||||
|
}
|
||||||
|
|
||||||
std::unique_ptr<ProgOptions> fInternalConfig; ///< Internal program options configuration
|
std::unique_ptr<ProgOptions> fInternalConfig; ///< Internal program options configuration
|
||||||
ProgOptions* fConfig; ///< Pointer to config (internal or external)
|
ProgOptions* fConfig; ///< Pointer to config (internal or external)
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
/********************************************************************************
|
/********************************************************************************
|
||||||
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
* Copyright (C) 2014-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||||
* *
|
* *
|
||||||
* This software is distributed under the terms of the *
|
* This software is distributed under the terms of the *
|
||||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||||
|
@ -47,7 +47,7 @@ class Merger : public Device
|
||||||
|
|
||||||
std::vector<Channel*> chans;
|
std::vector<Channel*> chans;
|
||||||
|
|
||||||
for (auto& chan : fChannels.at(fInChannelName)) {
|
for (auto& chan : GetChannels().at(fInChannelName)) {
|
||||||
chans.push_back(&chan);
|
chans.push_back(&chan);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user