reduce noise in examples

This commit is contained in:
Alexey Rybalchenko 2021-05-13 22:22:49 +02:00
parent ff89d2eca9
commit bf864413cc
119 changed files with 1766 additions and 3143 deletions

View File

@ -6,20 +6,11 @@
# copied verbatim in the file "LICENSE" #
################################################################################
add_library(Example11Lib STATIC
"Sampler.cxx"
"Sampler.h"
"Sink.cxx"
"Sink.h"
)
add_executable(fairmq-ex-1-1-sampler sampler.cxx)
target_link_libraries(fairmq-ex-1-1-sampler PRIVATE FairMQ)
target_link_libraries(Example11Lib PUBLIC FairMQ)
add_executable(fairmq-ex-1-1-sampler runSampler.cxx)
target_link_libraries(fairmq-ex-1-1-sampler PRIVATE Example11Lib)
add_executable(fairmq-ex-1-1-sink runSink.cxx)
target_link_libraries(fairmq-ex-1-1-sink PRIVATE Example11Lib)
add_executable(fairmq-ex-1-1-sink sink.cxx)
target_link_libraries(fairmq-ex-1-1-sink PRIVATE FairMQ)
add_custom_target(Example11 DEPENDS fairmq-ex-1-1-sampler fairmq-ex-1-1-sink)

View File

@ -3,7 +3,7 @@
A simple topology of two devices - **Sampler** and **Sink**. **Sampler** sends data to **Sink** via the **PUSH-PULL** pattern.
`runSampler.cxx` and `runSink.cxx` configure and run the devices.
`sampler.cxx` and `sink.cxx` configure and run the devices.
The executables take two command line parameters: `--id` and `--channel-config`. The value of `--id` should be a unique identifier and the value for `--channel-config` is the configuration of the communication channel. .

View File

@ -1,59 +0,0 @@
/********************************************************************************
* Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "Sampler.h"
using namespace std;
namespace example_1_1
{
Sampler::Sampler()
: fText()
, fMaxIterations(0)
, fNumIterations(0)
{}
void Sampler::InitTask()
{
// Get the fText and fMaxIterations values from the command line options (via fConfig)
fText = fConfig->GetProperty<string>("text");
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}
bool Sampler::ConditionalRun()
{
// create a copy of the data with new(), that will be deleted after the transfer is complete
string* text = new string(fText);
// create message object with a pointer to the data buffer, its size,
// custom deletion function (called when transfer is done),
// and pointer to the object managing the data buffer
FairMQMessagePtr msg(NewMessage(
const_cast<char*>(text->c_str()),
text->length(),
[](void* /*data*/, void* object) { delete static_cast<string*>(object); },
text));
LOG(info) << "Sending \"" << fText << "\"";
// in case of error or transfer interruption, return false to go to the Ready state
// successfull transfer will return number of bytes transfered (can be 0 if sending an empty message).
if (Send(msg, "data") < 0) {
return false;
} else if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
return false;
}
return true;
}
Sampler::~Sampler() {}
} // namespace example_1_1

View File

@ -1,42 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sampler.h
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#ifndef FAIRMQEXAMPLE11SAMPLER_H
#define FAIRMQEXAMPLE11SAMPLER_H
#include <string>
#include "FairMQDevice.h"
namespace example_1_1
{
class Sampler : public FairMQDevice
{
public:
Sampler();
virtual ~Sampler();
protected:
std::string fText;
uint64_t fMaxIterations;
uint64_t fNumIterations;
void InitTask() override;
bool ConditionalRun() override;
};
} // namespace example_1_1
#endif /* FAIRMQEXAMPLE11SAMPLER_H */

View File

@ -1,54 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sink.cxx
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#include "Sink.h"
using namespace std;
namespace example_1_1
{
Sink::Sink()
: fMaxIterations(0)
, fNumIterations(0)
{
// register a handler for data arriving on "data" channel
OnData("data", &Sink::HandleData);
}
void Sink::InitTask()
{
// Get the fMaxIterations value from the command line options (via fConfig)
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}
// handler is called whenever a message arrives on "data", with a reference to the message and a
// sub-channel index (here 0)
bool Sink::HandleData(FairMQMessagePtr& msg, int /*index*/)
{
LOG(info) << "Received: \"" << string(static_cast<char*>(msg->GetData()), msg->GetSize()) << "\"";
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
return false;
}
// return true if you want the handler to be called again (otherwise return false go to the
// Ready state)
return true;
}
Sink::~Sink() {}
} // namespace example_1_1

View File

@ -1,40 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sink.h
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#ifndef FAIRMQEXAMPLE11SINK_H
#define FAIRMQEXAMPLE11SINK_H
#include "FairMQDevice.h"
namespace example_1_1
{
class Sink : public FairMQDevice
{
public:
Sink();
virtual ~Sink();
protected:
virtual void InitTask();
bool HandleData(FairMQMessagePtr&, int);
private:
uint64_t fMaxIterations;
uint64_t fNumIterations;
};
} // namespace example_1_1
#endif /* FAIRMQEXAMPLE11SINK_H */

View File

@ -1,24 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "Sampler.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("text", bpo::value<std::string>()->default_value("Hello"), "Text to send out")
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
{
return new example_1_1::Sampler();
}

View File

@ -1,23 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "Sink.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
{
return new example_1_1::Sink();
}

75
examples/1-1/sampler.cxx Normal file
View File

@ -0,0 +1,75 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
#include <string>
namespace bpo = boost::program_options;
class Sampler : public FairMQDevice
{
public:
Sampler()
: fMaxIterations(0)
, fNumIterations(0)
{}
protected:
std::string fText;
uint64_t fMaxIterations;
uint64_t fNumIterations;
void InitTask() override
{
// Get the fText and fMaxIterations values from the command line options (via fConfig)
fText = fConfig->GetProperty<std::string>("text");
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}
bool ConditionalRun() override
{
// create a copy of the data with new(), that will be deleted after the transfer is complete
std::string* text = new std::string(fText);
// create message object with a pointer to the data buffer, its size,
// custom deletion function (called when transfer is done),
// and pointer to the object managing the data buffer
FairMQMessagePtr msg(NewMessage(
const_cast<char*>(text->c_str()),
text->length(),
[](void* /*data*/, void* object) { delete static_cast<std::string*>(object); },
text));
LOG(info) << "Sending \"" << fText << "\"";
// in case of error or transfer interruption, return false to go to the Ready state
// successfull transfer will return number of bytes transfered (can be 0 if sending an empty message).
if (Send(msg, "data") < 0) {
return false;
} else if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
return false;
}
return true;
}
};
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("text", bpo::value<std::string>()->default_value("Hello"), "Text to send out")
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
{
return std::make_unique<Sampler>();
}

62
examples/1-1/sink.cxx Normal file
View File

@ -0,0 +1,62 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
#include <string>
namespace bpo = boost::program_options;
class Sink : public FairMQDevice
{
public:
Sink()
: fMaxIterations(0)
, fNumIterations(0)
{
// register a handler for data arriving on "data" channel
OnData("data", &Sink::HandleData);
}
protected:
void InitTask() override
{
// Get the fMaxIterations value from the command line options (via fConfig)
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}
bool HandleData(FairMQMessagePtr& msg, int)
{
LOG(info) << "Received: \"" << std::string(static_cast<char*>(msg->GetData()), msg->GetSize()) << "\"";
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
return false;
}
// return true if you want the handler to be called again (otherwise return false go to the
// Ready state)
return true;
}
private:
uint64_t fMaxIterations;
uint64_t fNumIterations;
};
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
{
return std::make_unique<Sink>();
}

View File

@ -6,25 +6,14 @@
# copied verbatim in the file "LICENSE" #
################################################################################
add_library(Example1N1Lib STATIC
"Sampler.cxx"
"Sampler.h"
"Processor.cxx"
"Processor.h"
"Sink.cxx"
"Sink.h"
)
add_executable(fairmq-ex-1-n-1-sampler sampler.cxx)
target_link_libraries(fairmq-ex-1-n-1-sampler PRIVATE FairMQ)
target_link_libraries(Example1N1Lib PUBLIC FairMQ)
add_executable(fairmq-ex-1-n-1-processor processor.cxx)
target_link_libraries(fairmq-ex-1-n-1-processor PRIVATE FairMQ)
add_executable(fairmq-ex-1-n-1-sampler runSampler.cxx)
target_link_libraries(fairmq-ex-1-n-1-sampler PRIVATE Example1N1Lib)
add_executable(fairmq-ex-1-n-1-processor runProcessor.cxx)
target_link_libraries(fairmq-ex-1-n-1-processor PRIVATE Example1N1Lib)
add_executable(fairmq-ex-1-n-1-sink runSink.cxx)
target_link_libraries(fairmq-ex-1-n-1-sink PRIVATE Example1N1Lib)
add_executable(fairmq-ex-1-n-1-sink sink.cxx)
target_link_libraries(fairmq-ex-1-n-1-sink PRIVATE FairMQ)
add_custom_target(Example1N1 DEPENDS fairmq-ex-1-n-1-sampler fairmq-ex-1-n-1-processor fairmq-ex-1-n-1-sink)

View File

