Add FairMQ tests (PUB-SUB, PUSH-PULL, REQ-REP).

This commit is contained in:
Alexey Rybalchenko 2015-09-04 17:04:52 +02:00 committed by Mohammad Al-Turany
parent fbf7dbf2ba
commit f13bb5995d
40 changed files with 921 additions and 47 deletions

View File

@ -14,6 +14,10 @@ configure_file(${CMAKE_SOURCE_DIR}/fairmq/examples/3-dds/ex3-dds-topology.xml ${
configure_file(${CMAKE_SOURCE_DIR}/fairmq/examples/3-dds/ex3-dds-hosts.cfg ${CMAKE_BINARY_DIR}/bin/config/ex3-dds-hosts.cfg COPYONLY) configure_file(${CMAKE_SOURCE_DIR}/fairmq/examples/3-dds/ex3-dds-hosts.cfg ${CMAKE_BINARY_DIR}/bin/config/ex3-dds-hosts.cfg COPYONLY)
configure_file(${CMAKE_SOURCE_DIR}/fairmq/examples/4-copypush/ex4-copypush.json ${CMAKE_BINARY_DIR}/bin/config/ex4-copypush.json) configure_file(${CMAKE_SOURCE_DIR}/fairmq/examples/4-copypush/ex4-copypush.json ${CMAKE_BINARY_DIR}/bin/config/ex4-copypush.json)
configure_file(${CMAKE_SOURCE_DIR}/fairmq/test/test-fairmq-push-pull.sh.in ${CMAKE_BINARY_DIR}/fairmq/test/test-fairmq-push-pull.sh)
configure_file(${CMAKE_SOURCE_DIR}/fairmq/test/test-fairmq-pub-sub.sh.in ${CMAKE_BINARY_DIR}/fairmq/test/test-fairmq-pub-sub.sh)
configure_file(${CMAKE_SOURCE_DIR}/fairmq/test/test-fairmq-req-rep.sh.in ${CMAKE_BINARY_DIR}/fairmq/test/test-fairmq-req-rep.sh)
add_subdirectory(logger) add_subdirectory(logger)
Set(INCLUDE_DIRECTORIES Set(INCLUDE_DIRECTORIES
@ -26,6 +30,9 @@ Set(INCLUDE_DIRECTORIES
${CMAKE_SOURCE_DIR}/fairmq/examples/2-sampler-processor-sink ${CMAKE_SOURCE_DIR}/fairmq/examples/2-sampler-processor-sink
${CMAKE_SOURCE_DIR}/fairmq/examples/4-copypush ${CMAKE_SOURCE_DIR}/fairmq/examples/4-copypush
${CMAKE_SOURCE_DIR}/fairmq/examples/5-req-rep ${CMAKE_SOURCE_DIR}/fairmq/examples/5-req-rep
${CMAKE_SOURCE_DIR}/fairmq/test/push-pull
${CMAKE_SOURCE_DIR}/fairmq/test/pub-sub
${CMAKE_SOURCE_DIR}/fairmq/test/req-rep
${CMAKE_CURRENT_BINARY_DIR} ${CMAKE_CURRENT_BINARY_DIR}
) )
@ -120,16 +127,20 @@ set(SRCS
"examples/1-sampler-sink/FairMQExample1Sampler.cxx" "examples/1-sampler-sink/FairMQExample1Sampler.cxx"
"examples/1-sampler-sink/FairMQExample1Sink.cxx" "examples/1-sampler-sink/FairMQExample1Sink.cxx"
"examples/2-sampler-processor-sink/FairMQExample2Sampler.cxx" "examples/2-sampler-processor-sink/FairMQExample2Sampler.cxx"
"examples/2-sampler-processor-sink/FairMQExample2Processor.cxx" "examples/2-sampler-processor-sink/FairMQExample2Processor.cxx"
"examples/2-sampler-processor-sink/FairMQExample2Sink.cxx" "examples/2-sampler-processor-sink/FairMQExample2Sink.cxx"
"examples/4-copypush/FairMQExample4Sampler.cxx" "examples/4-copypush/FairMQExample4Sampler.cxx"
"examples/4-copypush/FairMQExample4Sink.cxx" "examples/4-copypush/FairMQExample4Sink.cxx"
"examples/5-req-rep/FairMQExample5Client.cxx" "examples/5-req-rep/FairMQExample5Client.cxx"
"examples/5-req-rep/FairMQExample5Server.cxx" "examples/5-req-rep/FairMQExample5Server.cxx"
"test/push-pull/FairMQTestPush.cxx"
"test/push-pull/FairMQTestPull.cxx"
"test/pub-sub/FairMQTestPub.cxx"
"test/pub-sub/FairMQTestSub.cxx"
"test/req-rep/FairMQTestReq.cxx"
"test/req-rep/FairMQTestRep.cxx"
) )
if(DDS_FOUND) if(DDS_FOUND)
@ -241,6 +252,12 @@ set(Exe_Names
ex4-sink ex4-sink
ex5-client ex5-client
ex5-server ex5-server
test-fairmq-push
test-fairmq-pull
test-fairmq-pub
test-fairmq-sub
test-fairmq-req
test-fairmq-rep
) )
if(DDS_FOUND) if(DDS_FOUND)
@ -279,6 +296,12 @@ set(Exe_Source
examples/4-copypush/runExample4Sink.cxx examples/4-copypush/runExample4Sink.cxx
examples/5-req-rep/runExample5Client.cxx examples/5-req-rep/runExample5Client.cxx
examples/5-req-rep/runExample5Server.cxx examples/5-req-rep/runExample5Server.cxx
test/push-pull/runTestPush.cxx
test/push-pull/runTestPull.cxx
test/pub-sub/runTestPub.cxx
test/pub-sub/runTestSub.cxx
test/req-rep/runTestReq.cxx
test/req-rep/runTestRep.cxx
) )
if(DDS_FOUND) if(DDS_FOUND)
@ -312,3 +335,15 @@ ForEach(_file RANGE 0 ${_length})
set(DEPENDENCIES FairMQ) set(DEPENDENCIES FairMQ)
GENERATE_EXECUTABLE() GENERATE_EXECUTABLE()
EndForEach(_file RANGE 0 ${_length}) EndForEach(_file RANGE 0 ${_length})
add_test(NAME run_fairmq_push_pull COMMAND ${CMAKE_BINARY_DIR}/fairmq/test/test-fairmq-push-pull.sh)
set_tests_properties(run_fairmq_push_pull PROPERTIES TIMEOUT "30")
set_tests_properties(run_fairmq_push_pull PROPERTIES PASS_REGULAR_EXPRESSION "PUSH-PULL test successfull")
add_test(NAME run_fairmq_pub_sub COMMAND ${CMAKE_BINARY_DIR}/fairmq/test/test-fairmq-pub-sub.sh)
set_tests_properties(run_fairmq_pub_sub PROPERTIES TIMEOUT "30")
set_tests_properties(run_fairmq_pub_sub PROPERTIES PASS_REGULAR_EXPRESSION "PUB-SUB test successfull")
add_test(NAME run_fairmq_req_rep COMMAND ${CMAKE_BINARY_DIR}/fairmq/test/test-fairmq-req-rep.sh)
set_tests_properties(run_fairmq_req_rep PROPERTIES TIMEOUT "30")
set_tests_properties(run_fairmq_req_rep PROPERTIES PASS_REGULAR_EXPRESSION "REQ-REP test successfull")

