reduce noise in examples

This commit is contained in:
Alexey Rybalchenko
2021-05-13 22:22:49 +02:00
committed by Dennis Klein
parent a7dbeadd1c
commit aaf74ad93f
119 changed files with 1766 additions and 3143 deletions

View File

@@ -6,25 +6,14 @@
# copied verbatim in the file "LICENSE" #
################################################################################
add_library(ExampleDDSLib STATIC
"Sampler.cxx"
"Sampler.h"
"Processor.cxx"
"Processor.h"
"Sink.cxx"
"Sink.h"
)
add_executable(fairmq-ex-dds-sampler sampler.cxx)
target_link_libraries(fairmq-ex-dds-sampler PRIVATE FairMQ)
target_link_libraries(ExampleDDSLib PUBLIC FairMQ)
add_executable(fairmq-ex-dds-processor processor.cxx)
target_link_libraries(fairmq-ex-dds-processor PRIVATE FairMQ)
add_executable(fairmq-ex-dds-sampler runSampler.cxx)
target_link_libraries(fairmq-ex-dds-sampler PRIVATE ExampleDDSLib)
add_executable(fairmq-ex-dds-processor runProcessor.cxx)
target_link_libraries(fairmq-ex-dds-processor PRIVATE ExampleDDSLib)
add_executable(fairmq-ex-dds-sink runSink.cxx)
target_link_libraries(fairmq-ex-dds-sink PRIVATE ExampleDDSLib)
add_executable(fairmq-ex-dds-sink sink.cxx)
target_link_libraries(fairmq-ex-dds-sink PRIVATE FairMQ)
add_custom_target(ExampleDDS DEPENDS fairmq-ex-dds-sampler fairmq-ex-dds-processor fairmq-ex-dds-sink)

View File

@@ -1,52 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "Processor.h"
#include "FairMQLogger.h"
using namespace std;
namespace example_dds
{
Processor::Processor()
{
OnData("data1", &Processor::HandleData);
}
bool Processor::HandleData(FairMQMessagePtr& msg, int /*index*/)
{
LOG(info) << "Received data, processing...";
// Modify the received string
string* text = new std::string(static_cast<char*>(msg->GetData()), msg->GetSize());
*text += " (modified by " + fId + ")";
// create message object with a pointer to the data buffer,
// its size,
// custom deletion function (called when transfer is done),
// and pointer to the object managing the data buffer
FairMQMessagePtr msg2(NewMessage(const_cast<char*>(text->c_str()),
text->length(),
[](void* /*data*/, void* object) { delete static_cast<string*>(object); },
text));
// Send out the output message
if (Send(msg2, "data2") < 0)
{
return false;
}
return true;
}
Processor::~Processor()
{
}
} // namespace example_dds

View File

@@ -1,29 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIRMQEXAMPLEDDSPROCESSOR_H
#define FAIRMQEXAMPLEDDSPROCESSOR_H
#include "FairMQDevice.h"
namespace example_dds
{
class Processor : public FairMQDevice
{
public:
Processor();
virtual ~Processor();
protected:
bool HandleData(FairMQMessagePtr&, int);
};
}
#endif /* FAIRMQEXAMPLEDDSPROCESSOR_H */

View File

@@ -1,64 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sampler.cpp
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#include <thread> // this_thread::sleep_for
#include <chrono>
#include "Sampler.h"
using namespace std;
namespace example_dds
{
Sampler::Sampler()
{
}
void Sampler::InitTask()
{
fIterations = fConfig->GetValue<uint64_t>("iterations");
fCounter = 0;
}
bool Sampler::ConditionalRun()
{
// NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
// Should only be used for small data because of the cost of an additional copy
FairMQMessagePtr msg(NewSimpleMessage("Data"));
LOG(info) << "Sending \"Data\"";
// in case of error or transfer interruption, return false to go to IDLE state
// successfull transfer will return number of bytes transfered (can be 0 if sending an empty message).
if (Send(msg, "data1") < 0) {
return false;
}
if (fIterations > 0) {
++fCounter;
if (fCounter >= fIterations) {
LOG(info) << "Sent " << fCounter << " messages. Finished.";
return false;
}
}
return true;
}
Sampler::~Sampler()
{
}
}

View File

@@ -1,40 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sampler.h
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#ifndef FAIRMQEXAMPLEDDSSAMPLER_H
#define FAIRMQEXAMPLEDDSSAMPLER_H
#include "FairMQDevice.h"
namespace example_dds
{
class Sampler : public FairMQDevice
{
public:
Sampler();
virtual ~Sampler();
void InitTask() override;
protected:
bool ConditionalRun() override;
private:
uint64_t fIterations;
uint64_t fCounter;
};
} // namespace example_dds
#endif /* FAIRMQEXAMPLEDDSSAMPLER_H */

View File