@ -1,53 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "Processor.h"
#include <string>
using namespace std;
namespace example_1_n_1
{
Processor::Processor()
{
OnData("data1", &Processor::HandleData);
}
bool Processor::HandleData(FairMQMessagePtr& msg, int /*index*/)
{
LOG(info) << "Received data, processing...";
// Modify the received string
string* text = new std::string(static_cast<char*>(msg->GetData()), msg->GetSize());
*text += " (modified by " + fId + ")";
// create message object with a pointer to the data buffer,
// its size,
// custom deletion function (called when transfer is done),
// and pointer to the object managing the data buffer
FairMQMessagePtr msg2(NewMessage(const_cast<char*>(text->c_str()),
text->length(),
[](void* /*data*/, void* object) { delete static_cast<string*>(object); },
text));
// Send out the output message
if (Send(msg2, "data2") < 0)
{
return false;
}
return true;
}
Processor::~Processor()
{
}
} // namespace example_1_n_1

View File

@ -1,29 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIRMQEXAMPLE1N1PROCESSOR_H_
#define FAIRMQEXAMPLE1N1PROCESSOR_H_
#include "FairMQDevice.h"
namespace example_1_n_1
{
class Processor : public FairMQDevice
{
public:
Processor();
virtual ~Processor();
protected:
bool HandleData(FairMQMessagePtr&, int);
};
} // namespace example_1_n_1
#endif /* FAIRMQEXAMPLE1N1PROCESSOR_H_ */

View File

@ -1,68 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sampler.cpp
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#include <thread> // this_thread::sleep_for
#include <chrono>
#include "Sampler.h"
using namespace std;
namespace example_1_n_1
{
Sampler::Sampler()
: fText()
, fMaxIterations(0)
, fNumIterations(0)
{
}
void Sampler::InitTask()
{
// Get the fText and fMaxIterations values from the command line options (via fConfig)
fText = fConfig->GetProperty<string>("text");
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}
bool Sampler::ConditionalRun()
{
// Initializing message with NewStaticMessage will avoid copy
// but won't delete the data after the sending is completed.
FairMQMessagePtr msg(NewStaticMessage(fText));
LOG(info) << "Sending \"" << fText << "\"";
// in case of error or transfer interruption, return false to go to IDLE state
// successfull transfer will return number of bytes transfered (can be 0 if sending an empty message).
if (Send(msg, "data1") < 0)
{
return false;
}
else if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations)
{
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
return false;
}
this_thread::sleep_for(chrono::seconds(1));
return true;
}
Sampler::~Sampler()
{
}
} // namespace example_1_n_1

View File

@ -1,42 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sampler.h
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#ifndef FAIRMQEXAMPLE1N1SAMPLER_H_
#define FAIRMQEXAMPLE1N1SAMPLER_H_
#include <string>
#include "FairMQDevice.h"
namespace example_1_n_1
{
class Sampler : public FairMQDevice
{
public:
Sampler();
virtual ~Sampler();
protected:
std::string fText;
uint64_t fMaxIterations;
uint64_t fNumIterations;
virtual void InitTask();
virtual bool ConditionalRun();
};
} // namespace example_1_n_1
#endif /* FAIRMQEXAMPLE1N1SAMPLER_H_ */

View File

@ -1,55 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sink.cxx
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#include "Sink.h"
using namespace std;
namespace example_1_n_1
{
Sink::Sink()
: fMaxIterations(0)
, fNumIterations(0)
{
// register a handler for data arriving on "data2" channel
OnData("data2", &Sink::HandleData);
}
void Sink::InitTask()
{
// Get the fMaxIterations value from the command line options (via fConfig)
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}
// handler is called whenever a message arrives on "data2", with a reference to the message and a sub-channel index (here 0)
bool Sink::HandleData(FairMQMessagePtr& msg, int /*index*/)
{
LOG(info) << "Received: \"" << string(static_cast<char*>(msg->GetData()), msg->GetSize()) << "\"";
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations)
{
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
return false;
}
// return true if want to be called again (otherwise return false go to IDLE state)
return true;
}
Sink::~Sink()
{
}
} // namespace example_1_n_1

View File

@ -1,40 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sink.h
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#ifndef FAIRMQEXAMPLE1N1SINK_H
#define FAIRMQEXAMPLE1N1SINK_H
#include "FairMQDevice.h"
namespace example_1_n_1
{
class Sink : public FairMQDevice
{
public:
Sink();
virtual ~Sink();
protected:
virtual void InitTask();
bool HandleData(FairMQMessagePtr&, int);
private:
uint64_t fMaxIterations;
uint64_t fNumIterations;
};
} // namespace example_1_n_1
#endif /* FAIRMQEXAMPLE1N1SINK_H */

View File

@ -0,0 +1,58 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
#include <string>
namespace bpo = boost::program_options;
class Processor : public FairMQDevice
{
public:
Processor()
{
OnData("data1", &Processor::HandleData);
}
protected:
bool HandleData(FairMQMessagePtr& msg, int)
{
LOG(info) << "Received data, processing...";
// Modify the received string
std::string* text = new std::string(static_cast<char*>(msg->GetData()), msg->GetSize());
*text += " (modified by " + fId + ")";
// create message object with a pointer to the data buffer,
// its size,
// custom deletion function (called when transfer is done),
// and pointer to the object managing the data buffer
FairMQMessagePtr msg2(NewMessage(const_cast<char*>(text->c_str()),
text->length(),
[](void* /*data*/, void* object) { delete static_cast<std::string*>(object); },
text));
// Send out the output message
if (Send(msg2, "data2") < 0) {
return false;
}
return true;
}
};
void addCustomOptions(bpo::options_description& /*options*/)
{
}
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
{
return std::make_unique<Processor>();
}

View File

@ -1,21 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "Processor.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& /*options*/)
{
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
{
return new example_1_n_1::Processor();
}

View File

@ -1,24 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "Sampler.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("text", bpo::value<std::string>()->default_value("Hello"), "Text to send out")
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
{
return new example_1_n_1::Sampler();
}

View File

@ -1,23 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "Sink.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
{
return new example_1_n_1::Sink();
}

View File

@ -0,0 +1,71 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
#include <string>
#include <thread> // this_thread::sleep_for
#include <chrono>
namespace bpo = boost::program_options;
class Sampler : public FairMQDevice
{
public:
Sampler()
: fMaxIterations(0)
, fNumIterations(0)
{}
protected:
std::string fText;
uint64_t fMaxIterations;
uint64_t fNumIterations;
void InitTask() override
{
// Get the fText and fMaxIterations values from the command line options (via fConfig)
fText = fConfig->GetProperty<std::string>("text");
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}
bool ConditionalRun() override
{
// Initializing message with NewStaticMessage will avoid copy
// but won't delete the data after the sending is completed.
FairMQMessagePtr msg(NewStaticMessage(fText));
LOG(info) << "Sending \"" << fText << "\"";
// in case of error or transfer interruption, return false to go to IDLE state
// successfull transfer will return number of bytes transfered (can be 0 if sending an empty message).
if (Send(msg, "data1") < 0) {
return false;
} else if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
return false;
}
std::this_thread::sleep_for(std::chrono::seconds(1));
return true;
}
};
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("text", bpo::value<std::string>()->default_value("Hello"), "Text to send out")
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
{
return std::make_unique<Sampler>();
}

61
examples/1-n-1/sink.cxx Normal file
View File

@ -0,0 +1,61 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
#include <string>
namespace bpo = boost::program_options;
class Sink : public FairMQDevice
{
public:
Sink()
: fMaxIterations(0)
, fNumIterations(0)
{
// register a handler for data arriving on "data2" channel
OnData("data2", &Sink::HandleData);
}
protected:
virtual void InitTask() override
{
// Get the fMaxIterations value from the command line options (via fConfig)
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}
bool HandleData(FairMQMessagePtr& msg, int)
{
LOG(info) << "Received: \"" << std::string(static_cast<char*>(msg->GetData()), msg->GetSize()) << "\"";
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
return false;
}
// return true if want to be called again (otherwise return false go to IDLE state)
return true;
}
private:
uint64_t fMaxIterations;
uint64_t fNumIterations;
};
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
{
return std::make_unique<Sink>();
}

View File

@ -6,21 +6,12 @@
# copied verbatim in the file "LICENSE" #
################################################################################
add_library(ExampleCopyPushLib STATIC
"Sampler.cxx"
"Sampler.h"
"Sink.cxx"
"Sink.h"
)
target_link_libraries(ExampleCopyPushLib PUBLIC FairMQ)
add_executable(fairmq-ex-copypush-sampler sampler.cxx)
target_link_libraries(fairmq-ex-copypush-sampler PRIVATE FairMQ)
add_executable(fairmq-ex-copypush-sampler runSampler.cxx)
target_link_libraries(fairmq-ex-copypush-sampler PRIVATE ExampleCopyPushLib)
add_executable(fairmq-ex-copypush-sink runSink.cxx)
target_link_libraries(fairmq-ex-copypush-sink PRIVATE ExampleCopyPushLib)
add_executable(fairmq-ex-copypush-sink sink.cxx)
target_link_libraries(fairmq-ex-copypush-sink PRIVATE FairMQ)
add_custom_target(ExampleCopyPush DEPENDS fairmq-ex-copypush-sampler fairmq-ex-copypush-sink)

View File