View File

@ -15,15 +15,15 @@
#ifndef FAIRMQTRANSPORTFACTORY_H_ #ifndef FAIRMQTRANSPORTFACTORY_H_
#define FAIRMQTRANSPORTFACTORY_H_ #define FAIRMQTRANSPORTFACTORY_H_
#include <string>
#include <vector>
#include "FairMQMessage.h" #include "FairMQMessage.h"
#include "FairMQChannel.h" #include "FairMQChannel.h"
#include "FairMQSocket.h" #include "FairMQSocket.h"
#include "FairMQPoller.h" #include "FairMQPoller.h"
#include "FairMQLogger.h" #include "FairMQLogger.h"
#include <string>
#include <vector>
class FairMQChannel; class FairMQChannel;
class FairMQTransportFactory class FairMQTransportFactory
@ -32,10 +32,12 @@ class FairMQTransportFactory
virtual FairMQMessage* CreateMessage() = 0; virtual FairMQMessage* CreateMessage() = 0;
virtual FairMQMessage* CreateMessage(size_t size) = 0; virtual FairMQMessage* CreateMessage(size_t size) = 0;
virtual FairMQMessage* CreateMessage(void* data, size_t size, fairmq_free_fn *ffn = NULL, void* hint = NULL) = 0; virtual FairMQMessage* CreateMessage(void* data, size_t size, fairmq_free_fn *ffn = NULL, void* hint = NULL) = 0;
virtual FairMQSocket* CreateSocket(const std::string& type, const std::string& name, int numIoThreads) = 0; virtual FairMQSocket* CreateSocket(const std::string& type, const std::string& name, int numIoThreads) = 0;
virtual FairMQPoller* CreatePoller(const std::vector<FairMQChannel>& channels) = 0; virtual FairMQPoller* CreatePoller(const std::vector<FairMQChannel>& channels) = 0;
virtual FairMQPoller* CreatePoller(std::map<std::string, std::vector<FairMQChannel>>& channelsMap, std::initializer_list<std::string> channelList) = 0; virtual FairMQPoller* CreatePoller(std::map<std::string, std::vector<FairMQChannel>>& channelsMap, std::initializer_list<std::string> channelList) = 0;
virtual FairMQPoller* CreatePoller(FairMQSocket& dataSocket, FairMQSocket& cmdSocket) = 0; virtual FairMQPoller* CreatePoller(FairMQSocket& cmdSocket, FairMQSocket& dataSocket) = 0;
virtual ~FairMQTransportFactory() {}; virtual ~FairMQTransportFactory() {};
}; };

