Remove built-in devices from the main lib

This commit is contained in:
Alexey Rybalchenko 2020-08-21 13:43:47 +02:00
parent 73109fe6d3
commit 12e6a874db
10 changed files with 242 additions and 419 deletions

View File

@ -174,12 +174,6 @@ if(BUILD_FAIRMQ)
)
set(FAIRMQ_PRIVATE_HEADER_FILES
devices/FairMQBenchmarkSampler.h
devices/FairMQMerger.h
devices/FairMQMultiplier.h
devices/FairMQProxy.h
devices/FairMQSink.h
devices/FairMQSplitter.h
plugins/Builtin.h
plugins/config/Config.h
plugins/Control.h
@ -221,10 +215,6 @@ if(BUILD_FAIRMQ)
FairMQPoller.cxx
FairMQSocket.cxx
FairMQTransportFactory.cxx
devices/FairMQMerger.cxx
devices/FairMQMultiplier.cxx
devices/FairMQProxy.cxx
devices/FairMQSplitter.cxx
Plugin.cxx
PluginManager.cxx
PluginServices.cxx

View File

@ -1,122 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQMerger.cxx
*
* @since 2012-12-06
* @author D. Klein, A. Rybalchenko
*/
#include "FairMQMerger.h"
#include "../FairMQLogger.h"
#include "../FairMQPoller.h"
using namespace std;
FairMQMerger::FairMQMerger()
: fMultipart(true)
, fInChannelName("data-in")
, fOutChannelName("data-out")
{
}
void FairMQMerger::RegisterChannelEndpoints()
{
RegisterChannelEndpoint(fInChannelName, 1, 10000);
RegisterChannelEndpoint(fOutChannelName, 1, 1);
PrintRegisteredChannels();
}
FairMQMerger::~FairMQMerger()
{
}
void FairMQMerger::InitTask()
{
fMultipart = fConfig->GetProperty<bool>("multipart");
fInChannelName = fConfig->GetProperty<string>("in-channel");
fOutChannelName = fConfig->GetProperty<string>("out-channel");
}
void FairMQMerger::Run()
{
int numInputs = fChannels.at(fInChannelName).size();
vector<FairMQChannel*> chans;
for (auto& chan : fChannels.at(fInChannelName))
{
chans.push_back(&chan);
}
FairMQPollerPtr poller(NewPoller(chans));
if (fMultipart)
{
while (!NewStatePending())
{
poller->Poll(100);
// Loop over the data input channels.
for (int i = 0; i < numInputs; ++i)
{
// Check if the channel has data ready to be received.
if (poller->CheckInput(i))
{
FairMQParts payload;
if (Receive(payload, fInChannelName, i) >= 0)
{
if (Send(payload, fOutChannelName) < 0)
{
LOG(debug) << "Transfer interrupted";
break;
}
}
else
{
LOG(debug) << "Transfer interrupted";
break;
}
}
}
}
}
else
{
while (!NewStatePending())
{
poller->Poll(100);
// Loop over the data input channels.
for (int i = 0; i < numInputs; ++i)
{
// Check if the channel has data ready to be received.
if (poller->CheckInput(i))
{
FairMQMessagePtr payload(fTransportFactory->CreateMessage());
if (Receive(payload, fInChannelName, i) >= 0)
{
if (Send(payload, fOutChannelName) < 0)
{
LOG(debug) << "Transfer interrupted";
break;
}
}
else
{
LOG(debug) << "Transfer interrupted";
break;
}
}
}
}
}
}

View File

@ -16,23 +16,100 @@
#define FAIRMQMERGER_H_
#include "FairMQDevice.h"
#include "../FairMQPoller.h"
#include "../FairMQLogger.h"
#include <string>
#include <vector>
class FairMQMerger : public FairMQDevice
{
public:
FairMQMerger();
virtual ~FairMQMerger();
FairMQMerger()
: fMultipart(true)
, fInChannelName("data-in")
, fOutChannelName("data-out")
{}
~FairMQMerger() {}
protected:
bool fMultipart;
std::string fInChannelName;
std::string fOutChannelName;
virtual void RegisterChannelEndpoints() override;
virtual void Run() override;
virtual void InitTask() override;
void InitTask() override
{
fMultipart = fConfig->GetProperty<bool>("multipart");
fInChannelName = fConfig->GetProperty<std::string>("in-channel");
fOutChannelName = fConfig->GetProperty<std::string>("out-channel");
}
void RegisterChannelEndpoints() override
{
RegisterChannelEndpoint(fInChannelName, 1, 10000);
RegisterChannelEndpoint(fOutChannelName, 1, 1);
PrintRegisteredChannels();
}
void Run() override
{
int numInputs = fChannels.at(fInChannelName).size();
std::vector<FairMQChannel*> chans;
for (auto& chan : fChannels.at(fInChannelName)) {
chans.push_back(&chan);
}
FairMQPollerPtr poller(NewPoller(chans));
if (fMultipart) {
while (!NewStatePending()) {
poller->Poll(100);
// Loop over the data input channels.
for (int i = 0; i < numInputs; ++i) {
// Check if the channel has data ready to be received.
if (poller->CheckInput(i)) {
FairMQParts payload;
if (Receive(payload, fInChannelName, i) >= 0) {
if (Send(payload, fOutChannelName) < 0) {
LOG(debug) << "Transfer interrupted";
break;
}
} else {
LOG(debug) << "Transfer interrupted";
break;
}
}
}
}
} else {
while (!NewStatePending()) {
poller->Poll(100);
// Loop over the data input channels.
for (int i = 0; i < numInputs; ++i) {
// Check if the channel has data ready to be received.
if (poller->CheckInput(i)) {
FairMQMessagePtr payload(fTransportFactory->CreateMessage());
if (Receive(payload, fInChannelName, i) >= 0) {
if (Send(payload, fOutChannelName) < 0) {
LOG(debug) << "Transfer interrupted";
break;
}
} else {
LOG(debug) << "Transfer interrupted";
break;
}
}
}
}
}
}
};
#endif /* FAIRMQMERGER_H_ */

View File

@ -1,110 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "FairMQMultiplier.h"
#include "../FairMQLogger.h"
using namespace std;
FairMQMultiplier::FairMQMultiplier()
: fMultipart(true)
, fNumOutputs(0)
, fInChannelName()
, fOutChannelNames()
{
}
FairMQMultiplier::~FairMQMultiplier()
{
}
void FairMQMultiplier::InitTask()
{
fMultipart = fConfig->GetProperty<bool>("multipart");
fInChannelName = fConfig->GetProperty<string>("in-channel");
fOutChannelNames = fConfig->GetProperty<vector<string>>("out-channel");
fNumOutputs = fChannels.at(fOutChannelNames.at(0)).size();
if (fMultipart)
{
OnData(fInChannelName, &FairMQMultiplier::HandleMultipartData);
}
else
{
OnData(fInChannelName, &FairMQMultiplier::HandleSingleData);
}
}
bool FairMQMultiplier::HandleSingleData(std::unique_ptr<FairMQMessage>& payload, int /*index*/)
{
for (unsigned int i = 0; i < fOutChannelNames.size() - 1; ++i) // all except last channel
{
for (unsigned int j = 0; j < fChannels.at(fOutChannelNames.at(i)).size(); ++j) // all subChannels in a channel
{
FairMQMessagePtr msgCopy(fTransportFactory->CreateMessage());
msgCopy->Copy(*payload);
Send(msgCopy, fOutChannelNames.at(i), j);
}
}
unsigned int lastChannelSize = fChannels.at(fOutChannelNames.back()).size();
for (unsigned int i = 0; i < lastChannelSize - 1; ++i) // iterate over all except last subChannels of the last channel
{
FairMQMessagePtr msgCopy(fTransportFactory->CreateMessage());
msgCopy->Copy(*payload);
Send(msgCopy, fOutChannelNames.back(), i);
}
Send(payload, fOutChannelNames.back(), lastChannelSize - 1); // send final message to last subChannel of last channel
return true;
}
bool FairMQMultiplier::HandleMultipartData(FairMQParts& payload, int /*index*/)
{
for (unsigned int i = 0; i < fOutChannelNames.size() - 1; ++i) // all except last channel
{
for (unsigned int j = 0; j < fChannels.at(fOutChannelNames.at(i)).size(); ++j) // all subChannels in a channel
{
FairMQParts parts;
for (int k = 0; k < payload.Size(); ++k)
{
FairMQMessagePtr msgCopy(fTransportFactory->CreateMessage());
msgCopy->Copy(payload.AtRef(k));
parts.AddPart(std::move(msgCopy));
}
Send(parts, fOutChannelNames.at(i), j);
}
}
unsigned int lastChannelSize = fChannels.at(fOutChannelNames.back()).size();
for (unsigned int i = 0; i < lastChannelSize - 1; ++i) // iterate over all except last subChannels of the last channel
{
FairMQParts parts;
for (int k = 0; k < payload.Size(); ++k)
{
FairMQMessagePtr msgCopy(fTransportFactory->CreateMessage());
msgCopy->Copy(payload.AtRef(k));
parts.AddPart(std::move(msgCopy));
}
Send(parts, fOutChannelNames.back(), i);
}
Send(payload, fOutChannelNames.back(), lastChannelSize - 1); // send final message to last subChannel of last channel
return true;
}

View File

@ -12,12 +12,18 @@
#include "FairMQDevice.h"
#include <string>
#include <vector>
class FairMQMultiplier : public FairMQDevice
{
public:
FairMQMultiplier();
virtual ~FairMQMultiplier();
FairMQMultiplier()
: fMultipart(true)
, fNumOutputs(0)
, fInChannelName()
, fOutChannelNames()
{}
~FairMQMultiplier() {}
protected:
bool fMultipart;
@ -25,10 +31,80 @@ class FairMQMultiplier : public FairMQDevice
std::string fInChannelName;
std::vector<std::string> fOutChannelNames;
virtual void InitTask();
void InitTask() override
{
fMultipart = fConfig->GetProperty<bool>("multipart");
fInChannelName = fConfig->GetProperty<std::string>("in-channel");
fOutChannelNames = fConfig->GetProperty<std::vector<std::string>>("out-channel");
fNumOutputs = fChannels.at(fOutChannelNames.at(0)).size();
bool HandleSingleData(std::unique_ptr<FairMQMessage>&, int);
bool HandleMultipartData(FairMQParts&, int);
if (fMultipart) {
OnData(fInChannelName, &FairMQMultiplier::HandleMultipartData);
} else {
OnData(fInChannelName, &FairMQMultiplier::HandleSingleData);
}
}
bool HandleSingleData(std::unique_ptr<FairMQMessage>& payload, int)
{
for (unsigned int i = 0; i < fOutChannelNames.size() - 1; ++i) { // all except last channel
for (unsigned int j = 0; j < fChannels.at(fOutChannelNames.at(i)).size(); ++j) { // all subChannels in a channel
FairMQMessagePtr msgCopy(fTransportFactory->CreateMessage());
msgCopy->Copy(*payload);
Send(msgCopy, fOutChannelNames.at(i), j);
}
}
unsigned int lastChannelSize = fChannels.at(fOutChannelNames.back()).size();
for (unsigned int i = 0; i < lastChannelSize - 1; ++i) { // iterate over all except last subChannels of the last channel
FairMQMessagePtr msgCopy(fTransportFactory->CreateMessage());
msgCopy->Copy(*payload);
Send(msgCopy, fOutChannelNames.back(), i);
}
Send(payload, fOutChannelNames.back(), lastChannelSize - 1); // send final message to last subChannel of last channel
return true;
}
bool HandleMultipartData(FairMQParts& payload, int)
{
for (unsigned int i = 0; i < fOutChannelNames.size() - 1; ++i) { // all except last channel
for (unsigned int j = 0; j < fChannels.at(fOutChannelNames.at(i)).size(); ++j) { // all subChannels in a channel
FairMQParts parts;
for (int k = 0; k < payload.Size(); ++k) {
FairMQMessagePtr msgCopy(fTransportFactory->CreateMessage());
msgCopy->Copy(payload.AtRef(k));
parts.AddPart(std::move(msgCopy));
}
Send(parts, fOutChannelNames.at(i), j);
}
}
unsigned int lastChannelSize = fChannels.at(fOutChannelNames.back()).size();
for (unsigned int i = 0; i < lastChannelSize - 1; ++i) { // iterate over all except last subChannels of the last channel
FairMQParts parts;
for (int k = 0; k < payload.Size(); ++k) {
FairMQMessagePtr msgCopy(fTransportFactory->CreateMessage());
msgCopy->Copy(payload.AtRef(k));
parts.AddPart(std::move(msgCopy));
}
Send(parts, fOutChannelNames.back(), i);
}
Send(payload, fOutChannelNames.back(), lastChannelSize - 1); // send final message to last subChannel of last channel
return true;
}
};
#endif /* FAIRMQMULTIPLIER_H_ */

View File

@ -1,81 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQProxy.cxx
*
* @since 2013-10-02
* @author A. Rybalchenko
*/
#include "FairMQProxy.h"
#include "../FairMQLogger.h"
using namespace std;
FairMQProxy::FairMQProxy()
: fMultipart(true)
, fInChannelName()
, fOutChannelName()
{
}
FairMQProxy::~FairMQProxy()
{
}
void FairMQProxy::InitTask()
{
fMultipart = fConfig->GetProperty<bool>("multipart");
fInChannelName = fConfig->GetProperty<string>("in-channel");
fOutChannelName = fConfig->GetProperty<string>("out-channel");
}
void FairMQProxy::Run()
{
if (fMultipart)
{
while (!NewStatePending())
{
FairMQParts payload;
if (Receive(payload, fInChannelName) >= 0)
{
if (Send(payload, fOutChannelName) < 0)
{
LOG(debug) << "Transfer interrupted";
break;
}
}
else
{
LOG(debug) << "Transfer interrupted";
break;
}
}
}
else
{
while (!NewStatePending())
{
unique_ptr<FairMQMessage> payload(fTransportFactory->CreateMessage());
if (Receive(payload, fInChannelName) >= 0)
{
if (Send(payload, fOutChannelName) < 0)
{
LOG(debug) << "Transfer interrupted";
break;
}
}
else
{
LOG(debug) << "Transfer interrupted";
break;
}
}
}
}

View File

@ -22,16 +22,55 @@
class FairMQProxy : public FairMQDevice
{
public:
FairMQProxy();
virtual ~FairMQProxy();
FairMQProxy()
: fMultipart(true)
, fInChannelName()
, fOutChannelName()
{}
~FairMQProxy() {}
protected:
bool fMultipart;
std::string fInChannelName;
std::string fOutChannelName;
virtual void Run();
virtual void InitTask();
void InitTask() override
{
fMultipart = fConfig->GetProperty<bool>("multipart");
fInChannelName = fConfig->GetProperty<std::string>("in-channel");
fOutChannelName = fConfig->GetProperty<std::string>("out-channel");
}
void Run() override
{
if (fMultipart) {
while (!NewStatePending()) {
FairMQParts payload;
if (Receive(payload, fInChannelName) >= 0) {
if (Send(payload, fOutChannelName) < 0) {
LOG(debug) << "Transfer interrupted";
break;
}
} else {
LOG(debug) << "Transfer interrupted";
break;
}
}
} else {
while (!NewStatePending()) {
FairMQMessagePtr payload(fTransportFactory->CreateMessage());
if (Receive(payload, fInChannelName) >= 0) {
if (Send(payload, fOutChannelName) < 0) {
LOG(debug) << "Transfer interrupted";
break;
}
} else {
LOG(debug) << "Transfer interrupted";
break;
}
}
}
}
};
#endif /* FAIRMQPROXY_H_ */

View File

@ -32,7 +32,7 @@ class FairMQSink : public FairMQDevice //, public OutputPolicy
, fInChannelName()
{}
virtual ~FairMQSink() {}
~FairMQSink() {}
protected:
bool fMultipart;
@ -40,14 +40,14 @@ class FairMQSink : public FairMQDevice //, public OutputPolicy
uint64_t fNumIterations;
std::string fInChannelName;
virtual void InitTask()
void InitTask() override
{
fMultipart = fConfig->GetProperty<bool>("multipart");
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
fInChannelName = fConfig->GetProperty<std::string>("in-channel");
}
virtual void Run()
void Run() override
{
// store the channel reference to avoid traversing the map on every loop iteration
FairMQChannel& dataInChannel = fChannels.at(fInChannelName).at(0);

View File

@ -1,74 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQSplitter.cxx
*
* @since 2012-12-06
* @author D. Klein, A. Rybalchenko
*/
#include "FairMQSplitter.h"
#include "../FairMQLogger.h"
using namespace std;
FairMQSplitter::FairMQSplitter()
: fMultipart(true)
, fNumOutputs(0)
, fDirection(0)
, fInChannelName()
, fOutChannelName()
{
}
FairMQSplitter::~FairMQSplitter()
{
}
void FairMQSplitter::InitTask()
{
fMultipart = fConfig->GetProperty<bool>("multipart");
fInChannelName = fConfig->GetProperty<string>("in-channel");
fOutChannelName = fConfig->GetProperty<string>("out-channel");
fNumOutputs = fChannels.at(fOutChannelName).size();
fDirection = 0;
if (fMultipart)
{
OnData(fInChannelName, &FairMQSplitter::HandleMultipartData);
}
else
{
OnData(fInChannelName, &FairMQSplitter::HandleSingleData);
}
}
bool FairMQSplitter::HandleSingleData(FairMQMessagePtr& payload, int /*index*/)
{
Send(payload, fOutChannelName, fDirection);
if (++fDirection >= fNumOutputs)
{
fDirection = 0;
}
return true;
}
bool FairMQSplitter::HandleMultipartData(FairMQParts& payload, int /*index*/)
{
Send(payload, fOutChannelName, fDirection);
if (++fDirection >= fNumOutputs)
{
fDirection = 0;
}
return true;
}

View File

@ -22,8 +22,14 @@
class FairMQSplitter : public FairMQDevice
{
public:
FairMQSplitter();
virtual ~FairMQSplitter();
FairMQSplitter()
: fMultipart(true)
, fNumOutputs(0)
, fDirection(0)
, fInChannelName()
, fOutChannelName()
{}
~FairMQSplitter() {}
protected:
bool fMultipart;
@ -32,10 +38,32 @@ class FairMQSplitter : public FairMQDevice
std::string fInChannelName;
std::string fOutChannelName;
virtual void InitTask();
void InitTask() override
{
fMultipart = fConfig->GetProperty<bool>("multipart");
fInChannelName = fConfig->GetProperty<std::string>("in-channel");
fOutChannelName = fConfig->GetProperty<std::string>("out-channel");
fNumOutputs = fChannels.at(fOutChannelName).size();
fDirection = 0;
bool HandleSingleData(std::unique_ptr<FairMQMessage>&, int);
bool HandleMultipartData(FairMQParts&, int);
if (fMultipart) {
OnData(fInChannelName, &FairMQSplitter::HandleData<FairMQParts>);
} else {
OnData(fInChannelName, &FairMQSplitter::HandleData<FairMQMessagePtr>);
}
}
template<typename T>
bool HandleData(T& payload, int)
{
Send(payload, fOutChannelName, fDirection);
if (++fDirection >= fNumOutputs) {
fDirection = 0;
}
return true;
}
};
#endif /* FAIRMQSPLITTER_H_ */