@ -1,68 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sampler.cpp
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#include <thread> // this_thread::sleep_for
#include <chrono>
#include "Sampler.h"
using namespace std;
namespace example_copypush
{
Sampler::Sampler()
: fNumDataChannels(0)
, fCounter(0)
, fMaxIterations(0)
, fNumIterations(0)
{
}
void Sampler::InitTask()
{
fNumDataChannels = fChannels.at("data").size();
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}
bool Sampler::ConditionalRun()
{
// NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
// Should only be used for small data because of the cost of an additional copy
FairMQMessagePtr msg(NewSimpleMessage(fCounter++));
for (int i = 0; i < fNumDataChannels - 1; ++i)
{
FairMQMessagePtr msgCopy(NewMessage());
msgCopy->Copy(*msg);
Send(msgCopy, "data", i);
}
Send(msg, "data", fNumDataChannels - 1);
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations)
{
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
return false;
}
this_thread::sleep_for(chrono::seconds(1));
return true;
}
Sampler::~Sampler()
{
}
} // namespace example_copypush

View File

@ -1,43 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sampler.h
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#ifndef FAIRMQEXAMPLECOPYPUSHSAMPLER_H
#define FAIRMQEXAMPLECOPYPUSHSAMPLER_H
#include "FairMQDevice.h"
#include <stdint.h> // uint64_t
namespace example_copypush
{
class Sampler : public FairMQDevice
{
public:
Sampler();
virtual ~Sampler();
protected:
virtual void InitTask();
virtual bool ConditionalRun();
int fNumDataChannels;
uint64_t fCounter;
uint64_t fMaxIterations;
uint64_t fNumIterations;
};
} // namespace example_copypush
#endif /* FAIRMQEXAMPLECOPYPUSHSAMPLER_H */

View File

@ -1,51 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sink.cxx
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#include "Sink.h"
namespace example_copypush
{
Sink::Sink()
: fMaxIterations(0)
, fNumIterations(0)
{
OnData("data", &Sink::HandleData);
}
void Sink::InitTask()
{
// Get the fMaxIterations value from the command line options (via fConfig)
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}
bool Sink::HandleData(FairMQMessagePtr& msg, int /*index*/)
{
LOG(info) << "Received message: \"" << *(static_cast<uint64_t*>(msg->GetData())) << "\"";
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations)
{
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
return false;
}
// return true if want to be called again (otherwise go to IDLE state)
return true;
}
Sink::~Sink()
{
}
} // namespace example_copypush

View File

@ -1,42 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sink.h
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#ifndef FAIRMQEXAMPLECOPYPUSHSINK_H
#define FAIRMQEXAMPLECOPYPUSHSINK_H
#include "FairMQDevice.h"
#include <stdint.h> // uint64_t
namespace example_copypush
{
class Sink : public FairMQDevice
{
public:
Sink();
virtual ~Sink();
protected:
virtual void InitTask();
bool HandleData(FairMQMessagePtr&, int);
private:
uint64_t fMaxIterations;
uint64_t fNumIterations;
};
} // namespace example_copypush
#endif /* FAIRMQEXAMPLECOPYPUSHSINK_H */

View File

@ -1,23 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "Sampler.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
{
return new example_copypush::Sampler();
}

View File

@ -1,23 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "Sink.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
{
return new example_copypush::Sink();
}

View File

@ -0,0 +1,74 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
#include <chrono>
#include <cstdint> // uint64_t
#include <thread> // this_thread::sleep_for
namespace bpo = boost::program_options;
class Sampler : public FairMQDevice
{
public:
Sampler()
: fNumDataChannels(0)
, fCounter(0)
, fMaxIterations(0)
, fNumIterations(0)
{}
protected:
void InitTask() override
{
fNumDataChannels = fChannels.at("data").size();
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}
bool ConditionalRun() override
{
// NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
// Should only be used for small data because of the cost of an additional copy
FairMQMessagePtr msg(NewSimpleMessage(fCounter++));
for (int i = 0; i < fNumDataChannels - 1; ++i) {
FairMQMessagePtr msgCopy(NewMessage());
msgCopy->Copy(*msg);
Send(msgCopy, "data", i);
}
Send(msg, "data", fNumDataChannels - 1);
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
return false;
}
std::this_thread::sleep_for(std::chrono::seconds(1));
return true;
}
int fNumDataChannels;
uint64_t fCounter;
uint64_t fMaxIterations;
uint64_t fNumIterations;
};
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
{
return std::make_unique<Sampler>();
}

View File

@ -0,0 +1,60 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
#include <cstdint> // uint64_t
namespace bpo = boost::program_options;
class Sink : public FairMQDevice
{
public:
Sink()
: fMaxIterations(0)
, fNumIterations(0)
{
OnData("data", &Sink::HandleData);
}
protected:
void InitTask() override
{
// Get the fMaxIterations value from the command line options (via fConfig)
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}
bool HandleData(FairMQMessagePtr& msg, int)
{
LOG(info) << "Received message: \"" << *(static_cast<uint64_t*>(msg->GetData())) << "\"";
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
return false;
}
// return true if want to be called again (otherwise go to IDLE state)
return true;
}
private:
uint64_t fMaxIterations;
uint64_t fNumIterations;
};
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
{
return std::make_unique<Sink>();
}

View File

@ -6,25 +6,14 @@
# copied verbatim in the file "LICENSE" #
################################################################################
add_library(ExampleDDSLib STATIC
"Sampler.cxx"
"Sampler.h"
"Processor.cxx"
"Processor.h"
"Sink.cxx"
"Sink.h"
)
add_executable(fairmq-ex-dds-sampler sampler.cxx)
target_link_libraries(fairmq-ex-dds-sampler PRIVATE FairMQ)
target_link_libraries(ExampleDDSLib PUBLIC FairMQ)
add_executable(fairmq-ex-dds-processor processor.cxx)
target_link_libraries(fairmq-ex-dds-processor PRIVATE FairMQ)
add_executable(fairmq-ex-dds-sampler runSampler.cxx)
target_link_libraries(fairmq-ex-dds-sampler PRIVATE ExampleDDSLib)
add_executable(fairmq-ex-dds-processor runProcessor.cxx)
target_link_libraries(fairmq-ex-dds-processor PRIVATE ExampleDDSLib)
add_executable(fairmq-ex-dds-sink runSink.cxx)
target_link_libraries(fairmq-ex-dds-sink PRIVATE ExampleDDSLib)
add_executable(fairmq-ex-dds-sink sink.cxx)
target_link_libraries(fairmq-ex-dds-sink PRIVATE FairMQ)
add_custom_target(ExampleDDS DEPENDS fairmq-ex-dds-sampler fairmq-ex-dds-processor fairmq-ex-dds-sink)

View File

@ -1,52 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "Processor.h"
#include "FairMQLogger.h"
using namespace std;
namespace example_dds
{
Processor::Processor()
{
OnData("data1", &Processor::HandleData);
}
bool Processor::HandleData(FairMQMessagePtr& msg, int /*index*/)
{
LOG(info) << "Received data, processing...";
// Modify the received string
string* text = new std::string(static_cast<char*>(msg->GetData()), msg->GetSize());
*text += " (modified by " + fId + ")";
// create message object with a pointer to the data buffer,
// its size,
// custom deletion function (called when transfer is done),
// and pointer to the object managing the data buffer
FairMQMessagePtr msg2(NewMessage(const_cast<char*>(text->c_str()),
text->length(),
[](void* /*data*/, void* object) { delete static_cast<string*>(object); },
text));
// Send out the output message
if (Send(msg2, "data2") < 0)
{
return false;
}
return true;
}
Processor::~Processor()
{
}
} // namespace example_dds

View File

@ -1,29 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIRMQEXAMPLEDDSPROCESSOR_H
#define FAIRMQEXAMPLEDDSPROCESSOR_H
#include "FairMQDevice.h"
namespace example_dds
{
class Processor : public FairMQDevice
{
public:
Processor();
virtual ~Processor();
protected:
bool HandleData(FairMQMessagePtr&, int);
};
}
#endif /* FAIRMQEXAMPLEDDSPROCESSOR_H */

View File

@ -1,64 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sampler.cpp
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#include <thread> // this_thread::sleep_for
#include <chrono>
#include "Sampler.h"
using namespace std;
namespace example_dds
{
Sampler::Sampler()
{
}
void Sampler::InitTask()
{
fIterations = fConfig->GetValue<uint64_t>("iterations");
fCounter = 0;
}
bool Sampler::ConditionalRun()
{
// NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
// Should only be used for small data because of the cost of an additional copy
FairMQMessagePtr msg(NewSimpleMessage("Data"));
LOG(info) << "Sending \"Data\"";
// in case of error or transfer interruption, return false to go to IDLE state
// successfull transfer will return number of bytes transfered (can be 0 if sending an empty message).
if (Send(msg, "data1") < 0) {
return false;
}
if (fIterations > 0) {
++fCounter;
if (fCounter >= fIterations) {
LOG(info) << "Sent " << fCounter << " messages. Finished.";
return false;
}
}
return true;
}
Sampler::~Sampler()
{
}
}

View File

@ -1,40 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sampler.h
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#ifndef FAIRMQEXAMPLEDDSSAMPLER_H
#define FAIRMQEXAMPLEDDSSAMPLER_H
#include "FairMQDevice.h"
namespace example_dds
{
class Sampler : public FairMQDevice
{
public:
Sampler();
virtual ~Sampler();
void InitTask() override;
protected:
bool ConditionalRun() override;
private:
uint64_t fIterations;
uint64_t fCounter;
};
} // namespace example_dds
#endif /* FAIRMQEXAMPLEDDSSAMPLER_H */

View File