View File

@ -12,8 +12,8 @@
* @author A. Rybalchenko * @author A. Rybalchenko
*/ */
#ifndef FAIRMQEXAMPLESAMPLER_H_ #ifndef FAIRMQEXAMPLE1SAMPLER_H_
#define FAIRMQEXAMPLESAMPLER_H_ #define FAIRMQEXAMPLE1SAMPLER_H_
#include <string> #include <string>

View File

@ -91,6 +91,7 @@ int main(int argc, char** argv)
client.SetTransport(transportFactory); client.SetTransport(transportFactory);
client.SetProperty(FairMQExample5Client::Id, "client"); client.SetProperty(FairMQExample5Client::Id, "client");
client.SetProperty(FairMQExample5Client::Text, options.text);
client.SetProperty(FairMQExample5Client::NumIoThreads, 1); client.SetProperty(FairMQExample5Client::NumIoThreads, 1);
FairMQChannel requestChannel("req", "connect", "tcp://localhost:5005"); FairMQChannel requestChannel("req", "connect", "tcp://localhost:5005");

View File

@ -137,6 +137,29 @@ void FairMQMessageNN::SetMessage(void* data, size_t size)
} }
void FairMQMessageNN::Copy(FairMQMessage* msg) void FairMQMessageNN::Copy(FairMQMessage* msg)
{
// DEPRECATED: Use Copy(const unique_ptr<FairMQMessage>&)
if (fMessage)
{
if (nn_freemsg(fMessage) < 0)
{
LOG(ERROR) << "failed freeing message, reason: " << nn_strerror(errno);
}
}
size_t size = msg->GetSize();
fMessage = nn_allocmsg(size, 0);
if (!fMessage)
{
LOG(ERROR) << "failed allocating message, reason: " << nn_strerror(errno);
}
memcpy(fMessage, msg->GetMessage(), size);
fSize = size;
}
void FairMQMessageNN::Copy(const unique_ptr<FairMQMessage>& msg)
{ {
if (fMessage) if (fMessage)
{ {

View File

@ -38,6 +38,7 @@ class FairMQMessageNN : public FairMQMessage
virtual void CloseMessage() {}; virtual void CloseMessage() {};
virtual void Copy(FairMQMessage* msg); virtual void Copy(FairMQMessage* msg);
virtual void Copy(const std::unique_ptr<FairMQMessage>& msg);
virtual ~FairMQMessageNN(); virtual ~FairMQMessageNN();

View File

@ -78,7 +78,7 @@ FairMQPollerNN::FairMQPollerNN(map< string,vector<FairMQChannel> >& channelsMap,
} }
} }
FairMQPollerNN::FairMQPollerNN(FairMQSocket& dataSocket, FairMQSocket& cmdSocket) FairMQPollerNN::FairMQPollerNN(FairMQSocket& cmdSocket, FairMQSocket& dataSocket)
: items() : items()
, fNumItems(2) , fNumItems(2)
, fOffsetMap() , fOffsetMap()

View File

@ -43,7 +43,7 @@ class FairMQPollerNN : public FairMQPoller
virtual ~FairMQPollerNN(); virtual ~FairMQPollerNN();
private: private:
FairMQPollerNN(FairMQSocket& dataSocket, FairMQSocket& cmdSocket); FairMQPollerNN(FairMQSocket& cmdSocket, FairMQSocket& dataSocket);
nn_pollfd* items; nn_pollfd* items;
int fNumItems; int fNumItems;

View File

@ -62,6 +62,10 @@ class FairMQSocketNN : public FairMQSocket
unsigned long fBytesRx; unsigned long fBytesRx;
unsigned long fMessagesTx; unsigned long fMessagesTx;
unsigned long fMessagesRx; unsigned long fMessagesRx;
/// Copy Constructor
FairMQSocketNN(const FairMQSocketNN&);
FairMQSocketNN operator=(const FairMQSocketNN&);
}; };
#endif /* FAIRMQSOCKETNN_H_ */ #endif /* FAIRMQSOCKETNN_H_ */

