diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index 8b8da60d..6b55ac6b 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -14,6 +14,10 @@ configure_file(${CMAKE_SOURCE_DIR}/fairmq/examples/3-dds/ex3-dds-topology.xml ${ configure_file(${CMAKE_SOURCE_DIR}/fairmq/examples/3-dds/ex3-dds-hosts.cfg ${CMAKE_BINARY_DIR}/bin/config/ex3-dds-hosts.cfg COPYONLY) configure_file(${CMAKE_SOURCE_DIR}/fairmq/examples/4-copypush/ex4-copypush.json ${CMAKE_BINARY_DIR}/bin/config/ex4-copypush.json) +configure_file(${CMAKE_SOURCE_DIR}/fairmq/test/test-fairmq-push-pull.sh.in ${CMAKE_BINARY_DIR}/fairmq/test/test-fairmq-push-pull.sh) +configure_file(${CMAKE_SOURCE_DIR}/fairmq/test/test-fairmq-pub-sub.sh.in ${CMAKE_BINARY_DIR}/fairmq/test/test-fairmq-pub-sub.sh) +configure_file(${CMAKE_SOURCE_DIR}/fairmq/test/test-fairmq-req-rep.sh.in ${CMAKE_BINARY_DIR}/fairmq/test/test-fairmq-req-rep.sh) + add_subdirectory(logger) Set(INCLUDE_DIRECTORIES @@ -26,6 +30,9 @@ Set(INCLUDE_DIRECTORIES ${CMAKE_SOURCE_DIR}/fairmq/examples/2-sampler-processor-sink ${CMAKE_SOURCE_DIR}/fairmq/examples/4-copypush ${CMAKE_SOURCE_DIR}/fairmq/examples/5-req-rep + ${CMAKE_SOURCE_DIR}/fairmq/test/push-pull + ${CMAKE_SOURCE_DIR}/fairmq/test/pub-sub + ${CMAKE_SOURCE_DIR}/fairmq/test/req-rep ${CMAKE_CURRENT_BINARY_DIR} ) @@ -120,16 +127,20 @@ set(SRCS "examples/1-sampler-sink/FairMQExample1Sampler.cxx" "examples/1-sampler-sink/FairMQExample1Sink.cxx" - "examples/2-sampler-processor-sink/FairMQExample2Sampler.cxx" "examples/2-sampler-processor-sink/FairMQExample2Processor.cxx" "examples/2-sampler-processor-sink/FairMQExample2Sink.cxx" - "examples/4-copypush/FairMQExample4Sampler.cxx" "examples/4-copypush/FairMQExample4Sink.cxx" - "examples/5-req-rep/FairMQExample5Client.cxx" "examples/5-req-rep/FairMQExample5Server.cxx" + + "test/push-pull/FairMQTestPush.cxx" + "test/push-pull/FairMQTestPull.cxx" + "test/pub-sub/FairMQTestPub.cxx" + "test/pub-sub/FairMQTestSub.cxx" + "test/req-rep/FairMQTestReq.cxx" + "test/req-rep/FairMQTestRep.cxx" ) if(DDS_FOUND) @@ -241,6 +252,12 @@ set(Exe_Names ex4-sink ex5-client ex5-server + test-fairmq-push + test-fairmq-pull + test-fairmq-pub + test-fairmq-sub + test-fairmq-req + test-fairmq-rep ) if(DDS_FOUND) @@ -263,7 +280,7 @@ endif(DDS_FOUND) # ) # endif(PROTOBUF_FOUND) -set(Exe_Source +set(Exe_Source run/runBenchmarkSampler.cxx run/runSink.cxx run/runBuffer.cxx @@ -279,6 +296,12 @@ set(Exe_Source examples/4-copypush/runExample4Sink.cxx examples/5-req-rep/runExample5Client.cxx examples/5-req-rep/runExample5Server.cxx + test/push-pull/runTestPush.cxx + test/push-pull/runTestPull.cxx + test/pub-sub/runTestPub.cxx + test/pub-sub/runTestSub.cxx + test/req-rep/runTestReq.cxx + test/req-rep/runTestRep.cxx ) if(DDS_FOUND) @@ -312,3 +335,15 @@ ForEach(_file RANGE 0 ${_length}) set(DEPENDENCIES FairMQ) GENERATE_EXECUTABLE() EndForEach(_file RANGE 0 ${_length}) + +add_test(NAME run_fairmq_push_pull COMMAND ${CMAKE_BINARY_DIR}/fairmq/test/test-fairmq-push-pull.sh) +set_tests_properties(run_fairmq_push_pull PROPERTIES TIMEOUT "30") +set_tests_properties(run_fairmq_push_pull PROPERTIES PASS_REGULAR_EXPRESSION "PUSH-PULL test successfull") + +add_test(NAME run_fairmq_pub_sub COMMAND ${CMAKE_BINARY_DIR}/fairmq/test/test-fairmq-pub-sub.sh) +set_tests_properties(run_fairmq_pub_sub PROPERTIES TIMEOUT "30") +set_tests_properties(run_fairmq_pub_sub PROPERTIES PASS_REGULAR_EXPRESSION "PUB-SUB test successfull") + +add_test(NAME run_fairmq_req_rep COMMAND ${CMAKE_BINARY_DIR}/fairmq/test/test-fairmq-req-rep.sh) +set_tests_properties(run_fairmq_req_rep PROPERTIES TIMEOUT "30") +set_tests_properties(run_fairmq_req_rep PROPERTIES PASS_REGULAR_EXPRESSION "REQ-REP test successfull") diff --git a/fairmq/FairMQTransportFactory.h b/fairmq/FairMQTransportFactory.h index 897887d1..292d5df1 100644 --- a/fairmq/FairMQTransportFactory.h +++ b/fairmq/FairMQTransportFactory.h @@ -15,15 +15,15 @@ #ifndef FAIRMQTRANSPORTFACTORY_H_ #define FAIRMQTRANSPORTFACTORY_H_ +#include +#include + #include "FairMQMessage.h" #include "FairMQChannel.h" #include "FairMQSocket.h" #include "FairMQPoller.h" #include "FairMQLogger.h" -#include -#include - class FairMQChannel; class FairMQTransportFactory @@ -32,10 +32,12 @@ class FairMQTransportFactory virtual FairMQMessage* CreateMessage() = 0; virtual FairMQMessage* CreateMessage(size_t size) = 0; virtual FairMQMessage* CreateMessage(void* data, size_t size, fairmq_free_fn *ffn = NULL, void* hint = NULL) = 0; + virtual FairMQSocket* CreateSocket(const std::string& type, const std::string& name, int numIoThreads) = 0; + virtual FairMQPoller* CreatePoller(const std::vector& channels) = 0; - virtual FairMQPoller* CreatePoller(std::map< std::string,std::vector >& channelsMap, std::initializer_list channelList) = 0; - virtual FairMQPoller* CreatePoller(FairMQSocket& dataSocket, FairMQSocket& cmdSocket) = 0; + virtual FairMQPoller* CreatePoller(std::map>& channelsMap, std::initializer_list channelList) = 0; + virtual FairMQPoller* CreatePoller(FairMQSocket& cmdSocket, FairMQSocket& dataSocket) = 0; virtual ~FairMQTransportFactory() {}; }; diff --git a/fairmq/devices/GenericSampler.h b/fairmq/devices/GenericSampler.h index d6b5afbd..102ad6b1 100644 --- a/fairmq/devices/GenericSampler.h +++ b/fairmq/devices/GenericSampler.h @@ -119,7 +119,7 @@ class base_GenericSampler : public FairMQDevice, public T, public U we can do in the main function as follow : sampler.RegisterTask( - [&](TSampler* s, std::map >& task_list) + [&](TSampler* s, std::map>& task_list) { task_list[0]=std::bind(myfunction); task_list[1]=std::bind(&U::template MultiPartTask<5>, s); diff --git a/fairmq/devices/GenericSampler.tpl b/fairmq/devices/GenericSampler.tpl index 16c70894..dbc8ccda 100644 --- a/fairmq/devices/GenericSampler.tpl +++ b/fairmq/devices/GenericSampler.tpl @@ -197,5 +197,5 @@ std::string base_GenericSampler::GetProperty(const int key, const std:: } template -using GenericSampler = base_GenericSampler >; -typedef std::map > SamplerTasksMap; +using GenericSampler = base_GenericSampler>; +typedef std::map> SamplerTasksMap; diff --git a/fairmq/examples/1-sampler-sink/FairMQExample1Sampler.h b/fairmq/examples/1-sampler-sink/FairMQExample1Sampler.h index 1569cce5..fd6665c3 100644 --- a/fairmq/examples/1-sampler-sink/FairMQExample1Sampler.h +++ b/fairmq/examples/1-sampler-sink/FairMQExample1Sampler.h @@ -12,8 +12,8 @@ * @author A. Rybalchenko */ -#ifndef FAIRMQEXAMPLESAMPLER_H_ -#define FAIRMQEXAMPLESAMPLER_H_ +#ifndef FAIRMQEXAMPLE1SAMPLER_H_ +#define FAIRMQEXAMPLE1SAMPLER_H_ #include diff --git a/fairmq/examples/5-req-rep/runExample5Client.cxx b/fairmq/examples/5-req-rep/runExample5Client.cxx index a848ebe4..087861ac 100644 --- a/fairmq/examples/5-req-rep/runExample5Client.cxx +++ b/fairmq/examples/5-req-rep/runExample5Client.cxx @@ -91,6 +91,7 @@ int main(int argc, char** argv) client.SetTransport(transportFactory); client.SetProperty(FairMQExample5Client::Id, "client"); + client.SetProperty(FairMQExample5Client::Text, options.text); client.SetProperty(FairMQExample5Client::NumIoThreads, 1); FairMQChannel requestChannel("req", "connect", "tcp://localhost:5005"); diff --git a/fairmq/nanomsg/FairMQMessageNN.cxx b/fairmq/nanomsg/FairMQMessageNN.cxx index 7f865ff9..fe02b92c 100644 --- a/fairmq/nanomsg/FairMQMessageNN.cxx +++ b/fairmq/nanomsg/FairMQMessageNN.cxx @@ -137,6 +137,29 @@ void FairMQMessageNN::SetMessage(void* data, size_t size) } void FairMQMessageNN::Copy(FairMQMessage* msg) +{ + // DEPRECATED: Use Copy(const unique_ptr&) + + if (fMessage) + { + if (nn_freemsg(fMessage) < 0) + { + LOG(ERROR) << "failed freeing message, reason: " << nn_strerror(errno); + } + } + + size_t size = msg->GetSize(); + + fMessage = nn_allocmsg(size, 0); + if (!fMessage) + { + LOG(ERROR) << "failed allocating message, reason: " << nn_strerror(errno); + } + memcpy(fMessage, msg->GetMessage(), size); + fSize = size; +} + +void FairMQMessageNN::Copy(const unique_ptr& msg) { if (fMessage) { diff --git a/fairmq/nanomsg/FairMQMessageNN.h b/fairmq/nanomsg/FairMQMessageNN.h index 45ec4123..6816f0cb 100644 --- a/fairmq/nanomsg/FairMQMessageNN.h +++ b/fairmq/nanomsg/FairMQMessageNN.h @@ -38,6 +38,7 @@ class FairMQMessageNN : public FairMQMessage virtual void CloseMessage() {}; virtual void Copy(FairMQMessage* msg); + virtual void Copy(const std::unique_ptr& msg); virtual ~FairMQMessageNN(); diff --git a/fairmq/nanomsg/FairMQPollerNN.cxx b/fairmq/nanomsg/FairMQPollerNN.cxx index b5ac824c..f86870df 100644 --- a/fairmq/nanomsg/FairMQPollerNN.cxx +++ b/fairmq/nanomsg/FairMQPollerNN.cxx @@ -40,7 +40,7 @@ FairMQPollerNN::FairMQPollerNN(const vector& channels) } } -FairMQPollerNN::FairMQPollerNN(map< string,vector >& channelsMap, initializer_list channelList) +FairMQPollerNN::FairMQPollerNN(map>& channelsMap, initializer_list channelList) : items() , fNumItems(0) , fOffsetMap() @@ -78,7 +78,7 @@ FairMQPollerNN::FairMQPollerNN(map< string,vector >& channelsMap, } } -FairMQPollerNN::FairMQPollerNN(FairMQSocket& dataSocket, FairMQSocket& cmdSocket) +FairMQPollerNN::FairMQPollerNN(FairMQSocket& cmdSocket, FairMQSocket& dataSocket) : items() , fNumItems(2) , fOffsetMap() diff --git a/fairmq/nanomsg/FairMQPollerNN.h b/fairmq/nanomsg/FairMQPollerNN.h index c4df2793..cf44762d 100644 --- a/fairmq/nanomsg/FairMQPollerNN.h +++ b/fairmq/nanomsg/FairMQPollerNN.h @@ -32,7 +32,7 @@ class FairMQPollerNN : public FairMQPoller public: FairMQPollerNN(const std::vector& channels); - FairMQPollerNN(std::map< std::string,std::vector >& channelsMap, std::initializer_list channelList); + FairMQPollerNN(std::map>& channelsMap, std::initializer_list channelList); virtual void Poll(const int timeout); virtual bool CheckInput(const int index); @@ -43,12 +43,12 @@ class FairMQPollerNN : public FairMQPoller virtual ~FairMQPollerNN(); private: - FairMQPollerNN(FairMQSocket& dataSocket, FairMQSocket& cmdSocket); + FairMQPollerNN(FairMQSocket& cmdSocket, FairMQSocket& dataSocket); nn_pollfd* items; int fNumItems; - std::unordered_map fOffsetMap; + std::unordered_map fOffsetMap; /// Copy Constructor FairMQPollerNN(const FairMQPollerNN&); diff --git a/fairmq/nanomsg/FairMQSocketNN.h b/fairmq/nanomsg/FairMQSocketNN.h index 4317fe78..d9e4c4b1 100644 --- a/fairmq/nanomsg/FairMQSocketNN.h +++ b/fairmq/nanomsg/FairMQSocketNN.h @@ -62,6 +62,10 @@ class FairMQSocketNN : public FairMQSocket unsigned long fBytesRx; unsigned long fMessagesTx; unsigned long fMessagesRx; + + /// Copy Constructor + FairMQSocketNN(const FairMQSocketNN&); + FairMQSocketNN operator=(const FairMQSocketNN&); }; #endif /* FAIRMQSOCKETNN_H_ */ diff --git a/fairmq/nanomsg/FairMQTransportFactoryNN.cxx b/fairmq/nanomsg/FairMQTransportFactoryNN.cxx index 6080a505..1baa978d 100644 --- a/fairmq/nanomsg/FairMQTransportFactoryNN.cxx +++ b/fairmq/nanomsg/FairMQTransportFactoryNN.cxx @@ -46,12 +46,12 @@ FairMQPoller* FairMQTransportFactoryNN::CreatePoller(const vector return new FairMQPollerNN(channels); } -FairMQPoller* FairMQTransportFactoryNN::CreatePoller(std::map< std::string,std::vector >& channelsMap, std::initializer_list channelList) +FairMQPoller* FairMQTransportFactoryNN::CreatePoller(std::map>& channelsMap, std::initializer_list channelList) { return new FairMQPollerNN(channelsMap, channelList); } -FairMQPoller* FairMQTransportFactoryNN::CreatePoller(FairMQSocket& dataSocket, FairMQSocket& cmdSocket) +FairMQPoller* FairMQTransportFactoryNN::CreatePoller(FairMQSocket& cmdSocket, FairMQSocket& dataSocket) { - return new FairMQPollerNN(dataSocket, cmdSocket); + return new FairMQPollerNN(cmdSocket, dataSocket); } diff --git a/fairmq/nanomsg/FairMQTransportFactoryNN.h b/fairmq/nanomsg/FairMQTransportFactoryNN.h index a41acb01..04cb69c4 100644 --- a/fairmq/nanomsg/FairMQTransportFactoryNN.h +++ b/fairmq/nanomsg/FairMQTransportFactoryNN.h @@ -34,8 +34,8 @@ class FairMQTransportFactoryNN : public FairMQTransportFactory virtual FairMQSocket* CreateSocket(const std::string& type, const std::string& name, int numIoThreads); virtual FairMQPoller* CreatePoller(const std::vector& channels); - virtual FairMQPoller* CreatePoller(std::map< std::string,std::vector >& channelsMap, std::initializer_list channelList); - virtual FairMQPoller* CreatePoller(FairMQSocket& dataSocket, FairMQSocket& cmdSocket); + virtual FairMQPoller* CreatePoller(std::map>& channelsMap, std::initializer_list channelList); + virtual FairMQPoller* CreatePoller(FairMQSocket& cmdSocket, FairMQSocket& dataSocket); virtual ~FairMQTransportFactoryNN() {}; }; diff --git a/fairmq/run/runMerger.cxx b/fairmq/run/runMerger.cxx index e470c97e..ae053bb8 100644 --- a/fairmq/run/runMerger.cxx +++ b/fairmq/run/runMerger.cxx @@ -58,10 +58,10 @@ inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) ("id", bpo::value()->required(), "Device ID") ("io-threads", bpo::value()->default_value(1), "Number of I/O threads") ("num-inputs", bpo::value()->required(), "Number of Merger input sockets") - ("input-socket-type", bpo::value< vector >()->required(), "Input socket type: sub/pull") - ("input-buff-size", bpo::value< vector >()->required(), "Input buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") - ("input-method", bpo::value< vector >()->required(), "Input method: bind/connect") - ("input-address", bpo::value< vector >()->required(), "Input address, e.g.: \"tcp://localhost:5555\"") + ("input-socket-type", bpo::value>()->required(), "Input socket type: sub/pull") + ("input-buff-size", bpo::value>()->required(), "Input buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") + ("input-method", bpo::value>()->required(), "Input method: bind/connect") + ("input-address", bpo::value>()->required(), "Input address, e.g.: \"tcp://localhost:5555\"") ("output-socket-type", bpo::value()->required(), "Output socket type: pub/push") ("output-buff-size", bpo::value()->required(), "Output buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") ("output-method", bpo::value()->required(), "Output method: bind/connect") @@ -89,16 +89,16 @@ inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) _options->numInputs = vm["num-inputs"].as(); if (vm.count("input-socket-type")) - _options->inputSocketType = vm["input-socket-type"].as< vector >(); + _options->inputSocketType = vm["input-socket-type"].as>(); if (vm.count("input-buff-size")) - _options->inputBufSize = vm["input-buff-size"].as< vector >(); + _options->inputBufSize = vm["input-buff-size"].as>(); if (vm.count("input-method")) - _options->inputMethod = vm["input-method"].as< vector >(); + _options->inputMethod = vm["input-method"].as>(); if (vm.count("input-address")) - _options->inputAddress = vm["input-address"].as< vector >(); + _options->inputAddress = vm["input-address"].as>(); if (vm.count("output-socket-type")) _options->outputSocketType = vm["output-socket-type"].as(); diff --git a/fairmq/run/runSplitter.cxx b/fairmq/run/runSplitter.cxx index ecd316bb..d553556c 100644 --- a/fairmq/run/runSplitter.cxx +++ b/fairmq/run/runSplitter.cxx @@ -63,10 +63,10 @@ inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) ("input-buff-size", bpo::value(), "Input buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") ("input-method", bpo::value()->required(), "Input method: bind/connect") ("input-address", bpo::value()->required(), "Input address, e.g.: \"tcp://localhost:5555\"") - ("output-socket-type", bpo::value< vector >()->required(), "Output socket type: pub/push") - ("output-buff-size", bpo::value< vector >(), "Output buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") - ("output-method", bpo::value< vector >()->required(), "Output method: bind/connect") - ("output-address", bpo::value< vector >()->required(), "Output address, e.g.: \"tcp://localhost:5555\"") + ("output-socket-type", bpo::value>()->required(), "Output socket type: pub/push") + ("output-buff-size", bpo::value>(), "Output buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") + ("output-method", bpo::value>()->required(), "Output method: bind/connect") + ("output-address", bpo::value>()->required(), "Output address, e.g.: \"tcp://localhost:5555\"") ("help", "Print help messages"); bpo::variables_map vm; @@ -102,16 +102,16 @@ inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) _options->inputAddress = vm["input-address"].as(); if ( vm.count("output-socket-type") ) - _options->outputSocketType = vm["output-socket-type"].as< vector >(); + _options->outputSocketType = vm["output-socket-type"].as>(); if ( vm.count("output-buff-size") ) - _options->outputBufSize = vm["output-buff-size"].as< vector >(); + _options->outputBufSize = vm["output-buff-size"].as>(); if ( vm.count("output-method") ) - _options->outputMethod = vm["output-method"].as< vector >(); + _options->outputMethod = vm["output-method"].as>(); if ( vm.count("output-address") ) - _options->outputAddress = vm["output-address"].as< vector >(); + _options->outputAddress = vm["output-address"].as>(); return true; } diff --git a/fairmq/test/pub-sub/FairMQTestPub.cxx b/fairmq/test/pub-sub/FairMQTestPub.cxx new file mode 100644 index 00000000..7695cd6b --- /dev/null +++ b/fairmq/test/pub-sub/FairMQTestPub.cxx @@ -0,0 +1,47 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +/** + * FairMQTestPub.cpp + * + * @since 2015-09-05 + * @author A. Rybalchenko + */ + +#include // unique_ptr + +#include "FairMQTestPub.h" +#include "FairMQLogger.h" + +FairMQTestPub::FairMQTestPub() +{ +} + +void FairMQTestPub::Run() +{ + std::unique_ptr ready1Msg(fTransportFactory->CreateMessage()); + fChannels.at("control").at(0).Receive(ready1Msg); + std::unique_ptr ready2Msg(fTransportFactory->CreateMessage()); + fChannels.at("control").at(0).Receive(ready2Msg); + + std::unique_ptr msg(fTransportFactory->CreateMessage()); + fChannels.at("data").at(0).Send(msg); + + std::unique_ptr ack1Msg(fTransportFactory->CreateMessage()); + std::unique_ptr ack2Msg(fTransportFactory->CreateMessage()); + if (fChannels.at("control").at(0).Receive(ack1Msg) >= 0) + { + if (fChannels.at("control").at(0).Receive(ack2Msg) >= 0) + { + LOG(INFO) << "PUB-SUB test successfull"; + } + } +} + +FairMQTestPub::~FairMQTestPub() +{ +} diff --git a/fairmq/test/pub-sub/FairMQTestPub.h b/fairmq/test/pub-sub/FairMQTestPub.h new file mode 100644 index 00000000..ef0f2b51 --- /dev/null +++ b/fairmq/test/pub-sub/FairMQTestPub.h @@ -0,0 +1,30 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +/** + * FairMQTestPub.h + * + * @since 2015-09-05 + * @author A. Rybalchenko + */ + +#ifndef FAIRMQTESTPUB_H_ +#define FAIRMQTESTPUB_H_ + +#include "FairMQDevice.h" + +class FairMQTestPub : public FairMQDevice +{ + public: + FairMQTestPub(); + virtual ~FairMQTestPub(); + + protected: + virtual void Run(); +}; + +#endif /* FAIRMQTESTPUB_H_ */ diff --git a/fairmq/test/pub-sub/FairMQTestSub.cxx b/fairmq/test/pub-sub/FairMQTestSub.cxx new file mode 100644 index 00000000..65ee3a96 --- /dev/null +++ b/fairmq/test/pub-sub/FairMQTestSub.cxx @@ -0,0 +1,43 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +/** + * FairMQTestSub.cxx + * + * @since 2015-09-05 + * @author A. Rybalchenko + */ + +#include // unique_ptr + +#include "FairMQTestSub.h" +#include "FairMQLogger.h" + +FairMQTestSub::FairMQTestSub() +{ +} + +void FairMQTestSub::Run() +{ + std::unique_ptr readyMsg(fTransportFactory->CreateMessage()); + fChannels.at("control").at(0).Send(readyMsg); + + std::unique_ptr msg(fTransportFactory->CreateMessage()); + if (fChannels.at("data").at(0).Receive(msg) >= 0) + { + std::unique_ptr ackMsg(fTransportFactory->CreateMessage()); + fChannels.at("control").at(0).Send(ackMsg); + } + else + { + LOG(ERROR) << "Test failed: size of the received message doesn't match. Expected: 0, Received: " << msg->GetSize(); + } +} + +FairMQTestSub::~FairMQTestSub() +{ +} diff --git a/fairmq/test/pub-sub/FairMQTestSub.h b/fairmq/test/pub-sub/FairMQTestSub.h new file mode 100644 index 00000000..b97ccf34 --- /dev/null +++ b/fairmq/test/pub-sub/FairMQTestSub.h @@ -0,0 +1,30 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +/** + * FairMQTestSub.h + * + * @since 2015-09-05 + * @author A. Rybalchenko + */ + +#ifndef FAIRMQTESTSUB_H_ +#define FAIRMQTESTSUB_H_ + +#include "FairMQDevice.h" + +class FairMQTestSub : public FairMQDevice +{ + public: + FairMQTestSub(); + virtual ~FairMQTestSub(); + + protected: + virtual void Run(); +}; + +#endif /* FAIRMQTESTSUB_H_ */ diff --git a/fairmq/test/pub-sub/runTestPub.cxx b/fairmq/test/pub-sub/runTestPub.cxx new file mode 100644 index 00000000..21ed2a0d --- /dev/null +++ b/fairmq/test/pub-sub/runTestPub.cxx @@ -0,0 +1,65 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +/** + * runTestPub.cxx + * + * @since 2015-09-05 + * @author A. Rybalchenko + */ + + + +#include "FairMQLogger.h" +#include "FairMQTestPub.h" + +#ifdef NANOMSG +#include "FairMQTransportFactoryNN.h" +#else +#include "FairMQTransportFactoryZMQ.h" +#endif + +int main(int argc, char** argv) +{ + FairMQTestPub testPub; + testPub.CatchSignals(); + +#ifdef NANOMSG + testPub.SetTransport(new FairMQTransportFactoryNN()); +#else + testPub.SetTransport(new FairMQTransportFactoryZMQ()); +#endif + + testPub.SetProperty(FairMQTestPub::Id, "testPub"); + + FairMQChannel controlChannel("pull", "bind", "tcp://127.0.0.1:5555"); + controlChannel.UpdateRateLogging(0); + testPub.fChannels["control"].push_back(controlChannel); + + FairMQChannel pubChannel("pub", "bind", "tcp://127.0.0.1:5556"); + pubChannel.UpdateRateLogging(0); + testPub.fChannels["data"].push_back(pubChannel); + + testPub.ChangeState("INIT_DEVICE"); + testPub.WaitForEndOfState("INIT_DEVICE"); + + testPub.ChangeState("INIT_TASK"); + testPub.WaitForEndOfState("INIT_TASK"); + + testPub.ChangeState("RUN"); + testPub.WaitForEndOfState("RUN"); + + testPub.ChangeState("RESET_TASK"); + testPub.WaitForEndOfState("RESET_TASK"); + + testPub.ChangeState("RESET_DEVICE"); + testPub.WaitForEndOfState("RESET_DEVICE"); + + testPub.ChangeState("END"); + + return 0; +} diff --git a/fairmq/test/pub-sub/runTestSub.cxx b/fairmq/test/pub-sub/runTestSub.cxx new file mode 100644 index 00000000..babaf28c --- /dev/null +++ b/fairmq/test/pub-sub/runTestSub.cxx @@ -0,0 +1,65 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +/** + * runTestSub.cxx + * + * @since 2015-09-05 + * @author A. Rybalchenko + */ + +#include + +#include "FairMQLogger.h" +#include "FairMQTestSub.h" + +#ifdef NANOMSG +#include "FairMQTransportFactoryNN.h" +#else +#include "FairMQTransportFactoryZMQ.h" +#endif + +int main(int argc, char** argv) +{ + FairMQTestSub testSub; + testSub.CatchSignals(); + +#ifdef NANOMSG + testSub.SetTransport(new FairMQTransportFactoryNN()); +#else + testSub.SetTransport(new FairMQTransportFactoryZMQ()); +#endif + + testSub.SetProperty(FairMQTestSub::Id, "testSub_" + std::to_string(getpid())); + + FairMQChannel controlChannel("push", "connect", "tcp://127.0.0.1:5555"); + controlChannel.UpdateRateLogging(0); + testSub.fChannels["control"].push_back(controlChannel); + + FairMQChannel subChannel("sub", "connect", "tcp://127.0.0.1:5556"); + subChannel.UpdateRateLogging(0); + testSub.fChannels["data"].push_back(subChannel); + + testSub.ChangeState("INIT_DEVICE"); + testSub.WaitForEndOfState("INIT_DEVICE"); + + testSub.ChangeState("INIT_TASK"); + testSub.WaitForEndOfState("INIT_TASK"); + + testSub.ChangeState("RUN"); + testSub.WaitForEndOfState("RUN"); + + testSub.ChangeState("RESET_TASK"); + testSub.WaitForEndOfState("RESET_TASK"); + + testSub.ChangeState("RESET_DEVICE"); + testSub.WaitForEndOfState("RESET_DEVICE"); + + testSub.ChangeState("END"); + + return 0; +} diff --git a/fairmq/test/push-pull/FairMQTestPull.cxx b/fairmq/test/push-pull/FairMQTestPull.cxx new file mode 100644 index 00000000..4d7cb3bd --- /dev/null +++ b/fairmq/test/push-pull/FairMQTestPull.cxx @@ -0,0 +1,36 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +/** + * FairMQTestPull.cxx + * + * @since 2015-09-05 + * @author A. Rybalchenko + */ + +#include // unique_ptr + +#include "FairMQTestPull.h" +#include "FairMQLogger.h" + +FairMQTestPull::FairMQTestPull() +{ +} + +void FairMQTestPull::Run() +{ + std::unique_ptr msg(fTransportFactory->CreateMessage()); + + if (fChannels.at("data").at(0).Receive(msg) >= 0) + { + LOG(INFO) << "PUSH-PULL test successfull"; + } +} + +FairMQTestPull::~FairMQTestPull() +{ +} diff --git a/fairmq/test/push-pull/FairMQTestPull.h b/fairmq/test/push-pull/FairMQTestPull.h new file mode 100644 index 00000000..915252b7 --- /dev/null +++ b/fairmq/test/push-pull/FairMQTestPull.h @@ -0,0 +1,30 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +/** + * FairMQTestPull.h + * + * @since 2015-09-05 + * @author A. Rybalchenko + */ + +#ifndef FAIRMQTESTPULL_H_ +#define FAIRMQTESTPULL_H_ + +#include "FairMQDevice.h" + +class FairMQTestPull : public FairMQDevice +{ + public: + FairMQTestPull(); + virtual ~FairMQTestPull(); + + protected: + virtual void Run(); +}; + +#endif /* FAIRMQTESTPULL_H_ */ diff --git a/fairmq/test/push-pull/FairMQTestPush.cxx b/fairmq/test/push-pull/FairMQTestPush.cxx new file mode 100644 index 00000000..79174e14 --- /dev/null +++ b/fairmq/test/push-pull/FairMQTestPush.cxx @@ -0,0 +1,32 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +/** + * FairMQTestPush.cpp + * + * @since 2015-09-05 + * @author A. Rybalchenko + */ + +#include // unique_ptr + +#include "FairMQTestPush.h" +#include "FairMQLogger.h" + +FairMQTestPush::FairMQTestPush() +{ +} + +void FairMQTestPush::Run() +{ + std::unique_ptr msg(fTransportFactory->CreateMessage()); + fChannels.at("data").at(0).Send(msg); +} + +FairMQTestPush::~FairMQTestPush() +{ +} diff --git a/fairmq/test/push-pull/FairMQTestPush.h b/fairmq/test/push-pull/FairMQTestPush.h new file mode 100644 index 00000000..f9e7887f --- /dev/null +++ b/fairmq/test/push-pull/FairMQTestPush.h @@ -0,0 +1,30 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +/** + * FairMQTestPush.h + * + * @since 2015-09-05 + * @author A. Rybalchenko + */ + +#ifndef FAIRMQTESTPUSH_H_ +#define FAIRMQTESTPUSH_H_ + +#include "FairMQDevice.h" + +class FairMQTestPush : public FairMQDevice +{ + public: + FairMQTestPush(); + virtual ~FairMQTestPush(); + + protected: + virtual void Run(); +}; + +#endif /* FAIRMQTESTPUSH_H_ */ diff --git a/fairmq/test/push-pull/runTestPull.cxx b/fairmq/test/push-pull/runTestPull.cxx new file mode 100644 index 00000000..70e3f4e8 --- /dev/null +++ b/fairmq/test/push-pull/runTestPull.cxx @@ -0,0 +1,58 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +/** + * runTestPull.cxx + * + * @since 2015-09-05 + * @author A. Rybalchenko + */ + +#include "FairMQLogger.h" +#include "FairMQTestPull.h" + +#ifdef NANOMSG +#include "FairMQTransportFactoryNN.h" +#else +#include "FairMQTransportFactoryZMQ.h" +#endif + +int main(int argc, char** argv) +{ + FairMQTestPull testPull; + testPull.CatchSignals(); + +#ifdef NANOMSG + testPull.SetTransport(new FairMQTransportFactoryNN()); +#else + testPull.SetTransport(new FairMQTransportFactoryZMQ()); +#endif + + testPull.SetProperty(FairMQTestPull::Id, "testPull"); + + FairMQChannel pullChannel("pull", "connect", "tcp://127.0.0.1:5557"); + testPull.fChannels["data"].push_back(pullChannel); + + testPull.ChangeState("INIT_DEVICE"); + testPull.WaitForEndOfState("INIT_DEVICE"); + + testPull.ChangeState("INIT_TASK"); + testPull.WaitForEndOfState("INIT_TASK"); + + testPull.ChangeState("RUN"); + testPull.WaitForEndOfState("RUN"); + + testPull.ChangeState("RESET_TASK"); + testPull.WaitForEndOfState("RESET_TASK"); + + testPull.ChangeState("RESET_DEVICE"); + testPull.WaitForEndOfState("RESET_DEVICE"); + + testPull.ChangeState("END"); + + return 0; +} diff --git a/fairmq/test/push-pull/runTestPush.cxx b/fairmq/test/push-pull/runTestPush.cxx new file mode 100644 index 00000000..4cfe2ee6 --- /dev/null +++ b/fairmq/test/push-pull/runTestPush.cxx @@ -0,0 +1,58 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +/** + * runTestPush.cxx + * + * @since 2015-09-05 + * @author A. Rybalchenko + */ + +#include "FairMQLogger.h" +#include "FairMQTestPush.h" + +#ifdef NANOMSG +#include "FairMQTransportFactoryNN.h" +#else +#include "FairMQTransportFactoryZMQ.h" +#endif + +int main(int argc, char** argv) +{ + FairMQTestPush testPush; + testPush.CatchSignals(); + +#ifdef NANOMSG + testPush.SetTransport(new FairMQTransportFactoryNN()); +#else + testPush.SetTransport(new FairMQTransportFactoryZMQ()); +#endif + + testPush.SetProperty(FairMQTestPush::Id, "testPush"); + + FairMQChannel pushChannel("push", "bind", "tcp://127.0.0.1:5557"); + testPush.fChannels["data"].push_back(pushChannel); + + testPush.ChangeState("INIT_DEVICE"); + testPush.WaitForEndOfState("INIT_DEVICE"); + + testPush.ChangeState("INIT_TASK"); + testPush.WaitForEndOfState("INIT_TASK"); + + testPush.ChangeState("RUN"); + testPush.WaitForEndOfState("RUN"); + + testPush.ChangeState("RESET_TASK"); + testPush.WaitForEndOfState("RESET_TASK"); + + testPush.ChangeState("RESET_DEVICE"); + testPush.WaitForEndOfState("RESET_DEVICE"); + + testPush.ChangeState("END"); + + return 0; +} diff --git a/fairmq/test/req-rep/FairMQTestRep.cxx b/fairmq/test/req-rep/FairMQTestRep.cxx new file mode 100644 index 00000000..a89d660b --- /dev/null +++ b/fairmq/test/req-rep/FairMQTestRep.cxx @@ -0,0 +1,36 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +/** + * FairMQTestRep.cpp + * + * @since 2015-09-05 + * @author A. Rybalchenko + */ + +#include // unique_ptr + +#include "FairMQTestRep.h" +#include "FairMQLogger.h" + +FairMQTestRep::FairMQTestRep() +{ +} + +void FairMQTestRep::Run() +{ + std::unique_ptr request(fTransportFactory->CreateMessage()); + if (fChannels.at("data").at(0).Receive(request) >= 0) + { + std::unique_ptr reply(fTransportFactory->CreateMessage()); + fChannels.at("data").at(0).Send(reply); + } +} + +FairMQTestRep::~FairMQTestRep() +{ +} diff --git a/fairmq/test/req-rep/FairMQTestRep.h b/fairmq/test/req-rep/FairMQTestRep.h new file mode 100644 index 00000000..2b2792d4 --- /dev/null +++ b/fairmq/test/req-rep/FairMQTestRep.h @@ -0,0 +1,30 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +/** + * FairMQTestRep.h + * + * @since 2015-09-05 + * @author A. Rybalchenko + */ + +#ifndef FAIRMQTESTREP_H_ +#define FAIRMQTESTREP_H_ + +#include "FairMQDevice.h" + +class FairMQTestRep : public FairMQDevice +{ + public: + FairMQTestRep(); + virtual ~FairMQTestRep(); + + protected: + virtual void Run(); +}; + +#endif /* FAIRMQTESTREP_H_ */ diff --git a/fairmq/test/req-rep/FairMQTestReq.cxx b/fairmq/test/req-rep/FairMQTestReq.cxx new file mode 100644 index 00000000..a9f01660 --- /dev/null +++ b/fairmq/test/req-rep/FairMQTestReq.cxx @@ -0,0 +1,38 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +/** + * FairMQTestReq.cxx + * + * @since 2015-09-05 + * @author A. Rybalchenko + */ + +#include // unique_ptr + +#include "FairMQTestReq.h" +#include "FairMQLogger.h" + +FairMQTestReq::FairMQTestReq() +{ +} + +void FairMQTestReq::Run() +{ + std::unique_ptr request(fTransportFactory->CreateMessage()); + fChannels.at("data").at(0).Send(request); + + std::unique_ptr reply(fTransportFactory->CreateMessage()); + if (fChannels.at("data").at(0).Receive(reply) >= 0) + { + LOG(INFO) << "REQ-REP test successfull"; + } +} + +FairMQTestReq::~FairMQTestReq() +{ +} diff --git a/fairmq/test/req-rep/FairMQTestReq.h b/fairmq/test/req-rep/FairMQTestReq.h new file mode 100644 index 00000000..dbd3eba9 --- /dev/null +++ b/fairmq/test/req-rep/FairMQTestReq.h @@ -0,0 +1,30 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +/** + * FairMQTestReq.h + * + * @since 2015-09-05 + * @author A. Rybalchenko + */ + +#ifndef FAIRMQTESTREQ_H_ +#define FAIRMQTESTREQ_H_ + +#include "FairMQDevice.h" + +class FairMQTestReq : public FairMQDevice +{ + public: + FairMQTestReq(); + virtual ~FairMQTestReq(); + + protected: + virtual void Run(); +}; + +#endif /* FAIRMQTESTREQ_H_ */ diff --git a/fairmq/test/req-rep/runTestRep.cxx b/fairmq/test/req-rep/runTestRep.cxx new file mode 100644 index 00000000..dd7b7ff5 --- /dev/null +++ b/fairmq/test/req-rep/runTestRep.cxx @@ -0,0 +1,60 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +/** + * runTestRep.cxx + * + * @since 2015-09-05 + * @author A. Rybalchenko + */ + +#include + +#include "FairMQLogger.h" +#include "FairMQTestRep.h" + +#ifdef NANOMSG +#include "FairMQTransportFactoryNN.h" +#else +#include "FairMQTransportFactoryZMQ.h" +#endif + +int main(int argc, char** argv) +{ + FairMQTestRep testRep; + testRep.CatchSignals(); + +#ifdef NANOMSG + testRep.SetTransport(new FairMQTransportFactoryNN()); +#else + testRep.SetTransport(new FairMQTransportFactoryZMQ()); +#endif + + testRep.SetProperty(FairMQTestRep::Id, "testRep"); + + FairMQChannel repChannel("rep", "connect", "tcp://127.0.0.1:5558"); + testRep.fChannels["data"].push_back(repChannel); + + testRep.ChangeState("INIT_DEVICE"); + testRep.WaitForEndOfState("INIT_DEVICE"); + + testRep.ChangeState("INIT_TASK"); + testRep.WaitForEndOfState("INIT_TASK"); + + testRep.ChangeState("RUN"); + testRep.WaitForEndOfState("RUN"); + + testRep.ChangeState("RESET_TASK"); + testRep.WaitForEndOfState("RESET_TASK"); + + testRep.ChangeState("RESET_DEVICE"); + testRep.WaitForEndOfState("RESET_DEVICE"); + + testRep.ChangeState("END"); + + return 0; +} diff --git a/fairmq/test/req-rep/runTestReq.cxx b/fairmq/test/req-rep/runTestReq.cxx new file mode 100644 index 00000000..029ac389 --- /dev/null +++ b/fairmq/test/req-rep/runTestReq.cxx @@ -0,0 +1,60 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +/** + * runTestReq.cxx + * + * @since 2015-09-05 + * @author A. Rybalchenko + */ + +#include + +#include "FairMQLogger.h" +#include "FairMQTestReq.h" + +#ifdef NANOMSG +#include "FairMQTransportFactoryNN.h" +#else +#include "FairMQTransportFactoryZMQ.h" +#endif + +int main(int argc, char** argv) +{ + FairMQTestReq testReq; + testReq.CatchSignals(); + +#ifdef NANOMSG + testReq.SetTransport(new FairMQTransportFactoryNN()); +#else + testReq.SetTransport(new FairMQTransportFactoryZMQ()); +#endif + + testReq.SetProperty(FairMQTestReq::Id, "testReq"); + + FairMQChannel reqChannel("req", "bind", "tcp://127.0.0.1:5558"); + testReq.fChannels["data"].push_back(reqChannel); + + testReq.ChangeState("INIT_DEVICE"); + testReq.WaitForEndOfState("INIT_DEVICE"); + + testReq.ChangeState("INIT_TASK"); + testReq.WaitForEndOfState("INIT_TASK"); + + testReq.ChangeState("RUN"); + testReq.WaitForEndOfState("RUN"); + + testReq.ChangeState("RESET_TASK"); + testReq.WaitForEndOfState("RESET_TASK"); + + testReq.ChangeState("RESET_DEVICE"); + testReq.WaitForEndOfState("RESET_DEVICE"); + + testReq.ChangeState("END"); + + return 0; +} diff --git a/fairmq/test/test-fairmq-pub-sub.sh.in b/fairmq/test/test-fairmq-pub-sub.sh.in new file mode 100755 index 00000000..984d34b8 --- /dev/null +++ b/fairmq/test/test-fairmq-pub-sub.sh.in @@ -0,0 +1,12 @@ +#!/bin/bash + +trap 'kill -TERM $PUB_PID; kill -TERM $SUB1_PID; kill -TERM $SUB2_PID; wait $PUB_PID; wait $SUB1_PID; wait $SUB2_PID;' TERM +@CMAKE_BINARY_DIR@/bin/test-fairmq-pub & +PUB_PID=$! +@CMAKE_BINARY_DIR@/bin/test-fairmq-sub & +SUB1_PID=$! +@CMAKE_BINARY_DIR@/bin/test-fairmq-sub & +SUB2_PID=$! +wait $PUB_PID +wait $SUB1_PID +wait $SUB2_PID diff --git a/fairmq/test/test-fairmq-push-pull.sh.in b/fairmq/test/test-fairmq-push-pull.sh.in new file mode 100755 index 00000000..e3cbacb5 --- /dev/null +++ b/fairmq/test/test-fairmq-push-pull.sh.in @@ -0,0 +1,9 @@ +#!/bin/bash + +trap 'kill -TERM $PUSH_PID; kill -TERM $PULL_PID; wait $PUSH_PID; wait $PULL_PID;' TERM +@CMAKE_BINARY_DIR@/bin/test-fairmq-push & +PUSH_PID=$! +@CMAKE_BINARY_DIR@/bin/test-fairmq-pull & +PULL_PID=$! +wait $PUSH_PID +wait $PULL_PID diff --git a/fairmq/test/test-fairmq-req-rep.sh.in b/fairmq/test/test-fairmq-req-rep.sh.in new file mode 100755 index 00000000..b523a5f6 --- /dev/null +++ b/fairmq/test/test-fairmq-req-rep.sh.in @@ -0,0 +1,9 @@ +#!/bin/bash + +trap 'kill -TERM $REQ_PID; kill -TERM $REP_PID; wait $REQ_PID; wait $REP_PID;' TERM +@CMAKE_BINARY_DIR@/bin/test-fairmq-req & +REQ_PID=$! +@CMAKE_BINARY_DIR@/bin/test-fairmq-rep & +REP_PID=$! +wait $REQ_PID +wait $REP_PID diff --git a/fairmq/zeromq/FairMQPollerZMQ.cxx b/fairmq/zeromq/FairMQPollerZMQ.cxx index 5cbbe99a..b8404187 100644 --- a/fairmq/zeromq/FairMQPollerZMQ.cxx +++ b/fairmq/zeromq/FairMQPollerZMQ.cxx @@ -38,7 +38,7 @@ FairMQPollerZMQ::FairMQPollerZMQ(const vector& channels) } } -FairMQPollerZMQ::FairMQPollerZMQ(map< string,vector >& channelsMap, initializer_list channelList) +FairMQPollerZMQ::FairMQPollerZMQ(map>& channelsMap, initializer_list channelList) : items() , fNumItems(0) , fOffsetMap() diff --git a/fairmq/zeromq/FairMQPollerZMQ.h b/fairmq/zeromq/FairMQPollerZMQ.h index 58a68de2..7a37128c 100644 --- a/fairmq/zeromq/FairMQPollerZMQ.h +++ b/fairmq/zeromq/FairMQPollerZMQ.h @@ -32,7 +32,7 @@ class FairMQPollerZMQ : public FairMQPoller public: FairMQPollerZMQ(const std::vector& channels); - FairMQPollerZMQ(std::map< std::string,std::vector >& channelsMap, std::initializer_list channelList); + FairMQPollerZMQ(std::map>& channelsMap, std::initializer_list channelList); virtual void Poll(const int timeout); virtual bool CheckInput(const int index); diff --git a/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx b/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx index 0fc08b3b..b774d23b 100644 --- a/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx +++ b/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx @@ -50,12 +50,12 @@ FairMQPoller* FairMQTransportFactoryZMQ::CreatePoller(const vector >& channelsMap, std::initializer_list channelList) +FairMQPoller* FairMQTransportFactoryZMQ::CreatePoller(std::map>& channelsMap, std::initializer_list channelList) { return new FairMQPollerZMQ(channelsMap, channelList); } -FairMQPoller* FairMQTransportFactoryZMQ::CreatePoller(FairMQSocket& dataSocket, FairMQSocket& cmdSocket) +FairMQPoller* FairMQTransportFactoryZMQ::CreatePoller(FairMQSocket& cmdSocket, FairMQSocket& dataSocket) { - return new FairMQPollerZMQ(dataSocket, cmdSocket); + return new FairMQPollerZMQ(cmdSocket, dataSocket); } diff --git a/fairmq/zeromq/FairMQTransportFactoryZMQ.h b/fairmq/zeromq/FairMQTransportFactoryZMQ.h index 9ee4e8db..a2442aa5 100644 --- a/fairmq/zeromq/FairMQTransportFactoryZMQ.h +++ b/fairmq/zeromq/FairMQTransportFactoryZMQ.h @@ -35,8 +35,8 @@ class FairMQTransportFactoryZMQ : public FairMQTransportFactory virtual FairMQSocket* CreateSocket(const std::string& type, const std::string& name, int numIoThreads); virtual FairMQPoller* CreatePoller(const std::vector& channels); - virtual FairMQPoller* CreatePoller(std::map< std::string,std::vector >& channelsMap, std::initializer_list channelList); - virtual FairMQPoller* CreatePoller(FairMQSocket& dataSocket, FairMQSocket& cmdSocket); + virtual FairMQPoller* CreatePoller(std::map>& channelsMap, std::initializer_list channelList); + virtual FairMQPoller* CreatePoller(FairMQSocket& cmdSocket, FairMQSocket& dataSocket); virtual ~FairMQTransportFactoryZMQ() {}; };