@ -1,54 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sink.cxx
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#include "Sink.h"
using namespace std;
namespace example_dds
{
Sink::Sink()
{
// register a handler for data arriving on "data2" channel
OnData("data2", &Sink::HandleData);
}
void Sink::InitTask()
{
fIterations = fConfig->GetValue<uint64_t>("iterations");
fCounter = 0;
}
// handler is called whenever a message arrives on "data2", with a reference to the message and a sub-channel index (here 0)
bool Sink::HandleData(FairMQMessagePtr& msg, int /*index*/)
{
LOG(info) << "Received: \"" << string(static_cast<char*>(msg->GetData()), msg->GetSize()) << "\"";
if (fIterations > 0) {
++fCounter;
if (fCounter >= fIterations) {
LOG(info) << "Received " << fCounter << " messages. Finished.";
return false;
}
}
// return true if want to be called again (otherwise go to IDLE state)
return true;
}
Sink::~Sink()
{
}
} // namespace example_dds

View File

@ -1,40 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sink.h
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#ifndef FAIRMQEXAMPLEDDSSINK_H
#define FAIRMQEXAMPLEDDSSINK_H
#include "FairMQDevice.h"
namespace example_dds
{
class Sink : public FairMQDevice
{
public:
Sink();
virtual ~Sink();
void InitTask() override;
protected:
bool HandleData(FairMQMessagePtr&, int);
private:
uint64_t fIterations;
uint64_t fCounter;
};
} // namespace example_dds
#endif /* FAIRMQEXAMPLEDDSSINK_H */

View File

@ -0,0 +1,58 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
#include <string>
namespace bpo = boost::program_options;
class Processor : public FairMQDevice
{
public:
Processor()
{
OnData("data1", &Processor::HandleData);
}
protected:
bool HandleData(FairMQMessagePtr& msg, int)
{
LOG(info) << "Received data, processing...";
// Modify the received string
std::string* text = new std::string(static_cast<char*>(msg->GetData()), msg->GetSize());
*text += " (modified by " + fId + ")";
// create message object with a pointer to the data buffer,
// its size,
// custom deletion function (called when transfer is done),
// and pointer to the object managing the data buffer
FairMQMessagePtr msg2(NewMessage(const_cast<char*>(text->c_str()),
text->length(),
[](void* /*data*/, void* object) { delete static_cast<std::string*>(object); },
text));
// Send out the output message
if (Send(msg2, "data2") < 0) {
return false;
}
return true;
}
};
void addCustomOptions(bpo::options_description& /*options*/)
{
}
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
{
return std::make_unique<Processor>();
}

View File