View File

@ -51,7 +51,7 @@ FairMQPoller* FairMQTransportFactoryNN::CreatePoller(std::map< std::string,std::
return new FairMQPollerNN(channelsMap, channelList); return new FairMQPollerNN(channelsMap, channelList);
} }
FairMQPoller* FairMQTransportFactoryNN::CreatePoller(FairMQSocket& dataSocket, FairMQSocket& cmdSocket) FairMQPoller* FairMQTransportFactoryNN::CreatePoller(FairMQSocket& cmdSocket, FairMQSocket& dataSocket)
{ {
return new FairMQPollerNN(dataSocket, cmdSocket); return new FairMQPollerNN(cmdSocket, dataSocket);
} }

View File

@ -35,7 +35,7 @@ class FairMQTransportFactoryNN : public FairMQTransportFactory
virtual FairMQPoller* CreatePoller(const std::vector<FairMQChannel>& channels); virtual FairMQPoller* CreatePoller(const std::vector<FairMQChannel>& channels);
virtual FairMQPoller* CreatePoller(std::map<std::string, std::vector<FairMQChannel>>& channelsMap, std::initializer_list<std::string> channelList); virtual FairMQPoller* CreatePoller(std::map<std::string, std::vector<FairMQChannel>>& channelsMap, std::initializer_list<std::string> channelList);
virtual FairMQPoller* CreatePoller(FairMQSocket& dataSocket, FairMQSocket& cmdSocket); virtual FairMQPoller* CreatePoller(FairMQSocket& cmdSocket, FairMQSocket& dataSocket);
virtual ~FairMQTransportFactoryNN() {}; virtual ~FairMQTransportFactoryNN() {};
}; };

View File

@ -0,0 +1,47 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQTestPub.cpp
*
* @since 2015-09-05
* @author A. Rybalchenko
*/
#include <memory> // unique_ptr
#include "FairMQTestPub.h"
#include "FairMQLogger.h"
FairMQTestPub::FairMQTestPub()
{
}
void FairMQTestPub::Run()
{
std::unique_ptr<FairMQMessage> ready1Msg(fTransportFactory->CreateMessage());
fChannels.at("control").at(0).Receive(ready1Msg);
std::unique_ptr<FairMQMessage> ready2Msg(fTransportFactory->CreateMessage());
fChannels.at("control").at(0).Receive(ready2Msg);
std::unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
fChannels.at("data").at(0).Send(msg);
std::unique_ptr<FairMQMessage> ack1Msg(fTransportFactory->CreateMessage());
std::unique_ptr<FairMQMessage> ack2Msg(fTransportFactory->CreateMessage());
if (fChannels.at("control").at(0).Receive(ack1Msg) >= 0)
{
if (fChannels.at("control").at(0).Receive(ack2Msg) >= 0)
{
LOG(INFO) << "PUB-SUB test successfull";
}
}
}
FairMQTestPub::~FairMQTestPub()
{
}

View File

@ -0,0 +1,30 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQTestPub.h
*
* @since 2015-09-05
* @author A. Rybalchenko
*/
#ifndef FAIRMQTESTPUB_H_
#define FAIRMQTESTPUB_H_
#include "FairMQDevice.h"
class FairMQTestPub : public FairMQDevice
{
public:
FairMQTestPub();
virtual ~FairMQTestPub();
protected:
virtual void Run();
};
#endif /* FAIRMQTESTPUB_H_ */

View File

@ -0,0 +1,43 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQTestSub.cxx
*
* @since 2015-09-05
* @author A. Rybalchenko
*/
#include <memory> // unique_ptr
#include "FairMQTestSub.h"
#include "FairMQLogger.h"
FairMQTestSub::FairMQTestSub()
{
}
void FairMQTestSub::Run()
{
std::unique_ptr<FairMQMessage> readyMsg(fTransportFactory->CreateMessage());
fChannels.at("control").at(0).Send(readyMsg);
std::unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
if (fChannels.at("data").at(0).Receive(msg) >= 0)
{
std::unique_ptr<FairMQMessage> ackMsg(fTransportFactory->CreateMessage());
fChannels.at("control").at(0).Send(ackMsg);
}
else
{
LOG(ERROR) << "Test failed: size of the received message doesn't match. Expected: 0, Received: " << msg->GetSize();
}
}
FairMQTestSub::~FairMQTestSub()
{
}

