Refactor the examples after move from FairRoot

This commit is contained in:
Alexey Rybalchenko
2018-04-26 23:06:01 +02:00
committed by Mohammad Al-Turany
parent 31cba0515e
commit bab7e13737
148 changed files with 1582 additions and 2027 deletions

View File

@@ -0,0 +1,62 @@
################################################################################
# Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# #
# This software is distributed under the terms of the #
# GNU Lesser General Public Licence (LGPL) version 3, #
# copied verbatim in the file "LICENSE" #
################################################################################
add_library(Example1N1Lib STATIC
"Sampler.cxx"
"Sampler.h"
"Processor.cxx"
"Processor.h"
"Sink.cxx"
"Sink.h"
)
target_link_libraries(Example1N1Lib PUBLIC FairMQ)
add_executable(fairmq-ex-1-n-1-sampler runSampler.cxx)
target_link_libraries(fairmq-ex-1-n-1-sampler PRIVATE Example1N1Lib)
add_executable(fairmq-ex-1-n-1-processor runProcessor.cxx)
target_link_libraries(fairmq-ex-1-n-1-processor PRIVATE Example1N1Lib)
add_executable(fairmq-ex-1-n-1-sink runSink.cxx)
target_link_libraries(fairmq-ex-1-n-1-sink PRIVATE Example1N1Lib)
add_custom_target(Example1N1 DEPENDS fairmq-ex-1-n-1-sampler fairmq-ex-1-n-1-processor fairmq-ex-1-n-1-sink)
install(
TARGETS
fairmq-ex-1-n-1-sampler
fairmq-ex-1-n-1-processor
fairmq-ex-1-n-1-sink
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
)
# configure run script with different executable paths for build and for install directories
set(EX_BIN_DIR ${CMAKE_CURRENT_BINARY_DIR})
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-1-n-1.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-1-n-1.sh)
set(EX_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR})
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-1-n-1.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-1-n-1.sh_install)
install(
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-1-n-1.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-1-n-1.sh
)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/ex-1-n-1.json ${CMAKE_CURRENT_BINARY_DIR}/ex-1-n-1.json)
install(
FILES ${CMAKE_CURRENT_BINARY_DIR}/ex-1-n-1.json
DESTINATION ${PROJECT_INSTALL_BINDIR} # TODO: install this in a more appropriate directory
)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test-ex-1-n-1.sh.in ${CMAKE_CURRENT_BINARY_DIR}/test-ex-1-n-1.sh)
add_test(NAME Example-1-n-1 COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-1-n-1.sh)
set_tests_properties(Example-1-n-1 PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received: ")

View File

@@ -0,0 +1,53 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "Processor.h"
#include <string>
using namespace std;
namespace example_1_n_1
{
Processor::Processor()
{
OnData("data1", &Processor::HandleData);
}
bool Processor::HandleData(FairMQMessagePtr& msg, int /*index*/)
{
LOG(info) << "Received data, processing...";
// Modify the received string
string* text = new std::string(static_cast<char*>(msg->GetData()), msg->GetSize());
*text += " (modified by " + fId + ")";
// create message object with a pointer to the data buffer,
// its size,
// custom deletion function (called when transfer is done),
// and pointer to the object managing the data buffer
FairMQMessagePtr msg2(NewMessage(const_cast<char*>(text->c_str()),
text->length(),
[](void* /*data*/, void* object) { delete static_cast<string*>(object); },
text));
// Send out the output message
if (Send(msg2, "data2") < 0)
{
return false;
}
return true;
}
Processor::~Processor()
{
}
} // namespace example_1_n_1

View File

@@ -0,0 +1,29 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIRMQEXAMPLE1N1PROCESSOR_H_
#define FAIRMQEXAMPLE1N1PROCESSOR_H_
#include "FairMQDevice.h"
namespace example_1_n_1
{
class Processor : public FairMQDevice
{
public:
Processor();
virtual ~Processor();
protected:
bool HandleData(FairMQMessagePtr&, int);
};
} // namespace example_1_n_1
#endif /* FAIRMQEXAMPLE1N1PROCESSOR_H_ */

12
examples/1-n-1/README.md Normal file
View File

@@ -0,0 +1,12 @@
Example 2: Sampler -> Processor -> Sink
===============
A simple topology of three devices - **Sampler**, **Processor** and **Sink**. **Sampler** sends data to one or more **Processor**s, who modify the data and send it to one **Sink**. Transport with the **PUSH-PULL** pattern.
For this example both processor devices share same configuration, and can therefore use same setting from the JSON file. But since their ID still has to be unique, additional command line argument must be used to allow them to share configuration. This parameter is `--config-key`.
In this example the Sampler is configured to **bind** its output and the Sink is configured to also **bind** its input. This allows us run any number of processors with the same configuration, because they all connect to same Sampler and Sink addresses. Furthermore, it allows adding of processors dynamically during run-time. The PUSH and PULL sockets will handle the data distribution to/from the new devices according to their distribution strategies ([Round-robin output for PUSH](http://api.zeromq.org/4-0:zmq-socket#toc14) and [Fair-queued input for PULL](http://api.zeromq.org/4-0:zmq-socket#toc15)).
The Sampler sends out a simple text string (its content configurable with `--text` command line parameter, defaul is "Hello"). Each Processor modifies the string by appending its ID to it and send it to the Sink.
The provided configuration file contains two Processors. To add more Processors, you can either extend the configuration file, or create a separate file only for new processors.

View File

@@ -0,0 +1,68 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sampler.cpp
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#include <thread> // this_thread::sleep_for
#include <chrono>
#include "Sampler.h"
using namespace std;
namespace example_1_n_1
{
Sampler::Sampler()
: fText()
, fMaxIterations(0)
, fNumIterations(0)
{
}
void Sampler::InitTask()
{
// Get the fText and fMaxIterations values from the command line options (via fConfig)
fText = fConfig->GetValue<string>("text");
fMaxIterations = fConfig->GetValue<uint64_t>("max-iterations");
}
bool Sampler::ConditionalRun()
{
// Initializing message with NewStaticMessage will avoid copy
// but won't delete the data after the sending is completed.
FairMQMessagePtr msg(NewStaticMessage(fText));
LOG(info) << "Sending \"" << fText << "\"";
// in case of error or transfer interruption, return false to go to IDLE state
// successfull transfer will return number of bytes transfered (can be 0 if sending an empty message).
if (Send(msg, "data1") < 0)
{
return false;
}
else if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations)
{
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
return false;
}
this_thread::sleep_for(chrono::seconds(1));
return true;
}
Sampler::~Sampler()
{
}
} // namespace example_1_n_1

42
examples/1-n-1/Sampler.h Normal file
View File

@@ -0,0 +1,42 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sampler.h
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#ifndef FAIRMQEXAMPLE1N1SAMPLER_H_
#define FAIRMQEXAMPLE1N1SAMPLER_H_
#include <string>
#include "FairMQDevice.h"
namespace example_1_n_1
{
class Sampler : public FairMQDevice
{
public:
Sampler();
virtual ~Sampler();
protected:
std::string fText;
uint64_t fMaxIterations;
uint64_t fNumIterations;
virtual void InitTask();
virtual bool ConditionalRun();
};
} // namespace example_1_n_1
#endif /* FAIRMQEXAMPLE1N1SAMPLER_H_ */

55
examples/1-n-1/Sink.cxx Normal file
View File

@@ -0,0 +1,55 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sink.cxx
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#include "Sink.h"
using namespace std;
namespace example_1_n_1
{
Sink::Sink()
: fMaxIterations(0)
, fNumIterations(0)
{
// register a handler for data arriving on "data2" channel
OnData("data2", &Sink::HandleData);
}
void Sink::InitTask()
{
// Get the fMaxIterations value from the command line options (via fConfig)
fMaxIterations = fConfig->GetValue<uint64_t>("max-iterations");
}
// handler is called whenever a message arrives on "data2", with a reference to the message and a sub-channel index (here 0)
bool Sink::HandleData(FairMQMessagePtr& msg, int /*index*/)
{
LOG(info) << "Received: \"" << string(static_cast<char*>(msg->GetData()), msg->GetSize()) << "\"";
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations)
{
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
return false;
}
// return true if want to be called again (otherwise return false go to IDLE state)
return true;
}
Sink::~Sink()
{
}
} // namespace example_1_n_1

40
examples/1-n-1/Sink.h Normal file
View File

@@ -0,0 +1,40 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sink.h
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#ifndef FAIRMQEXAMPLE1N1SINK_H
#define FAIRMQEXAMPLE1N1SINK_H
#include "FairMQDevice.h"
namespace example_1_n_1
{
class Sink : public FairMQDevice
{
public:
Sink();
virtual ~Sink();
protected:
virtual void InitTask();
bool HandleData(FairMQMessagePtr&, int);
private:
uint64_t fMaxIterations;
uint64_t fNumIterations;
};
} // namespace example_1_n_1
#endif /* FAIRMQEXAMPLE1N1SINK_H */

View File

@@ -0,0 +1,73 @@
{
"fairMQOptions": {
"devices": [
{
"id": "sampler1",
"channels": [
{
"name": "data1",
"sockets": [
{
"type": "push",
"method": "bind",
"address": "tcp://*:5555",
"sndBufSize": 1000,
"rcvBufSize": 1000,
"rateLogging": 0
}
]
}
]
},
{
"key": "processor",
"channels": [
{
"name": "data1",
"sockets": [
{
"type": "pull",
"method": "connect",
"address": "tcp://localhost:5555",
"sndBufSize": 1000,
"rcvBufSize": 1000,
"rateLogging": 0
}
]
},
{
"name": "data2",
"sockets": [
{
"type": "push",
"method": "connect",
"address": "tcp://localhost:5556",
"sndBufSize": 1000,
"rcvBufSize": 1000,
"rateLogging": 0
}
]
}
]
},
{
"id": "sink1",
"channels": [
{
"name": "data2",
"sockets": [
{
"type": "pull",
"method": "bind",
"address": "tcp://*:5556",
"sndBufSize": 1000,
"rcvBufSize": 1000,
"rateLogging": 0
}
]
}
]
}
]
}
}

View File

@@ -0,0 +1,25 @@
#!/bin/bash
ex2config="@EX_BIN_DIR@/ex-1-n-1.json"
SAMPLER="fairmq-ex-1-n-1-sampler"
SAMPLER+=" --id sampler1"
SAMPLER+=" --mq-config $ex2config"
xterm -geometry 80x23+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER &
PROCESSOR1="fairmq-ex-1-n-1-processor"
PROCESSOR1+=" --id processor1"
PROCESSOR1+=" --mq-config $ex2config"
PROCESSOR1+=" --config-key processor"
xterm -geometry 80x23+500+0 -hold -e @EX_BIN_DIR@/$PROCESSOR1 &
PROCESSOR2="fairmq-ex-1-n-1-processor"
PROCESSOR2+=" --id processor2"
PROCESSOR2+=" --mq-config $ex2config"
PROCESSOR2+=" --config-key processor"
xterm -geometry 80x23+500+330 -hold -e @EX_BIN_DIR@/$PROCESSOR2 &
SINK="fairmq-ex-1-n-1-sink"
SINK+=" --id sink1"
SINK+=" --mq-config $ex2config"
xterm -geometry 80x23+1000+0 -hold -e @EX_BIN_DIR@/$SINK &

View File

@@ -0,0 +1,21 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "Processor.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& /*options*/)
{
}
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
{
return new example_1_n_1::Processor();
}

View File

@@ -0,0 +1,24 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "Sampler.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("text", bpo::value<std::string>()->default_value("Hello"), "Text to send out")
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
{
return new example_1_n_1::Sampler();
}

View File

@@ -0,0 +1,23 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "Sink.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
{
return new example_1_n_1::Sink();
}

View File

@@ -0,0 +1,54 @@
#!/bin/bash
ex2config="@CMAKE_CURRENT_BINARY_DIR@/ex-1-n-1.json"
# setup a trap to kill everything if the test fails/timeouts
trap 'kill -TERM $SAMPLER_PID; kill -TERM $SINK_PID; kill -TERM $PROCESSOR1_PID; kill -TERM $PROCESSOR2_PID; wait $SAMPLER_PID; wait $SINK_PID; wait $PROCESSOR1_PID; wait $PROCESSOR2_PID;' TERM
SAMPLER="fairmq-ex-1-n-1-sampler"
SAMPLER+=" --id sampler1"
SAMPLER+=" --verbosity veryhigh"
SAMPLER+=" --control static --color false"
SAMPLER+=" --max-iterations 2"
SAMPLER+=" --mq-config $ex2config"
@CMAKE_CURRENT_BINARY_DIR@/$SAMPLER &
SAMPLER_PID=$!
PROCESSOR1="fairmq-ex-1-n-1-processor"
PROCESSOR1+=" --id processor1"
PROCESSOR1+=" --verbosity veryhigh"
PROCESSOR1+=" --control static --color false"
PROCESSOR1+=" --mq-config $ex2config"
PROCESSOR1+=" --config-key processor"
@CMAKE_CURRENT_BINARY_DIR@/$PROCESSOR1 &
PROCESSOR1_PID=$!
PROCESSOR2="fairmq-ex-1-n-1-processor"
PROCESSOR2+=" --id processor2"
PROCESSOR2+=" --verbosity veryhigh"
PROCESSOR2+=" --control static --color false"
PROCESSOR2+=" --mq-config $ex2config"
PROCESSOR2+=" --config-key processor"
@CMAKE_CURRENT_BINARY_DIR@/$PROCESSOR2 &
PROCESSOR2_PID=$!
SINK="fairmq-ex-1-n-1-sink"
SINK+=" --id sink1"
SINK+=" --verbosity veryhigh"
SINK+=" --control static --color false"
SINK+=" --max-iterations 2"
SINK+=" --mq-config $ex2config"
@CMAKE_CURRENT_BINARY_DIR@/$SINK &
SINK_PID=$!
# wait for sampler and sink to finish
wait $SAMPLER_PID
wait $SINK_PID
# stop processors
kill -SIGINT $PROCESSOR1_PID
kill -SIGINT $PROCESSOR2_PID
# wait for everything to finish
wait $PROCESSOR1_PID
wait $PROCESSOR2_PID