@ -1,21 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "Processor.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& /*options*/)
{
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
{
return new example_dds::Processor();
}

View File

@ -1,25 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "Sampler.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& options)
{
options.add_options()(
"iterations,i",
bpo::value<uint64_t>()->default_value(1000),
"Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
{
return new example_dds::Sampler();
}

View File

@ -1,25 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "Sink.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& options)
{
options.add_options()(
"iterations,i",
bpo::value<uint64_t>()->default_value(1000),
"Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
{
return new example_dds::Sink();
}

71
examples/dds/sampler.cxx Normal file
View File

@ -0,0 +1,71 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
#include <cstdint> // uint64_t
namespace bpo = boost::program_options;
class Sampler : public FairMQDevice
{
public:
Sampler()
: fIterations(0)
, fCounter(0)
{}
void InitTask() override
{
fIterations = fConfig->GetValue<uint64_t>("iterations");
}
protected:
bool ConditionalRun() override
{
// NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
// Should only be used for small data because of the cost of an additional copy
FairMQMessagePtr msg(NewSimpleMessage("Data"));
LOG(info) << "Sending \"Data\"";
// in case of error or transfer interruption, return false to go to IDLE state
// successfull transfer will return number of bytes transfered (can be 0 if sending an empty message).
if (Send(msg, "data1") < 0) {
return false;
}
if (fIterations > 0) {
++fCounter;
if (fCounter >= fIterations) {
LOG(info) << "Sent " << fCounter << " messages. Finished.";
return false;
}
}
return true;
}
private:
uint64_t fIterations;
uint64_t fCounter;
};
void addCustomOptions(bpo::options_description& options)
{
options.add_options()(
"iterations,i",
bpo::value<uint64_t>()->default_value(1000),
"Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
{
return std::make_unique<Sampler>();
}

62
examples/dds/sink.cxx Normal file
View File

@ -0,0 +1,62 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
#include <string>
namespace bpo = boost::program_options;
class Sink : public FairMQDevice
{
public:
Sink()
: fIterations(0)
, fCounter(0)
{
OnData("data2", &Sink::HandleData);
}
void InitTask() override
{
fIterations = fConfig->GetValue<uint64_t>("iterations");
}
protected:
bool HandleData(FairMQMessagePtr& msg, int)
{
LOG(info) << "Received: \"" << std::string(static_cast<char*>(msg->GetData()), msg->GetSize()) << "\"";
if (fIterations > 0) {
++fCounter;
if (fCounter >= fIterations) {
LOG(info) << "Received " << fCounter << " messages. Finished.";
return false;
}
}
// return true if want to be called again (otherwise go to IDLE state)
return true;
}
private:
uint64_t fIterations;
uint64_t fCounter;
};
void addCustomOptions(bpo::options_description& options)
{
options.add_options()(
"iterations,i",
bpo::value<uint64_t>()->default_value(1000),
"Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
{
return std::make_unique<Sink>();
}

View File

@ -6,20 +6,11 @@
# copied verbatim in the file "LICENSE" #
################################################################################
add_library(ExampleMultipartLib STATIC
"Sampler.cxx"
"Sampler.h"
"Sink.cxx"
"Sink.h"
)
add_executable(fairmq-ex-multipart-sampler sampler.cxx)
target_link_libraries(fairmq-ex-multipart-sampler PRIVATE FairMQ)
target_link_libraries(ExampleMultipartLib PUBLIC FairMQ)
add_executable(fairmq-ex-multipart-sampler runSampler.cxx)
target_link_libraries(fairmq-ex-multipart-sampler PRIVATE ExampleMultipartLib)
add_executable(fairmq-ex-multipart-sink runSink.cxx)
target_link_libraries(fairmq-ex-multipart-sink PRIVATE ExampleMultipartLib)
add_executable(fairmq-ex-multipart-sink sink.cxx)
target_link_libraries(fairmq-ex-multipart-sink PRIVATE FairMQ)
add_custom_target(ExampleMultipart DEPENDS fairmq-ex-multipart-sampler fairmq-ex-multipart-sink)

View File

@ -9,6 +9,8 @@
#ifndef FAIRMQEXAMPLEMULTIPARTHEADER_H
#define FAIRMQEXAMPLEMULTIPARTHEADER_H
#include <cstdint>
namespace example_multipart
{

View File

@ -1,86 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sampler.cpp
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#include <thread> // this_thread::sleep_for
#include <chrono>
#include "Sampler.h"
#include "Header.h"
using namespace std;
namespace example_multipart
{
Sampler::Sampler()
: fMaxIterations(5)
, fNumIterations(0)
{
}
void Sampler::InitTask()
{
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}
bool Sampler::ConditionalRun()
{
Header header;
header.stopFlag = 0;
// Set stopFlag to 1 for last message.
if (fMaxIterations > 0 && fNumIterations == fMaxIterations - 1) {
header.stopFlag = 1;
}
LOG(info) << "Sending header with stopFlag: " << header.stopFlag;
FairMQParts parts;
// NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
// Should only be used for small data because of the cost of an additional copy
parts.AddPart(NewSimpleMessage(header));
parts.AddPart(NewMessage(1000));
// create more data parts, testing the FairMQParts in-place constructor
FairMQParts auxData{ NewMessage(500), NewMessage(600), NewMessage(700) };
assert(auxData.Size() == 3);
parts.AddPart(std::move(auxData));
assert(auxData.Size() == 0);
assert(parts.Size() == 5);
parts.AddPart(NewMessage());
assert(parts.Size() == 6);
parts.AddPart(NewMessage(100));
assert(parts.Size() == 7);
LOG(info) << "Sending body of size: " << parts.At(1)->GetSize();
Send(parts, "data");
// Go out of the sending loop if the stopFlag was sent.
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
return false;
}
// Wait a second to keep the output readable.
this_thread::sleep_for(chrono::seconds(1));
return true;
}
} // namespace example_multipart

View File

@ -1,40 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sampler.h
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#ifndef FAIRMQEXAMPLEDDSSAMPLER_H
#define FAIRMQEXAMPLEDDSSAMPLER_H
#include "FairMQDevice.h"
namespace example_multipart
{
class Sampler : public FairMQDevice
{
public:
Sampler();
virtual ~Sampler() {}
protected:
virtual void InitTask();
virtual bool ConditionalRun();
private:
uint64_t fMaxIterations;
uint64_t fNumIterations;
};
} // namespace example_multipart
#endif /* FAIRMQEXAMPLEDDSSAMPLER_H */

View File

@ -1,52 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sink.cxx
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#include "Sink.h"
#include "Header.h"
using namespace std;
namespace example_multipart
{
bool Sink::HandleData(FairMQParts& parts, int /*index*/)
{
LOG(info) << "Received message with " << parts.Size() << " parts";
Header header;
header.stopFlag = (static_cast<Header*>(parts.At(0)->GetData()))->stopFlag;
LOG(info) << "Received part 1 (header) with stopFlag: " << header.stopFlag;
LOG(info) << "Received part 2 of size: " << parts.At(1)->GetSize();
assert(parts.At(1)->GetSize() == 1000);
LOG(info) << "Received part 3 of size: " << parts.At(2)->GetSize();
assert(parts.At(2)->GetSize() == 500);
LOG(info) << "Received part 4 of size: " << parts.At(3)->GetSize();
assert(parts.At(3)->GetSize() == 600);
LOG(info) << "Received part 5 of size: " << parts.At(4)->GetSize();
assert(parts.At(4)->GetSize() == 700);
LOG(info) << "Received part 6 of size: " << parts.At(5)->GetSize();
assert(parts.At(5)->GetSize() == 0);
LOG(info) << "Received part 7 of size: " << parts.At(6)->GetSize();
assert(parts.At(6)->GetSize() == 100);
if (header.stopFlag == 1) {
LOG(info) << "stopFlag is 1, going IDLE";
return false;
}
return true;
}
}

View File

@ -1,34 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sink.h
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#ifndef FAIRMQEXAMPLEMULTIPARTSINK_H
#define FAIRMQEXAMPLEMULTIPARTSINK_H
#include "FairMQDevice.h"
namespace example_multipart
{
class Sink : public FairMQDevice
{
public:
Sink() { OnData("data", &Sink::HandleData); }
virtual ~Sink() {}
protected:
bool HandleData(FairMQParts&, int);
};
}
#endif /* FAIRMQEXAMPLEMULTIPARTSINK_H */

View File

@ -1,23 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "Sampler.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("max-iterations", bpo::value<uint64_t>()->default_value(5), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
{
return new example_multipart::Sampler();
}

View File

@ -1,21 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "Sink.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& /*options*/)
{
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
{
return new example_multipart::Sink();
}

View File

@ -0,0 +1,97 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "Header.h"
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
#include <chrono>
#include <cstdint>
#include <thread> // this_thread::sleep_for
namespace bpo = boost::program_options;
class Sampler : public FairMQDevice
{
public:
Sampler()
: fMaxIterations(5)
, fNumIterations(0)
{}
protected:
void InitTask() override
{
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}
bool ConditionalRun() override
{
example_multipart::Header header;
header.stopFlag = 0;
// Set stopFlag to 1 for last message.
if (fMaxIterations > 0 && fNumIterations == fMaxIterations - 1) {
header.stopFlag = 1;
}
LOG(info) << "Sending header with stopFlag: " << header.stopFlag;
FairMQParts parts;
// NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
// Should only be used for small data because of the cost of an additional copy
parts.AddPart(NewSimpleMessage(header));
parts.AddPart(NewMessage(1000));
// create more data parts, testing the FairMQParts in-place constructor
FairMQParts auxData{ NewMessage(500), NewMessage(600), NewMessage(700) };
assert(auxData.Size() == 3);
parts.AddPart(std::move(auxData));
assert(auxData.Size() == 0);
assert(parts.Size() == 5);
parts.AddPart(NewMessage());
assert(parts.Size() == 6);
parts.AddPart(NewMessage(100));
assert(parts.Size() == 7);
LOG(info) << "Sending body of size: " << parts.At(1)->GetSize();
Send(parts, "data");
// Go out of the sending loop if the stopFlag was sent.
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
return false;
}
// Wait a second to keep the output readable.
std::this_thread::sleep_for(std::chrono::seconds(1));
return true;
}
private:
uint64_t fMaxIterations;
uint64_t fNumIterations;
};
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("max-iterations", bpo::value<uint64_t>()->default_value(5), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
{
return std::make_unique<Sampler>();
}

View File

@ -0,0 +1,63 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "Header.h"
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
namespace bpo = boost::program_options;
class Sink : public FairMQDevice
{
public:
Sink()
{
OnData("data", &Sink::HandleData);
}
protected:
bool HandleData(FairMQParts& parts, int)
{
LOG(info) << "Received message with " << parts.Size() << " parts";
example_multipart::Header header;
header.stopFlag = (static_cast<example_multipart::Header*>(parts.At(0)->GetData()))->stopFlag;
LOG(info) << "Received part 1 (header) with stopFlag: " << header.stopFlag;
LOG(info) << "Received part 2 of size: " << parts.At(1)->GetSize();
assert(parts.At(1)->GetSize() == 1000);
LOG(info) << "Received part 3 of size: " << parts.At(2)->GetSize();
assert(parts.At(2)->GetSize() == 500);
LOG(info) << "Received part 4 of size: " << parts.At(3)->GetSize();
assert(parts.At(3)->GetSize() == 600);
LOG(info) << "Received part 5 of size: " << parts.At(4)->GetSize();
assert(parts.At(4)->GetSize() == 700);
LOG(info) << "Received part 6 of size: " << parts.At(5)->GetSize();
assert(parts.At(5)->GetSize() == 0);
LOG(info) << "Received part 7 of size: " << parts.At(6)->GetSize();
assert(parts.At(6)->GetSize() == 100);
if (header.stopFlag == 1) {
LOG(info) << "stopFlag is 1, going IDLE";
return false;
}
return true;
}
};
void addCustomOptions(bpo::options_description& /*options*/)
{
}
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
{
return std::make_unique<Sink>();
}

View File

@ -1,48 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Broadcaster.cpp
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#include <thread> // this_thread::sleep_for
#include <chrono>
#include "Broadcaster.h"
using namespace std;
namespace example_multiple_channels
{
Broadcaster::Broadcaster()
{
}
bool Broadcaster::ConditionalRun()
{
this_thread::sleep_for(chrono::seconds(1));
// NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
// Should only be used for small data because of the cost of an additional copy
FairMQMessagePtr msg(NewSimpleMessage("OK"));
LOG(info) << "Sending OK";
Send(msg, "broadcast");
return true;
}
Broadcaster::~Broadcaster()
{
}
}

View File

@ -1,35 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Broadcaster.h
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#ifndef FAIRMQEXAMPLEMULTIPLECHANNELSBROADCASTER_H
#define FAIRMQEXAMPLEMULTIPLECHANNELSBROADCASTER_H
#include "FairMQDevice.h"
namespace example_multiple_channels
{
class Broadcaster : public FairMQDevice
{
public:
Broadcaster();
virtual ~Broadcaster();
protected:
virtual bool ConditionalRun();
};
}
#endif /* FAIRMQEXAMPLEMULTIPLECHANNELSBROADCASTER_H */

View File

@ -6,25 +6,14 @@
# copied verbatim in the file "LICENSE" #
################################################################################
add_library(ExampleMultipleChannelsLib STATIC
"Sampler.cxx"
"Sampler.h"
"Broadcaster.cxx"
"Broadcaster.h"
"Sink.cxx"
"Sink.h"
)
add_executable(fairmq-ex-multiple-channels-sampler sampler.cxx)
target_link_libraries(fairmq-ex-multiple-channels-sampler PRIVATE FairMQ)
target_link_libraries(ExampleMultipleChannelsLib PUBLIC FairMQ)
add_executable(fairmq-ex-multiple-channels-broadcaster broadcaster.cxx)
target_link_libraries(fairmq-ex-multiple-channels-broadcaster PRIVATE FairMQ)
add_executable(fairmq-ex-multiple-channels-sampler runSampler.cxx)
target_link_libraries(fairmq-ex-multiple-channels-sampler PRIVATE ExampleMultipleChannelsLib)
add_executable(fairmq-ex-multiple-channels-broadcaster runBroadcaster.cxx)
target_link_libraries(fairmq-ex-multiple-channels-broadcaster PRIVATE ExampleMultipleChannelsLib)
add_executable(fairmq-ex-multiple-channels-sink runSink.cxx)
target_link_libraries(fairmq-ex-multiple-channels-sink PRIVATE ExampleMultipleChannelsLib)
add_executable(fairmq-ex-multiple-channels-sink sink.cxx)
target_link_libraries(fairmq-ex-multiple-channels-sink PRIVATE FairMQ)
add_custom_target(ExampleMultipleChannels DEPENDS fairmq-ex-multiple-channels-sampler fairmq-ex-multiple-channels-broadcaster fairmq-ex-multiple-channels-sink)

View File

@ -1,82 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sampler.cpp
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#include <memory> // unique_ptr
#include <thread> // this_thread::sleep_for
#include <chrono>
#include "Sampler.h"
#include "FairMQPoller.h"
using namespace std;
namespace example_multiple_channels
{
Sampler::Sampler()
: fText()
, fMaxIterations(0)
, fNumIterations(0)
{
}
void Sampler::InitTask()
{
fText = fConfig->GetProperty<string>("text");
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}
void Sampler::Run()
{
FairMQPollerPtr poller(NewPoller("data", "broadcast"));
while (!NewStatePending())
{
poller->Poll(100);
if (poller->CheckInput("broadcast", 0))
{
FairMQMessagePtr msg(NewMessage());
if (Receive(msg, "broadcast") > 0)
{
LOG(info) << "Received broadcast: \"" << string(static_cast<char*>(msg->GetData()), msg->GetSize()) << "\"";
}
}
if (poller->CheckOutput("data", 0))
{
this_thread::sleep_for(chrono::seconds(1));
FairMQMessagePtr msg(NewSimpleMessage(fText));
if (Send(msg, "data") > 0)
{
LOG(info) << "Sent \"" << fText << "\"";
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations)
{
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
break;
}
}
}
}
}
Sampler::~Sampler()
{
}
}

View File

@ -1,42 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sampler.h
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#ifndef FAIRMQEXAMPLEMULTIPLECHANNELSSAMPLER_H
#define FAIRMQEXAMPLEMULTIPLECHANNELSSAMPLER_H
#include <string>
#include "FairMQDevice.h"
namespace example_multiple_channels
{
class Sampler : public FairMQDevice
{
public:
Sampler();
virtual ~Sampler();
protected:
std::string fText;
uint64_t fMaxIterations;
uint64_t fNumIterations;
virtual void Run();
virtual void InitTask();
};
}
#endif /* FAIRMQEXAMPLEMULTIPLECHANNELSSAMPLER_H */

View File

@ -1,71 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sink.cxx
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#include "Sink.h"
using namespace std;
namespace example_multiple_channels
{
Sink::Sink()
: fReceivedData(false)
, fReceivedBroadcast(false)
, fMaxIterations(0)
, fNumIterations(0)
{
OnData("broadcast", &Sink::HandleBroadcast);
OnData("data", &Sink::HandleData);
}
void Sink::InitTask()
{
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}
bool Sink::HandleBroadcast(FairMQMessagePtr& msg, int /*index*/)
{
LOG(info) << "Received broadcast: \"" << string(static_cast<char*>(msg->GetData()), msg->GetSize()) << "\"";
fReceivedBroadcast = true;
return CheckIterations();
}
bool Sink::HandleData(FairMQMessagePtr& msg, int /*index*/)
{
LOG(info) << "Received message: \"" << string(static_cast<char*>(msg->GetData()), msg->GetSize()) << "\"";
fReceivedData = true;
return CheckIterations();
}
bool Sink::CheckIterations()
{
if (fMaxIterations > 0)
{
if (fReceivedData && fReceivedBroadcast && ++fNumIterations >= fMaxIterations)
{
LOG(info) << "Configured maximum number of iterations reached & Received messages from both sources. Leaving RUNNING state.";
return false;
}
}
return true;
}
Sink::~Sink()
{
}
}

View File

@ -1,44 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sink.h
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#ifndef FAIRMQEXAMPLEMULTIPLECHANNELSSINK_H
#define FAIRMQEXAMPLEMULTIPLECHANNELSSINK_H
#include "FairMQDevice.h"
namespace example_multiple_channels
{
class Sink : public FairMQDevice
{
public:
Sink();
virtual ~Sink();
protected:
bool HandleBroadcast(FairMQMessagePtr&, int);
bool HandleData(FairMQMessagePtr&, int);
bool CheckIterations();
virtual void InitTask();
private:
bool fReceivedData;
bool fReceivedBroadcast;
uint64_t fMaxIterations;
uint64_t fNumIterations;
};
}
#endif /* FAIRMQEXAMPLEMULTIPLECHANNELSSINK_H */

View File

@ -0,0 +1,46 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
#include <thread> // this_thread::sleep_for
#include <chrono>
namespace bpo = boost::program_options;
class Broadcaster : public FairMQDevice
{
public:
Broadcaster() {}
protected:
bool ConditionalRun() override
{
std::this_thread::sleep_for(std::chrono::seconds(1));
// NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
// Should only be used for small data because of the cost of an additional copy
FairMQMessagePtr msg(NewSimpleMessage("OK"));
LOG(info) << "Sending OK";
Send(msg, "broadcast");
return true;
}
};
void addCustomOptions(bpo::options_description& /*options*/)
{
}
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
{
return std::make_unique<Broadcaster>();
}

View File

@ -1,21 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "Broadcaster.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& /*options*/)
{
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
{
return new example_multiple_channels::Broadcaster();
}

View File

@ -1,24 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "Sampler.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("text", bpo::value<std::string>()->default_value("Hello"), "Text to send out")
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
{
return new example_multiple_channels::Sampler();
}

View File

@ -1,23 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "Sink.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
{
return new example_multiple_channels::Sink();
}

View File

@ -0,0 +1,81 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <fairmq/Device.h>
#include <FairMQPoller.h>
#include <fairmq/runDevice.h>
#include <chrono>
#include <string>
#include <thread> // this_thread::sleep_for
namespace bpo = boost::program_options;
class Sampler : public FairMQDevice
{
public:
Sampler()
: fMaxIterations(0)
, fNumIterations(0)
{}
void InitTask() override
{
fText = fConfig->GetProperty<std::string>("text");
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}
void Run() override
{
FairMQPollerPtr poller(NewPoller("data", "broadcast"));
while (!NewStatePending()) {
poller->Poll(100);
if (poller->CheckInput("broadcast", 0)) {
FairMQMessagePtr msg(NewMessage());
if (Receive(msg, "broadcast") > 0) {
LOG(info) << "Received broadcast: \"" << std::string(static_cast<char*>(msg->GetData()), msg->GetSize()) << "\"";
}
}
if (poller->CheckOutput("data", 0)) {
std::this_thread::sleep_for(std::chrono::seconds(1));
FairMQMessagePtr msg(NewSimpleMessage(fText));
if (Send(msg, "data") > 0) {
LOG(info) << "Sent \"" << fText << "\"";
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
break;
}
}
}
}
}
protected:
std::string fText;
uint64_t fMaxIterations;
uint64_t fNumIterations;
};
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("text", bpo::value<std::string>()->default_value("Hello"), "Text to send out")
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
{
return std::make_unique<Sampler>();
}

View File

@ -0,0 +1,79 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
#include <cstdint> // uint64_t
#include <string>
namespace bpo = boost::program_options;
class Sink : public FairMQDevice
{
public:
Sink()
: fReceivedData(false)
, fReceivedBroadcast(false)
, fMaxIterations(0)
, fNumIterations(0)
{
OnData("broadcast", &Sink::HandleBroadcast);
OnData("data", &Sink::HandleData);
}
void InitTask() override
{
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}
bool HandleBroadcast(FairMQMessagePtr& msg, int /*index*/)
{
LOG(info) << "Received broadcast: \"" << std::string(static_cast<char*>(msg->GetData()), msg->GetSize()) << "\"";
fReceivedBroadcast = true;
return CheckIterations();
}
bool HandleData(FairMQMessagePtr& msg, int /*index*/)
{
LOG(info) << "Received message: \"" << std::string(static_cast<char*>(msg->GetData()), msg->GetSize()) << "\"";
fReceivedData = true;
return CheckIterations();
}
bool CheckIterations()
{
if (fMaxIterations > 0) {
if (fReceivedData && fReceivedBroadcast && ++fNumIterations >= fMaxIterations) {
LOG(info) << "Configured maximum number of iterations reached & Received messages from both sources. Leaving RUNNING state.";
return false;
}
}
return true;
}
private:
bool fReceivedData;
bool fReceivedBroadcast;
uint64_t fMaxIterations;
uint64_t fNumIterations;
};
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
{
return std::make_unique<Sink>();
}

View File

@ -6,25 +6,14 @@
# copied verbatim in the file "LICENSE" #
################################################################################
add_library(ExampleMultipleTransportsLib STATIC
"Sampler1.cxx"
"Sampler1.h"
"Sampler2.cxx"
"Sampler2.h"
"Sink.cxx"
"Sink.h"
)
add_executable(fairmq-ex-multiple-transports-sampler1 sampler1.cxx)
target_link_libraries(fairmq-ex-multiple-transports-sampler1 PRIVATE FairMQ)
target_link_libraries(ExampleMultipleTransportsLib PUBLIC FairMQ)
add_executable(fairmq-ex-multiple-transports-sampler2 sampler2.cxx)
target_link_libraries(fairmq-ex-multiple-transports-sampler2 PRIVATE FairMQ)
add_executable(fairmq-ex-multiple-transports-sampler1 runSampler1.cxx)
target_link_libraries(fairmq-ex-multiple-transports-sampler1 PRIVATE ExampleMultipleTransportsLib)
add_executable(fairmq-ex-multiple-transports-sampler2 runSampler2.cxx)
target_link_libraries(fairmq-ex-multiple-transports-sampler2 PRIVATE ExampleMultipleTransportsLib)
add_executable(fairmq-ex-multiple-transports-sink runSink.cxx)
target_link_libraries(fairmq-ex-multiple-transports-sink PRIVATE ExampleMultipleTransportsLib)
add_executable(fairmq-ex-multiple-transports-sink sink.cxx)
target_link_libraries(fairmq-ex-multiple-transports-sink PRIVATE FairMQ)
add_custom_target(ExampleMultipleTransports DEPENDS fairmq-ex-multiple-transports-sampler1 fairmq-ex-multiple-transports-sampler2 fairmq-ex-multiple-transports-sink)

View File

@ -1,80 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "Sampler1.h"
using namespace std;
namespace example_multiple_transports
{
Sampler1::Sampler1()
: fAckListener()
, fMaxIterations(0)
, fNumIterations(0)
{
}
void Sampler1::InitTask()
{
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}
void Sampler1::PreRun()
{
fAckListener = thread(&Sampler1::ListenForAcks, this);
}
bool Sampler1::ConditionalRun()
{
// Creates a message using the transport of channel data1
FairMQMessagePtr msg(NewMessageFor("data1", 0, 1000000));
// in case of error or transfer interruption, return false to go to IDLE state
// successfull transfer will return number of bytes transfered (can be 0 if sending an empty message).
if (Send(msg, "data1") < 0)
{
return false;
}
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations)
{
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
return false;
}
return true;
}
void Sampler1::PostRun()
{
fAckListener.join();
}
void Sampler1::ListenForAcks()
{
uint64_t numAcks = 0;
while (!NewStatePending())
{
FairMQMessagePtr ack(NewMessageFor("ack", 0));
if (Receive(ack, "ack") < 0)
{
break;
}
++numAcks;
}
LOG(info) << "Acknowledged " << numAcks << " messages";
}
Sampler1::~Sampler1()
{
}
} // namespace example_multiple_transports

View File

@ -1,39 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIRMQEXAMPLEMULTIPLETRANSPORTSSAMPLER1_H
#define FAIRMQEXAMPLEMULTIPLETRANSPORTSSAMPLER1_H
#include <thread>
#include "FairMQDevice.h"
namespace example_multiple_transports
{
class Sampler1 : public FairMQDevice
{
public:
Sampler1();
virtual ~Sampler1();
protected:
virtual void InitTask();
virtual void PreRun();
virtual bool ConditionalRun();
virtual void PostRun();
void ListenForAcks();
std::thread fAckListener;
uint64_t fMaxIterations;
uint64_t fNumIterations;
};
} // namespace example_multiple_transports
#endif /* FAIRMQEXAMPLEMULTIPLETRANSPORTSSAMPLER1_H */

View File

@ -1,51 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "Sampler2.h"
using namespace std;
namespace example_multiple_transports
{
Sampler2::Sampler2()
: fMaxIterations(0)
, fNumIterations(0)
{
}
void Sampler2::InitTask()
{
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}
bool Sampler2::ConditionalRun()
{
FairMQMessagePtr msg(NewMessage(1000));
// in case of error or transfer interruption, return false to go to IDLE state
// successfull transfer will return number of bytes transfered (can be 0 if sending an empty message).
if (Send(msg, "data2") < 0)
{
return false;
}
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations)
{
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
return false;
}
return true;
}
Sampler2::~Sampler2()
{
}
} // namespace example_multiple_transports

View File

@ -1,34 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIRMQEXAMPLEMULTIPLETRANSPORTSSAMPLER2_H
#define FAIRMQEXAMPLEMULTIPLETRANSPORTSSAMPLER2_H
#include "FairMQDevice.h"
namespace example_multiple_transports
{
class Sampler2 : public FairMQDevice
{
public:
Sampler2();
virtual ~Sampler2();
protected:
virtual void InitTask();
virtual bool ConditionalRun();
private:
uint64_t fMaxIterations;
uint64_t fNumIterations;
};
} // namespace example_multiple_transports
#endif /* FAIRMQEXAMPLEMULTIPLETRANSPORTSSAMPLER2_H */

View File

@ -1,78 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sink.cxx
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#include "Sink.h"
using namespace std;
namespace example_multiple_transports
{
Sink::Sink()
: fMaxIterations(0)
, fNumIterations1(0)
, fNumIterations2(0)
{
// register a handler for data arriving on "data" channel
OnData("data1", &Sink::HandleData1);
OnData("data2", &Sink::HandleData2);
}
void Sink::InitTask()
{
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}
// handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0)
bool Sink::HandleData1(FairMQMessagePtr& /*msg*/, int /*index*/)
{
fNumIterations1++;
// Creates a message using the transport of channel ack
FairMQMessagePtr ack(NewMessageFor("ack", 0));
if (Send(ack, "ack") < 0)
{
return false;
}
// return true if want to be called again (otherwise go to IDLE state)
return CheckIterations();
}
// handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0)
bool Sink::HandleData2(FairMQMessagePtr& /*msg*/, int /*index*/)
{
fNumIterations2++;
// return true if want to be called again (otherwise go to IDLE state)
return CheckIterations();
}
bool Sink::CheckIterations()
{
if (fMaxIterations > 0)
{
if (fNumIterations1 >= fMaxIterations && fNumIterations2 >= fMaxIterations)
{
LOG(info) << "Configured maximum number of iterations reached & Received messages from both sources. Leaving RUNNING state.";
return false;
}
}
return true;
}
Sink::~Sink()
{
}
} // namespace example_multiple_transports

View File

@ -1,43 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sink.h
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#ifndef FAIRMQEXAMPLEMULTIPLETRANSPORTSSINK_H
#define FAIRMQEXAMPLEMULTIPLETRANSPORTSSINK_H
#include "FairMQDevice.h"
namespace example_multiple_transports
{
class Sink : public FairMQDevice
{
public:
Sink();
virtual ~Sink();
protected:
virtual void InitTask();
bool HandleData1(FairMQMessagePtr&, int);
bool HandleData2(FairMQMessagePtr&, int);
bool CheckIterations();
private:
uint64_t fMaxIterations;
uint64_t fNumIterations1;
uint64_t fNumIterations2;
};
} // namespace example_multiple_transports
#endif /* FAIRMQEXAMPLEMULTIPLETRANSPORTSSINK_H */

View File

@ -1,23 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "Sampler1.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
{
return new example_multiple_transports::Sampler1();
}

View File

@ -1,23 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "Sampler2.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
{
return new example_multiple_transports::Sampler2();
}

View File

@ -1,23 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "Sink.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
{
return new example_multiple_transports::Sink();
}

View File

@ -0,0 +1,89 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
#include <thread>
namespace bpo = boost::program_options;
class Sampler1 : public FairMQDevice
{
public:
Sampler1()
: fAckListener()
, fMaxIterations(0)
, fNumIterations(0)
{}
protected:
void InitTask() override
{
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}
void PreRun() override
{
fAckListener = std::thread(&Sampler1::ListenForAcks, this);
}
bool ConditionalRun() override
{
// Creates a message using the transport of channel data1
FairMQMessagePtr msg(NewMessageFor("data1", 0, 1000000));
// in case of error or transfer interruption, return false to go to IDLE state
// successfull transfer will return number of bytes transfered (can be 0 if sending an empty message).
if (Send(msg, "data1") < 0) {
return false;
}
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
return false;
}
return true;
}
void PostRun() override
{
fAckListener.join();
}
void ListenForAcks()
{
uint64_t numAcks = 0;
while (!NewStatePending()) {
FairMQMessagePtr ack(NewMessageFor("ack", 0));
if (Receive(ack, "ack") < 0) {
break;
}
++numAcks;
}
LOG(info) << "Acknowledged " << numAcks << " messages";
}
std::thread fAckListener;
uint64_t fMaxIterations;
uint64_t fNumIterations;
};
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
{
return std::make_unique<Sampler1>();
}

View File

@ -0,0 +1,63 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
#include <cstdint> // uint64_t
namespace bpo = boost::program_options;
class Sampler2 : public FairMQDevice
{
public:
Sampler2()
: fMaxIterations(0)
, fNumIterations(0)
{}
protected:
void InitTask() override
{
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}
bool ConditionalRun() override
{
FairMQMessagePtr msg(NewMessage(1000));
// in case of error or transfer interruption, return false to go to IDLE state
// successfull transfer will return number of bytes transfered (can be 0 if sending an empty message).
if (Send(msg, "data2") < 0) {
return false;
}
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
return false;
}
return true;
}
private:
uint64_t fMaxIterations;
uint64_t fNumIterations;
};
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
{
return std::make_unique<Sampler2>();
}

View File

@ -0,0 +1,82 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
namespace bpo = boost::program_options;
class Sink : public FairMQDevice
{
public:
Sink()
: fMaxIterations(0)
, fNumIterations1(0)
, fNumIterations2(0)
{
// register a handler for data arriving on "data" channel
OnData("data1", &Sink::HandleData1);
OnData("data2", &Sink::HandleData2);
}
protected:
void InitTask() override
{
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}
// handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0)
bool HandleData1(FairMQMessagePtr& /*msg*/, int /*index*/)
{
fNumIterations1++;
// Creates a message using the transport of channel ack
FairMQMessagePtr ack(NewMessageFor("ack", 0));
if (Send(ack, "ack") < 0) {
return false;
}
// return true if want to be called again (otherwise go to IDLE state)
return CheckIterations();
}
// handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0)
bool HandleData2(FairMQMessagePtr& /*msg*/, int /*index*/)
{
fNumIterations2++;
// return true if want to be called again (otherwise go to IDLE state)
return CheckIterations();
}
bool CheckIterations()
{
if (fMaxIterations > 0) {
if (fNumIterations1 >= fMaxIterations && fNumIterations2 >= fMaxIterations) {
LOG(info) << "Configured maximum number of iterations reached & Received messages from both sources. Leaving RUNNING state.";
return false;
}
}
return true;
}
private:
uint64_t fMaxIterations;
uint64_t fNumIterations1;
uint64_t fNumIterations2;
};
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
{
return std::make_unique<Sink>();
}

View File

@ -6,13 +6,13 @@
# copied verbatim in the file "LICENSE" #
################################################################################
add_executable(fairmq-ex-n-m-synchronizer runSynchronizer.cxx)
add_executable(fairmq-ex-n-m-synchronizer synchronizer.cxx)
target_link_libraries(fairmq-ex-n-m-synchronizer PRIVATE FairMQ)
add_executable(fairmq-ex-n-m-sender runSender.cxx)
add_executable(fairmq-ex-n-m-sender sender.cxx)
target_link_libraries(fairmq-ex-n-m-sender PRIVATE FairMQ)
add_executable(fairmq-ex-n-m-receiver runReceiver.cxx)
add_executable(fairmq-ex-n-m-receiver receiver.cxx)
target_link_libraries(fairmq-ex-n-m-receiver PRIVATE FairMQ)
add_custom_target(ExampleNM DEPENDS fairmq-ex-n-m-synchronizer fairmq-ex-n-m-sender fairmq-ex-n-m-receiver)

View File

@ -8,8 +8,8 @@
#include "Header.h"
#include <FairMQDevice.h>
#include <runFairMQDevice.h>
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
#include <string>
#include <iomanip>
@ -116,4 +116,7 @@ void addCustomOptions(bpo::options_description& options)
("max-timeframes", bpo::value<int>()->default_value(0), "Maximum number of timeframes to receive (0 - unlimited)");
}
FairMQDevice* getDevice(const FairMQProgOptions& /* config */) { return new Receiver(); }
std::unique_ptr<fair::mq::Device> getDevice(FairMQProgOptions& /* config */)
{
return std::make_unique<Receiver>();
}

View File

@ -8,8 +8,8 @@
#include "Header.h"
#include <FairMQDevice.h>
#include <runFairMQDevice.h>
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
#include <string>
@ -76,4 +76,7 @@ void addCustomOptions(bpo::options_description& options)
("subtimeframe-size", bpo::value<int>()->default_value(1000), "Subtimeframe size in bytes")
("num-receivers", bpo::value<int>()->required(), "Number of EPNs");
}
FairMQDevice* getDevice(const FairMQProgOptions& /* config */) { return new Sender(); }
std::unique_ptr<fair::mq::Device> getDevice(FairMQProgOptions& /* config */)
{
return std::make_unique<Sender>();
}

View File

@ -6,8 +6,8 @@
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <FairMQDevice.h>
#include <runFairMQDevice.h>
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
#include <string>
#include <cstdint>
@ -43,4 +43,7 @@ class Synchronizer : public FairMQDevice
};
void addCustomOptions(bpo::options_description& /* options */) {}
FairMQDevice* getDevice(const FairMQProgOptions& /* config */) { return new Synchronizer(); }
std::unique_ptr<fair::mq::Device> getDevice(FairMQProgOptions& /* config */)
{
return std::make_unique<Synchronizer>();
}

View File

@ -6,16 +6,16 @@
# copied verbatim in the file "LICENSE" #
################################################################################
add_executable(fairmq-ex-qc-sampler runSampler.cxx)
add_executable(fairmq-ex-qc-sampler sampler.cxx)
target_link_libraries(fairmq-ex-qc-sampler PRIVATE FairMQ)
add_executable(fairmq-ex-qc-dispatcher runQCDispatcher.cxx)
add_executable(fairmq-ex-qc-dispatcher qCDispatcher.cxx)
target_link_libraries(fairmq-ex-qc-dispatcher PRIVATE FairMQ)
add_executable(fairmq-ex-qc-task runQCTask.cxx)
add_executable(fairmq-ex-qc-task qCTask.cxx)
target_link_libraries(fairmq-ex-qc-task PRIVATE FairMQ)
add_executable(fairmq-ex-qc-sink runSink.cxx)
add_executable(fairmq-ex-qc-sink sink.cxx)
target_link_libraries(fairmq-ex-qc-sink PRIVATE FairMQ)
add_custom_target(ExampleQC DEPENDS fairmq-ex-qc-sampler fairmq-ex-qc-dispatcher fairmq-ex-qc-task fairmq-ex-qc-sink)

View File

@ -6,8 +6,8 @@
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "FairMQDevice.h"
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
class QCDispatcher : public FairMQDevice
{
@ -56,4 +56,7 @@ class QCDispatcher : public FairMQDevice
};
void addCustomOptions(boost::program_options::options_description& /*options*/) {}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/) { return new QCDispatcher(); }
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
{
return std::make_unique<QCDispatcher>();
}

View File

@ -6,8 +6,8 @@
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "FairMQDevice.h"
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
class QCTask : public FairMQDevice
{
@ -23,4 +23,7 @@ class QCTask : public FairMQDevice
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& /*options*/) {}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/) { return new QCTask(); }
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
{
return std::make_unique<QCTask>();
}