View File

@ -0,0 +1,30 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQTestSub.h
*
* @since 2015-09-05
* @author A. Rybalchenko
*/
#ifndef FAIRMQTESTSUB_H_
#define FAIRMQTESTSUB_H_
#include "FairMQDevice.h"
class FairMQTestSub : public FairMQDevice
{
public:
FairMQTestSub();
virtual ~FairMQTestSub();
protected:
virtual void Run();
};
#endif /* FAIRMQTESTSUB_H_ */

View File

@ -0,0 +1,65 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* runTestPub.cxx
*
* @since 2015-09-05
* @author A. Rybalchenko
*/
#include "FairMQLogger.h"
#include "FairMQTestPub.h"
#ifdef NANOMSG
#include "FairMQTransportFactoryNN.h"
#else
#include "FairMQTransportFactoryZMQ.h"
#endif
int main(int argc, char** argv)
{
FairMQTestPub testPub;
testPub.CatchSignals();
#ifdef NANOMSG
testPub.SetTransport(new FairMQTransportFactoryNN());
#else
testPub.SetTransport(new FairMQTransportFactoryZMQ());
#endif
testPub.SetProperty(FairMQTestPub::Id, "testPub");
FairMQChannel controlChannel("pull", "bind", "tcp://127.0.0.1:5555");
controlChannel.UpdateRateLogging(0);
testPub.fChannels["control"].push_back(controlChannel);
FairMQChannel pubChannel("pub", "bind", "tcp://127.0.0.1:5556");
pubChannel.UpdateRateLogging(0);
testPub.fChannels["data"].push_back(pubChannel);
testPub.ChangeState("INIT_DEVICE");
testPub.WaitForEndOfState("INIT_DEVICE");
testPub.ChangeState("INIT_TASK");
testPub.WaitForEndOfState("INIT_TASK");
testPub.ChangeState("RUN");
testPub.WaitForEndOfState("RUN");
testPub.ChangeState("RESET_TASK");
testPub.WaitForEndOfState("RESET_TASK");
testPub.ChangeState("RESET_DEVICE");
testPub.WaitForEndOfState("RESET_DEVICE");
testPub.ChangeState("END");
return 0;
}

View File

@ -0,0 +1,65 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* runTestSub.cxx
*
* @since 2015-09-05
* @author A. Rybalchenko
*/
#include <string>
#include "FairMQLogger.h"
#include "FairMQTestSub.h"
#ifdef NANOMSG
#include "FairMQTransportFactoryNN.h"
#else
#include "FairMQTransportFactoryZMQ.h"
#endif
int main(int argc, char** argv)
{
FairMQTestSub testSub;
testSub.CatchSignals();
#ifdef NANOMSG
testSub.SetTransport(new FairMQTransportFactoryNN());
#else
testSub.SetTransport(new FairMQTransportFactoryZMQ());
#endif
testSub.SetProperty(FairMQTestSub::Id, "testSub_" + std::to_string(getpid()));
FairMQChannel controlChannel("push", "connect", "tcp://127.0.0.1:5555");
controlChannel.UpdateRateLogging(0);
testSub.fChannels["control"].push_back(controlChannel);
FairMQChannel subChannel("sub", "connect", "tcp://127.0.0.1:5556");
subChannel.UpdateRateLogging(0);
testSub.fChannels["data"].push_back(subChannel);
testSub.ChangeState("INIT_DEVICE");
testSub.WaitForEndOfState("INIT_DEVICE");
testSub.ChangeState("INIT_TASK");
testSub.WaitForEndOfState("INIT_TASK");
testSub.ChangeState("RUN");
testSub.WaitForEndOfState("RUN");
testSub.ChangeState("RESET_TASK");
testSub.WaitForEndOfState("RESET_TASK");
testSub.ChangeState("RESET_DEVICE");
testSub.WaitForEndOfState("RESET_DEVICE");
testSub.ChangeState("END");
return 0;
}

View File

