From f1abb9ecdde29243d49b0a850733e7020d0eb085 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Wed, 13 Jan 2016 17:21:24 +0100 Subject: [PATCH] Remove compile time transport interface switch - Remove the compile time check of the transport implementation. The transport (zeromq/nanomsg) can be chosen at run time with: `device.SetTransport("zeromq"); // possible values are "zeromq" and "nanomsg"`. For devices that use FairMQProgOptions, the transport can be configured via cmd option: `--transport zeromq` or `--transport nanomsg`. Default values is "zeromq". The device receives the configured value with: `device.SetTransport(config.GetValue("transport"));` Old method of setting transport still works. But the NANOMSG constant is not defined. - Remove old `fairmq/prototest` directory. It was only used as a test for protobuf. The protobuf part of Tutorial3 does the same (with different values). - Fix a bug in FairMQPollerNN, where the `revents` value was not initialized. This caused the `poller->CheckOutput()` to trigger when it should not. --- fairmq/CMakeLists.txt | 75 ++---------- fairmq/FairMQDevice.cxx | 37 ++++++ fairmq/FairMQDevice.h | 7 +- fairmq/devices/GenericFileSink.h | 5 +- fairmq/devices/GenericMerger.h | 5 +- fairmq/devices/GenericProcessor.h | 5 +- fairmq/devices/GenericSampler.h | 7 +- fairmq/devices/GenericSampler.tpl | 6 - fairmq/nanomsg/FairMQPollerNN.cxx | 10 +- fairmq/options/FairMQProgOptions.cxx | 15 ++- fairmq/prototest/FairMQBinSampler.cxx | 144 ---------------------- fairmq/prototest/FairMQBinSampler.h | 58 --------- fairmq/prototest/FairMQBinSink.cxx | 49 -------- fairmq/prototest/FairMQBinSink.h | 39 ------ fairmq/prototest/FairMQProtoSampler.cxx | 147 ----------------------- fairmq/prototest/FairMQProtoSampler.h | 49 -------- fairmq/prototest/FairMQProtoSink.cxx | 52 -------- fairmq/prototest/FairMQProtoSink.h | 39 ------ fairmq/prototest/payload.proto | 13 -- fairmq/prototest/runBinSampler.cxx | 153 ------------------------ fairmq/prototest/runBinSink.cxx | 139 --------------------- fairmq/prototest/runProtoSampler.cxx | 153 ------------------------ fairmq/prototest/runProtoSink.cxx | 139 --------------------- fairmq/prototest/startBin.sh.in | 18 --- fairmq/prototest/startProto.sh.in | 18 --- fairmq/run/runBenchmarkSampler.cxx | 12 +- fairmq/run/runBuffer.cxx | 60 +++------- fairmq/run/runMerger.cxx | 62 +++------- fairmq/run/runProxy.cxx | 60 +++------- fairmq/run/runSink.cxx | 14 +-- fairmq/run/runSplitter.cxx | 64 +++------- fairmq/test/CMakeLists.txt | 24 +--- fairmq/test/pub-sub/runTestPub.cxx | 15 +-- fairmq/test/pub-sub/runTestSub.cxx | 13 +- fairmq/test/push-pull/runTestPull.cxx | 13 +- fairmq/test/push-pull/runTestPush.cxx | 13 +- fairmq/test/req-rep/runTestRep.cxx | 13 +- fairmq/test/req-rep/runTestReq.cxx | 13 +- fairmq/test/runTransferTimeoutTest.cxx | 13 +- fairmq/tools/runSimpleMQStateMachine.h | 23 +--- fairmq/zeromq/FairMQPollerZMQ.h | 2 +- 41 files changed, 162 insertions(+), 1634 deletions(-) delete mode 100644 fairmq/prototest/FairMQBinSampler.cxx delete mode 100644 fairmq/prototest/FairMQBinSampler.h delete mode 100644 fairmq/prototest/FairMQBinSink.cxx delete mode 100644 fairmq/prototest/FairMQBinSink.h delete mode 100644 fairmq/prototest/FairMQProtoSampler.cxx delete mode 100644 fairmq/prototest/FairMQProtoSampler.h delete mode 100644 fairmq/prototest/FairMQProtoSink.cxx delete mode 100644 fairmq/prototest/FairMQProtoSink.h delete mode 100644 fairmq/prototest/payload.proto delete mode 100644 fairmq/prototest/runBinSampler.cxx delete mode 100644 fairmq/prototest/runBinSink.cxx delete mode 100644 fairmq/prototest/runProtoSampler.cxx delete mode 100644 fairmq/prototest/runProtoSink.cxx delete mode 100755 fairmq/prototest/startBin.sh.in delete mode 100755 fairmq/prototest/startProto.sh.in diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index 20909f98..7e600385 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -8,9 +8,6 @@ configure_file(${CMAKE_SOURCE_DIR}/fairmq/run/startBenchmark.sh.in ${CMAKE_BINARY_DIR}/bin/startBenchmark.sh) configure_file(${CMAKE_SOURCE_DIR}/fairmq/run/benchmark.json ${CMAKE_BINARY_DIR}/bin/config/benchmark.json) -# following scripts are only for protobuf tests and are not essential part of FairMQ -# configure_file(${CMAKE_SOURCE_DIR}/examples/advanced/Tutorial3/MQ/run/startBin.sh.in ${CMAKE_BINARY_DIR}/bin/startBin.sh) -# configure_file(${CMAKE_SOURCE_DIR}/examples/advanced/Tutorial3/MQ/run/startProto.sh.in ${CMAKE_BINARY_DIR}/bin/startProto.sh) add_subdirectory(logger) add_subdirectory(test) @@ -30,19 +27,8 @@ Set(SYSTEM_INCLUDE_DIRECTORIES ${ZMQ_INCLUDE_DIR} ) -If(PROTOBUF_FOUND) - Set(INCLUDE_DIRECTORIES - ${INCLUDE_DIRECTORIES} - # following directory is only for protobuf tests and is not essential part of FairMQ - # ${CMAKE_SOURCE_DIR}/fairmq/prototest - ) - Set(SYSTEM_INCLUDE_DIRECTORIES - ${SYSTEM_INCLUDE_DIRECTORIES} - ${PROTOBUF_INCLUDE_DIR} - ) -EndIf(PROTOBUF_FOUND) - If(NANOMSG_FOUND) + add_definitions(-DNANOMSG_FOUND) Set(INCLUDE_DIRECTORIES ${INCLUDE_DIRECTORIES} ${CMAKE_SOURCE_DIR}/fairmq/nanomsg @@ -91,31 +77,6 @@ Set(SRCS "options/FairMQParser.cxx" ) -If(PROTOBUF_FOUND) - # following source files are only for protobuf tests and are not essential part of FairMQ - # add_custom_command( - # OUTPUT - # ${CMAKE_CURRENT_BINARY_DIR}/payload.pb.h - # ${CMAKE_CURRENT_BINARY_DIR}/payload.pb.cc - # COMMAND - # ${SIMPATH}/bin/protoc -I=. --cpp_out=${CMAKE_CURRENT_BINARY_DIR} payload.proto - # WORKING_DIRECTORY - # ${CMAKE_SOURCE_DIR}/fairmq/prototest - # ) - # set(SRCS - # ${SRCS} - # ${CMAKE_CURRENT_BINARY_DIR}/payload.pb.cc - # "prototest/FairMQProtoSampler.cxx" - # "prototest/FairMQBinSampler.cxx" - # "prototest/FairMQBinSink.cxx" - # "prototest/FairMQProtoSink.cxx" - # ) - Set(DEPENDENCIES - ${DEPENDENCIES} - ${PROTOBUF_LIBRARY} - ) -Endif(PROTOBUF_FOUND) - If(NANOMSG_FOUND) Set(SRCS ${SRCS} @@ -124,12 +85,9 @@ If(NANOMSG_FOUND) "nanomsg/FairMQSocketNN.cxx" "nanomsg/FairMQPollerNN.cxx" ) - Set(DEPENDENCIES - ${DEPENDENCIES} - ${NANOMSG_LIBRARY_SHARED} - ) EndIf(NANOMSG_FOUND) + # to copy src that are header-only files (e.g. c++ template) for FairRoot external installation # manual install (globbing add not recommended) Set(FAIRMQHEADERS @@ -163,6 +121,13 @@ Set(DEPENDENCIES boost_regex ) +If(NANOMSG_FOUND) + Set(DEPENDENCIES + ${DEPENDENCIES} + ${NANOMSG_LIBRARY_SHARED} + ) +EndIf(NANOMSG_FOUND) + Set(LIBRARY_NAME FairMQ) GENERATE_LIBRARY() @@ -176,17 +141,6 @@ Set(Exe_Names proxy ) -# following executables are only for protobuf tests and are not essential part of FairMQ -# if(PROTOBUF_FOUND) -# set(Exe_Names -# ${Exe_Names} -# binsampler -# protosampler -# binsink -# protosink -# ) -# endif(PROTOBUF_FOUND) - Set(Exe_Source run/runBenchmarkSampler.cxx run/runSink.cxx @@ -196,17 +150,6 @@ Set(Exe_Source run/runProxy.cxx ) -# following source files are only for protobuf tests and are not essential part of FairMQ -# if(PROTOBUF_FOUND) -# set(Exe_Source -# ${Exe_Source} -# prototest/runBinSampler.cxx -# prototest/runProtoSampler.cxx -# prototest/runBinSink.cxx -# prototest/runProtoSink.cxx -# ) -# endif(PROTOBUF_FOUND) - list(LENGTH Exe_Names _length) math(EXPR _length ${_length}-1) diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index c74ccdc3..483cf5ed 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -27,6 +27,11 @@ #include "FairMQDevice.h" #include "FairMQLogger.h" +#include "FairMQTransportFactoryZMQ.h" +#ifdef NANOMSG_FOUND +#include "FairMQTransportFactoryNN.h" +#endif + using namespace std; // boost::function and a wrapper to catch the signals @@ -85,6 +90,12 @@ void FairMQDevice::SignalHandler(int signal) void FairMQDevice::InitWrapper() { + if (!fTransportFactory) + { + LOG(ERROR) << "Transport not initialized. Did you call SetTransport()?"; + exit(EXIT_FAILURE); + } + if (!fCmdSocket) { fCmdSocket = fTransportFactory->CreateSocket("pub", "device-commands", fNumIoThreads); @@ -454,6 +465,32 @@ void FairMQDevice::SetTransport(FairMQTransportFactory* factory) fTransportFactory = factory; } +void FairMQDevice::SetTransport(const string& transport) +{ + if (transport == "zeromq") + { + fTransportFactory = new FairMQTransportFactoryZMQ(); + } +#ifdef NANOMSG_FOUND + else if (transport == "nanomsg") + { + fTransportFactory = new FairMQTransportFactoryNN(); + } +#endif + else + { + LOG(ERROR) << "Unknown transport implementation requested: " + << transport + << ". Supported are " + << "\"zeromq\"" +#ifdef NANOMSG_FOUND + << ", \"nanomsg\"" +#endif + << ". Exiting."; + exit(EXIT_FAILURE); + } +} + void FairMQDevice::LogSocketRates() { timestamp_t t0; diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index 3a819990..37c5bd3a 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -102,9 +102,12 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable /// Print all properties of this and the parent class to LOG(INFO) virtual void ListProperties(); - /// Configures the device with a transport factory + /// Configures the device with a transport factory (DEPRECATED) /// @param factory Pointer to the transport factory object - virtual void SetTransport(FairMQTransportFactory* factory); + void SetTransport(FairMQTransportFactory* factory); + /// Configures the device with a transport factory + /// @param transport Transport string ("zeromq"/"nanomsg") + void SetTransport(const std::string& transport = "zeromq"); /// Implements the sort algorithm used in SortChannel() /// @param lhs Right hand side value for comparison diff --git a/fairmq/devices/GenericFileSink.h b/fairmq/devices/GenericFileSink.h index 04b2c977..125c6a9d 100644 --- a/fairmq/devices/GenericFileSink.h +++ b/fairmq/devices/GenericFileSink.h @@ -53,9 +53,10 @@ class GenericFileSink : public FairMQDevice, public T, public U virtual ~GenericFileSink() {} - void SetTransport(FairMQTransportFactory* transport) + template + void SetTransport(Args... args) { - FairMQDevice::SetTransport(transport); + FairMQDevice::SetTransport(std::forward(args)...); } template diff --git a/fairmq/devices/GenericMerger.h b/fairmq/devices/GenericMerger.h index 21c65a2c..ba06d631 100644 --- a/fairmq/devices/GenericMerger.h +++ b/fairmq/devices/GenericMerger.h @@ -28,9 +28,10 @@ class GenericMerger : public FairMQDevice, public MergerPolicy, public InputPoli virtual ~GenericMerger() {} - void SetTransport(FairMQTransportFactory* transport) + template + void SetTransport(Args... args) { - FairMQDevice::SetTransport(transport); + FairMQDevice::SetTransport(std::forward(args)...); } protected: diff --git a/fairmq/devices/GenericProcessor.h b/fairmq/devices/GenericProcessor.h index 1ee97aeb..3510bab9 100644 --- a/fairmq/devices/GenericProcessor.h +++ b/fairmq/devices/GenericProcessor.h @@ -60,9 +60,10 @@ class GenericProcessor : public FairMQDevice, public T, public U, public V // the four following methods ensure // that the correct policy method is called - void SetTransport(FairMQTransportFactory* transport) + template + void SetTransport(Args... args) { - FairMQDevice::SetTransport(transport); + FairMQDevice::SetTransport(std::forward(args)...); } template diff --git a/fairmq/devices/GenericSampler.h b/fairmq/devices/GenericSampler.h index a6ae6656..76ae4d1a 100644 --- a/fairmq/devices/GenericSampler.h +++ b/fairmq/devices/GenericSampler.h @@ -84,7 +84,12 @@ class base_GenericSampler : public FairMQDevice, public T, public U }; */ - virtual void SetTransport(FairMQTransportFactory* factory); + template + void SetTransport(Args... args) + { + FairMQDevice::SetTransport(std::forward(args)...); + } + void ResetEventCounter(); template diff --git a/fairmq/devices/GenericSampler.tpl b/fairmq/devices/GenericSampler.tpl index 07d049f6..97f15fa1 100644 --- a/fairmq/devices/GenericSampler.tpl +++ b/fairmq/devices/GenericSampler.tpl @@ -22,12 +22,6 @@ base_GenericSampler::~base_GenericSampler() { } -template -void base_GenericSampler::SetTransport(FairMQTransportFactory* factory) -{ - FairMQDevice::SetTransport(factory); -} - template void base_GenericSampler::InitTask() { diff --git a/fairmq/nanomsg/FairMQPollerNN.cxx b/fairmq/nanomsg/FairMQPollerNN.cxx index 564ab214..933f8afd 100644 --- a/fairmq/nanomsg/FairMQPollerNN.cxx +++ b/fairmq/nanomsg/FairMQPollerNN.cxx @@ -127,13 +127,15 @@ FairMQPollerNN::FairMQPollerNN(FairMQSocket& cmdSocket, FairMQSocket& dataSocket items[0].fd = cmdSocket.GetSocket(1); items[0].events = NN_POLLIN; + items[0].revents = 0; + + items[1].fd = dataSocket.GetSocket(1); + items[1].revents = 0; int type = 0; size_t sz = sizeof(type); nn_getsockopt(dataSocket.GetSocket(1), NN_SOL_SOCKET, NN_PROTOCOL, &type, &sz); - items[1].fd = dataSocket.GetSocket(1); - if (type == NN_REQ || type == NN_REP || type == NN_PAIR) { items[1].events = NN_POLLIN|NN_POLLOUT; @@ -153,7 +155,7 @@ FairMQPollerNN::FairMQPollerNN(FairMQSocket& cmdSocket, FairMQSocket& dataSocket } } -void FairMQPollerNN::Poll(int timeout) +void FairMQPollerNN::Poll(const int timeout) { if (nn_poll(items, fNumItems, timeout) < 0) { @@ -192,7 +194,7 @@ bool FairMQPollerNN::CheckInput(const string channelKey, const int index) { try { - if (items[fOffsetMap.at(channelKey) + index].revents & NN_POLLIN) + if (items[fOffsetMap.at(channelKey) + index].revents & (NN_POLLIN | NN_POLLOUT)) { return true; } diff --git a/fairmq/options/FairMQProgOptions.cxx b/fairmq/options/FairMQProgOptions.cxx index 9ba33277..76c621d8 100644 --- a/fairmq/options/FairMQProgOptions.cxx +++ b/fairmq/options/FairMQProgOptions.cxx @@ -131,18 +131,21 @@ void FairMQProgOptions::InitOptionDescription() if (fUseConfigFile) { fMQOptionsInCmd.add_options() - ("id", po::value(), "Device ID (required argument).") - ("io-threads", po::value()->default_value(1), "Number of I/O threads."); + ("id", po::value(), "Device ID (required argument).") + ("io-threads", po::value()->default_value(1), "Number of I/O threads.") + ("transport", po::value()->default_value("zeromq"), "Transport ('zeromq'/'nanomsg')."); fMQOptionsInCfg.add_options() - ("id", po::value()->required(), "Device ID (required argument).") - ("io-threads", po::value()->default_value(1), "Number of I/O threads."); + ("id", po::value()->required(), "Device ID (required argument).") + ("io-threads", po::value()->default_value(1), "Number of I/O threads.") + ("transport", po::value()->default_value("zeromq"), "Transport ('zeromq'/'nanomsg')."); } else { fMQOptionsInCmd.add_options() - ("id", po::value()->required(), "Device ID (required argument)") - ("io-threads", po::value()->default_value(1), "Number of I/O threads"); + ("id", po::value()->required(), "Device ID (required argument)") + ("io-threads", po::value()->default_value(1), "Number of I/O threads") + ("transport", po::value()->default_value("zeromq"), "Transport ('zeromq'/'nanomsg')."); } fMQParserOptions.add_options() diff --git a/fairmq/prototest/FairMQBinSampler.cxx b/fairmq/prototest/FairMQBinSampler.cxx deleted file mode 100644 index 9f50957f..00000000 --- a/fairmq/prototest/FairMQBinSampler.cxx +++ /dev/null @@ -1,144 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence version 3 (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ -/** - * FairMQBinSampler.cpp - * - * @since 2013-04-23 - * @author D. Klein, A. Rybalchenko - */ - -#include -#include /* srand, rand */ -#include /* time */ - -#include -#include - -#include "FairMQBinSampler.h" -#include "FairMQLogger.h" - -using namespace std; - -FairMQBinSampler::FairMQBinSampler() - : fEventSize(10000) - , fEventRate(1) - , fEventCounter(0) -{ -} - -FairMQBinSampler::~FairMQBinSampler() -{ -} - -void FairMQBinSampler::Run() -{ - boost::thread resetEventCounter(boost::bind(&FairMQBinSampler::ResetEventCounter, this)); - - srand(time(NULL)); - - LOG(DEBUG) << "Message size: " << fEventSize * sizeof(Content) << " bytes."; - - const FairMQChannel& dataOutChannel = fChannels.at("data-out").at(0); - - while (CheckCurrentState(RUNNING)) - { - Content* payload = new Content[fEventSize]; - - for (int i = 0; i < fEventSize; ++i) - { - (&payload[i])->x = rand() % 100 + 1; - (&payload[i])->y = rand() % 100 + 1; - (&payload[i])->z = rand() % 100 + 1; - (&payload[i])->a = (rand() % 100 + 1) / (rand() % 100 + 1); - (&payload[i])->b = (rand() % 100 + 1) / (rand() % 100 + 1); - // LOG(INFO) << (&payload[i])->x << " " << (&payload[i])->y << " " << (&payload[i])->z << " " << (&payload[i])->a << " " << (&payload[i])->b; - } - - FairMQMessage* msg = fTransportFactory->CreateMessage(fEventSize * sizeof(Content)); - memcpy(msg->GetData(), payload, fEventSize * sizeof(Content)); - - dataOutChannel.Send(msg); - - --fEventCounter; - - while (fEventCounter == 0) - { - boost::this_thread::sleep(boost::posix_time::milliseconds(1)); - } - - delete[] payload; - delete msg; - } - - resetEventCounter.interrupt(); - resetEventCounter.join(); -} - -void FairMQBinSampler::ResetEventCounter() -{ - while (GetCurrentState() == RUNNING) - { - try - { - fEventCounter = fEventRate / 100; - boost::this_thread::sleep(boost::posix_time::milliseconds(10)); - } - catch (boost::thread_interrupted&) - { - break; - } - } -} - -void FairMQBinSampler::SetProperty(const int key, const string& value) -{ - switch (key) - { - default: - FairMQDevice::SetProperty(key, value); - break; - } -} - -string FairMQBinSampler::GetProperty(const int key, const string& default_ /*= ""*/) -{ - switch (key) - { - default: - return FairMQDevice::GetProperty(key, default_); - } -} - -void FairMQBinSampler::SetProperty(const int key, const int value) -{ - switch (key) - { - case EventSize: - fEventSize = value; - break; - case EventRate: - fEventRate = value; - break; - default: - FairMQDevice::SetProperty(key, value); - break; - } -} - -int FairMQBinSampler::GetProperty(const int key, const int default_ /*= 0*/) -{ - switch (key) - { - case EventSize: - return fEventSize; - case EventRate: - return fEventRate; - default: - return FairMQDevice::GetProperty(key, default_); - } -} diff --git a/fairmq/prototest/FairMQBinSampler.h b/fairmq/prototest/FairMQBinSampler.h deleted file mode 100644 index 31c30ebb..00000000 --- a/fairmq/prototest/FairMQBinSampler.h +++ /dev/null @@ -1,58 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence version 3 (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ -/** - * FairMQBinSampler.h - * - * @since 2014-02-24 - * @author A. Rybalchenko - */ - -#ifndef FAIRMQBINSAMPLER_H_ -#define FAIRMQBINSAMPLER_H_ - -#include - -#include "FairMQDevice.h" - -struct Content -{ - double a; - double b; - int x; - int y; - int z; -}; - -class FairMQBinSampler : public FairMQDevice -{ - public: - enum - { - EventRate = FairMQDevice::Last, - EventSize, - Last - }; - FairMQBinSampler(); - virtual ~FairMQBinSampler(); - - void ResetEventCounter(); - - virtual void SetProperty(const int key, const std::string& value); - virtual std::string GetProperty(const int key, const std::string& default_ = ""); - virtual void SetProperty(const int key, const int value); - virtual int GetProperty(const int key, const int default_ = 0); - - protected: - int fEventSize; - int fEventRate; - int fEventCounter; - - virtual void Run(); -}; - -#endif /* FAIRMQBINSAMPLER_H_ */ diff --git a/fairmq/prototest/FairMQBinSink.cxx b/fairmq/prototest/FairMQBinSink.cxx deleted file mode 100644 index e1061f45..00000000 --- a/fairmq/prototest/FairMQBinSink.cxx +++ /dev/null @@ -1,49 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence version 3 (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ -/** - * FairMQBinSink.cxx - * - * @since 2013-01-09 - * @author D. Klein, A. Rybalchenko - */ - -#include -#include - -#include "FairMQBinSink.h" -#include "FairMQLogger.h" - -FairMQBinSink::FairMQBinSink() -{ -} - -void FairMQBinSink::Run() -{ - const FairMQChannel& dataInChannel = fChannels.at("data-in").at(0); - - while (CheckCurrentState(RUNNING)) - { - FairMQMessage* msg = fTransportFactory->CreateMessage(); - - dataInChannel.Receive(msg); - - int inputSize = msg->GetSize(); - // int numInput = inputSize / sizeof(Content); - // Content* input = reinterpret_cast(msg->GetData()); - - // for (int i = 0; i < numInput; ++i) { - // LOG(INFO) << (&input[i])->x << " " << (&input[i])->y << " " << (&input[i])->z << " " << (&input[i])->a << " " << (&input[i])->b; - // } - - delete msg; - } -} - -FairMQBinSink::~FairMQBinSink() -{ -} diff --git a/fairmq/prototest/FairMQBinSink.h b/fairmq/prototest/FairMQBinSink.h deleted file mode 100644 index 030b1403..00000000 --- a/fairmq/prototest/FairMQBinSink.h +++ /dev/null @@ -1,39 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence version 3 (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ -/** - * FairMQBinSink.h - * - * @since 2013-01-09 - * @author D. Klein, A. Rybalchenko - */ - -#ifndef FAIRMQPROTOSINK_H_ -#define FAIRMQPROTOSINK_H_ - -#include "FairMQDevice.h" - -struct Content -{ - double a; - double b; - int x; - int y; - int z; -}; - -class FairMQBinSink : public FairMQDevice -{ - public: - FairMQBinSink(); - virtual ~FairMQBinSink(); - - protected: - virtual void Run(); -}; - -#endif /* FAIRMQPROTOSINK_H_ */ diff --git a/fairmq/prototest/FairMQProtoSampler.cxx b/fairmq/prototest/FairMQProtoSampler.cxx deleted file mode 100644 index 6c85ac46..00000000 --- a/fairmq/prototest/FairMQProtoSampler.cxx +++ /dev/null @@ -1,147 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence version 3 (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ -/** - * FairMQProtoSampler.cpp - * - * @since 2013-04-23 - * @author D. Klein, A. Rybalchenko - */ - -#include - -#include -#include - -#include "FairMQProtoSampler.h" -#include "FairMQLogger.h" - -#include "payload.pb.h" - -using namespace std; - -FairMQProtoSampler::FairMQProtoSampler() - : fEventSize(10000) - , fEventRate(1) - , fEventCounter(0) -{ -} - -FairMQProtoSampler::~FairMQProtoSampler() -{ -} - -void FairMQProtoSampler::Run() -{ - boost::thread resetEventCounter(boost::bind(&FairMQProtoSampler::ResetEventCounter, this)); - - srand(time(NULL)); - - const FairMQChannel& dataOutChannel = fChannels.at("data-out").at(0); - - while (CheckCurrentState(RUNNING)) - { - sampler::Payload p; - - for (int i = 0; i < fEventSize; ++i) - { - sampler::Content* content = p.add_data(); - - content->set_x(rand() % 100 + 1); - content->set_y(rand() % 100 + 1); - content->set_z(rand() % 100 + 1); - content->set_a((rand() % 100 + 1) / (rand() % 100 + 1)); - content->set_b((rand() % 100 + 1) / (rand() % 100 + 1)); - // LOG(INFO) << content->x() << " " << content->y() << " " << content->z() << " " << content->a() << " " << content->b(); - } - - string str; - p.SerializeToString(&str); - size_t size = str.length(); - - FairMQMessage* msg = fTransportFactory->CreateMessage(size); - memcpy(msg->GetData(), str.c_str(), size); - - dataOutChannel.Send(msg); - - --fEventCounter; - - while (fEventCounter == 0) - { - boost::this_thread::sleep(boost::posix_time::milliseconds(1)); - } - - delete msg; - } - - resetEventCounter.interrupt(); - resetEventCounter.join(); -} - -void FairMQProtoSampler::ResetEventCounter() -{ - while (true) - { - try - { - fEventCounter = fEventRate / 100; - boost::this_thread::sleep(boost::posix_time::milliseconds(10)); - } - catch (boost::thread_interrupted&) - { - break; - } - } -} - -void FairMQProtoSampler::SetProperty(const int key, const string& value) -{ - switch (key) - { - default: - FairMQDevice::SetProperty(key, value); - break; - } -} - -string FairMQProtoSampler::GetProperty(const int key, const string& default_ /*= ""*/) -{ - switch (key) - { - default: - return FairMQDevice::GetProperty(key, default_); - } -} - -void FairMQProtoSampler::SetProperty(const int key, const int value) -{ - switch (key) - { - case EventSize: - fEventSize = value; - break; - case EventRate: - fEventRate = value; - break; - default: - FairMQDevice::SetProperty(key, value); - break; - } -} - -int FairMQProtoSampler::GetProperty(const int key, const int default_ /*= 0*/) -{ - switch (key) - { - case EventSize: - return fEventSize; - case EventRate: - return fEventRate; - default: - return FairMQDevice::GetProperty(key, default_); - } -} diff --git a/fairmq/prototest/FairMQProtoSampler.h b/fairmq/prototest/FairMQProtoSampler.h deleted file mode 100644 index 57c7a1ac..00000000 --- a/fairmq/prototest/FairMQProtoSampler.h +++ /dev/null @@ -1,49 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence version 3 (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ -/** - * FairMQProtoSampler.h - * - * @since 2013-04-23 - * @author D. Klein, A. Rybalchenko - */ - -#ifndef FAIRMQPROTOSAMPLER_H_ -#define FAIRMQPROTOSAMPLER_H_ - -#include - -#include "FairMQDevice.h" - -class FairMQProtoSampler : public FairMQDevice -{ - public: - enum - { - EventRate = FairMQDevice::Last, - EventSize, - Last - }; - FairMQProtoSampler(); - virtual ~FairMQProtoSampler(); - - void ResetEventCounter(); - - virtual void SetProperty(const int key, const std::string& value); - virtual std::string GetProperty(const int key, const std::string& default_ = ""); - virtual void SetProperty(const int key, const int value); - virtual int GetProperty(const int key, const int default_ = 0); - - protected: - int fEventSize; - int fEventRate; - int fEventCounter; - - virtual void Run(); -}; - -#endif /* FAIRMQPROTOSAMPLER_H_ */ diff --git a/fairmq/prototest/FairMQProtoSink.cxx b/fairmq/prototest/FairMQProtoSink.cxx deleted file mode 100644 index 29e03810..00000000 --- a/fairmq/prototest/FairMQProtoSink.cxx +++ /dev/null @@ -1,52 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence version 3 (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ -/** - * FairMQProtoSink.cxx - * - * @since 2013-01-09 - * @author D. Klein, A. Rybalchenko - */ - -#include -#include - -#include "FairMQProtoSink.h" -#include "FairMQLogger.h" - -#include "payload.pb.h" - -FairMQProtoSink::FairMQProtoSink() -{ -} - -void FairMQProtoSink::Run() -{ - const FairMQChannel& dataInChannel = fChannels.at("data-in").at(0); - - while (CheckCurrentState(RUNNING)) - { - FairMQMessage* msg = fTransportFactory->CreateMessage(); - - dataInChannel.Receive(msg); - - sampler::Payload p; - - p.ParseFromArray(msg->GetData(), msg->GetSize()); - - // for (int i = 0; i < p.data_size(); ++i) { - // const sampler::Payload::Content& content = p.data(i); - // LOG(INFO) << content.x() << " " << content.y() << " " << content.z() << " " << content.a() << " " << content.b(); - // } - - delete msg; - } -} - -FairMQProtoSink::~FairMQProtoSink() -{ -} diff --git a/fairmq/prototest/FairMQProtoSink.h b/fairmq/prototest/FairMQProtoSink.h deleted file mode 100644 index 99f01b82..00000000 --- a/fairmq/prototest/FairMQProtoSink.h +++ /dev/null @@ -1,39 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence version 3 (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ -/** - * FairMQProtoSink.h - * - * @since 2013-01-09 - * @author D. Klein, A. Rybalchenko - */ - -#ifndef FAIRMQPROTOSINK_H_ -#define FAIRMQPROTOSINK_H_ - -#include "FairMQDevice.h" - -struct Content -{ - double a; - double b; - int x; - int y; - int z; -}; - -class FairMQProtoSink : public FairMQDevice -{ - public: - FairMQProtoSink(); - virtual ~FairMQProtoSink(); - - protected: - virtual void Run(); -}; - -#endif /* FAIRMQPROTOSINK_H_ */ diff --git a/fairmq/prototest/payload.proto b/fairmq/prototest/payload.proto deleted file mode 100644 index 1ecfbfdd..00000000 --- a/fairmq/prototest/payload.proto +++ /dev/null @@ -1,13 +0,0 @@ -package sampler; - -message Content { - optional double a = 1; - optional double b = 2; - optional int32 x = 3; - optional int32 y = 4; - optional int32 z = 5; -} - -message Payload { - repeated Content data = 1; -} diff --git a/fairmq/prototest/runBinSampler.cxx b/fairmq/prototest/runBinSampler.cxx deleted file mode 100644 index d65e02ce..00000000 --- a/fairmq/prototest/runBinSampler.cxx +++ /dev/null @@ -1,153 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence version 3 (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ -/** - * runBenchmarkSampler.cxx - * - * @since 2013-04-23 - * @author D. Klein, A. Rybalchenko - */ - -#include - -#include "boost/program_options.hpp" - -#include "FairMQLogger.h" -#include "FairMQBinSampler.h" - -#ifdef NANOMSG -#include "FairMQTransportFactoryNN.h" -#else -#include "FairMQTransportFactoryZMQ.h" -#endif - -using namespace std; - -typedef struct DeviceOptions -{ - DeviceOptions() : - id(), eventSize(0), eventRate(0), ioThreads(0), - outputSocketType(), outputBufSize(0), outputMethod(), outputAddress() {} - - string id; - int eventSize; - int eventRate; - int ioThreads; - string outputSocketType; - int outputBufSize; - string outputMethod; - string outputAddress; -} DeviceOptions_t; - -inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) -{ - if (_options == NULL) - throw runtime_error("Internal error: options' container is empty."); - - namespace bpo = boost::program_options; - bpo::options_description desc("Options"); - desc.add_options() - ("id", bpo::value()->required(), "Device ID") - ("event-size", bpo::value()->default_value(1000), "Event size in bytes") - ("event-rate", bpo::value()->default_value(0), "Event rate limit in maximum number of events per second") - ("io-threads", bpo::value()->default_value(1), "Number of I/O threads") - ("output-socket-type", bpo::value()->required(), "Output socket type: pub/push") - ("output-buff-size", bpo::value()->required(), "Output buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") - ("output-method", bpo::value()->required(), "Output method: bind/connect") - ("output-address", bpo::value()->required(), "Output address, e.g.: \"tcp://*:5555\"") - ("help", "Print help messages"); - - bpo::variables_map vm; - bpo::store(bpo::parse_command_line(_argc, _argv, desc), vm); - - if ( vm.count("help") ) - { - LOG(INFO) << "FairMQ Bin Sampler" << endl << desc; - return false; - } - - bpo::notify(vm); - - if ( vm.count("id") ) - _options->id = vm["id"].as(); - - if ( vm.count("event-size") ) - _options->eventSize = vm["event-size"].as(); - - if ( vm.count("event-rate") ) - _options->eventRate = vm["event-rate"].as(); - - if ( vm.count("io-threads") ) - _options->ioThreads = vm["io-threads"].as(); - - if ( vm.count("output-socket-type") ) - _options->outputSocketType = vm["output-socket-type"].as(); - - if ( vm.count("output-buff-size") ) - _options->outputBufSize = vm["output-buff-size"].as(); - - if ( vm.count("output-method") ) - _options->outputMethod = vm["output-method"].as(); - - if ( vm.count("output-address") ) - _options->outputAddress = vm["output-address"].as(); - - return true; -} - -int main(int argc, char** argv) -{ - FairMQBinSampler sampler; - sampler.CatchSignals(); - - DeviceOptions_t options; - try - { - if (!parse_cmd_line(argc, argv, &options)) - return 0; - } - catch (exception& e) - { - LOG(ERROR) << e.what(); - return 1; - } - - LOG(INFO) << "PID: " << getpid(); - LOG(INFO) << "CONFIG: " << "id: " << options.id << ", event size: " << options.eventSize << ", event rate: " << options.eventRate << ", I/O threads: " << options.ioThreads; - LOG(INFO) << "OUTPUT: " << options.outputSocketType << " " << options.outputBufSize << " " << options.outputMethod << " " << options.outputAddress; - -#ifdef NANOMSG - FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); -#else - FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); -#endif - - sampler.SetTransport(transportFactory); - - FairMQChannel outputChannel(options.outputSocketType, options.outputMethod, options.outputAddress); - outputChannel.UpdateSndBufSize(options.outputBufSize); - outputChannel.UpdateRcvBufSize(options.outputBufSize); - outputChannel.UpdateRateLogging(1); - - sampler.fChannels["data-out"].push_back(outputChannel); - - sampler.SetProperty(FairMQBinSampler::Id, options.id); - sampler.SetProperty(FairMQBinSampler::EventSize, options.eventSize); - sampler.SetProperty(FairMQBinSampler::EventRate, options.eventRate); - sampler.SetProperty(FairMQBinSampler::NumIoThreads, options.ioThreads); - - sampler.ChangeState("INIT_DEVICE"); - sampler.WaitForEndOfState("INIT_DEVICE"); - - sampler.ChangeState("INIT_TASK"); - sampler.WaitForEndOfState("INIT_TASK"); - - sampler.ChangeState("RUN"); - sampler.InteractiveStateLoop(); - - return 0; -} diff --git a/fairmq/prototest/runBinSink.cxx b/fairmq/prototest/runBinSink.cxx deleted file mode 100644 index 5ae89062..00000000 --- a/fairmq/prototest/runBinSink.cxx +++ /dev/null @@ -1,139 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence version 3 (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ -/** - * runSink.cxx - * - * @since 2013-01-21 - * @author D. Klein, A. Rybalchenko - */ - -#include - -#include "boost/program_options.hpp" - -#include "FairMQLogger.h" -#include "FairMQBinSink.h" - -#ifdef NANOMSG -#include "FairMQTransportFactoryNN.h" -#else -#include "FairMQTransportFactoryZMQ.h" -#endif - -using namespace std; - -typedef struct DeviceOptions -{ - DeviceOptions() : - id(), ioThreads(0), - inputSocketType(), inputBufSize(0), inputMethod(), inputAddress() {} - - string id; - int ioThreads; - string inputSocketType; - int inputBufSize; - string inputMethod; - string inputAddress; -} DeviceOptions_t; - -inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) -{ - if (_options == NULL) - throw runtime_error("Internal error: options' container is empty."); - - namespace bpo = boost::program_options; - bpo::options_description desc("Options"); - desc.add_options() - ("id", bpo::value()->required(), "Device ID") - ("io-threads", bpo::value()->default_value(1), "Number of I/O threads") - ("input-socket-type", bpo::value()->required(), "Input socket type: sub/pull") - ("input-buff-size", bpo::value()->required(), "Input buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") - ("input-method", bpo::value()->required(), "Input method: bind/connect") - ("input-address", bpo::value()->required(), "Input address, e.g.: \"tcp://*:5555\"") - ("help", "Print help messages"); - - bpo::variables_map vm; - bpo::store(bpo::parse_command_line(_argc, _argv, desc), vm); - - if ( vm.count("help") ) - { - LOG(INFO) << "FairMQ Bin Sink" << endl << desc; - return false; - } - - bpo::notify(vm); - - if ( vm.count("id") ) - _options->id = vm["id"].as(); - - if ( vm.count("io-threads") ) - _options->ioThreads = vm["io-threads"].as(); - - if ( vm.count("input-socket-type") ) - _options->inputSocketType = vm["input-socket-type"].as(); - - if ( vm.count("input-buff-size") ) - _options->inputBufSize = vm["input-buff-size"].as(); - - if ( vm.count("input-method") ) - _options->inputMethod = vm["input-method"].as(); - - if ( vm.count("input-address") ) - _options->inputAddress = vm["input-address"].as(); - - return true; -} - -int main(int argc, char** argv) -{ - FairMQBinSink sink; - sink.CatchSignals(); - - DeviceOptions_t options; - try - { - if (!parse_cmd_line(argc, argv, &options)) - return 0; - } - catch (exception& e) - { - LOG(ERROR) << e.what(); - return 1; - } - - LOG(INFO) << "PID: " << getpid(); - -#ifdef NANOMSG - FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); -#else - FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); -#endif - - sink.SetTransport(transportFactory); - - FairMQChannel inputChannel(options.inputSocketType, options.inputMethod, options.inputAddress); - inputChannel.UpdateSndBufSize(options.inputBufSize); - inputChannel.UpdateRcvBufSize(options.inputBufSize); - inputChannel.UpdateRateLogging(1); - - sink.fChannels["data-in"].push_back(inputChannel); - - sink.SetProperty(FairMQBinSink::Id, options.id); - sink.SetProperty(FairMQBinSink::NumIoThreads, options.ioThreads); - - sink.ChangeState("INIT_DEVICE"); - sink.WaitForEndOfState("INIT_DEVICE"); - - sink.ChangeState("INIT_TASK"); - sink.WaitForEndOfState("INIT_TASK"); - - sink.ChangeState("RUN"); - sink.InteractiveStateLoop(); - - return 0; -} diff --git a/fairmq/prototest/runProtoSampler.cxx b/fairmq/prototest/runProtoSampler.cxx deleted file mode 100644 index a5b3082d..00000000 --- a/fairmq/prototest/runProtoSampler.cxx +++ /dev/null @@ -1,153 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence version 3 (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ -/** - * runBenchmarkSampler.cxx - * - * @since 2013-04-23 - * @author D. Klein, A. Rybalchenko - */ - -#include - -#include "boost/program_options.hpp" - -#include "FairMQLogger.h" -#include "FairMQProtoSampler.h" - -#ifdef NANOMSG -#include "FairMQTransportFactoryNN.h" -#else -#include "FairMQTransportFactoryZMQ.h" -#endif - -using namespace std; - -typedef struct DeviceOptions -{ - DeviceOptions() : - id(), eventSize(0), eventRate(0), ioThreads(0), - outputSocketType(), outputBufSize(0), outputMethod(), outputAddress() {} - - string id; - int eventSize; - int eventRate; - int ioThreads; - string outputSocketType; - int outputBufSize; - string outputMethod; - string outputAddress; -} DeviceOptions_t; - -inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) -{ - if (_options == NULL) - throw runtime_error("Internal error: options' container is empty."); - - namespace bpo = boost::program_options; - bpo::options_description desc("Options"); - desc.add_options() - ("id", bpo::value()->required(), "Device ID") - ("event-size", bpo::value()->default_value(1000), "Event size in bytes") - ("event-rate", bpo::value()->default_value(0), "Event rate limit in maximum number of events per second") - ("io-threads", bpo::value()->default_value(1), "Number of I/O threads") - ("output-socket-type", bpo::value()->required(), "Output socket type: pub/push") - ("output-buff-size", bpo::value()->required(), "Output buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") - ("output-method", bpo::value()->required(), "Output method: bind/connect") - ("output-address", bpo::value()->required(), "Output address, e.g.: \"tcp://*:5555\"") - ("help", "Print help messages"); - - bpo::variables_map vm; - bpo::store(bpo::parse_command_line(_argc, _argv, desc), vm); - - if ( vm.count("help") ) - { - LOG(INFO) << "FairMQ Proto Sampler" << endl << desc; - return false; - } - - bpo::notify(vm); - - if ( vm.count("id") ) - _options->id = vm["id"].as(); - - if ( vm.count("event-size") ) - _options->eventSize = vm["event-size"].as(); - - if ( vm.count("event-rate") ) - _options->eventRate = vm["event-rate"].as(); - - if ( vm.count("io-threads") ) - _options->ioThreads = vm["io-threads"].as(); - - if ( vm.count("output-socket-type") ) - _options->outputSocketType = vm["output-socket-type"].as(); - - if ( vm.count("output-buff-size") ) - _options->outputBufSize = vm["output-buff-size"].as(); - - if ( vm.count("output-method") ) - _options->outputMethod = vm["output-method"].as(); - - if ( vm.count("output-address") ) - _options->outputAddress = vm["output-address"].as(); - - return true; -} - -int main(int argc, char** argv) -{ - FairMQProtoSampler sampler; - sampler.CatchSignals(); - - DeviceOptions_t options; - try - { - if (!parse_cmd_line(argc, argv, &options)) - return 0; - } - catch (exception& e) - { - LOG(ERROR) << e.what(); - return 1; - } - - LOG(INFO) << "PID: " << getpid(); - LOG(INFO) << "CONFIG: " << "id: " << options.id << ", event size: " << options.eventSize << ", event rate: " << options.eventRate << ", I/O threads: " << options.ioThreads; - LOG(INFO) << "OUTPUT: " << options.outputSocketType << " " << options.outputBufSize << " " << options.outputMethod << " " << options.outputAddress; - -#ifdef NANOMSG - FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); -#else - FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); -#endif - - sampler.SetTransport(transportFactory); - - FairMQChannel outputChannel(options.outputSocketType, options.outputMethod, options.outputAddress); - outputChannel.UpdateSndBufSize(options.outputBufSize); - outputChannel.UpdateRcvBufSize(options.outputBufSize); - outputChannel.UpdateRateLogging(1); - - sampler.fChannels["data-out"].push_back(outputChannel); - - sampler.SetProperty(FairMQProtoSampler::Id, options.id); - sampler.SetProperty(FairMQProtoSampler::EventSize, options.eventSize); - sampler.SetProperty(FairMQProtoSampler::EventRate, options.eventRate); - sampler.SetProperty(FairMQProtoSampler::NumIoThreads, options.ioThreads); - - sampler.ChangeState("INIT_DEVICE"); - sampler.WaitForEndOfState("INIT_DEVICE"); - - sampler.ChangeState("INIT_TASK"); - sampler.WaitForEndOfState("INIT_TASK"); - - sampler.ChangeState("RUN"); - sampler.InteractiveStateLoop(); - - return 0; -} diff --git a/fairmq/prototest/runProtoSink.cxx b/fairmq/prototest/runProtoSink.cxx deleted file mode 100644 index 96e84139..00000000 --- a/fairmq/prototest/runProtoSink.cxx +++ /dev/null @@ -1,139 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence version 3 (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ -/** - * runSink.cxx - * - * @since 2013-01-21 - * @author D. Klein, A. Rybalchenko - */ - -#include - -#include "boost/program_options.hpp" - -#include "FairMQLogger.h" -#include "FairMQProtoSink.h" - -#ifdef NANOMSG -#include "FairMQTransportFactoryNN.h" -#else -#include "FairMQTransportFactoryZMQ.h" -#endif - -using namespace std; - -typedef struct DeviceOptions -{ - DeviceOptions() : - id(), ioThreads(0), - inputSocketType(), inputBufSize(0), inputMethod(), inputAddress() {} - - string id; - int ioThreads; - string inputSocketType; - int inputBufSize; - string inputMethod; - string inputAddress; -} DeviceOptions_t; - -inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) -{ - if (_options == NULL) - throw runtime_error("Internal error: options' container is empty."); - - namespace bpo = boost::program_options; - bpo::options_description desc("Options"); - desc.add_options() - ("id", bpo::value()->required(), "Device ID") - ("io-threads", bpo::value()->default_value(1), "Number of I/O threads") - ("input-socket-type", bpo::value()->required(), "Input socket type: sub/pull") - ("input-buff-size", bpo::value()->required(), "Input buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") - ("input-method", bpo::value()->required(), "Input method: bind/connect") - ("input-address", bpo::value()->required(), "Input address, e.g.: \"tcp://*:5555\"") - ("help", "Print help messages"); - - bpo::variables_map vm; - bpo::store(bpo::parse_command_line(_argc, _argv, desc), vm); - - if ( vm.count("help") ) - { - LOG(INFO) << "FairMQ Proto Sink" << endl << desc; - return false; - } - - bpo::notify(vm); - - if ( vm.count("id") ) - _options->id = vm["id"].as(); - - if ( vm.count("io-threads") ) - _options->ioThreads = vm["io-threads"].as(); - - if ( vm.count("input-socket-type") ) - _options->inputSocketType = vm["input-socket-type"].as(); - - if ( vm.count("input-buff-size") ) - _options->inputBufSize = vm["input-buff-size"].as(); - - if ( vm.count("input-method") ) - _options->inputMethod = vm["input-method"].as(); - - if ( vm.count("input-address") ) - _options->inputAddress = vm["input-address"].as(); - - return true; -} - -int main(int argc, char** argv) -{ - FairMQProtoSink sink; - sink.CatchSignals(); - - DeviceOptions_t options; - try - { - if (!parse_cmd_line(argc, argv, &options)) - return 0; - } - catch (exception& e) - { - LOG(ERROR) << e.what(); - return 1; - } - - LOG(INFO) << "PID: " << getpid(); - -#ifdef NANOMSG - FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); -#else - FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); -#endif - - sink.SetTransport(transportFactory); - - FairMQChannel inputChannel(options.inputSocketType, options.inputMethod, options.inputAddress); - inputChannel.UpdateSndBufSize(options.inputBufSize); - inputChannel.UpdateRcvBufSize(options.inputBufSize); - inputChannel.UpdateRateLogging(1); - - sink.fChannels["data-in"].push_back(inputChannel); - - sink.SetProperty(FairMQProtoSink::Id, options.id); - sink.SetProperty(FairMQProtoSink::NumIoThreads, options.ioThreads); - - sink.ChangeState("INIT_DEVICE"); - sink.WaitForEndOfState("INIT_DEVICE"); - - sink.ChangeState("INIT_TASK"); - sink.WaitForEndOfState("INIT_TASK"); - - sink.ChangeState("RUN"); - sink.InteractiveStateLoop(); - - return 0; -} diff --git a/fairmq/prototest/startBin.sh.in b/fairmq/prototest/startBin.sh.in deleted file mode 100755 index d8313f22..00000000 --- a/fairmq/prototest/startBin.sh.in +++ /dev/null @@ -1,18 +0,0 @@ -#!/bin/bash - -if(@NANOMSG_FOUND@); then - buffSize="50000000" # nanomsg buffer size is in bytes -else - buffSize="1000" # zeromq high-water mark is in messages -fi - -SAMPLER="binsampler" -SAMPLER+=" --id 101" -SAMPLER+=" --event-size 10000" -SAMPLER+=" --output-socket-type push --output-buff-size $buffSize --output-method bind --output-address tcp://*:5565" -xterm -e @CMAKE_BINARY_DIR@/bin/$SAMPLER & - -SINK="binsink" -SINK+=" --id 201" -SINK+=" --input-socket-type pull --input-buff-size $buffSize --input-method connect --input-address tcp://localhost:5565" -xterm -e @CMAKE_BINARY_DIR@/bin/$SINK & diff --git a/fairmq/prototest/startProto.sh.in b/fairmq/prototest/startProto.sh.in deleted file mode 100755 index 0e25ba18..00000000 --- a/fairmq/prototest/startProto.sh.in +++ /dev/null @@ -1,18 +0,0 @@ -#!/bin/bash - -if(@NANOMSG_FOUND@); then - buffSize="50000000" # nanomsg buffer size is in bytes -else - buffSize="1000" # zeromq high-water mark is in messages -fi - -SAMPLER="protosampler" -SAMPLER+=" --id 101" -SAMPLER+=" --event-size 10000" -SAMPLER+=" --output-socket-type push --output-buff-size $buffSize --output-method bind --output-address tcp://*:5565" -xterm -e @CMAKE_BINARY_DIR@/bin/$SAMPLER & - -SINK="protosink" -SINK+=" --id 201" -SINK+=" --input-socket-type pull --input-buff-size $buffSize --input-method connect --input-address tcp://localhost:5565" -xterm -e @CMAKE_BINARY_DIR@/bin/$SINK & diff --git a/fairmq/run/runBenchmarkSampler.cxx b/fairmq/run/runBenchmarkSampler.cxx index 61156cb7..819b521e 100644 --- a/fairmq/run/runBenchmarkSampler.cxx +++ b/fairmq/run/runBenchmarkSampler.cxx @@ -22,12 +22,6 @@ #include "FairMQProgOptions.h" #include "FairMQBenchmarkSampler.h" -#ifdef NANOMSG -#include "FairMQTransportFactoryNN.h" -#else -#include "FairMQTransportFactoryZMQ.h" -#endif - using namespace std; using namespace FairMQParser; using namespace boost::program_options; @@ -65,11 +59,7 @@ int main(int argc, char** argv) LOG(INFO) << "PID: " << getpid(); -#ifdef NANOMSG - sampler.SetTransport(new FairMQTransportFactoryNN()); -#else - sampler.SetTransport(new FairMQTransportFactoryZMQ()); -#endif + sampler.SetTransport(config.GetValue("transport")); sampler.SetProperty(FairMQBenchmarkSampler::Id, id); sampler.SetProperty(FairMQBenchmarkSampler::MsgSize, msgSize); diff --git a/fairmq/run/runBuffer.cxx b/fairmq/run/runBuffer.cxx index 3dd7693e..499a0f78 100644 --- a/fairmq/run/runBuffer.cxx +++ b/fairmq/run/runBuffer.cxx @@ -19,23 +19,18 @@ #include "FairMQLogger.h" #include "FairMQBuffer.h" -#ifdef NANOMSG -#include "FairMQTransportFactoryNN.h" -#else -#include "FairMQTransportFactoryZMQ.h" -#endif - using namespace std; typedef struct DeviceOptions { DeviceOptions() : - id(), ioThreads(0), + id(), ioThreads(0), transport(), inputSocketType(), inputBufSize(0), inputMethod(), inputAddress(), outputSocketType(), outputBufSize(0), outputMethod(), outputAddress() {} string id; int ioThreads; + string transport; string inputSocketType; int inputBufSize; string inputMethod; @@ -56,6 +51,7 @@ inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) desc.add_options() ("id", bpo::value(), "Device ID") ("io-threads", bpo::value()->default_value(1), "Number of I/O threads") + ("transport", bpo::value()->default_value("zeromq"), "Transport (zeromq/nanomsg)") ("input-socket-type", bpo::value()->required(), "Input socket type: sub/pull") ("input-buff-size", bpo::value()->default_value(1000), "Input buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") ("input-method", bpo::value()->required(), "Input method: bind/connect") @@ -69,7 +65,7 @@ inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) bpo::variables_map vm; bpo::store(bpo::parse_command_line(_argc, _argv, desc), vm); - if ( vm.count("help") ) + if (vm.count("help")) { LOG(INFO) << "FairMQ Buffer" << endl << desc; return false; @@ -77,35 +73,17 @@ inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) bpo::notify(vm); - if ( vm.count("id") ) - _options->id = vm["id"].as(); - - if ( vm.count("io-threads") ) - _options->ioThreads = vm["io-threads"].as(); - - if ( vm.count("input-socket-type") ) - _options->inputSocketType = vm["input-socket-type"].as(); - - if ( vm.count("input-buff-size") ) - _options->inputBufSize = vm["input-buff-size"].as(); - - if ( vm.count("input-method") ) - _options->inputMethod = vm["input-method"].as(); - - if ( vm.count("input-address") ) - _options->inputAddress = vm["input-address"].as(); - - if ( vm.count("output-socket-type") ) - _options->outputSocketType = vm["output-socket-type"].as(); - - if ( vm.count("output-buff-size") ) - _options->outputBufSize = vm["output-buff-size"].as(); - - if ( vm.count("output-method") ) - _options->outputMethod = vm["output-method"].as(); - - if ( vm.count("output-address") ) - _options->outputAddress = vm["output-address"].as(); + if (vm.count("id")) { _options->id = vm["id"].as(); } + if (vm.count("io-threads")) { _options->ioThreads = vm["io-threads"].as(); } + if (vm.count("transport")) { _options->transport = vm["transport"].as(); } + if (vm.count("input-socket-type")) { _options->inputSocketType = vm["input-socket-type"].as(); } + if (vm.count("input-buff-size")) { _options->inputBufSize = vm["input-buff-size"].as(); } + if (vm.count("input-method")) { _options->inputMethod = vm["input-method"].as(); } + if (vm.count("input-address")) { _options->inputAddress = vm["input-address"].as(); } + if (vm.count("output-socket-type")) { _options->outputSocketType = vm["output-socket-type"].as(); } + if (vm.count("output-buff-size")) { _options->outputBufSize = vm["output-buff-size"].as(); } + if (vm.count("output-method")) { _options->outputMethod = vm["output-method"].as(); } + if (vm.count("output-address")) { _options->outputAddress = vm["output-address"].as(); } return true; } @@ -129,13 +107,7 @@ int main(int argc, char** argv) LOG(INFO) << "PID: " << getpid(); -#ifdef NANOMSG - FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); -#else - FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); -#endif - - buffer.SetTransport(transportFactory); + buffer.SetTransport(options.transport); FairMQChannel inputChannel(options.inputSocketType, options.inputMethod, options.inputAddress); inputChannel.UpdateSndBufSize(options.inputBufSize); diff --git a/fairmq/run/runMerger.cxx b/fairmq/run/runMerger.cxx index 17311c57..0e97c008 100644 --- a/fairmq/run/runMerger.cxx +++ b/fairmq/run/runMerger.cxx @@ -19,23 +19,18 @@ #include "FairMQLogger.h" #include "FairMQMerger.h" -#ifdef NANOMSG -#include "FairMQTransportFactoryNN.h" -#else -#include "FairMQTransportFactoryZMQ.h" -#endif - using namespace std; typedef struct DeviceOptions { DeviceOptions() : - id(), ioThreads(0), numInputs(0), + id(), ioThreads(0), transport(), numInputs(0), inputSocketType(), inputBufSize(), inputMethod(), inputAddress(), outputSocketType(), outputBufSize(0), outputMethod(), outputAddress() {} string id; int ioThreads; + string transport; int numInputs; vector inputSocketType; vector inputBufSize; @@ -57,6 +52,7 @@ inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) desc.add_options() ("id", bpo::value()->required(), "Device ID") ("io-threads", bpo::value()->default_value(1), "Number of I/O threads") + ("transport", bpo::value()->default_value("zeromq"), "Transport (zeromq/nanomsg)") ("num-inputs", bpo::value()->required(), "Number of Merger input sockets") ("input-socket-type", bpo::value>()->required(), "Input socket type: sub/pull") ("input-buff-size", bpo::value>()->required(), "Input buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") @@ -79,38 +75,18 @@ inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) bpo::notify(vm); - if (vm.count("id")) - _options->id = vm["id"].as(); - - if (vm.count("io-threads")) - _options->ioThreads = vm["io-threads"].as(); - - if (vm.count("num-inputs")) - _options->numInputs = vm["num-inputs"].as(); - - if (vm.count("input-socket-type")) - _options->inputSocketType = vm["input-socket-type"].as>(); - - if (vm.count("input-buff-size")) - _options->inputBufSize = vm["input-buff-size"].as>(); - - if (vm.count("input-method")) - _options->inputMethod = vm["input-method"].as>(); - - if (vm.count("input-address")) - _options->inputAddress = vm["input-address"].as>(); - - if (vm.count("output-socket-type")) - _options->outputSocketType = vm["output-socket-type"].as(); - - if (vm.count("output-buff-size")) - _options->outputBufSize = vm["output-buff-size"].as(); - - if (vm.count("output-method")) - _options->outputMethod = vm["output-method"].as(); - - if (vm.count("output-address")) - _options->outputAddress = vm["output-address"].as(); + if (vm.count("id")) { _options->id = vm["id"].as(); } + if (vm.count("io-threads")) { _options->ioThreads = vm["io-threads"].as(); } + if (vm.count("transport")) { _options->transport = vm["transport"].as(); } + if (vm.count("num-inputs")) { _options->numInputs = vm["num-inputs"].as(); } + if (vm.count("input-socket-type")) { _options->inputSocketType = vm["input-socket-type"].as>(); } + if (vm.count("input-buff-size")) { _options->inputBufSize = vm["input-buff-size"].as>(); } + if (vm.count("input-method")) { _options->inputMethod = vm["input-method"].as>(); } + if (vm.count("input-address")) { _options->inputAddress = vm["input-address"].as>(); } + if (vm.count("output-socket-type")) { _options->outputSocketType = vm["output-socket-type"].as(); } + if (vm.count("output-buff-size")) { _options->outputBufSize = vm["output-buff-size"].as(); } + if (vm.count("output-method")) { _options->outputMethod = vm["output-method"].as(); } + if (vm.count("output-address")) { _options->outputAddress = vm["output-address"].as(); } return true; } @@ -134,13 +110,7 @@ int main(int argc, char** argv) LOG(INFO) << "PID: " << getpid(); -#ifdef NANOMSG - FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); -#else - FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); -#endif - - merger.SetTransport(transportFactory); + merger.SetTransport(options.transport); for (unsigned int i = 0; i < options.inputAddress.size(); ++i) { diff --git a/fairmq/run/runProxy.cxx b/fairmq/run/runProxy.cxx index f5e289cc..52086690 100644 --- a/fairmq/run/runProxy.cxx +++ b/fairmq/run/runProxy.cxx @@ -19,23 +19,18 @@ #include "FairMQLogger.h" #include "FairMQProxy.h" -#ifdef NANOMSG -#include "FairMQTransportFactoryNN.h" -#else -#include "FairMQTransportFactoryZMQ.h" -#endif - using namespace std; typedef struct DeviceOptions { DeviceOptions() : - id(), ioThreads(0), + id(), ioThreads(0), transport(), inputSocketType(), inputBufSize(0), inputMethod(), inputAddress(), outputSocketType(), outputBufSize(0), outputMethod(), outputAddress() {} string id; int ioThreads; + string transport; string inputSocketType; int inputBufSize; string inputMethod; @@ -56,6 +51,7 @@ inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) desc.add_options() ("id", bpo::value()->required(), "Device ID") ("io-threads", bpo::value()->default_value(1), "Number of I/O threads") + ("transport", bpo::value()->default_value("zeromq"), "Transport (zeromq/nanomsg)") ("input-socket-type", bpo::value()->required(), "Input socket type: sub/pull") ("input-buff-size", bpo::value()->default_value(1000), "Input buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") ("input-method", bpo::value()->required(), "Input method: bind/connect") @@ -69,7 +65,7 @@ inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) bpo::variables_map vm; bpo::store(bpo::parse_command_line(_argc, _argv, desc), vm); - if ( vm.count("help") ) + if (vm.count("help")) { LOG(INFO) << "FairMQ Proxy" << endl << desc; return false; @@ -77,35 +73,17 @@ inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) bpo::notify(vm); - if ( vm.count("id") ) - _options->id = vm["id"].as(); - - if ( vm.count("io-threads") ) - _options->ioThreads = vm["io-threads"].as(); - - if ( vm.count("input-socket-type") ) - _options->inputSocketType = vm["input-socket-type"].as(); - - if ( vm.count("input-buff-size") ) - _options->inputBufSize = vm["input-buff-size"].as(); - - if ( vm.count("input-method") ) - _options->inputMethod = vm["input-method"].as(); - - if ( vm.count("input-address") ) - _options->inputAddress = vm["input-address"].as(); - - if ( vm.count("output-socket-type") ) - _options->outputSocketType = vm["output-socket-type"].as(); - - if ( vm.count("output-buff-size") ) - _options->outputBufSize = vm["output-buff-size"].as(); - - if ( vm.count("output-method") ) - _options->outputMethod = vm["output-method"].as(); - - if ( vm.count("output-address") ) - _options->outputAddress = vm["output-address"].as(); + if (vm.count("id")) { _options->id = vm["id"].as(); } + if (vm.count("io-threads")) { _options->ioThreads = vm["io-threads"].as(); } + if (vm.count("transport")) { _options->transport = vm["transport"].as(); } + if (vm.count("input-socket-type")) { _options->inputSocketType = vm["input-socket-type"].as(); } + if (vm.count("input-buff-size")) { _options->inputBufSize = vm["input-buff-size"].as(); } + if (vm.count("input-method")) { _options->inputMethod = vm["input-method"].as(); } + if (vm.count("input-address")) { _options->inputAddress = vm["input-address"].as(); } + if (vm.count("output-socket-type")) { _options->outputSocketType = vm["output-socket-type"].as(); } + if (vm.count("output-buff-size")) { _options->outputBufSize = vm["output-buff-size"].as(); } + if (vm.count("output-method")) { _options->outputMethod = vm["output-method"].as(); } + if (vm.count("output-address")) { _options->outputAddress = vm["output-address"].as(); } return true; } @@ -129,13 +107,7 @@ int main(int argc, char** argv) LOG(INFO) << "PID: " << getpid(); -#ifdef NANOMSG - FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); -#else - FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); -#endif - - proxy.SetTransport(transportFactory); + proxy.SetTransport(options.transport); FairMQChannel inputChannel(options.inputSocketType, options.inputMethod, options.inputAddress); inputChannel.UpdateSndBufSize(options.inputBufSize); diff --git a/fairmq/run/runSink.cxx b/fairmq/run/runSink.cxx index 612babf9..b4f502ad 100644 --- a/fairmq/run/runSink.cxx +++ b/fairmq/run/runSink.cxx @@ -21,12 +21,6 @@ #include "FairMQProgOptions.h" #include "FairMQSink.h" -#ifdef NANOMSG -#include "FairMQTransportFactoryNN.h" -#else -#include "FairMQTransportFactoryZMQ.h" -#endif - using namespace std; using namespace FairMQParser; using namespace boost::program_options; @@ -63,13 +57,7 @@ int main(int argc, char** argv) LOG(INFO) << "PID: " << getpid(); -#ifdef NANOMSG - FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); -#else - FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); -#endif - - sink.SetTransport(transportFactory); + sink.SetTransport(config.GetValue("transport")); sink.SetProperty(FairMQSink::Id, id); sink.SetProperty(FairMQSink::NumMsgs, numMsgs); diff --git a/fairmq/run/runSplitter.cxx b/fairmq/run/runSplitter.cxx index d553556c..ecc50bc3 100644 --- a/fairmq/run/runSplitter.cxx +++ b/fairmq/run/runSplitter.cxx @@ -19,24 +19,19 @@ #include "FairMQLogger.h" #include "FairMQSplitter.h" -#ifdef NANOMSG -#include "FairMQTransportFactoryNN.h" -#else -#include "FairMQTransportFactoryZMQ.h" -#endif - using namespace std; typedef struct DeviceOptions { DeviceOptions() : - id(), ioThreads(0), numOutputs(0), + id(), ioThreads(0), transport(), numOutputs(0), inputSocketType(), inputBufSize(0), inputMethod(), inputAddress(), outputSocketType(), outputBufSize(), outputMethod(), outputAddress() {} string id; int ioThreads; + string transport; int numOutputs; string inputSocketType; int inputBufSize; @@ -58,6 +53,7 @@ inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) desc.add_options() ("id", bpo::value()->required(), "Device ID") ("io-threads", bpo::value()->default_value(1), "Number of I/O threads") + ("transport", bpo::value()->default_value("zeromq"), "Transport (zeromq/nanomsg)") ("num-outputs", bpo::value()->required(), "Number of Splitter output sockets") ("input-socket-type", bpo::value()->required(), "Input socket type: sub/pull") ("input-buff-size", bpo::value(), "Input buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") @@ -72,7 +68,7 @@ inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) bpo::variables_map vm; bpo::store(bpo::parse_command_line(_argc, _argv, desc), vm); - if ( vm.count("help") ) + if (vm.count("help")) { LOG(INFO) << "FairMQ Splitter" << endl << desc; return false; @@ -80,38 +76,18 @@ inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) bpo::notify(vm); - if ( vm.count("id") ) - _options->id = vm["id"].as(); - - if ( vm.count("io-threads") ) - _options->ioThreads = vm["io-threads"].as(); - - if ( vm.count("num-outputs") ) - _options->numOutputs = vm["num-outputs"].as(); - - if ( vm.count("input-socket-type") ) - _options->inputSocketType = vm["input-socket-type"].as(); - - if ( vm.count("input-buff-size") ) - _options->inputBufSize = vm["input-buff-size"].as(); - - if ( vm.count("input-method") ) - _options->inputMethod = vm["input-method"].as(); - - if ( vm.count("input-address") ) - _options->inputAddress = vm["input-address"].as(); - - if ( vm.count("output-socket-type") ) - _options->outputSocketType = vm["output-socket-type"].as>(); - - if ( vm.count("output-buff-size") ) - _options->outputBufSize = vm["output-buff-size"].as>(); - - if ( vm.count("output-method") ) - _options->outputMethod = vm["output-method"].as>(); - - if ( vm.count("output-address") ) - _options->outputAddress = vm["output-address"].as>(); + if (vm.count("id")) { _options->id = vm["id"].as(); } + if (vm.count("io-threads")) { _options->ioThreads = vm["io-threads"].as(); } + if (vm.count("transport")) { _options->transport = vm["transport"].as(); } + if (vm.count("num-outputs")) { _options->numOutputs = vm["num-outputs"].as(); } + if (vm.count("input-socket-type")) { _options->inputSocketType = vm["input-socket-type"].as(); } + if (vm.count("input-buff-size")) { _options->inputBufSize = vm["input-buff-size"].as(); } + if (vm.count("input-method")) { _options->inputMethod = vm["input-method"].as(); } + if (vm.count("input-address")) { _options->inputAddress = vm["input-address"].as(); } + if (vm.count("output-socket-type")) { _options->outputSocketType = vm["output-socket-type"].as>(); } + if (vm.count("output-buff-size")) { _options->outputBufSize = vm["output-buff-size"].as>(); } + if (vm.count("output-method")) { _options->outputMethod = vm["output-method"].as>(); } + if (vm.count("output-address")) { _options->outputAddress = vm["output-address"].as>(); } return true; } @@ -135,13 +111,7 @@ int main(int argc, char** argv) LOG(INFO) << "PID: " << getpid(); -#ifdef NANOMSG - FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); -#else - FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); -#endif - - splitter.SetTransport(transportFactory); + splitter.SetTransport(options.transport); FairMQChannel inputChannel(options.inputSocketType, options.inputMethod, options.inputAddress); inputChannel.UpdateSndBufSize(options.inputBufSize); diff --git a/fairmq/test/CMakeLists.txt b/fairmq/test/CMakeLists.txt index 3620bda1..d65cfd3f 100644 --- a/fairmq/test/CMakeLists.txt +++ b/fairmq/test/CMakeLists.txt @@ -12,6 +12,8 @@ configure_file(${CMAKE_SOURCE_DIR}/fairmq/test/test-fairmq-req-rep.sh.in ${CMAKE Set(INCLUDE_DIRECTORIES ${CMAKE_SOURCE_DIR}/fairmq + ${CMAKE_SOURCE_DIR}/fairmq/zeromq + ${CMAKE_SOURCE_DIR}/fairmq/nanomsg ${CMAKE_SOURCE_DIR}/fairmq/devices ${CMAKE_SOURCE_DIR}/fairmq/tools ${CMAKE_SOURCE_DIR}/fairmq/options @@ -24,28 +26,10 @@ Set(INCLUDE_DIRECTORIES Set(SYSTEM_INCLUDE_DIRECTORIES ${Boost_INCLUDE_DIR} + ${ZMQ_INCLUDE_DIR} + ${NANOMSG_INCLUDE_DIR} ) -If(NANOMSG_FOUND) - Set(INCLUDE_DIRECTORIES - ${INCLUDE_DIRECTORIES} - ${CMAKE_SOURCE_DIR}/fairmq/nanomsg - ) - Set(SYSTEM_INCLUDE_DIRECTORIES - ${SYSTEM_INCLUDE_DIRECTORIES} - ${ZMQ_INCLUDE_DIR} - ) -Else(NANOMSG_FOUND) - Set(INCLUDE_DIRECTORIES - ${INCLUDE_DIRECTORIES} - ${CMAKE_SOURCE_DIR}/fairmq/zeromq - ) - Set(SYSTEM_INCLUDE_DIRECTORIES - ${SYSTEM_INCLUDE_DIRECTORIES} - ${ZMQ_INCLUDE_DIR} - ) -EndIf(NANOMSG_FOUND) - Include_Directories(${INCLUDE_DIRECTORIES}) Include_Directories(SYSTEM ${SYSTEM_INCLUDE_DIRECTORIES}) diff --git a/fairmq/test/pub-sub/runTestPub.cxx b/fairmq/test/pub-sub/runTestPub.cxx index 21ed2a0d..0af27dda 100644 --- a/fairmq/test/pub-sub/runTestPub.cxx +++ b/fairmq/test/pub-sub/runTestPub.cxx @@ -12,27 +12,14 @@ * @author A. Rybalchenko */ - - #include "FairMQLogger.h" #include "FairMQTestPub.h" -#ifdef NANOMSG -#include "FairMQTransportFactoryNN.h" -#else -#include "FairMQTransportFactoryZMQ.h" -#endif - int main(int argc, char** argv) { FairMQTestPub testPub; testPub.CatchSignals(); - -#ifdef NANOMSG - testPub.SetTransport(new FairMQTransportFactoryNN()); -#else - testPub.SetTransport(new FairMQTransportFactoryZMQ()); -#endif + testPub.SetTransport("zeromq"); testPub.SetProperty(FairMQTestPub::Id, "testPub"); diff --git a/fairmq/test/pub-sub/runTestSub.cxx b/fairmq/test/pub-sub/runTestSub.cxx index babaf28c..e66f1154 100644 --- a/fairmq/test/pub-sub/runTestSub.cxx +++ b/fairmq/test/pub-sub/runTestSub.cxx @@ -17,22 +17,11 @@ #include "FairMQLogger.h" #include "FairMQTestSub.h" -#ifdef NANOMSG -#include "FairMQTransportFactoryNN.h" -#else -#include "FairMQTransportFactoryZMQ.h" -#endif - int main(int argc, char** argv) { FairMQTestSub testSub; testSub.CatchSignals(); - -#ifdef NANOMSG - testSub.SetTransport(new FairMQTransportFactoryNN()); -#else - testSub.SetTransport(new FairMQTransportFactoryZMQ()); -#endif + testSub.SetTransport("zeromq"); testSub.SetProperty(FairMQTestSub::Id, "testSub_" + std::to_string(getpid())); diff --git a/fairmq/test/push-pull/runTestPull.cxx b/fairmq/test/push-pull/runTestPull.cxx index 70e3f4e8..12de2a18 100644 --- a/fairmq/test/push-pull/runTestPull.cxx +++ b/fairmq/test/push-pull/runTestPull.cxx @@ -15,22 +15,11 @@ #include "FairMQLogger.h" #include "FairMQTestPull.h" -#ifdef NANOMSG -#include "FairMQTransportFactoryNN.h" -#else -#include "FairMQTransportFactoryZMQ.h" -#endif - int main(int argc, char** argv) { FairMQTestPull testPull; testPull.CatchSignals(); - -#ifdef NANOMSG - testPull.SetTransport(new FairMQTransportFactoryNN()); -#else - testPull.SetTransport(new FairMQTransportFactoryZMQ()); -#endif + testPull.SetTransport("zeromq"); testPull.SetProperty(FairMQTestPull::Id, "testPull"); diff --git a/fairmq/test/push-pull/runTestPush.cxx b/fairmq/test/push-pull/runTestPush.cxx index 4cfe2ee6..62dcc19c 100644 --- a/fairmq/test/push-pull/runTestPush.cxx +++ b/fairmq/test/push-pull/runTestPush.cxx @@ -15,22 +15,11 @@ #include "FairMQLogger.h" #include "FairMQTestPush.h" -#ifdef NANOMSG -#include "FairMQTransportFactoryNN.h" -#else -#include "FairMQTransportFactoryZMQ.h" -#endif - int main(int argc, char** argv) { FairMQTestPush testPush; testPush.CatchSignals(); - -#ifdef NANOMSG - testPush.SetTransport(new FairMQTransportFactoryNN()); -#else - testPush.SetTransport(new FairMQTransportFactoryZMQ()); -#endif + testPush.SetTransport("zeromq"); testPush.SetProperty(FairMQTestPush::Id, "testPush"); diff --git a/fairmq/test/req-rep/runTestRep.cxx b/fairmq/test/req-rep/runTestRep.cxx index dd7b7ff5..3684859e 100644 --- a/fairmq/test/req-rep/runTestRep.cxx +++ b/fairmq/test/req-rep/runTestRep.cxx @@ -17,22 +17,11 @@ #include "FairMQLogger.h" #include "FairMQTestRep.h" -#ifdef NANOMSG -#include "FairMQTransportFactoryNN.h" -#else -#include "FairMQTransportFactoryZMQ.h" -#endif - int main(int argc, char** argv) { FairMQTestRep testRep; testRep.CatchSignals(); - -#ifdef NANOMSG - testRep.SetTransport(new FairMQTransportFactoryNN()); -#else - testRep.SetTransport(new FairMQTransportFactoryZMQ()); -#endif + testRep.SetTransport("zeromq"); testRep.SetProperty(FairMQTestRep::Id, "testRep"); diff --git a/fairmq/test/req-rep/runTestReq.cxx b/fairmq/test/req-rep/runTestReq.cxx index 029ac389..8277b630 100644 --- a/fairmq/test/req-rep/runTestReq.cxx +++ b/fairmq/test/req-rep/runTestReq.cxx @@ -17,22 +17,11 @@ #include "FairMQLogger.h" #include "FairMQTestReq.h" -#ifdef NANOMSG -#include "FairMQTransportFactoryNN.h" -#else -#include "FairMQTransportFactoryZMQ.h" -#endif - int main(int argc, char** argv) { FairMQTestReq testReq; testReq.CatchSignals(); - -#ifdef NANOMSG - testReq.SetTransport(new FairMQTransportFactoryNN()); -#else - testReq.SetTransport(new FairMQTransportFactoryZMQ()); -#endif + testReq.SetTransport("zeromq"); testReq.SetProperty(FairMQTestReq::Id, "testReq"); diff --git a/fairmq/test/runTransferTimeoutTest.cxx b/fairmq/test/runTransferTimeoutTest.cxx index 9b6a865e..ea545090 100644 --- a/fairmq/test/runTransferTimeoutTest.cxx +++ b/fairmq/test/runTransferTimeoutTest.cxx @@ -15,12 +15,6 @@ #include "FairMQLogger.h" #include "FairMQDevice.h" -#ifdef NANOMSG -#include "FairMQTransportFactoryNN.h" -#else -#include "FairMQTransportFactoryZMQ.h" -#endif - class TransferTimeoutTester : public FairMQDevice { public: @@ -98,12 +92,7 @@ int main(int argc, char** argv) { TransferTimeoutTester timeoutTester; timeoutTester.CatchSignals(); - -#ifdef NANOMSG - timeoutTester.SetTransport(new FairMQTransportFactoryNN()); -#else - timeoutTester.SetTransport(new FairMQTransportFactoryZMQ()); -#endif + timeoutTester.SetTransport("zeromq"); timeoutTester.SetProperty(TransferTimeoutTester::Id, "timeoutTester"); diff --git a/fairmq/tools/runSimpleMQStateMachine.h b/fairmq/tools/runSimpleMQStateMachine.h index 140290e5..ef9d9ca8 100644 --- a/fairmq/tools/runSimpleMQStateMachine.h +++ b/fairmq/tools/runSimpleMQStateMachine.h @@ -16,13 +16,6 @@ /// boost #include "boost/program_options.hpp" -/// ZMQ/nmsg (in FairSoft) -#ifdef NANOMSG -#include "FairMQTransportFactoryNN.h" -#else -#include "FairMQTransportFactoryZMQ.h" -#endif - /// FairRoot - FairMQ #include "FairMQLogger.h" #include "FairMQParser.h" @@ -47,13 +40,7 @@ inline int runStateMachine(TMQDevice& device, FairMQProgOptions& config) LOG(INFO) << "PID: " << getpid(); -#ifdef NANOMSG - FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); -#else - FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); -#endif - - device.SetTransport(transportFactory); + device.SetTransport(config.GetValue("transport")); device.ChangeState(TMQDevice::INIT_DEVICE); device.WaitForEndOfState(TMQDevice::INIT_DEVICE); @@ -84,13 +71,7 @@ inline int runNonInteractiveStateMachine(TMQDevice& device, FairMQProgOptions& c LOG(INFO) << "PID: " << getpid(); -#ifdef NANOMSG - FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); -#else - FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); -#endif - - device.SetTransport(transportFactory); + device.SetTransport(config.GetValue("transport")); device.ChangeState(TMQDevice::INIT_DEVICE); device.WaitForEndOfState(TMQDevice::INIT_DEVICE); diff --git a/fairmq/zeromq/FairMQPollerZMQ.h b/fairmq/zeromq/FairMQPollerZMQ.h index 7164fc19..f3a7d6ff 100644 --- a/fairmq/zeromq/FairMQPollerZMQ.h +++ b/fairmq/zeromq/FairMQPollerZMQ.h @@ -50,7 +50,7 @@ class FairMQPollerZMQ : public FairMQPoller zmq_pollitem_t* items; int fNumItems; - std::unordered_map fOffsetMap; + std::unordered_map fOffsetMap; }; #endif /* FAIRMQPOLLERZMQ_H_ */ \ No newline at end of file