View File

@ -6,8 +6,8 @@
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "FairMQDevice.h"
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
#include <thread> // this_thread::sleep_for
#include <chrono>
@ -33,4 +33,7 @@ class Sampler : public FairMQDevice
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description&) {}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/) { return new Sampler(); }
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
{
return std::make_unique<Sampler>();
}

View File

@ -6,8 +6,8 @@
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "FairMQDevice.h"
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
#include <string>
@ -22,4 +22,7 @@ class Sink : public FairMQDevice
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description&) {}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/) { return new Sink(); }
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
{
return std::make_unique<Sink>();
}

View File

@ -6,19 +6,19 @@
# copied verbatim in the file "LICENSE" #
################################################################################
add_executable(fairmq-ex-readout-readout runReadout.cxx)
add_executable(fairmq-ex-readout-readout readout.cxx)
target_link_libraries(fairmq-ex-readout-readout PRIVATE FairMQ)
add_executable(fairmq-ex-readout-builder runBuilder.cxx)
add_executable(fairmq-ex-readout-builder builder.cxx)
target_link_libraries(fairmq-ex-readout-builder PRIVATE FairMQ)
add_executable(fairmq-ex-readout-processor runProcessor.cxx)
add_executable(fairmq-ex-readout-processor processor.cxx)
target_link_libraries(fairmq-ex-readout-processor PRIVATE FairMQ)
add_executable(fairmq-ex-readout-sender runSender.cxx)
add_executable(fairmq-ex-readout-sender sender.cxx)
target_link_libraries(fairmq-ex-readout-sender PRIVATE FairMQ)
add_executable(fairmq-ex-readout-receiver runReceiver.cxx)
add_executable(fairmq-ex-readout-receiver receiver.cxx)
target_link_libraries(fairmq-ex-readout-receiver PRIVATE FairMQ)
set(EX_BIN_DIR ${CMAKE_CURRENT_BINARY_DIR})