@ -0,0 +1,36 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQTestPull.cxx
*
* @since 2015-09-05
* @author A. Rybalchenko
*/
#include <memory> // unique_ptr
#include "FairMQTestPull.h"
#include "FairMQLogger.h"
FairMQTestPull::FairMQTestPull()
{
}
void FairMQTestPull::Run()
{
std::unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
if (fChannels.at("data").at(0).Receive(msg) >= 0)
{
LOG(INFO) << "PUSH-PULL test successfull";
}
}
FairMQTestPull::~FairMQTestPull()
{
}

View File

@ -0,0 +1,30 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQTestPull.h
*
* @since 2015-09-05
* @author A. Rybalchenko
*/
#ifndef FAIRMQTESTPULL_H_
#define FAIRMQTESTPULL_H_
#include "FairMQDevice.h"
class FairMQTestPull : public FairMQDevice
{
public:
FairMQTestPull();
virtual ~FairMQTestPull();
protected:
virtual void Run();
};
#endif /* FAIRMQTESTPULL_H_ */

View File

@ -0,0 +1,32 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQTestPush.cpp
*
* @since 2015-09-05
* @author A. Rybalchenko
*/
#include <memory> // unique_ptr
#include "FairMQTestPush.h"
#include "FairMQLogger.h"
FairMQTestPush::FairMQTestPush()
{
}
void FairMQTestPush::Run()
{
std::unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
fChannels.at("data").at(0).Send(msg);
}
FairMQTestPush::~FairMQTestPush()
{
}

View File

@ -0,0 +1,30 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQTestPush.h
*
* @since 2015-09-05
* @author A. Rybalchenko
*/
#ifndef FAIRMQTESTPUSH_H_
#define FAIRMQTESTPUSH_H_
#include "FairMQDevice.h"
class FairMQTestPush : public FairMQDevice
{
public:
FairMQTestPush();
virtual ~FairMQTestPush();
protected:
virtual void Run();
};
#endif /* FAIRMQTESTPUSH_H_ */

View File

@ -0,0 +1,58 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* runTestPull.cxx
*
* @since 2015-09-05
* @author A. Rybalchenko
*/
#include "FairMQLogger.h"
#include "FairMQTestPull.h"
#ifdef NANOMSG
#include "FairMQTransportFactoryNN.h"
#else
#include "FairMQTransportFactoryZMQ.h"
#endif
int main(int argc, char** argv)
{
FairMQTestPull testPull;
testPull.CatchSignals();
#ifdef NANOMSG
testPull.SetTransport(new FairMQTransportFactoryNN());
#else
testPull.SetTransport(new FairMQTransportFactoryZMQ());
#endif
testPull.SetProperty(FairMQTestPull::Id, "testPull");
FairMQChannel pullChannel("pull", "connect", "tcp://127.0.0.1:5557");
testPull.fChannels["data"].push_back(pullChannel);
testPull.ChangeState("INIT_DEVICE");
testPull.WaitForEndOfState("INIT_DEVICE");
testPull.ChangeState("INIT_TASK");
testPull.WaitForEndOfState("INIT_TASK");
testPull.ChangeState("RUN");
testPull.WaitForEndOfState("RUN");
testPull.ChangeState("RESET_TASK");
testPull.WaitForEndOfState("RESET_TASK");
testPull.ChangeState("RESET_DEVICE");
testPull.WaitForEndOfState("RESET_DEVICE");
testPull.ChangeState("END");
return 0;
}

View File

@ -0,0 +1,58 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* runTestPush.cxx
*
* @since 2015-09-05
* @author A. Rybalchenko
*/
#include "FairMQLogger.h"
#include "FairMQTestPush.h"
#ifdef NANOMSG
#include "FairMQTransportFactoryNN.h"
#else
#include "FairMQTransportFactoryZMQ.h"
#endif
int main(int argc, char** argv)
{
FairMQTestPush testPush;
testPush.CatchSignals();
#ifdef NANOMSG
testPush.SetTransport(new FairMQTransportFactoryNN());
#else
testPush.SetTransport(new FairMQTransportFactoryZMQ());
#endif
testPush.SetProperty(FairMQTestPush::Id, "testPush");
FairMQChannel pushChannel("push", "bind", "tcp://127.0.0.1:5557");
testPush.fChannels["data"].push_back(pushChannel);
testPush.ChangeState("INIT_DEVICE");
testPush.WaitForEndOfState("INIT_DEVICE");
testPush.ChangeState("INIT_TASK");
testPush.WaitForEndOfState("INIT_TASK");
testPush.ChangeState("RUN");
testPush.WaitForEndOfState("RUN");
testPush.ChangeState("RESET_TASK");
testPush.WaitForEndOfState("RESET_TASK");
testPush.ChangeState("RESET_DEVICE");
testPush.WaitForEndOfState("RESET_DEVICE");
testPush.ChangeState("END");
return 0;
}