@@ -1,54 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sink.cxx
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#include "Sink.h"
using namespace std;
namespace example_dds
{
Sink::Sink()
{
// register a handler for data arriving on "data2" channel
OnData("data2", &Sink::HandleData);
}
void Sink::InitTask()
{
fIterations = fConfig->GetValue<uint64_t>("iterations");
fCounter = 0;
}
// handler is called whenever a message arrives on "data2", with a reference to the message and a sub-channel index (here 0)
bool Sink::HandleData(FairMQMessagePtr& msg, int /*index*/)
{
LOG(info) << "Received: \"" << string(static_cast<char*>(msg->GetData()), msg->GetSize()) << "\"";
if (fIterations > 0) {
++fCounter;
if (fCounter >= fIterations) {
LOG(info) << "Received " << fCounter << " messages. Finished.";
return false;
}
}
// return true if want to be called again (otherwise go to IDLE state)
return true;
}
Sink::~Sink()
{
}
} // namespace example_dds

View File

@@ -1,40 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sink.h
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#ifndef FAIRMQEXAMPLEDDSSINK_H
#define FAIRMQEXAMPLEDDSSINK_H
#include "FairMQDevice.h"
namespace example_dds
{
class Sink : public FairMQDevice
{
public:
Sink();
virtual ~Sink();
void InitTask() override;
protected:
bool HandleData(FairMQMessagePtr&, int);
private:
uint64_t fIterations;
uint64_t fCounter;
};
} // namespace example_dds
#endif /* FAIRMQEXAMPLEDDSSINK_H */

View File

@@ -0,0 +1,58 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
#include <string>
namespace bpo = boost::program_options;
class Processor : public FairMQDevice
{
public:
Processor()
{
OnData("data1", &Processor::HandleData);
}
protected:
bool HandleData(FairMQMessagePtr& msg, int)
{
LOG(info) << "Received data, processing...";
// Modify the received string
std::string* text = new std::string(static_cast<char*>(msg->GetData()), msg->GetSize());
*text += " (modified by " + fId + ")";
// create message object with a pointer to the data buffer,
// its size,
// custom deletion function (called when transfer is done),
// and pointer to the object managing the data buffer
FairMQMessagePtr msg2(NewMessage(const_cast<char*>(text->c_str()),
text->length(),
[](void* /*data*/, void* object) { delete static_cast<std::string*>(object); },
text));
// Send out the output message
if (Send(msg2, "data2") < 0) {
return false;
}
return true;
}
};
void addCustomOptions(bpo::options_description& /*options*/)
{
}
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
{
return std::make_unique<Processor>();
}

View File

@@ -1,21 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "Processor.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& /*options*/)
{
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
{
return new example_dds::Processor();
}

View File

@@ -1,25 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "Sampler.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& options)
{
options.add_options()(
"iterations,i",
bpo::value<uint64_t>()->default_value(1000),
"Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
{
return new example_dds::Sampler();
}

View File

@@ -1,25 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "Sink.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& options)
{
options.add_options()(
"iterations,i",
bpo::value<uint64_t>()->default_value(1000),
"Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
{
return new example_dds::Sink();
}

71
examples/dds/sampler.cxx Normal file
View File

@@ -0,0 +1,71 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
#include <cstdint> // uint64_t
namespace bpo = boost::program_options;
class Sampler : public FairMQDevice
{
public:
Sampler()
: fIterations(0)
, fCounter(0)
{}
void InitTask() override
{
fIterations = fConfig->GetValue<uint64_t>("iterations");
}
protected:
bool ConditionalRun() override
{
// NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
// Should only be used for small data because of the cost of an additional copy
FairMQMessagePtr msg(NewSimpleMessage("Data"));
LOG(info) << "Sending \"Data\"";
// in case of error or transfer interruption, return false to go to IDLE state
// successfull transfer will return number of bytes transfered (can be 0 if sending an empty message).
if (Send(msg, "data1") < 0) {
return false;
}
if (fIterations > 0) {
++fCounter;
if (fCounter >= fIterations) {
LOG(info) << "Sent " << fCounter << " messages. Finished.";
return false;
}
}
return true;
}
private:
uint64_t fIterations;
uint64_t fCounter;
};
void addCustomOptions(bpo::options_description& options)
{
options.add_options()(
"iterations,i",
bpo::value<uint64_t>()->default_value(1000),
"Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
{
return std::make_unique<Sampler>();
}

62
examples/dds/sink.cxx Normal file
View File

@@ -0,0 +1,62 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
#include <string>
namespace bpo = boost::program_options;
class Sink : public FairMQDevice
{
public:
Sink()
: fIterations(0)
, fCounter(0)
{
OnData("data2", &Sink::HandleData);
}
void InitTask() override
{
fIterations = fConfig->GetValue<uint64_t>("iterations");
}
protected:
bool HandleData(FairMQMessagePtr& msg, int)
{
LOG(info) << "Received: \"" << std::string(static_cast<char*>(msg->GetData()), msg->GetSize()) << "\"";
if (fIterations > 0) {
++fCounter;
if (fCounter >= fIterations) {
LOG(info) << "Received " << fCounter << " messages. Finished.";
return false;
}
}
// return true if want to be called again (otherwise go to IDLE state)
return true;
}
private:
uint64_t fIterations;
uint64_t fCounter;
};
void addCustomOptions(bpo::options_description& options)
{
options.add_options()(
"iterations,i",
bpo::value<uint64_t>()->default_value(1000),
"Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
{
return std::make_unique<Sink>();
}