View File

@ -5,15 +5,13 @@
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIRMQEXAMPLEREGIONBUILDER_H
#define FAIRMQEXAMPLEREGIONBUILDER_H
#include "FairMQDevice.h"
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
#include <string>
namespace example_readout
{
namespace bpo = boost::program_options;
class Builder : public FairMQDevice
{
@ -41,6 +39,13 @@ class Builder : public FairMQDevice
std::string fOutputChannelName;
};
} // namespace example_readout
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("output-name", bpo::value<std::string>()->default_value("bs"), "Output channel name");
}
#endif /* FAIRMQEXAMPLEREGIONBUILDER_H */
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
{
return std::make_unique<Builder>();
}

View File

@ -5,13 +5,11 @@
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIRMQEXAMPLEREGIONPROCESSOR_H
#define FAIRMQEXAMPLEREGIONPROCESSOR_H
#include "FairMQDevice.h"
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
namespace example_readout
{
namespace bpo = boost::program_options;
class Processor : public FairMQDevice
{
@ -32,6 +30,10 @@ class Processor : public FairMQDevice
}
};
} // namespace example_readout
void addCustomOptions(bpo::options_description& /* options */)
{}
#endif /* FAIRMQEXAMPLEREGIONPROCESSOR_H */
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
{
return std::make_unique<Processor>();
}