View File

@ -0,0 +1,36 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQTestRep.cpp
*
* @since 2015-09-05
* @author A. Rybalchenko
*/
#include <memory> // unique_ptr
#include "FairMQTestRep.h"
#include "FairMQLogger.h"
FairMQTestRep::FairMQTestRep()
{
}
void FairMQTestRep::Run()
{
std::unique_ptr<FairMQMessage> request(fTransportFactory->CreateMessage());
if (fChannels.at("data").at(0).Receive(request) >= 0)
{
std::unique_ptr<FairMQMessage> reply(fTransportFactory->CreateMessage());
fChannels.at("data").at(0).Send(reply);
}
}
FairMQTestRep::~FairMQTestRep()
{
}

View File

@ -0,0 +1,30 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQTestRep.h
*
* @since 2015-09-05
* @author A. Rybalchenko
*/
#ifndef FAIRMQTESTREP_H_
#define FAIRMQTESTREP_H_
#include "FairMQDevice.h"
class FairMQTestRep : public FairMQDevice
{
public:
FairMQTestRep();
virtual ~FairMQTestRep();
protected:
virtual void Run();
};
#endif /* FAIRMQTESTREP_H_ */

View File

@ -0,0 +1,38 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQTestReq.cxx
*
* @since 2015-09-05
* @author A. Rybalchenko
*/
#include <memory> // unique_ptr
#include "FairMQTestReq.h"
#include "FairMQLogger.h"
FairMQTestReq::FairMQTestReq()
{
}
void FairMQTestReq::Run()
{
std::unique_ptr<FairMQMessage> request(fTransportFactory->CreateMessage());
fChannels.at("data").at(0).Send(request);
std::unique_ptr<FairMQMessage> reply(fTransportFactory->CreateMessage());
if (fChannels.at("data").at(0).Receive(reply) >= 0)
{
LOG(INFO) << "REQ-REP test successfull";
}
}
FairMQTestReq::~FairMQTestReq()
{
}

View File

@ -0,0 +1,30 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQTestReq.h
*
* @since 2015-09-05
* @author A. Rybalchenko
*/
#ifndef FAIRMQTESTREQ_H_
#define FAIRMQTESTREQ_H_
#include "FairMQDevice.h"
class FairMQTestReq : public FairMQDevice
{
public:
FairMQTestReq();
virtual ~FairMQTestReq();
protected:
virtual void Run();
};
#endif /* FAIRMQTESTREQ_H_ */

View File

@ -0,0 +1,60 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* runTestRep.cxx
*
* @since 2015-09-05
* @author A. Rybalchenko
*/
#include <iostream>
#include "FairMQLogger.h"
#include "FairMQTestRep.h"
#ifdef NANOMSG
#include "FairMQTransportFactoryNN.h"
#else
#include "FairMQTransportFactoryZMQ.h"
#endif
int main(int argc, char** argv)
{
FairMQTestRep testRep;
testRep.CatchSignals();
#ifdef NANOMSG
testRep.SetTransport(new FairMQTransportFactoryNN());
#else
testRep.SetTransport(new FairMQTransportFactoryZMQ());
#endif
testRep.SetProperty(FairMQTestRep::Id, "testRep");
FairMQChannel repChannel("rep", "connect", "tcp://127.0.0.1:5558");
testRep.fChannels["data"].push_back(repChannel);
testRep.ChangeState("INIT_DEVICE");
testRep.WaitForEndOfState("INIT_DEVICE");
testRep.ChangeState("INIT_TASK");
testRep.WaitForEndOfState("INIT_TASK");
testRep.ChangeState("RUN");
testRep.WaitForEndOfState("RUN");
testRep.ChangeState("RESET_TASK");
testRep.WaitForEndOfState("RESET_TASK");
testRep.ChangeState("RESET_DEVICE");
testRep.WaitForEndOfState("RESET_DEVICE");
testRep.ChangeState("END");
return 0;
}

View File

