Add first two simple FairMQ examples

fairmq/examples/1-sampler-sink
fairmq/examples/2-sampler-processor-sink
This commit is contained in:
Alexey Rybalchenko 2015-06-22 12:23:21 +02:00
parent 5f20a28b04
commit 2656d8098c
30 changed files with 1326 additions and 39 deletions

View File

@ -6,13 +6,18 @@
# copied verbatim in the file "LICENSE" #
################################################################################
configure_file(${CMAKE_SOURCE_DIR}/fairmq/options/ProgOptionTest/macro/bsampler-sink.json ${CMAKE_BINARY_DIR}/bin/bsampler-sink.json)
configure_file(${CMAKE_SOURCE_DIR}/fairmq/options/ProgOptionTest/macro/bsampler-sink.json ${CMAKE_BINARY_DIR}/bin/config/example-bsampler-sink.json)
configure_file(${CMAKE_SOURCE_DIR}/fairmq/examples/1-sampler-sink/sampler-sink.json ${CMAKE_BINARY_DIR}/bin/config/ex1-sampler-sink.json)
configure_file(${CMAKE_SOURCE_DIR}/fairmq/examples/2-sampler-processor-sink/sampler-processor-sink.json ${CMAKE_BINARY_DIR}/bin/config/ex2-sampler-processor-sink.json)
Set(INCLUDE_DIRECTORIES
${CMAKE_SOURCE_DIR}/fairmq
${CMAKE_SOURCE_DIR}/fairmq/devices
${CMAKE_SOURCE_DIR}/fairmq/tools
${CMAKE_SOURCE_DIR}/fairmq/options
${CMAKE_SOURCE_DIR}/fairmq/examples/1-sampler-sink
${CMAKE_SOURCE_DIR}/fairmq/examples/2-sampler-processor-sink
${CMAKE_SOURCE_DIR}/fairmq/examples/req-rep
)
Set(SYSTEM_INCLUDE_DIRECTORIES
@ -22,7 +27,6 @@ Set(SYSTEM_INCLUDE_DIRECTORIES
If(PROTOBUF_FOUND)
Set(INCLUDE_DIRECTORIES
${INCLUDE_DIRECTORIES}
${CMAKE_SOURCE_DIR}/fairmq/examples/req-rep
# # following directory is only for protobuf tests and is not essential part of FairMQ
#${CMAKE_SOURCE_DIR}/fairmq/prototest
)
@ -71,15 +75,25 @@ set(SRCS
"FairMQChannel.cxx"
"FairMQDevice.cxx"
"FairMQPoller.cxx"
"devices/FairMQBenchmarkSampler.cxx"
"devices/FairMQSink.cxx"
"devices/FairMQBuffer.cxx"
"devices/FairMQProxy.cxx"
"devices/FairMQSplitter.cxx"
"devices/FairMQMerger.cxx"
"options/FairProgOptions.cxx"
"options/FairMQProgOptions.cxx"
"options/FairMQParser.cxx"
"examples/1-sampler-sink/FairMQExample1Sampler.cxx"
"examples/1-sampler-sink/FairMQExample1Sink.cxx"
"examples/2-sampler-processor-sink/FairMQExample2Sampler.cxx"
"examples/2-sampler-processor-sink/FairMQExample2Processor.cxx"
"examples/2-sampler-processor-sink/FairMQExample2Sink.cxx"
"examples/req-rep/FairMQExampleClient.cxx"
"examples/req-rep/FairMQExampleServer.cxx"
)
@ -155,8 +169,13 @@ set(Exe_Names
splitter
merger
proxy
example_client
example_server
ex1-sampler
ex1-sink
ex2-sampler
ex2-processor
ex2-sink
example-client
example-server
)
# following executables are only for protobuf tests and are not essential part of FairMQ
@ -177,6 +196,11 @@ set(Exe_Source
run/runSplitter.cxx
run/runMerger.cxx
run/runProxy.cxx
examples/1-sampler-sink/runExample1Sampler.cxx
examples/1-sampler-sink/runExample1Sink.cxx
examples/2-sampler-processor-sink/runExample2Sampler.cxx
examples/2-sampler-processor-sink/runExample2Processor.cxx
examples/2-sampler-processor-sink/runExample2Sink.cxx
examples/req-rep/runExampleClient.cxx
examples/req-rep/runExampleServer.cxx
)

View File

@ -0,0 +1,93 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQExample1Sampler.cpp
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include "FairMQExample1Sampler.h"
#include "FairMQLogger.h"
FairMQExample1Sampler::FairMQExample1Sampler()
: fText()
{
}
void FairMQExample1Sampler::CustomCleanup(void *data, void *object)
{
delete (std::string*)object;
}
void FairMQExample1Sampler::Run()
{
while (GetCurrentState() == RUNNING)
{
boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
std::string* text = new std::string(fText);
FairMQMessage* msg = fTransportFactory->CreateMessage(const_cast<char*>(text->c_str()), text->length(), CustomCleanup, text);
LOG(INFO) << "Sending \"" << fText << "\"";
fChannels["data-out"].at(0).Send(msg);
}
}
FairMQExample1Sampler::~FairMQExample1Sampler()
{
}
void FairMQExample1Sampler::SetProperty(const int key, const std::string& value)
{
switch (key)
{
case Text:
fText = value;
break;
default:
FairMQDevice::SetProperty(key, value);
break;
}
}
std::string FairMQExample1Sampler::GetProperty(const int key, const std::string& default_ /*= ""*/)
{
switch (key)
{
case Text:
return fText;
break;
default:
return FairMQDevice::GetProperty(key, default_);
}
}
void FairMQExample1Sampler::SetProperty(const int key, const int value)
{
switch (key)
{
default:
FairMQDevice::SetProperty(key, value);
break;
}
}
int FairMQExample1Sampler::GetProperty(const int key, const int default_ /*= 0*/)
{
switch (key)
{
default:
return FairMQDevice::GetProperty(key, default_);
}
}

View File

@ -0,0 +1,46 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQExample1Sampler.h
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#ifndef FAIRMQEXAMPLESAMPLER_H_
#define FAIRMQEXAMPLESAMPLER_H_
#include <string>
#include "FairMQDevice.h"
class FairMQExample1Sampler : public FairMQDevice
{
public:
enum
{
Text = FairMQDevice::Last,
Last
};
FairMQExample1Sampler();
virtual ~FairMQExample1Sampler();
static void CustomCleanup(void* data, void* hint);
virtual void SetProperty(const int key, const std::string& value);
virtual std::string GetProperty(const int key, const std::string& default_ = "");
virtual void SetProperty(const int key, const int value);
virtual int GetProperty(const int key, const int default_ = 0);
protected:
std::string fText;
virtual void Run();
};
#endif /* FAIRMQEXAMPLE1SAMPLER_H_ */

View File

@ -0,0 +1,45 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQExample1Sink.cxx
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include "FairMQExample1Sink.h"
#include "FairMQLogger.h"
using namespace std;
FairMQExample1Sink::FairMQExample1Sink()
{
}
void FairMQExample1Sink::Run()
{
while (GetCurrentState() == RUNNING)
{
FairMQMessage* msg = fTransportFactory->CreateMessage();
fChannels["data-in"].at(0).Receive(msg);
LOG(INFO) << "Received message: \""
<< string(static_cast<char*>(msg->GetData()), msg->GetSize())
<< "\"";
delete msg;
}
}
FairMQExample1Sink::~FairMQExample1Sink()
{
}

View File

@ -0,0 +1,30 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQExample1Sink.h
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#ifndef FAIRMQEXAMPLE1SINK_H_
#define FAIRMQEXAMPLE1SINK_H_
#include "FairMQDevice.h"
class FairMQExample1Sink : public FairMQDevice
{
public:
FairMQExample1Sink();
virtual ~FairMQExample1Sink();
protected:
virtual void Run();
};
#endif /* FAIRMQEXAMPLE1SINK_H_ */

View File

@ -0,0 +1,125 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* runExampleSampler.cxx
*
* @since 2013-04-23
* @author D. Klein, A. Rybalchenko
*/
#include <iostream>
#include <csignal>
#include "boost/program_options.hpp"
#include "FairMQLogger.h"
#include "FairMQParser.h"
#include "FairMQProgOptions.h"
#include "FairMQExample1Sampler.h"
#ifdef NANOMSG
#include "FairMQTransportFactoryNN.h"
#else
#include "FairMQTransportFactoryZMQ.h"
#endif
using namespace boost::program_options;
FairMQExample1Sampler sampler;
static void s_signal_handler(int signal)
{
LOG(INFO) << "Caught signal " << signal;
sampler.ChangeState("END");
LOG(INFO) << "Shutdown complete.";
exit(1);
}
static void s_catch_signals(void)
{
struct sigaction action;
action.sa_handler = s_signal_handler;
action.sa_flags = 0;
sigemptyset(&action.sa_mask);
sigaction(SIGINT, &action, NULL);
sigaction(SIGTERM, &action, NULL);
}
int main(int argc, char** argv)
{
s_catch_signals();
FairMQProgOptions config;
try
{
std::string text;
options_description samplerOptions("Sampler options");
samplerOptions.add_options()
("text", value<std::string>(&text)->default_value("Hello"), "Text to send out");
config.AddToCmdLineOptions(samplerOptions);
if (config.ParseAll(argc, argv))
{
return 0;
}
std::string filename = config.GetValue<std::string>("config-json-file");
std::string id = config.GetValue<std::string>("id");
config.UserParser<FairMQParser::JSON>(filename, id);
sampler.fChannels = config.GetFairMQMap();
LOG(INFO) << "PID: " << getpid();
#ifdef NANOMSG
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
#else
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
#endif
sampler.SetTransport(transportFactory);
sampler.SetProperty(FairMQExample1Sampler::Id, id);
sampler.SetProperty(FairMQExample1Sampler::Text, text);
sampler.ChangeState("INIT_DEVICE");
sampler.WaitForEndOfState("INIT_DEVICE");
sampler.ChangeState("INIT_TASK");
sampler.WaitForEndOfState("INIT_TASK");
sampler.ChangeState("RUN");
sampler.WaitForEndOfState("RUN");
sampler.ChangeState("STOP");
sampler.ChangeState("RESET_TASK");
sampler.WaitForEndOfState("RESET_TASK");
sampler.ChangeState("RESET_DEVICE");
sampler.WaitForEndOfState("RESET_DEVICE");
sampler.ChangeState("END");
}
catch (std::exception& e)
{
LOG(ERROR) << e.what();
LOG(INFO) << "Command line options are the following: ";
config.PrintHelp();
return 1;
}
return 0;
}

View File

@ -0,0 +1,116 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* runExampleSink.cxx
*
* @since 2013-04-23
* @author D. Klein, A. Rybalchenko
*/
#include <iostream>
#include <csignal>
#include "boost/program_options.hpp"
#include "FairMQLogger.h"
#include "FairMQParser.h"
#include "FairMQProgOptions.h"
#include "FairMQExample1Sink.h"
#ifdef NANOMSG
#include "FairMQTransportFactoryNN.h"
#else
#include "FairMQTransportFactoryZMQ.h"
#endif
using namespace boost::program_options;
FairMQExample1Sink sink;
static void s_signal_handler(int signal)
{
LOG(INFO) << "Caught signal " << signal;
sink.ChangeState(FairMQExample1Sink::END);
LOG(INFO) << "Shutdown complete.";
exit(1);
}
static void s_catch_signals(void)
{
struct sigaction action;
action.sa_handler = s_signal_handler;
action.sa_flags = 0;
sigemptyset(&action.sa_mask);
sigaction(SIGINT, &action, NULL);
sigaction(SIGTERM, &action, NULL);
}
int main(int argc, char** argv)
{
s_catch_signals();
FairMQProgOptions config;
try
{
if (config.ParseAll(argc, argv))
{
return 0;
}
std::string filename = config.GetValue<std::string>("config-json-file");
std::string id = config.GetValue<std::string>("id");
config.UserParser<FairMQParser::JSON>(filename, id);
sink.fChannels = config.GetFairMQMap();
LOG(INFO) << "PID: " << getpid();
#ifdef NANOMSG
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
#else
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
#endif
sink.SetTransport(transportFactory);
sink.SetProperty(FairMQExample1Sink::Id, id);
sink.ChangeState("INIT_DEVICE");
sink.WaitForEndOfState("INIT_DEVICE");
sink.ChangeState("INIT_TASK");
sink.WaitForEndOfState("INIT_TASK");
sink.ChangeState("RUN");
sink.WaitForEndOfState("RUN");
sink.ChangeState("STOP");
sink.ChangeState("RESET_TASK");
sink.WaitForEndOfState("RESET_TASK");
sink.ChangeState("RESET_DEVICE");
sink.WaitForEndOfState("RESET_DEVICE");
sink.ChangeState("END");
}
catch (std::exception& e)
{
LOG(ERROR) << e.what();
LOG(INFO) << "Command line options are the following: ";
config.PrintHelp();
return 1;
}
return 0;
}

View File

@ -0,0 +1,41 @@
{
"fairMQOptions":
{
"device":
{
"id": "sampler1",
"channel":
{
"name": "data-out",
"socket":
{
"type": "push",
"method": "bind",
"address": "tcp://*:5555",
"sndBufSize": "1000",
"rcvBufSize": "1000",
"rateLogging": "0"
}
}
},
"device":
{
"id": "sink1",
"channel":
{
"name": "data-in",
"socket":
{
"type": "pull",
"method": "connect",
"address": "tcp://localhost:5556",
"sndBufSize": "1000",
"rcvBufSize": "1000",
"rateLogging": "0"
}
}
}
}
}

View File

@ -0,0 +1,52 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQExample2Processor.cpp
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include "FairMQExample2Processor.h"
#include "FairMQLogger.h"
FairMQExample2Processor::FairMQExample2Processor()
: fText()
{
}
void FairMQExample2Processor::CustomCleanup(void *data, void *object)
{
delete (std::string*)object;
}
void FairMQExample2Processor::Run()
{
while (GetCurrentState() == RUNNING)
{
FairMQMessage* input = fTransportFactory->CreateMessage();
fChannels["data-in"].at(0).Receive(input);
LOG(INFO) << "Received data, processing...";
std::string* text = new std::string(static_cast<char*>(input->GetData()), input->GetSize());
*text += " (modified by " + fId + ")";
delete input;
FairMQMessage* msg = fTransportFactory->CreateMessage(const_cast<char*>(text->c_str()), text->length(), CustomCleanup, text);
fChannels["data-out"].at(0).Send(msg);
}
}
FairMQExample2Processor::~FairMQExample2Processor()
{
}

View File

@ -0,0 +1,41 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQExample2Processor.h
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#ifndef FAIRMQEXAMPLE2PROCESSOR_H_
#define FAIRMQEXAMPLE2PROCESSOR_H_
#include <string>
#include "FairMQDevice.h"
class FairMQExample2Processor : public FairMQDevice
{
public:
enum
{
Text = FairMQDevice::Last,
Last
};
FairMQExample2Processor();
virtual ~FairMQExample2Processor();
static void CustomCleanup(void* data, void* hint);
protected:
std::string fText;
virtual void Run();
};
#endif /* FAIRMQEXAMPLE2PROCESSOR_H_ */

View File

@ -0,0 +1,93 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQExample2Sampler.cpp
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include "FairMQExample2Sampler.h"
#include "FairMQLogger.h"
FairMQExample2Sampler::FairMQExample2Sampler()
: fText()
{
}
void FairMQExample2Sampler::CustomCleanup(void *data, void *object)
{
delete (std::string*)object;
}
void FairMQExample2Sampler::Run()
{
while (GetCurrentState() == RUNNING)
{
boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
std::string* text = new std::string(fText);
FairMQMessage* msg = fTransportFactory->CreateMessage(const_cast<char*>(text->c_str()), text->length(), CustomCleanup, text);
LOG(INFO) << "Sending \"" << fText << "\"";
fChannels["data-out"].at(0).Send(msg);
}
}
FairMQExample2Sampler::~FairMQExample2Sampler()
{
}
void FairMQExample2Sampler::SetProperty(const int key, const std::string& value)
{
switch (key)
{
case Text:
fText = value;
break;
default:
FairMQDevice::SetProperty(key, value);
break;
}
}
std::string FairMQExample2Sampler::GetProperty(const int key, const std::string& default_ /*= ""*/)
{
switch (key)
{
case Text:
return fText;
break;
default:
return FairMQDevice::GetProperty(key, default_);
}
}
void FairMQExample2Sampler::SetProperty(const int key, const int value)
{
switch (key)
{
default:
FairMQDevice::SetProperty(key, value);
break;
}
}
int FairMQExample2Sampler::GetProperty(const int key, const int default_ /*= 0*/)
{
switch (key)
{
default:
return FairMQDevice::GetProperty(key, default_);
}
}

View File

@ -0,0 +1,46 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQExample2Sampler.h
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#ifndef FAIRMQEXAMPLE2SAMPLER_H_
#define FAIRMQEXAMPLE2SAMPLER_H_
#include <string>
#include "FairMQDevice.h"
class FairMQExample2Sampler : public FairMQDevice
{
public:
enum
{
Text = FairMQDevice::Last,
Last
};
FairMQExample2Sampler();
virtual ~FairMQExample2Sampler();
static void CustomCleanup(void* data, void* hint);
virtual void SetProperty(const int key, const std::string& value);
virtual std::string GetProperty(const int key, const std::string& default_ = "");
virtual void SetProperty(const int key, const int value);
virtual int GetProperty(const int key, const int default_ = 0);
protected:
std::string fText;
virtual void Run();
};
#endif /* FAIRMQEXAMPLE2SAMPLER_H_ */

View File

@ -0,0 +1,45 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQExample2Sink.cxx
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include "FairMQExample2Sink.h"
#include "FairMQLogger.h"
using namespace std;
FairMQExample2Sink::FairMQExample2Sink()
{
}
void FairMQExample2Sink::Run()
{
while (GetCurrentState() == RUNNING)
{
FairMQMessage* msg = fTransportFactory->CreateMessage();
fChannels["data-in"].at(0).Receive(msg);
LOG(INFO) << "Received message: \""
<< string(static_cast<char*>(msg->GetData()), msg->GetSize())
<< "\"";
delete msg;
}
}
FairMQExample2Sink::~FairMQExample2Sink()
{
}

View File

@ -0,0 +1,30 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQExample2Sink.h
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#ifndef FAIRMQEXAMPLE2SINK_H_
#define FAIRMQEXAMPLE2SINK_H_
#include "FairMQDevice.h"
class FairMQExample2Sink : public FairMQDevice
{
public:
FairMQExample2Sink();
virtual ~FairMQExample2Sink();
protected:
virtual void Run();
};
#endif /* FAIRMQEXAMPLE2SINK_H_ */

View File

@ -0,0 +1,116 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* runExample2Processor.cxx
*
* @since 2013-04-23
* @author D. Klein, A. Rybalchenko
*/
#include <iostream>
#include <csignal>
#include "boost/program_options.hpp"
#include "FairMQLogger.h"
#include "FairMQParser.h"
#include "FairMQProgOptions.h"
#include "FairMQExample2Processor.h"
#ifdef NANOMSG
#include "FairMQTransportFactoryNN.h"
#else
#include "FairMQTransportFactoryZMQ.h"
#endif
using namespace boost::program_options;
FairMQExample2Processor processor;
static void s_signal_handler(int signal)
{
LOG(INFO) << "Caught signal " << signal;
processor.ChangeState("END");
LOG(INFO) << "Shutdown complete.";
exit(1);
}
static void s_catch_signals(void)
{
struct sigaction action;
action.sa_handler = s_signal_handler;
action.sa_flags = 0;
sigemptyset(&action.sa_mask);
sigaction(SIGINT, &action, NULL);
sigaction(SIGTERM, &action, NULL);
}
int main(int argc, char** argv)
{
s_catch_signals();
FairMQProgOptions config;
try
{
if (config.ParseAll(argc, argv))
{
return 0;
}
std::string filename = config.GetValue<std::string>("config-json-file");
std::string id = config.GetValue<std::string>("id");
config.UserParser<FairMQParser::JSON>(filename, id);
processor.fChannels = config.GetFairMQMap();
LOG(INFO) << "PID: " << getpid();
#ifdef NANOMSG
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
#else
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
#endif
processor.SetTransport(transportFactory);
processor.SetProperty(FairMQExample2Processor::Id, id);
processor.ChangeState("INIT_DEVICE");
processor.WaitForEndOfState("INIT_DEVICE");
processor.ChangeState("INIT_TASK");
processor.WaitForEndOfState("INIT_TASK");
processor.ChangeState("RUN");
processor.WaitForEndOfState("RUN");
processor.ChangeState("STOP");
processor.ChangeState("RESET_TASK");
processor.WaitForEndOfState("RESET_TASK");
processor.ChangeState("RESET_DEVICE");
processor.WaitForEndOfState("RESET_DEVICE");
processor.ChangeState("END");
}
catch (std::exception& e)
{
LOG(ERROR) << e.what();
LOG(INFO) << "Command line options are the following: ";
config.PrintHelp();
return 1;
}
return 0;
}

View File

@ -0,0 +1,125 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* runExample2Sampler.cxx
*
* @since 2013-04-23
* @author D. Klein, A. Rybalchenko
*/
#include <iostream>
#include <csignal>
#include "boost/program_options.hpp"
#include "FairMQLogger.h"
#include "FairMQParser.h"
#include "FairMQProgOptions.h"
#include "FairMQExample2Sampler.h"
#ifdef NANOMSG
#include "FairMQTransportFactoryNN.h"
#else
#include "FairMQTransportFactoryZMQ.h"
#endif
using namespace boost::program_options;
FairMQExample2Sampler sampler;
static void s_signal_handler(int signal)
{
LOG(INFO) << "Caught signal " << signal;
sampler.ChangeState("END");
LOG(INFO) << "Shutdown complete.";
exit(1);
}
static void s_catch_signals(void)
{
struct sigaction action;
action.sa_handler = s_signal_handler;
action.sa_flags = 0;
sigemptyset(&action.sa_mask);
sigaction(SIGINT, &action, NULL);
sigaction(SIGTERM, &action, NULL);
}
int main(int argc, char** argv)
{
s_catch_signals();
FairMQProgOptions config;
try
{
std::string text;
options_description samplerOptions("Sampler options");
samplerOptions.add_options()
("text", value<std::string>(&text)->default_value("Hello"), "Text to send out");
config.AddToCmdLineOptions(samplerOptions);
if (config.ParseAll(argc, argv))
{
return 0;
}
std::string filename = config.GetValue<std::string>("config-json-file");
std::string id = config.GetValue<std::string>("id");
config.UserParser<FairMQParser::JSON>(filename, id);
sampler.fChannels = config.GetFairMQMap();
LOG(INFO) << "PID: " << getpid();
#ifdef NANOMSG
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
#else
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
#endif
sampler.SetTransport(transportFactory);
sampler.SetProperty(FairMQExample2Sampler::Id, id);
sampler.SetProperty(FairMQExample2Sampler::Text, text);
sampler.ChangeState("INIT_DEVICE");
sampler.WaitForEndOfState("INIT_DEVICE");
sampler.ChangeState("INIT_TASK");
sampler.WaitForEndOfState("INIT_TASK");
sampler.ChangeState("RUN");
sampler.WaitForEndOfState("RUN");
sampler.ChangeState("STOP");
sampler.ChangeState("RESET_TASK");
sampler.WaitForEndOfState("RESET_TASK");
sampler.ChangeState("RESET_DEVICE");
sampler.WaitForEndOfState("RESET_DEVICE");
sampler.ChangeState("END");
}
catch (std::exception& e)
{
LOG(ERROR) << e.what();
LOG(INFO) << "Command line options are the following: ";
config.PrintHelp();
return 1;
}
return 0;
}

View File

@ -0,0 +1,116 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* runExample2Sink.cxx
*
* @since 2013-04-23
* @author D. Klein, A. Rybalchenko
*/
#include <iostream>
#include <csignal>
#include "boost/program_options.hpp"
#include "FairMQLogger.h"
#include "FairMQParser.h"
#include "FairMQProgOptions.h"
#include "FairMQExample2Sink.h"
#ifdef NANOMSG
#include "FairMQTransportFactoryNN.h"
#else
#include "FairMQTransportFactoryZMQ.h"
#endif
using namespace boost::program_options;
FairMQExample2Sink sink;
static void s_signal_handler(int signal)
{
LOG(INFO) << "Caught signal " << signal;
sink.ChangeState(FairMQExample2Sink::END);
LOG(INFO) << "Shutdown complete.";
exit(1);
}
static void s_catch_signals(void)
{
struct sigaction action;
action.sa_handler = s_signal_handler;
action.sa_flags = 0;
sigemptyset(&action.sa_mask);
sigaction(SIGINT, &action, NULL);
sigaction(SIGTERM, &action, NULL);
}
int main(int argc, char** argv)
{
s_catch_signals();
FairMQProgOptions config;
try
{
if (config.ParseAll(argc, argv))
{
return 0;
}
std::string filename = config.GetValue<std::string>("config-json-file");
std::string id = config.GetValue<std::string>("id");
config.UserParser<FairMQParser::JSON>(filename, id);
sink.fChannels = config.GetFairMQMap();
LOG(INFO) << "PID: " << getpid();
#ifdef NANOMSG
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
#else
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
#endif
sink.SetTransport(transportFactory);
sink.SetProperty(FairMQExample2Sink::Id, id);
sink.ChangeState("INIT_DEVICE");
sink.WaitForEndOfState("INIT_DEVICE");
sink.ChangeState("INIT_TASK");
sink.WaitForEndOfState("INIT_TASK");
sink.ChangeState("RUN");
sink.WaitForEndOfState("RUN");
sink.ChangeState("STOP");
sink.ChangeState("RESET_TASK");
sink.WaitForEndOfState("RESET_TASK");
sink.ChangeState("RESET_DEVICE");
sink.WaitForEndOfState("RESET_DEVICE");
sink.ChangeState("END");
}
catch (std::exception& e)
{
LOG(ERROR) << e.what();
LOG(INFO) << "Command line options are the following: ";
config.PrintHelp();
return 1;
}
return 0;
}

View File

@ -0,0 +1,102 @@
{
"fairMQOptions":
{
"device":
{
"id": "sampler1",
"channel":
{
"name": "data-out",
"socket":
{
"type": "push",
"method": "bind",
"address": "tcp://*:5555",
"sndBufSize": "1000",
"rcvBufSize": "1000",
"rateLogging": "0"
}
}
},
"device":
{
"id": "processor1",
"channel":
{
"name": "data-in",
"socket":
{
"type": "pull",
"method": "connect",
"address": "tcp://localhost:5555",
"sndBufSize": "1000",
"rcvBufSize": "1000",
"rateLogging": "0"
}
},
"channel":
{
"name": "data-out",
"socket":
{
"type": "push",
"method": "connect",
"address": "tcp://localhost:5556",
"sndBufSize": "1000",
"rcvBufSize": "1000",
"rateLogging": "0"
}
}
},
"device":
{
"id": "processor2",
"channel":
{
"name": "data-in",
"socket":
{
"type": "pull",
"method": "connect",
"address": "tcp://localhost:5555",
"sndBufSize": "1000",
"rcvBufSize": "1000",
"rateLogging": "0"
}
},
"channel":
{
"name": "data-out",
"socket":
{
"type": "push",
"method": "connect",
"address": "tcp://localhost:5556",
"sndBufSize": "1000",
"rcvBufSize": "1000",
"rateLogging": "0"
}
}
},
"device":
{
"id": "sink1",
"channel":
{
"name": "data-in",
"socket":
{
"type": "pull",
"method": "bind",
"address": "tcp://*:5556",
"sndBufSize": "1000",
"rcvBufSize": "1000",
"rateLogging": "0"
}
}
}
}
}

View File

@ -41,18 +41,18 @@ int FairMQProgOptions::ParseAll(const int argc, char** argv, bool AllowUnregiste
if (fUseConfigFile)
{
fCmdline_options.add_options()
("device-id", po::value< std::string >(), "Device ID");
("id", po::value< std::string >(), "Device ID");
fConfig_file_options.add_options()
("device-id", po::value< std::string >()->required(), "Device ID");
("id", po::value< std::string >()->required(), "Device ID");
}
else
{
fCmdline_options.add_options()
("device-id", po::value< std::string >()->required(), "Device ID");
("id", po::value< std::string >()->required(), "Device ID");
}
fVisible_options.add_options()
("device-id", po::value< std::string >()->required(), "Device ID (required value)");
("id", po::value< std::string >()->required(), "Device ID (required value)");
// parse command line
if (ParseCmdLine(argc,argv,fCmdline_options,fvarmap,AllowUnregistered))
@ -101,12 +101,12 @@ int FairMQProgOptions::NotifySwitchOption()
void FairMQProgOptions::InitOptionDescription()
{
fMQParserOptions.add_options()
("config-xml-string", po::value< std::vector<std::string> >()->multitoken(), "XML input as command line string.")
("config-xml-filename", po::value< std::string >(), "XML input as file.")
("config-json-string", po::value< std::vector<std::string> >()->multitoken(), "JSON input as command line string.")
("config-json-filename", po::value< std::string >(), "JSON input as file.")
("config-xml-string", po::value< std::vector<std::string> >()->multitoken(), "XML input as command line string.")
("config-xml-file", po::value< std::string >(), "XML input as file.")
("config-json-string", po::value< std::vector<std::string> >()->multitoken(), "JSON input as command line string.")
("config-json-file", po::value< std::string >(), "JSON input as file.")
// ("ini.config.string", po::value< std::vector<std::string> >()->multitoken(), "INI input as command line string.")
// ("ini.config.filename", po::value< std::string >(), "INI input as file.")
// ("ini.config.string", po::value< std::vector<std::string> >()->multitoken(), "INI input as command line string.")
// ("ini.config.file", po::value< std::string >(), "INI input as file.")
;
}

View File

@ -30,6 +30,7 @@ protected:
public:
FairMQProgOptions();
virtual ~FairMQProgOptions();
virtual int ParseAll(const int argc, char** argv, bool AllowUnregistered = false);
// external parser, store function

View File

@ -9,8 +9,8 @@
# xml.config.node.root = fairMQOptions
#----------------------------------------------------
config-json-filename = @CMAKE_BINARY_DIR@/bin/testJSON.json
config-xml-filename = @CMAKE_BINARY_DIR@/bin/testXML.xml
config-json-file = @CMAKE_BINARY_DIR@/bin/testJSON.json
config-xml-file = @CMAKE_BINARY_DIR@/bin/testXML.xml
#
device-id = merger

View File

@ -5,14 +5,14 @@ JSONFILE="@CMAKE_BINARY_DIR@/bin/bsampler-sink.json"
# Note: device-id value must correspond to the device id given in the json file
BSAMPLER="runOptTestSampler"
BSAMPLER+=" --config-sjson-filename $JSONFILE"
BSAMPLER+=" --device-id bsampler1"
BSAMPLER+=" --config-json-file $JSONFILE"
BSAMPLER+=" --id bsampler1"
xterm -geometry 150x23+0+0 -hold -e @CMAKE_BINARY_DIR@/bin/$BSAMPLER &
SINK="runOptTestSink"
SINK+=" --config-json-filename $JSONFILE"
SINK+=" --device-id sink1"
SINK+=" --config-json-file $JSONFILE"
SINK+=" --id sink1"
xterm -geometry 150x23+0+350 -hold -e @CMAKE_BINARY_DIR@/bin/$SINK &

View File

@ -6,6 +6,6 @@ if [ "$#" -gt 0 ]; then
RUN_TEST+=" $*"
fi
RUN_TEST+=" --config-xml-filename @CMAKE_BINARY_DIR@/bin/testXML.xml"
RUN_TEST+=" --config-xml-file @CMAKE_BINARY_DIR@/bin/testXML.xml"
@CMAKE_BINARY_DIR@/bin/$RUN_TEST

View File

@ -6,6 +6,6 @@ if [ "$#" -gt 0 ]; then
RUN_TEST+=" $*"
fi
RUN_TEST+=" --config-json-filename @CMAKE_BINARY_DIR@/bin/testJSON.json"
RUN_TEST+=" --config-json-file @CMAKE_BINARY_DIR@/bin/testJSON.json"
@CMAKE_BINARY_DIR@/bin/$RUN_TEST

View File

@ -55,8 +55,8 @@ int main(int argc, char** argv)
return 0;
// keys defined in FairMQProgOptions
string filename=config.GetValue<string>("config-json-filename");
string deviceID=config.GetValue<string>("device-id");
string filename=config.GetValue<string>("config-json-file");
string deviceID=config.GetValue<string>("id");
// //////////////////////////////////////////////////////////////
// User defined parsing method.

View File

@ -49,8 +49,8 @@ int main(int argc, char** argv)
return 0;
// keys defined in FairMQProgOptions
string filename=config.GetValue<string>("config-json-filename");
string deviceID=config.GetValue<string>("device-id");
string filename=config.GetValue<string>("config-json-file");
string deviceID=config.GetValue<string>("id");
// //////////////////////////////////////////////////////////////
// User defined parsing method.

View File

@ -23,9 +23,9 @@ int testXML1(FairMQProgOptions* config)
std::string filename;
std::string XMLrootNode;
filename=config->GetValue<std::string>("config-xml-filename");
filename=config->GetValue<std::string>("config-xml-file");
XMLrootNode=config->GetValue<std::string>("xml.config.node.root");
std::string id=config->GetValue<std::string>("device-id");
std::string id=config->GetValue<std::string>("id");
config->UserParser<FairMQParser::XML>(filename,id,XMLrootNode);
// other xml parser test
config->UserParser<FairMQParser::MQXML2>(filename);
@ -42,7 +42,7 @@ int testXML2(FairMQProgOptions* config)
LOG(INFO)<<"--------- test XML2 ---------\n";
std::string XML;
std::string XMLrootNode;
std::string id=config->GetValue<std::string>("device-id");
std::string id=config->GetValue<std::string>("id");
XMLrootNode=config->GetValue<std::string>("xml.config.node.root");
// Note: convert the vector<string> into one string with GetStringValue(key)
@ -62,9 +62,9 @@ int testJSON1(FairMQProgOptions* config)
LOG(INFO)<<"--------- test JSON1 ---------\n";
std::string filename;
std::string JSONrootNode;
std::string id=config->GetValue<std::string>("device-id");
std::string id=config->GetValue<std::string>("id");
filename=config->GetValue<std::string>("config-json-filename");
filename=config->GetValue<std::string>("config-json-file");
JSONrootNode=config->GetValue<std::string>("json.config.node.root");
config->UserParser<FairMQParser::JSON>(filename,id,JSONrootNode);
@ -79,7 +79,7 @@ int testJSON2(FairMQProgOptions* config)
LOG(INFO)<<"--------- test JSON2 ---------\n";
std::string JSON;
std::string JSONrootNode;
std::string id=config->GetValue<std::string>("device-id");
std::string id=config->GetValue<std::string>("id");
JSONrootNode=config->GetValue<std::string>("json.config.node.root");
// Note: convert the vector<string> into one string with GetStringValue(key)
@ -122,13 +122,13 @@ int main(int argc, char** argv)
// Parse xml or json from cmd line or file
if(config->GetVarMap().count("config-xml-filename"))
if(config->GetVarMap().count("config-xml-file"))
testXML1(config);
if(config->GetVarMap().count("config-xml-string"))
testXML2(config);
if(config->GetVarMap().count("config-json-filename"))
if(config->GetVarMap().count("config-json-file"))
testJSON1(config);
if(config->GetVarMap().count("config-json-string"))

View File

@ -57,9 +57,9 @@ int main(int argc, char** argv)
std::string filename;
std::string XMLrootNode;
filename=config.GetValue<std::string>("config-xml-filename");
filename=config.GetValue<std::string>("config-xml-file");
XMLrootNode=config.GetValue<std::string>("xml.config.node.root");
std::string id=config.GetValue<std::string>("device-id");
std::string id=config.GetValue<std::string>("id");
config.UserParser<FairMQParser::XML>(filename,id,XMLrootNode);

View File

@ -9,7 +9,7 @@
* runBenchmarkSampler.cxx
*
* @since 2013-04-23
* @author: D. Klein, A. Rybalchenko
* @author D. Klein, A. Rybalchenko
*/
#include <iostream>
@ -80,7 +80,7 @@ int main(int argc, char** argv)
}
string filename = config.GetValue<string>("config-json-filename");
string id = config.GetValue<string>("device-id");
string id = config.GetValue<string>("id");
config.UserParser<JSON>(filename, id);

View File

@ -76,7 +76,7 @@ int main(int argc, char** argv)
}
string filename = config.GetValue<string>("config-json-filename");
string id = config.GetValue<string>("device-id");
string id = config.GetValue<string>("id");
config.UserParser<JSON>(filename, id);