View File

@ -5,17 +5,15 @@
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIRMQEXAMPLEREADOUTREADOUT_H
#define FAIRMQEXAMPLEREADOUTREADOUT_H
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
#include <atomic>
#include <thread>
#include <chrono>
#include "FairMQDevice.h"
namespace example_readout
{
namespace bpo = boost::program_options;
class Readout : public FairMQDevice
{
@ -86,6 +84,14 @@ class Readout : public FairMQDevice
std::atomic<uint64_t> fNumUnackedMsgs;
};
} // namespace example_readout
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("msg-size", bpo::value<int>()->default_value(1000), "Message size in bytes")
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
#endif /* FAIRMQEXAMPLEREADOUTREADOUT_H */
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
{
return std::make_unique<Readout>();
}

View File

@ -5,13 +5,11 @@
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIRMQEXAMPLEREGIONRECEIVER_H
#define FAIRMQEXAMPLEREGIONRECEIVER_H
#include "FairMQDevice.h"
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
namespace example_readout
{
namespace bpo = boost::program_options;
class Receiver : public FairMQDevice
{
@ -49,6 +47,13 @@ class Receiver : public FairMQDevice
uint64_t fNumIterations;
};
} // namespace example_readout
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
#endif /* FAIRMQEXAMPLEREGIONRECEIVER_H */
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
{
return std::make_unique<Receiver>();
}

View File

@ -1,23 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "Builder.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("output-name", bpo::value<std::string>()->default_value("bs"), "Output channel name");
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
{
return new example_readout::Builder();
}

View File

@ -1,20 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "Processor.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& /* options */)
{}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
{
return new example_readout::Processor();
}

View File

@ -1,24 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "Readout.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("msg-size", bpo::value<int>()->default_value(1000), "Message size in bytes")
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
{
return new example_readout::Readout();
}

View File

@ -1,23 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "Receiver.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
{
return new example_readout::Receiver();
}

View File

@ -1,23 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "Sender.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("input-name", bpo::value<std::string>()->default_value("bs"), "Input channel name");
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
{
return new example_readout::Sender();
}

Some files were not shown because too many files have changed in this diff Show More