@ -0,0 +1,60 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* runTestReq.cxx
*
* @since 2015-09-05
* @author A. Rybalchenko
*/
#include <iostream>
#include "FairMQLogger.h"
#include "FairMQTestReq.h"
#ifdef NANOMSG
#include "FairMQTransportFactoryNN.h"
#else
#include "FairMQTransportFactoryZMQ.h"
#endif
int main(int argc, char** argv)
{
FairMQTestReq testReq;
testReq.CatchSignals();
#ifdef NANOMSG
testReq.SetTransport(new FairMQTransportFactoryNN());
#else
testReq.SetTransport(new FairMQTransportFactoryZMQ());
#endif
testReq.SetProperty(FairMQTestReq::Id, "testReq");
FairMQChannel reqChannel("req", "bind", "tcp://127.0.0.1:5558");
testReq.fChannels["data"].push_back(reqChannel);
testReq.ChangeState("INIT_DEVICE");
testReq.WaitForEndOfState("INIT_DEVICE");
testReq.ChangeState("INIT_TASK");
testReq.WaitForEndOfState("INIT_TASK");
testReq.ChangeState("RUN");
testReq.WaitForEndOfState("RUN");
testReq.ChangeState("RESET_TASK");
testReq.WaitForEndOfState("RESET_TASK");
testReq.ChangeState("RESET_DEVICE");
testReq.WaitForEndOfState("RESET_DEVICE");
testReq.ChangeState("END");
return 0;
}

View File

@ -0,0 +1,12 @@
#!/bin/bash
trap 'kill -TERM $PUB_PID; kill -TERM $SUB1_PID; kill -TERM $SUB2_PID; wait $PUB_PID; wait $SUB1_PID; wait $SUB2_PID;' TERM
@CMAKE_BINARY_DIR@/bin/test-fairmq-pub &
PUB_PID=$!
@CMAKE_BINARY_DIR@/bin/test-fairmq-sub &
SUB1_PID=$!
@CMAKE_BINARY_DIR@/bin/test-fairmq-sub &
SUB2_PID=$!
wait $PUB_PID
wait $SUB1_PID
wait $SUB2_PID

View File

@ -0,0 +1,9 @@
#!/bin/bash
trap 'kill -TERM $PUSH_PID; kill -TERM $PULL_PID; wait $PUSH_PID; wait $PULL_PID;' TERM
@CMAKE_BINARY_DIR@/bin/test-fairmq-push &
PUSH_PID=$!
@CMAKE_BINARY_DIR@/bin/test-fairmq-pull &
PULL_PID=$!
wait $PUSH_PID
wait $PULL_PID

View File

@ -0,0 +1,9 @@
#!/bin/bash
trap 'kill -TERM $REQ_PID; kill -TERM $REP_PID; wait $REQ_PID; wait $REP_PID;' TERM
@CMAKE_BINARY_DIR@/bin/test-fairmq-req &
REQ_PID=$!
@CMAKE_BINARY_DIR@/bin/test-fairmq-rep &
REP_PID=$!
wait $REQ_PID
wait $REP_PID

View File

@ -55,7 +55,7 @@ FairMQPoller* FairMQTransportFactoryZMQ::CreatePoller(std::map< std::string,std:
return new FairMQPollerZMQ(channelsMap, channelList); return new FairMQPollerZMQ(channelsMap, channelList);
} }
FairMQPoller* FairMQTransportFactoryZMQ::CreatePoller(FairMQSocket& dataSocket, FairMQSocket& cmdSocket) FairMQPoller* FairMQTransportFactoryZMQ::CreatePoller(FairMQSocket& cmdSocket, FairMQSocket& dataSocket)
{ {
return new FairMQPollerZMQ(dataSocket, cmdSocket); return new FairMQPollerZMQ(cmdSocket, dataSocket);
} }

View File

@ -36,7 +36,7 @@ class FairMQTransportFactoryZMQ : public FairMQTransportFactory
virtual FairMQPoller* CreatePoller(const std::vector<FairMQChannel>& channels); virtual FairMQPoller* CreatePoller(const std::vector<FairMQChannel>& channels);
virtual FairMQPoller* CreatePoller(std::map<std::string, std::vector<FairMQChannel>>& channelsMap, std::initializer_list<std::string> channelList); virtual FairMQPoller* CreatePoller(std::map<std::string, std::vector<FairMQChannel>>& channelsMap, std::initializer_list<std::string> channelList);
virtual FairMQPoller* CreatePoller(FairMQSocket& dataSocket, FairMQSocket& cmdSocket); virtual FairMQPoller* CreatePoller(FairMQSocket& cmdSocket, FairMQSocket& dataSocket);
virtual ~FairMQTransportFactoryZMQ() {}; virtual ~FairMQTransportFactoryZMQ() {};
}; };