From 88fee245b83c99b583581e1308cf9cef42ceaa2c Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Tue, 21 Jan 2014 15:57:59 +0100 Subject: [PATCH] use factory for sockets --- fairmq/CMakeLists.txt | 18 +-- fairmq/FairMQBenchmarkSampler.cxx | 36 +++--- fairmq/FairMQBenchmarkSampler.h | 17 ++- fairmq/FairMQBuffer.cxx | 8 +- fairmq/FairMQConfigurable.cxx | 8 +- fairmq/FairMQConfigurable.h | 11 +- fairmq/FairMQDevice.cxx | 93 ++++++++------- fairmq/FairMQDevice.h | 48 ++++---- fairmq/FairMQLogger.cxx | 41 +++---- fairmq/FairMQLogger.h | 11 +- fairmq/FairMQMerger.cxx | 10 +- fairmq/FairMQMessage.cxx | 93 --------------- fairmq/FairMQMessage.h | 29 ++--- fairmq/FairMQMessageZMQ.cxx | 111 ++++++++++++++++++ fairmq/FairMQMessageZMQ.h | 43 +++++++ fairmq/FairMQProcessor.cxx | 12 +- fairmq/FairMQProcessor.h | 1 - fairmq/FairMQProcessorTask.h | 1 + fairmq/FairMQProxy.cxx | 8 +- fairmq/FairMQProxy.h | 2 - fairmq/FairMQSampler.cxx | 16 +-- fairmq/FairMQSampler.h | 21 ++-- fairmq/FairMQSamplerTask.cxx | 6 +- fairmq/FairMQSamplerTask.h | 3 +- fairmq/FairMQSink.cxx | 7 +- fairmq/FairMQSocket.cxx | 175 ---------------------------- fairmq/FairMQSocket.h | 42 +++---- fairmq/FairMQSocketZMQ.cxx | 182 ++++++++++++++++++++++++++++++ fairmq/FairMQSocketZMQ.h | 53 +++++++++ fairmq/FairMQSplitter.cxx | 8 +- fairmq/runBenchmarkSampler.cxx | 4 + fairmq/runBuffer.cxx | 4 + fairmq/runMerger.cxx | 4 + fairmq/runNToOneMerger.cxx | 4 + fairmq/runOneToNSplitter.cxx | 4 + fairmq/runProxy.cxx | 4 + fairmq/runSink.cxx | 4 + fairmq/runSplitter.cxx | 4 + 38 files changed, 653 insertions(+), 493 deletions(-) diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index 9a096806..ef19b1e4 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -7,6 +7,13 @@ include_directories( ${ROOT_INCLUDE_DIR} ) +Set(LINK_DIRECTORIES + ${ROOT_LIBRARY_DIR} + ${Boost_LIBRARY_DIRS} +) + +link_directories(${LINK_DIRECTORIES}) + Set(SRCS "FairMQSampler.cxx" "FairMQBenchmarkSampler.cxx" @@ -17,6 +24,8 @@ Set(SRCS "FairMQLogger.cxx" "FairMQContext.cxx" "FairMQMessage.cxx" + "FairMQTransportFactory.cxx" + "FairMQTransportFactoryZMQ.cxx" "FairMQMessageZMQ.cxx" "FairMQMessageNN.cxx" "FairMQSocket.cxx" @@ -31,15 +40,8 @@ Set(SRCS "FairMQProxy.cxx" ) -Set(LINK_DIRECTORIES - ${ROOT_LIBRARY_DIR} - ${Boost_LIBRARY_DIRS} -) - -link_directories(${LINK_DIRECTORIES}) - Set(LIBRARY_NAME FairMQ) -Set(LINKDEF) + Set(DEPENDENCIES ${CMAKE_THREAD_LIBS_INIT} ${ZMQ_LIBRARY_SHARED} diff --git a/fairmq/FairMQBenchmarkSampler.cxx b/fairmq/FairMQBenchmarkSampler.cxx index 2fc7064d..847a1329 100644 --- a/fairmq/FairMQBenchmarkSampler.cxx +++ b/fairmq/FairMQBenchmarkSampler.cxx @@ -39,19 +39,21 @@ void FairMQBenchmarkSampler::Run() boost::thread resetEventCounter(boost::bind(&FairMQBenchmarkSampler::ResetEventCounter, this)); void* buffer = operator new[](fEventSize); - FairMQMessage* base_event = new FairMQMessage(buffer, fEventSize); + FairMQMessage* base_event = new FairMQMessageZMQ(buffer, fEventSize); while ( fState == RUNNING ) { - FairMQMessage event; - event.Copy(base_event); + FairMQMessage* event = new FairMQMessageZMQ(); + event->Copy(base_event); - fPayloadOutputs->at(0)->Send(&event); + fPayloadOutputs->at(0)->Send(event); --fEventCounter; while (fEventCounter == 0) { boost::this_thread::sleep(boost::posix_time::milliseconds(1)); } + + delete event; } delete base_event; @@ -75,16 +77,16 @@ void FairMQBenchmarkSampler::ResetEventCounter() } } -void FairMQBenchmarkSampler::Log(Int_t intervalInMs) +void FairMQBenchmarkSampler::Log(int intervalInMs) { timestamp_t t0; timestamp_t t1; - ULong_t bytes = fPayloadOutputs->at(0)->GetBytesTx(); - ULong_t messages = fPayloadOutputs->at(0)->GetMessagesTx(); - ULong_t bytesNew; - ULong_t messagesNew; - Double_t megabytesPerSecond = (bytesNew - bytes) / (1024 * 1024); - Double_t messagesPerSecond = (messagesNew - messages); + unsigned long bytes = fPayloadOutputs->at(0)->GetBytesTx(); + unsigned long messages = fPayloadOutputs->at(0)->GetMessagesTx(); + unsigned long bytesNew = 0; + unsigned long messagesNew = 0; + double megabytesPerSecond = 0; + double messagesPerSecond = 0; t0 = get_timestamp(); @@ -98,8 +100,8 @@ void FairMQBenchmarkSampler::Log(Int_t intervalInMs) timestamp_t timeSinceLastLog_ms = (t1 - t0) / 1000.0L; - megabytesPerSecond = ((Double_t) (bytesNew - bytes) / (1024. * 1024.)) / (Double_t) timeSinceLastLog_ms * 1000.; - messagesPerSecond = (Double_t) (messagesNew - messages) / (Double_t) timeSinceLastLog_ms * 1000.; + megabytesPerSecond = ((double) (bytesNew - bytes) / (1024. * 1024.)) / (double) timeSinceLastLog_ms * 1000.; + messagesPerSecond = (double) (messagesNew - messages) / (double) timeSinceLastLog_ms * 1000.; std::stringstream logmsg; logmsg << "send " << messagesPerSecond << " msg/s, " << megabytesPerSecond << " MB/s"; @@ -111,7 +113,7 @@ void FairMQBenchmarkSampler::Log(Int_t intervalInMs) } } -void FairMQBenchmarkSampler::SetProperty(const Int_t& key, const TString& value, const Int_t& slot/*= 0*/) +void FairMQBenchmarkSampler::SetProperty(const int& key, const std::string& value, const int& slot/*= 0*/) { switch (key) { default: @@ -120,7 +122,7 @@ void FairMQBenchmarkSampler::SetProperty(const Int_t& key, const TString& value, } } -TString FairMQBenchmarkSampler::GetProperty(const Int_t& key, const TString& default_/*= ""*/, const Int_t& slot/*= 0*/) +std::string FairMQBenchmarkSampler::GetProperty(const int& key, const std::string& default_/*= ""*/, const int& slot/*= 0*/) { switch (key) { default: @@ -128,7 +130,7 @@ TString FairMQBenchmarkSampler::GetProperty(const Int_t& key, const TString& def } } -void FairMQBenchmarkSampler::SetProperty(const Int_t& key, const Int_t& value, const Int_t& slot/*= 0*/) +void FairMQBenchmarkSampler::SetProperty(const int& key, const int& value, const int& slot/*= 0*/) { switch (key) { case EventSize: @@ -143,7 +145,7 @@ void FairMQBenchmarkSampler::SetProperty(const Int_t& key, const Int_t& value, c } } -Int_t FairMQBenchmarkSampler::GetProperty(const Int_t& key, const Int_t& default_/*= 0*/, const Int_t& slot/*= 0*/) +int FairMQBenchmarkSampler::GetProperty(const int& key, const int& default_/*= 0*/, const int& slot/*= 0*/) { switch (key) { case EventSize: diff --git a/fairmq/FairMQBenchmarkSampler.h b/fairmq/FairMQBenchmarkSampler.h index f1d6fef0..aaa93a32 100644 --- a/fairmq/FairMQBenchmarkSampler.h +++ b/fairmq/FairMQBenchmarkSampler.h @@ -11,7 +11,6 @@ #include #include "FairMQDevice.h" -#include "TString.h" /** * Sampler to generate traffic for benchmarking. @@ -28,16 +27,16 @@ class FairMQBenchmarkSampler: public FairMQDevice }; FairMQBenchmarkSampler(); virtual ~FairMQBenchmarkSampler(); - void Log(Int_t intervalInMs); + void Log(int intervalInMs); void ResetEventCounter(); - virtual void SetProperty(const Int_t& key, const TString& value, const Int_t& slot = 0); - virtual TString GetProperty(const Int_t& key, const TString& default_ = "", const Int_t& slot = 0); - virtual void SetProperty(const Int_t& key, const Int_t& value, const Int_t& slot = 0); - virtual Int_t GetProperty(const Int_t& key, const Int_t& default_ = 0, const Int_t& slot = 0); + virtual void SetProperty(const int& key, const std::string& value, const int& slot = 0); + virtual std::string GetProperty(const int& key, const std::string& default_ = "", const int& slot = 0); + virtual void SetProperty(const int& key, const int& value, const int& slot = 0); + virtual int GetProperty(const int& key, const int& default_ = 0, const int& slot = 0); protected: - Int_t fEventSize; - Int_t fEventRate; - Int_t fEventCounter; + int fEventSize; + int fEventRate; + int fEventCounter; virtual void Init(); virtual void Run(); }; diff --git a/fairmq/FairMQBuffer.cxx b/fairmq/FairMQBuffer.cxx index 42cdaf04..be5fdeef 100644 --- a/fairmq/FairMQBuffer.cxx +++ b/fairmq/FairMQBuffer.cxx @@ -26,14 +26,16 @@ void FairMQBuffer::Run() bool received = false; while ( fState == RUNNING ) { - FairMQMessage msg; + FairMQMessage* msg = new FairMQMessageZMQ(); - received = fPayloadInputs->at(0)->Receive(&msg); + received = fPayloadInputs->at(0)->Receive(msg); if (received) { - fPayloadOutputs->at(0)->Send(&msg); + fPayloadOutputs->at(0)->Send(msg); received = false; } + + delete msg; } rateLogger.interrupt(); diff --git a/fairmq/FairMQConfigurable.cxx b/fairmq/FairMQConfigurable.cxx index dd589eff..71186517 100644 --- a/fairmq/FairMQConfigurable.cxx +++ b/fairmq/FairMQConfigurable.cxx @@ -12,20 +12,20 @@ FairMQConfigurable::FairMQConfigurable() { } -void FairMQConfigurable::SetProperty(const Int_t& key, const TString& value, const Int_t& slot/*= 0*/) +void FairMQConfigurable::SetProperty(const int& key, const std::string& value, const int& slot/*= 0*/) { } -TString FairMQConfigurable::GetProperty(const Int_t& key, const TString& default_/*= ""*/, const Int_t& slot/*= 0*/) +std::string FairMQConfigurable::GetProperty(const int& key, const std::string& default_/*= ""*/, const int& slot/*= 0*/) { return default_; } -void FairMQConfigurable::SetProperty(const Int_t& key, const Int_t& value, const Int_t& slot/*= 0*/) +void FairMQConfigurable::SetProperty(const int& key, const int& value, const int& slot/*= 0*/) { } -Int_t FairMQConfigurable::GetProperty(const Int_t& key, const Int_t& default_/*= 0*/, const Int_t& slot/*= 0*/) +int FairMQConfigurable::GetProperty(const int& key, const int& default_/*= 0*/, const int& slot/*= 0*/) { return default_; } diff --git a/fairmq/FairMQConfigurable.h b/fairmq/FairMQConfigurable.h index 641086f9..924cf87f 100644 --- a/fairmq/FairMQConfigurable.h +++ b/fairmq/FairMQConfigurable.h @@ -8,8 +8,7 @@ #ifndef FAIRMQCONFIGURABLE_H_ #define FAIRMQCONFIGURABLE_H_ -#include "Rtypes.h" -#include "TString.h" +#include class FairMQConfigurable @@ -19,10 +18,10 @@ class FairMQConfigurable Last = 1 }; FairMQConfigurable(); - virtual void SetProperty(const Int_t& key, const TString& value, const Int_t& slot = 0); - virtual TString GetProperty(const Int_t& key, const TString& default_ = "", const Int_t& slot = 0); - virtual void SetProperty(const Int_t& key, const Int_t& value, const Int_t& slot = 0); - virtual Int_t GetProperty(const Int_t& key, const Int_t& default_ = 0, const Int_t& slot = 0); + virtual void SetProperty(const int& key, const std::string& value, const int& slot = 0); + virtual std::string GetProperty(const int& key, const std::string& default_ = "", const int& slot = 0); + virtual void SetProperty(const int& key, const int& value, const int& slot = 0); + virtual int GetProperty(const int& key, const int& default_ = 0, const int& slot = 0); virtual ~FairMQConfigurable(); }; diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index 7a9d5719..e2a7f93c 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -9,7 +9,7 @@ #include -#include "FairMQSocket.h" +#include "FairMQSocketZMQ.h" #include "FairMQDevice.h" #include "FairMQLogger.h" @@ -32,26 +32,26 @@ void FairMQDevice::Init() fPayloadContext = new FairMQContext(fNumIoThreads); - fInputAddress = new std::vector(fNumInputs); - fInputMethod = new std::vector(); - fInputSocketType = new std::vector(); - fInputSndBufSize = new std::vector(); - fInputRcvBufSize = new std::vector(); + fInputAddress = new std::vector(fNumInputs); + fInputMethod = new std::vector(); + fInputSocketType = new std::vector(); + fInputSndBufSize = new std::vector(); + fInputRcvBufSize = new std::vector(); - for (Int_t i = 0; i < fNumInputs; ++i) { + for (int i = 0; i < fNumInputs; ++i) { fInputMethod->push_back("connect"); // default value, can be overwritten in configuration fInputSocketType->push_back(ZMQ_SUB); // default value, can be overwritten in configuration fInputSndBufSize->push_back(10000); // default value, can be overwritten in configuration fInputRcvBufSize->push_back(10000); // default value, can be overwritten in configuration } - fOutputAddress = new std::vector(fNumOutputs); - fOutputMethod = new std::vector(); - fOutputSocketType = new std::vector(); - fOutputSndBufSize = new std::vector(); - fOutputRcvBufSize = new std::vector(); + fOutputAddress = new std::vector(fNumOutputs); + fOutputMethod = new std::vector(); + fOutputSocketType = new std::vector(); + fOutputSndBufSize = new std::vector(); + fOutputRcvBufSize = new std::vector(); - for (Int_t i = 0; i < fNumOutputs; ++i) { + for (int i = 0; i < fNumOutputs; ++i) { fOutputMethod->push_back("bind"); // default value, can be overwritten in configuration fOutputSocketType->push_back(ZMQ_PUB); // default value, can be overwritten in configuration fOutputSndBufSize->push_back(10000); // default value, can be overwritten in configuration @@ -63,8 +63,9 @@ void FairMQDevice::InitInput() { FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> InitInput <<<<<<<"); - for (Int_t i = 0; i < fNumInputs; ++i) { - FairMQSocket* socket = new FairMQSocket(fPayloadContext, fInputSocketType->at(i), i); + for (int i = 0; i < fNumInputs; ++i) { + //FairMQSocket* socket = new FairMQSocketZMQ(fPayloadContext, fInputSocketType->at(i), i); + FairMQSocket* socket = fTransportFactory->CreateSocket(fPayloadContext, fInputSocketType->at(i), i); socket->SetOption(ZMQ_SNDHWM, &fInputSndBufSize->at(i), sizeof(fInputSndBufSize->at(i))); socket->SetOption(ZMQ_RCVHWM, &fInputRcvBufSize->at(i), sizeof(fInputRcvBufSize->at(i))); @@ -86,11 +87,14 @@ void FairMQDevice::InitOutput() { FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> InitOutput <<<<<<<"); - for (Int_t i = 0; i < fNumOutputs; ++i) { - FairMQSocket* socket = new FairMQSocket(fPayloadContext, fOutputSocketType->at(i), i); + for (int i = 0; i < fNumOutputs; ++i) { + FairMQSocket* socket = fTransportFactory->CreateSocket(fPayloadContext, fOutputSocketType->at(i), i); + socket->SetOption(ZMQ_SNDHWM, &fOutputSndBufSize->at(i), sizeof(fOutputSndBufSize->at(i))); socket->SetOption(ZMQ_RCVHWM, &fOutputRcvBufSize->at(i), sizeof(fOutputRcvBufSize->at(i))); + fPayloadOutputs->push_back(socket); + try { if (fOutputMethod->at(i) == "bind") { fPayloadOutputs->at(i)->Bind(fOutputAddress->at(i)); @@ -111,7 +115,7 @@ void FairMQDevice::Pause() } // Method for setting properties represented as a string. -void FairMQDevice::SetProperty(const Int_t& key, const TString& value, const Int_t& slot/*= 0*/) +void FairMQDevice::SetProperty(const int& key, const std::string& value, const int& slot/*= 0*/) { switch (key) { case Id: @@ -140,7 +144,7 @@ void FairMQDevice::SetProperty(const Int_t& key, const TString& value, const Int } // Method for setting properties represented as an integer. -void FairMQDevice::SetProperty(const Int_t& key, const Int_t& value, const Int_t& slot/*= 0*/) +void FairMQDevice::SetProperty(const int& key, const int& value, const int& slot/*= 0*/) { switch (key) { case NumIoThreads: @@ -186,7 +190,7 @@ void FairMQDevice::SetProperty(const Int_t& key, const Int_t& value, const Int_t } // Method for getting properties represented as an string. -TString FairMQDevice::GetProperty(const Int_t& key, const TString& default_/*= ""*/, const Int_t& slot/*= 0*/) +std::string FairMQDevice::GetProperty(const int& key, const std::string& default_/*= ""*/, const int& slot/*= 0*/) { switch (key) { case Id: @@ -205,7 +209,7 @@ TString FairMQDevice::GetProperty(const Int_t& key, const TString& default_/*= " } // Method for getting properties represented as an integer. -Int_t FairMQDevice::GetProperty(const Int_t& key, const Int_t& default_/*= 0*/, const Int_t& slot/*= 0*/) +int FairMQDevice::GetProperty(const int& key, const int& default_/*= 0*/, const int& slot/*= 0*/) { switch (key) { case NumIoThreads: @@ -229,6 +233,11 @@ Int_t FairMQDevice::GetProperty(const Int_t& key, const Int_t& default_/*= 0*/, } } +void FairMQDevice::SetTransport(FairMQTransportFactory* factory) +{ + fTransportFactory = factory; +} + void FairMQDevice::LogSocketRates() { timestamp_t t0; @@ -236,20 +245,20 @@ void FairMQDevice::LogSocketRates() timestamp_t timeSinceLastLog_ms; - ULong_t* bytesInput = new ULong_t[fNumInputs]; - ULong_t* messagesInput = new ULong_t[fNumInputs]; - ULong_t* bytesOutput = new ULong_t[fNumOutputs]; - ULong_t* messagesOutput = new ULong_t[fNumOutputs]; + unsigned long* bytesInput = new unsigned long[fNumInputs]; + unsigned long* messagesInput = new unsigned long[fNumInputs]; + unsigned long* bytesOutput = new unsigned long[fNumOutputs]; + unsigned long* messagesOutput = new unsigned long[fNumOutputs]; - ULong_t* bytesInputNew = new ULong_t[fNumInputs]; - ULong_t* messagesInputNew = new ULong_t[fNumInputs]; - ULong_t* bytesOutputNew = new ULong_t[fNumOutputs]; - ULong_t* messagesOutputNew = new ULong_t[fNumOutputs]; + unsigned long* bytesInputNew = new unsigned long[fNumInputs]; + unsigned long* messagesInputNew = new unsigned long[fNumInputs]; + unsigned long* bytesOutputNew = new unsigned long[fNumOutputs]; + unsigned long* messagesOutputNew = new unsigned long[fNumOutputs]; - Double_t* megabytesPerSecondInput = new Double_t[fNumInputs]; - Double_t* messagesPerSecondInput = new Double_t[fNumInputs]; - Double_t* megabytesPerSecondOutput = new Double_t[fNumOutputs]; - Double_t* messagesPerSecondOutput = new Double_t[fNumOutputs]; + double* megabytesPerSecondInput = new double[fNumInputs]; + double* messagesPerSecondInput = new double[fNumInputs]; + double* megabytesPerSecondOutput = new double[fNumOutputs]; + double* messagesPerSecondOutput = new double[fNumOutputs]; // Temp stuff for process termination bool receivedSomething = false; @@ -258,15 +267,15 @@ void FairMQDevice::LogSocketRates() int didNotSendFor = 0; // End of temp stuff - Int_t i = 0; - for( std::vector::iterator itr = fPayloadInputs->begin(); itr != fPayloadInputs->end(); itr++ ) { + int i = 0; + for ( std::vector::iterator itr = fPayloadInputs->begin(); itr != fPayloadInputs->end(); itr++ ) { bytesInput[i] = (*itr)->GetBytesRx(); messagesInput[i] = (*itr)->GetMessagesRx(); ++i; } i = 0; - for( std::vector::iterator itr = fPayloadOutputs->begin(); itr != fPayloadOutputs->end(); itr++ ) { + for ( std::vector::iterator itr = fPayloadOutputs->begin(); itr != fPayloadOutputs->end(); itr++ ) { bytesOutput[i] = (*itr)->GetBytesTx(); messagesOutput[i] = (*itr)->GetMessagesTx(); ++i; @@ -284,12 +293,12 @@ void FairMQDevice::LogSocketRates() i = 0; - for( std::vector::iterator itr = fPayloadInputs->begin(); itr != fPayloadInputs->end(); itr++ ) { + for ( std::vector::iterator itr = fPayloadInputs->begin(); itr != fPayloadInputs->end(); itr++ ) { bytesInputNew[i] = (*itr)->GetBytesRx(); - megabytesPerSecondInput[i] = ((Double_t) (bytesInputNew[i] - bytesInput[i]) / (1024. * 1024.)) / (Double_t) timeSinceLastLog_ms * 1000.; + megabytesPerSecondInput[i] = ((double) (bytesInputNew[i] - bytesInput[i]) / (1024. * 1024.)) / (double) timeSinceLastLog_ms * 1000.; bytesInput[i] = bytesInputNew[i]; messagesInputNew[i] = (*itr)->GetMessagesRx(); - messagesPerSecondInput[i] = (Double_t) (messagesInputNew[i] - messagesInput[i]) / (Double_t) timeSinceLastLog_ms * 1000.; + messagesPerSecondInput[i] = (double) (messagesInputNew[i] - messagesInput[i]) / (double) timeSinceLastLog_ms * 1000.; messagesInput[i] = messagesInputNew[i]; std::stringstream logmsg; @@ -312,12 +321,12 @@ void FairMQDevice::LogSocketRates() i = 0; - for( std::vector::iterator itr = fPayloadOutputs->begin(); itr != fPayloadOutputs->end(); itr++ ) { + for ( std::vector::iterator itr = fPayloadOutputs->begin(); itr != fPayloadOutputs->end(); itr++ ) { bytesOutputNew[i] = (*itr)->GetBytesTx(); - megabytesPerSecondOutput[i] = ((Double_t) (bytesOutputNew[i] - bytesOutput[i]) / (1024. * 1024.)) / (Double_t) timeSinceLastLog_ms * 1000.; + megabytesPerSecondOutput[i] = ((double) (bytesOutputNew[i] - bytesOutput[i]) / (1024. * 1024.)) / (double) timeSinceLastLog_ms * 1000.; bytesOutput[i] = bytesOutputNew[i]; messagesOutputNew[i] = (*itr)->GetMessagesTx(); - messagesPerSecondOutput[i] = (Double_t) (messagesOutputNew[i] - messagesOutput[i]) / (Double_t) timeSinceLastLog_ms * 1000.; + messagesPerSecondOutput[i] = (double) (messagesOutputNew[i] - messagesOutput[i]) / (double) timeSinceLastLog_ms * 1000.; messagesOutput[i] = messagesOutputNew[i]; std::stringstream logmsg; diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index a38c651c..bcd6602e 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -8,13 +8,14 @@ #ifndef FAIRMQDEVICE_H_ #define FAIRMQDEVICE_H_ +#include +#include + #include "FairMQConfigurable.h" #include "FairMQStateMachine.h" -#include +#include "FairMQTransportFactory.h" #include "FairMQContext.h" #include "FairMQSocket.h" -#include "Rtypes.h" -#include "TString.h" class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable @@ -44,37 +45,40 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable virtual void LogSocketRates(); virtual void ListenToCommands(); - virtual void SetProperty(const Int_t& key, const TString& value, const Int_t& slot = 0); - virtual TString GetProperty(const Int_t& key, const TString& default_ = "", const Int_t& slot = 0); - virtual void SetProperty(const Int_t& key, const Int_t& value, const Int_t& slot = 0); - virtual Int_t GetProperty(const Int_t& key, const Int_t& default_ = 0, const Int_t& slot = 0); + virtual void SetProperty(const int& key, const std::string& value, const int& slot = 0); + virtual std::string GetProperty(const int& key, const std::string& default_ = "", const int& slot = 0); + virtual void SetProperty(const int& key, const int& value, const int& slot = 0); + virtual int GetProperty(const int& key, const int& default_ = 0, const int& slot = 0); + + virtual void SetTransport(FairMQTransportFactory* factory); virtual ~FairMQDevice(); protected: - TString fId; - Int_t fNumIoThreads; + std::string fId; + int fNumIoThreads; FairMQContext* fPayloadContext; + FairMQTransportFactory* fTransportFactory; - Int_t fNumInputs; - Int_t fNumOutputs; + int fNumInputs; + int fNumOutputs; - std::vector *fInputAddress; - std::vector *fInputMethod; - std::vector *fInputSocketType; - std::vector *fInputSndBufSize; - std::vector *fInputRcvBufSize; + std::vector *fInputAddress; + std::vector *fInputMethod; + std::vector *fInputSocketType; + std::vector *fInputSndBufSize; + std::vector *fInputRcvBufSize; - std::vector *fOutputAddress; - std::vector *fOutputMethod; - std::vector *fOutputSocketType; - std::vector *fOutputSndBufSize; - std::vector *fOutputRcvBufSize; + std::vector *fOutputAddress; + std::vector *fOutputMethod; + std::vector *fOutputSocketType; + std::vector *fOutputSndBufSize; + std::vector *fOutputRcvBufSize; std::vector *fPayloadInputs; std::vector *fPayloadOutputs; - Int_t fLogIntervalInMs; + int fLogIntervalInMs; virtual void Init(); virtual void Run(); diff --git a/fairmq/FairMQLogger.cxx b/fairmq/FairMQLogger.cxx index f08f62db..c21414f8 100644 --- a/fairmq/FairMQLogger.cxx +++ b/fairmq/FairMQLogger.cxx @@ -5,10 +5,11 @@ * @author D. Klein, A. Rybalchenko */ -#include "FairMQLogger.h" #include -#include #include +#include + +#include "FairMQLogger.h" FairMQLogger* FairMQLogger::instance = NULL; @@ -21,7 +22,7 @@ FairMQLogger* FairMQLogger::GetInstance() return instance; } -FairMQLogger* FairMQLogger::InitInstance(TString bindAddress) +FairMQLogger* FairMQLogger::InitInstance(std::string bindAddress) { instance = new FairMQLogger(bindAddress); return instance; @@ -32,7 +33,7 @@ FairMQLogger::FairMQLogger() : { } -FairMQLogger::FairMQLogger(TString bindAddress) : +FairMQLogger::FairMQLogger(std::string bindAddress) : fBindAddress(bindAddress) { } @@ -41,31 +42,31 @@ FairMQLogger::~FairMQLogger() { } -void FairMQLogger::Log(Int_t type, TString logmsg) +void FairMQLogger::Log(int type, std::string logmsg) { timestamp_t tm = get_timestamp(); timestamp_t ms = tm / 1000.0L; timestamp_t s = ms / 1000.0L; std::time_t t = s; std::size_t fractional_seconds = ms % 1000; - Text_t mbstr[100]; + char mbstr[100]; std::strftime(mbstr, 100, "%H:%M:%S:", std::localtime(&t)); - TString type_str; + std::string type_str; switch (type) { - case DEBUG: - type_str = "\033[01;34mDEBUG\033[0m"; - break; - case INFO: - type_str = "\033[01;32mINFO\033[0m"; - break; - case ERROR: - type_str = "\033[01;31mERROR\033[0m"; - break; - case STATE: - type_str = "\033[01;33mSTATE\033[0m"; - default: - break; + case DEBUG: + type_str = "\033[01;34mDEBUG\033[0m"; + break; + case INFO: + type_str = "\033[01;32mINFO\033[0m"; + break; + case ERROR: + type_str = "\033[01;31mERROR\033[0m"; + break; + case STATE: + type_str = "\033[01;33mSTATE\033[0m"; + default: + break; } std::cout << "[\033[01;36m" << mbstr << fractional_seconds << "\033[0m]" << "[" << type_str << "]" << " " << logmsg << std::endl; diff --git a/fairmq/FairMQLogger.h b/fairmq/FairMQLogger.h index 8adbbcaa..c76d125e 100644 --- a/fairmq/FairMQLogger.h +++ b/fairmq/FairMQLogger.h @@ -7,28 +7,27 @@ #ifndef FAIRMQLOGGER_H_ #define FAIRMQLOGGER_H_ + #include #include #include -#include "Rtypes.h" -#include "TString.h" class FairMQLogger { private: static FairMQLogger* instance; - TString fBindAddress; + std::string fBindAddress; public: enum { DEBUG, INFO, ERROR, STATE }; FairMQLogger(); - FairMQLogger(TString bindAdress); + FairMQLogger(std::string bindAdress); virtual ~FairMQLogger(); - void Log(Int_t type, TString logmsg); + void Log(int type, std::string logmsg); static FairMQLogger* GetInstance(); - static FairMQLogger* InitInstance(TString bindAddress); + static FairMQLogger* InitInstance(std::string bindAddress); }; typedef unsigned long long timestamp_t; diff --git a/fairmq/FairMQMerger.cxx b/fairmq/FairMQMerger.cxx index acc1c7b3..81902e3d 100644 --- a/fairmq/FairMQMerger.cxx +++ b/fairmq/FairMQMerger.cxx @@ -35,22 +35,24 @@ void FairMQMerger::Run() items[i].revents = 0; } - Bool_t received = false; + bool received = false; while ( fState == RUNNING ) { - FairMQMessage msg; + FairMQMessage* msg = new FairMQMessageZMQ(); zmq_poll(items, fNumInputs, 100); for(int i = 0; i < fNumInputs; i++) { if (items[i].revents & ZMQ_POLLIN) { - received = fPayloadInputs->at(i)->Receive(&msg); + received = fPayloadInputs->at(i)->Receive(msg); } if (received) { - fPayloadOutputs->at(0)->Send(&msg); + fPayloadOutputs->at(0)->Send(msg); received = false; } } + + delete msg; } rateLogger.interrupt(); diff --git a/fairmq/FairMQMessage.cxx b/fairmq/FairMQMessage.cxx index 5e76063f..f9b74f71 100644 --- a/fairmq/FairMQMessage.cxx +++ b/fairmq/FairMQMessage.cxx @@ -4,96 +4,3 @@ * @since 2012-12-05 * @author: D. Klein, A. Rybalchenko */ - -#include - -#include "FairMQMessage.h" -#include "FairMQLogger.h" - - -FairMQMessage::FairMQMessage() -{ - int rc = zmq_msg_init (&fMessage); - if (rc != 0){ - std::stringstream logmsg; - logmsg << "failed initializing message, reason: " << zmq_strerror(errno); - FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); - } -} - -FairMQMessage::FairMQMessage(size_t size) -{ - int rc = zmq_msg_init_size (&fMessage, size); - if (rc != 0){ - std::stringstream logmsg; - logmsg << "failed initializing message with size, reason: " << zmq_strerror(errno); - FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); - } -} - -FairMQMessage::FairMQMessage(void* data, size_t size) -{ - int rc = zmq_msg_init_data (&fMessage, data, size, &CleanUp, NULL); - if (rc != 0){ - std::stringstream logmsg; - logmsg << "failed initializing message with data, reason: " << zmq_strerror(errno); - FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); - } -} - -FairMQMessage::~FairMQMessage() -{ - int rc = zmq_msg_close (&fMessage); - if (rc != 0){ - std::stringstream logmsg; - logmsg << "failed closing message with data, reason: " << zmq_strerror(errno); - FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); - } -} - -void FairMQMessage::Rebuild(void* data, size_t size) -{ - int rc = zmq_msg_close (&fMessage); - if (rc != 0) { - std::stringstream logmsg; - logmsg << "failed closing message, reason: " << zmq_strerror(errno); - FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); - } - - rc = zmq_msg_init_data (&fMessage, data, size, &CleanUp, NULL); - if (rc != 0) { - std::stringstream logmsg2; - logmsg2 << "failed initializing message with data, reason: " << zmq_strerror(errno); - FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg2.str()); - } -} - -zmq_msg_t* FairMQMessage::GetMessage() -{ - return &fMessage; -} - -void* FairMQMessage::GetData() -{ - return zmq_msg_data (&fMessage); -} - -size_t FairMQMessage::GetSize() -{ - return zmq_msg_size (&fMessage); -} - -void FairMQMessage::Copy(FairMQMessage* msg) -{ - int rc = zmq_msg_copy (&fMessage, &(msg->fMessage)); - if (rc != 0) { - std::stringstream logmsg; - logmsg << "failed copying message, reason: " << zmq_strerror(errno); - FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); - } -} - -void FairMQMessage::CleanUp(void* data, void* hint) -{ - free (data); -} diff --git a/fairmq/FairMQMessage.h b/fairmq/FairMQMessage.h index a071a86d..0ce27b4f 100644 --- a/fairmq/FairMQMessage.h +++ b/fairmq/FairMQMessage.h @@ -8,34 +8,23 @@ #ifndef FAIRMQMESSAGE_H_ #define FAIRMQMESSAGE_H_ -#include - -#include +#include // for size_t class FairMQMessage { public: - FairMQMessage(); - FairMQMessage(size_t size); - FairMQMessage(void* data, size_t size); + virtual void Rebuild() = 0; + virtual void Rebuild(size_t size) = 0; + virtual void Rebuild(void* data, size_t site) = 0; - void Rebuild(); - void Rebuild(size_t size); - void Rebuild(void* data, size_t site); + virtual void* GetMessage() = 0; + virtual void* GetData() = 0; + virtual size_t GetSize() = 0; - zmq_msg_t* GetMessage(); - void* GetData(); - size_t GetSize(); + virtual void Copy(FairMQMessage* msg) = 0; - void Copy(FairMQMessage* msg); - - static void CleanUp(void* data, void* hint); - - virtual ~FairMQMessage(); - - private: - zmq_msg_t fMessage; + virtual ~FairMQMessage() {}; }; #endif /* FAIRMQMESSAGE_H_ */ diff --git a/fairmq/FairMQMessageZMQ.cxx b/fairmq/FairMQMessageZMQ.cxx index e69de29b..a6fdcdf6 100644 --- a/fairmq/FairMQMessageZMQ.cxx +++ b/fairmq/FairMQMessageZMQ.cxx @@ -0,0 +1,111 @@ +/** + * FairMQMessageZMQ.cxx + * + * @since 2012-12-05 + * @author: D. Klein, A. Rybalchenko + */ + +#include + +#include "FairMQMessageZMQ.h" +#include "FairMQLogger.h" + + +FairMQMessageZMQ::FairMQMessageZMQ() +{ + int rc = zmq_msg_init (&fMessage); + if (rc != 0){ + std::stringstream logmsg; + logmsg << "failed initializing message, reason: " << zmq_strerror(errno); + FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); + } +} + +FairMQMessageZMQ::FairMQMessageZMQ(size_t size) +{ + int rc = zmq_msg_init_size (&fMessage, size); + if (rc != 0){ + std::stringstream logmsg; + logmsg << "failed initializing message with size, reason: " << zmq_strerror(errno); + FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); + } +} + +FairMQMessageZMQ::FairMQMessageZMQ(void* data, size_t size) +{ + int rc = zmq_msg_init_data (&fMessage, data, size, &CleanUp, NULL); + if (rc != 0){ + std::stringstream logmsg; + logmsg << "failed initializing message with data, reason: " << zmq_strerror(errno); + FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); + } +} + +void FairMQMessageZMQ::Rebuild() +{ + //TODO +} + +void FairMQMessageZMQ::Rebuild(size_t size) +{ + //TODO +} + + + +void FairMQMessageZMQ::Rebuild(void* data, size_t size) +{ + int rc = zmq_msg_close (&fMessage); + if (rc != 0) { + std::stringstream logmsg; + logmsg << "failed closing message, reason: " << zmq_strerror(errno); + FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); + } + + rc = zmq_msg_init_data (&fMessage, data, size, &CleanUp, NULL); + if (rc != 0) { + std::stringstream logmsg2; + logmsg2 << "failed initializing message with data, reason: " << zmq_strerror(errno); + FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg2.str()); + } +} + +void* FairMQMessageZMQ::GetMessage() +{ + return &fMessage; +} + +void* FairMQMessageZMQ::GetData() +{ + return zmq_msg_data (&fMessage); +} + +size_t FairMQMessageZMQ::GetSize() +{ + return zmq_msg_size (&fMessage); +} + +void FairMQMessageZMQ::Copy(FairMQMessage* msg) +{ + int rc = zmq_msg_copy (&fMessage, &(static_cast(msg)->fMessage)); + if (rc != 0) { + std::stringstream logmsg; + logmsg << "failed copying message, reason: " << zmq_strerror(errno); + FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); + } +} + +void FairMQMessageZMQ::CleanUp(void* data, void* hint) +{ + free (data); +} + +FairMQMessageZMQ::~FairMQMessageZMQ() +{ + int rc = zmq_msg_close (&fMessage); + if (rc != 0){ + std::stringstream logmsg; + logmsg << "failed closing message with data, reason: " << zmq_strerror(errno); + FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); + } +} diff --git a/fairmq/FairMQMessageZMQ.h b/fairmq/FairMQMessageZMQ.h index e69de29b..84fec86d 100644 --- a/fairmq/FairMQMessageZMQ.h +++ b/fairmq/FairMQMessageZMQ.h @@ -0,0 +1,43 @@ +/** + * FairMQMessageZMQ.h + * + * @since 2014-01-17 + * @author: A. Rybalchenko + */ + +#ifndef FAIRMQMESSAGEZMQ_H_ +#define FAIRMQMESSAGEZMQ_H_ + +#include + +#include + +#include "FairMQMessage.h" + + +class FairMQMessageZMQ: public FairMQMessage +{ + public: + FairMQMessageZMQ(); + FairMQMessageZMQ(size_t size); + FairMQMessageZMQ(void* data, size_t size); + + virtual void Rebuild(); + virtual void Rebuild(size_t size); + virtual void Rebuild(void* data, size_t site); + + virtual void* GetMessage(); + virtual void* GetData(); + virtual size_t GetSize(); + + virtual void Copy(FairMQMessage* msg); + + static void CleanUp(void* data, void* hint); + + virtual ~FairMQMessageZMQ(); + + private: + zmq_msg_t fMessage; +}; + +#endif /* FAIRMQMESSAGEZMQ_H_ */ diff --git a/fairmq/FairMQProcessor.cxx b/fairmq/FairMQProcessor.cxx index a783ad5a..7b091cdc 100644 --- a/fairmq/FairMQProcessor.cxx +++ b/fairmq/FairMQProcessor.cxx @@ -42,21 +42,23 @@ void FairMQProcessor::Run() int receivedMsgs = 0; int sentMsgs = 0; - Bool_t received = false; + bool received = false; while ( fState == RUNNING ) { - FairMQMessage msg; + FairMQMessage* msg = new FairMQMessageZMQ(); - received = fPayloadInputs->at(0)->Receive(&msg); + received = fPayloadInputs->at(0)->Receive(msg); receivedMsgs++; if (received) { - fTask->Exec(&msg, NULL); + fTask->Exec(msg, NULL); - fPayloadOutputs->at(0)->Send(&msg); + fPayloadOutputs->at(0)->Send(msg); sentMsgs++; received = false; } + + delete msg; } std::cout << "I've received " << receivedMsgs << " and sent " << sentMsgs << " messages!" << std::endl; diff --git a/fairmq/FairMQProcessor.h b/fairmq/FairMQProcessor.h index ae793449..c396ab20 100644 --- a/fairmq/FairMQProcessor.h +++ b/fairmq/FairMQProcessor.h @@ -10,7 +10,6 @@ #include "FairMQDevice.h" #include "FairMQProcessorTask.h" -#include "Rtypes.h" class FairMQProcessor: public FairMQDevice diff --git a/fairmq/FairMQProcessorTask.h b/fairmq/FairMQProcessorTask.h index 97151f75..3d563d9e 100644 --- a/fairmq/FairMQProcessorTask.h +++ b/fairmq/FairMQProcessorTask.h @@ -9,6 +9,7 @@ #define FAIRMQPROCESSORTASK_H_ #include + #include "FairMQMessage.h" #include "FairTask.h" diff --git a/fairmq/FairMQProxy.cxx b/fairmq/FairMQProxy.cxx index a3e930d1..776469b2 100644 --- a/fairmq/FairMQProxy.cxx +++ b/fairmq/FairMQProxy.cxx @@ -25,13 +25,15 @@ void FairMQProxy::Run() boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this)); - FairMQMessage msg; + FairMQMessage* msg = new FairMQMessageZMQ(); while ( fState == RUNNING ) { - fPayloadInputs->at(0)->Receive(&msg); - fPayloadOutputs->at(0)->Send(&msg); + fPayloadInputs->at(0)->Receive(msg); + fPayloadOutputs->at(0)->Send(msg); } + delete msg; + rateLogger.interrupt(); rateLogger.join(); } diff --git a/fairmq/FairMQProxy.h b/fairmq/FairMQProxy.h index 6f684992..1660faf1 100644 --- a/fairmq/FairMQProxy.h +++ b/fairmq/FairMQProxy.h @@ -9,8 +9,6 @@ #define FAIRMQPROXY_H_ #include "FairMQDevice.h" -#include "Rtypes.h" -#include "TString.h" class FairMQProxy: public FairMQDevice diff --git a/fairmq/FairMQSampler.cxx b/fairmq/FairMQSampler.cxx index b78342c5..95ef2bf3 100644 --- a/fairmq/FairMQSampler.cxx +++ b/fairmq/FairMQSampler.cxx @@ -62,7 +62,7 @@ void FairMQSampler::Init() fFairRunAna->Init(); //fFairRunAna->Run(0, 0); FairRootManager* ioman = FairRootManager::Instance(); - fNumEvents = Int_t((ioman->GetInChain())->GetEntries()); + fNumEvents = int((ioman->GetInChain())->GetEntries()); } void FairMQSampler::Run() @@ -139,9 +139,9 @@ void FairMQSampler::ListenToCommands() while ( true ) { try { - FairMQMessage msg; + FairMQMessage* msg = new FairMQMessageZMQ(); - received = fPayloadInputs->at(0)->Receive(&msg); + received = fPayloadInputs->at(0)->Receive(msg); if (received) { //command handling goes here. @@ -149,6 +149,8 @@ void FairMQSampler::ListenToCommands() received = false; } + delete msg; + boost::this_thread::interruption_point(); } catch (boost::thread_interrupted&) { std::cout << "commandListener interrupted" << std::endl; @@ -158,7 +160,7 @@ void FairMQSampler::ListenToCommands() FairMQLogger::GetInstance()->Log(FairMQLogger::DEBUG, ">>>>>>> stopping commandListener <<<<<<<"); } -void FairMQSampler::SetProperty(const Int_t& key, const TString& value, const Int_t& slot/*= 0*/) +void FairMQSampler::SetProperty(const int& key, const std::string& value, const int& slot/*= 0*/) { switch (key) { case InputFile: @@ -176,7 +178,7 @@ void FairMQSampler::SetProperty(const Int_t& key, const TString& value, const In } } -TString FairMQSampler::GetProperty(const Int_t& key, const TString& default_/*= ""*/, const Int_t& slot/*= 0*/) +std::string FairMQSampler::GetProperty(const int& key, const std::string& default_/*= ""*/, const int& slot/*= 0*/) { switch (key) { case InputFile: @@ -190,7 +192,7 @@ TString FairMQSampler::GetProperty(const Int_t& key, const TString& default_/*= } } -void FairMQSampler::SetProperty(const Int_t& key, const Int_t& value, const Int_t& slot/*= 0*/) +void FairMQSampler::SetProperty(const int& key, const int& value, const int& slot/*= 0*/) { switch (key) { case EventRate: @@ -202,7 +204,7 @@ void FairMQSampler::SetProperty(const Int_t& key, const Int_t& value, const Int_ } } -Int_t FairMQSampler::GetProperty(const Int_t& key, const Int_t& default_/*= 0*/, const Int_t& slot/*= 0*/) +int FairMQSampler::GetProperty(const int& key, const int& default_/*= 0*/, const int& slot/*= 0*/) { switch (key) { case EventRate: diff --git a/fairmq/FairMQSampler.h b/fairmq/FairMQSampler.h index 53b5084e..695bb362 100644 --- a/fairmq/FairMQSampler.h +++ b/fairmq/FairMQSampler.h @@ -13,7 +13,6 @@ #include "FairTask.h" #include "FairMQDevice.h" #include "FairMQSamplerTask.h" -#include "TString.h" /** @@ -41,19 +40,19 @@ class FairMQSampler: public FairMQDevice void ResetEventCounter(); virtual void ListenToCommands(); - virtual void SetProperty(const Int_t& key, const TString& value, const Int_t& slot = 0); - virtual TString GetProperty(const Int_t& key, const TString& default_ = "", const Int_t& slot = 0); - virtual void SetProperty(const Int_t& key, const Int_t& value, const Int_t& slot = 0); - virtual Int_t GetProperty(const Int_t& key, const Int_t& default_ = 0, const Int_t& slot = 0); + virtual void SetProperty(const int& key, const std::string& value, const int& slot = 0); + virtual std::string GetProperty(const int& key, const std::string& default_ = "", const int& slot = 0); + virtual void SetProperty(const int& key, const int& value, const int& slot = 0); + virtual int GetProperty(const int& key, const int& default_ = 0, const int& slot = 0); protected: FairRunAna* fFairRunAna; - Int_t fNumEvents; + int fNumEvents; FairMQSamplerTask* fSamplerTask; - TString fInputFile; // Filename of a root file containing the simulated digis. - TString fParFile; - TString fBranch; // The name of the sub-detector branch to stream the digis from. - Int_t fEventRate; - Int_t fEventCounter; + std::string fInputFile; // Filename of a root file containing the simulated digis. + std::string fParFile; + std::string fBranch; // The name of the sub-detector branch to stream the digis from. + int fEventRate; + int fEventCounter; virtual void Init(); virtual void Run(); diff --git a/fairmq/FairMQSamplerTask.cxx b/fairmq/FairMQSamplerTask.cxx index a38ecbaa..aa860640 100644 --- a/fairmq/FairMQSamplerTask.cxx +++ b/fairmq/FairMQSamplerTask.cxx @@ -8,11 +8,11 @@ #include "FairMQSamplerTask.h" -FairMQSamplerTask::FairMQSamplerTask(const Text_t* name, Int_t iVerbose) : +FairMQSamplerTask::FairMQSamplerTask(const Text_t* name, int iVerbose) : FairTask(name, iVerbose), fInput(NULL), fBranch(""), - fOutput(new FairMQMessage) + fOutput(new FairMQMessageZMQ) { } @@ -20,7 +20,7 @@ FairMQSamplerTask::FairMQSamplerTask() : FairTask( "Abstract base task used for loading a branch from a root file into memory"), fInput(NULL), fBranch(""), - fOutput(new FairMQMessage) + fOutput(new FairMQMessageZMQ) { } diff --git a/fairmq/FairMQSamplerTask.h b/fairmq/FairMQSamplerTask.h index 286c9ef7..05fcc92f 100644 --- a/fairmq/FairMQSamplerTask.h +++ b/fairmq/FairMQSamplerTask.h @@ -13,6 +13,7 @@ #include "TClonesArray.h" #include #include "FairMQMessage.h" +#include "FairMQMessageZMQ.h" #include "TString.h" @@ -20,7 +21,7 @@ class FairMQSamplerTask: public FairTask { public: FairMQSamplerTask(); - FairMQSamplerTask(const Text_t* name, Int_t iVerbose=1); + FairMQSamplerTask(const Text_t* name, int iVerbose=1); virtual ~FairMQSamplerTask(); virtual InitStatus Init(); virtual void Exec(Option_t* opt) = 0; diff --git a/fairmq/FairMQSink.cxx b/fairmq/FairMQSink.cxx index 3b09b743..43865e55 100644 --- a/fairmq/FairMQSink.cxx +++ b/fairmq/FairMQSink.cxx @@ -17,15 +17,16 @@ FairMQSink::FairMQSink() void FairMQSink::Run() { - void* status; //necessary for pthread_join FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> Run <<<<<<<"); boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this)); while ( fState == RUNNING ) { - FairMQMessage msg; + FairMQMessage* msg = new FairMQMessageZMQ(); - fPayloadInputs->at(0)->Receive(&msg); + fPayloadInputs->at(0)->Receive(msg); + + delete msg; } rateLogger.interrupt(); diff --git a/fairmq/FairMQSocket.cxx b/fairmq/FairMQSocket.cxx index f15e5bc6..87206b8e 100644 --- a/fairmq/FairMQSocket.cxx +++ b/fairmq/FairMQSocket.cxx @@ -4,178 +4,3 @@ * @since 2012-12-05 * @author D. Klein, A. Rybalchenko */ - -#include "FairMQSocket.h" -#include -#include "FairMQLogger.h" - - -FairMQSocket::FairMQSocket(FairMQContext* context, int type, int num) : - fBytesTx(0), - fBytesRx(0), - fMessagesTx(0), - fMessagesRx(0) -{ - std::stringstream id; - id << GetTypeString(type) << "." << num; - fId = id.str(); - - fSocket = zmq_socket(context->GetContext(), type); - int rc = zmq_setsockopt(fSocket, ZMQ_IDENTITY, &fId, fId.Length()); - if (rc != 0) { - std::stringstream logmsg; - logmsg << "failed setting socket option, reason: " << zmq_strerror(errno); - FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); - } - - if (type == ZMQ_SUB) { - rc = zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, NULL, 0); - if (rc != 0) { - std::stringstream logmsg2; - logmsg2 << "failed setting socket option, reason: " << zmq_strerror(errno); - FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg2.str()); - } - } - - std::stringstream logmsg3; - logmsg3 << "created socket #" << fId; - FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg3.str()); -} - -FairMQSocket::~FairMQSocket() -{ -} - -TString FairMQSocket::GetId() -{ - return fId; -} - -TString FairMQSocket::GetTypeString(int type) -{ - switch (type) { - case ZMQ_SUB: - return "sub"; - case ZMQ_PUB: - return "pub"; - case ZMQ_PUSH: - return "push"; - case ZMQ_PULL: - return "pull"; - default: - return ""; - } -} - -void FairMQSocket::Bind(TString address) -{ - std::stringstream logmsg; - logmsg << "bind socket #" << fId << " on " << address; - FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); - - int rc = zmq_bind (fSocket, address); - if (rc != 0) { - std::stringstream logmsg2; - logmsg2 << "failed binding socket #" << fId << ", reason: " << zmq_strerror(errno); - FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg2.str()); - } -} - -void FairMQSocket::Connect(TString address) -{ - std::stringstream logmsg; - logmsg << "connect socket #" << fId << " on " << address; - FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); - - int rc = zmq_connect (fSocket, address); - if (rc != 0) { - std::stringstream logmsg2; - logmsg2 << "failed connecting socket #" << fId << ", reason: " << zmq_strerror(errno); - FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg2.str()); - } -} - -size_t FairMQSocket::Send(FairMQMessage* msg) -{ - int nbytes = zmq_msg_send (msg->GetMessage(), fSocket, 0); - if (nbytes >= 0){ - fBytesTx += nbytes; - ++fMessagesTx; - return nbytes; - } - if (zmq_errno() == EAGAIN){ - return false; - } - std::stringstream logmsg; - logmsg << "failed sending on socket #" << fId << ", reason: " << zmq_strerror(errno); - FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); - return nbytes; -} - -size_t FairMQSocket::Receive(FairMQMessage* msg) -{ - int nbytes = zmq_msg_recv (msg->GetMessage(), fSocket, 0); - if (nbytes >= 0){ - fBytesRx += nbytes; - ++fMessagesRx; - return nbytes; - } - if (zmq_errno() == EAGAIN){ - return false; - } - std::stringstream logmsg; - logmsg << "failed receiving on socket #" << fId << ", reason: " << zmq_strerror(errno); - FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); - return nbytes; -} - -void FairMQSocket::SetOption(int option, const void* value, size_t valueSize) -{ - int rc = zmq_setsockopt(fSocket, option, value, valueSize); - if (rc < 0) { - std::stringstream logmsg; - logmsg << "failed setting socket option, reason: " << zmq_strerror(errno); - FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); - } -} - -void FairMQSocket::Close() -{ - if (fSocket == NULL){ - return; - } - - int rc = zmq_close (fSocket); - if (rc != 0) { - std::stringstream logmsg; - logmsg << "failed closing socket, reason: " << zmq_strerror(errno); - FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); - } - - fSocket = NULL; -} - -void* FairMQSocket::GetSocket() -{ - return fSocket; -} - -ULong_t FairMQSocket::GetBytesTx() -{ - return fBytesTx; -} - -ULong_t FairMQSocket::GetBytesRx() -{ - return fBytesRx; -} - -ULong_t FairMQSocket::GetMessagesTx() -{ - return fMessagesTx; -} - -ULong_t FairMQSocket::GetMessagesRx() -{ - return fMessagesRx; -} diff --git a/fairmq/FairMQSocket.h b/fairmq/FairMQSocket.h index 71d8da38..7cbaa30b 100644 --- a/fairmq/FairMQSocket.h +++ b/fairmq/FairMQSocket.h @@ -8,42 +8,34 @@ #ifndef FAIRMQSOCKET_H_ #define FAIRMQSOCKET_H_ -#include #include #include "FairMQContext.h" #include "FairMQMessage.h" -#include "Rtypes.h" -#include "TString.h" +#include "FairMQMessageZMQ.h" class FairMQSocket { public: - FairMQSocket(FairMQContext* context, int type, int num); - virtual ~FairMQSocket(); - TString GetId(); - static TString GetTypeString(int type); - size_t Send(FairMQMessage* msg); - size_t Receive(FairMQMessage* msg); - void Close(); - void Bind(TString address); - void Connect(TString address); - void* GetSocket(); + virtual std::string GetId() = 0; - void SetOption(int option, const void* value, size_t valueSize); + virtual void Bind(std::string address) = 0; + virtual void Connect(std::string address) = 0; - ULong_t GetBytesTx(); - ULong_t GetBytesRx(); - ULong_t GetMessagesTx(); - ULong_t GetMessagesRx(); + virtual size_t Send(FairMQMessage* msg) = 0; + virtual size_t Receive(FairMQMessage* msg) = 0; - private: - void* fSocket; - TString fId; - ULong_t fBytesTx; - ULong_t fBytesRx; - ULong_t fMessagesTx; - ULong_t fMessagesRx; + virtual void Close() = 0; + virtual void* GetSocket() = 0; + + virtual void SetOption(int option, const void* value, size_t valueSize) = 0; + + virtual unsigned long GetBytesTx() = 0; + virtual unsigned long GetBytesRx() = 0; + virtual unsigned long GetMessagesTx() = 0; + virtual unsigned long GetMessagesRx() = 0; + + virtual ~FairMQSocket() {}; }; #endif /* FAIRMQSOCKET_H_ */ diff --git a/fairmq/FairMQSocketZMQ.cxx b/fairmq/FairMQSocketZMQ.cxx index e69de29b..afbea8c9 100644 --- a/fairmq/FairMQSocketZMQ.cxx +++ b/fairmq/FairMQSocketZMQ.cxx @@ -0,0 +1,182 @@ +/** + * FairMQSocketZMQ.cxx + * + * @since 2012-12-05 + * @author D. Klein, A. Rybalchenko + */ + +#include + +#include "FairMQSocketZMQ.h" +#include "FairMQLogger.h" + + +FairMQSocketZMQ::FairMQSocketZMQ(FairMQContext* context, int type, int num) : + fBytesTx(0), + fBytesRx(0), + fMessagesTx(0), + fMessagesRx(0) +{ + std::stringstream id; + id << GetTypeString(type) << "." << num; + fId = id.str(); + + fSocket = zmq_socket(context->GetContext(), type); + int rc = zmq_setsockopt(fSocket, ZMQ_IDENTITY, &fId, fId.length()); + if (rc != 0) { + std::stringstream logmsg; + logmsg << "failed setting socket option, reason: " << zmq_strerror(errno); + FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); + } + + if (type == ZMQ_SUB) { + rc = zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, NULL, 0); + if (rc != 0) { + std::stringstream logmsg2; + logmsg2 << "failed setting socket option, reason: " << zmq_strerror(errno); + FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg2.str()); + } + } + + std::stringstream logmsg3; + logmsg3 << "created socket #" << fId; + FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg3.str()); +} + +std::string FairMQSocketZMQ::GetId() +{ + return fId; +} + +std::string FairMQSocketZMQ::GetTypeString(int type) +{ + switch (type) { + case ZMQ_SUB: + return "sub"; + case ZMQ_PUB: + return "pub"; + case ZMQ_PUSH: + return "push"; + case ZMQ_PULL: + return "pull"; + default: + return ""; + } +} + +void FairMQSocketZMQ::Bind(std::string address) +{ + std::stringstream logmsg; + logmsg << "bind socket #" << fId << " on " << address; + FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); + + int rc = zmq_bind (fSocket, address.c_str()); + if (rc != 0) { + std::stringstream logmsg2; + logmsg2 << "failed binding socket #" << fId << ", reason: " << zmq_strerror(errno); + FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg2.str()); + } +} + +void FairMQSocketZMQ::Connect(std::string address) +{ + std::stringstream logmsg; + logmsg << "connect socket #" << fId << " on " << address; + FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); + + int rc = zmq_connect (fSocket, address.c_str()); + if (rc != 0) { + std::stringstream logmsg2; + logmsg2 << "failed connecting socket #" << fId << ", reason: " << zmq_strerror(errno); + FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg2.str()); + } +} + +size_t FairMQSocketZMQ::Send(FairMQMessage* msg) +{ + int nbytes = zmq_msg_send (static_cast(msg->GetMessage()), fSocket, 0); + if (nbytes >= 0){ + fBytesTx += nbytes; + ++fMessagesTx; + return nbytes; + } + if (zmq_errno() == EAGAIN){ + return false; + } + std::stringstream logmsg; + logmsg << "failed sending on socket #" << fId << ", reason: " << zmq_strerror(errno); + FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); + return nbytes; +} + +size_t FairMQSocketZMQ::Receive(FairMQMessage* msg) +{ + int nbytes = zmq_msg_recv (static_cast(msg->GetMessage()), fSocket, 0); + if (nbytes >= 0){ + fBytesRx += nbytes; + ++fMessagesRx; + return nbytes; + } + if (zmq_errno() == EAGAIN){ + return false; + } + std::stringstream logmsg; + logmsg << "failed receiving on socket #" << fId << ", reason: " << zmq_strerror(errno); + FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); + return nbytes; +} + +void FairMQSocketZMQ::SetOption(int option, const void* value, size_t valueSize) +{ + int rc = zmq_setsockopt(fSocket, option, value, valueSize); + if (rc < 0) { + std::stringstream logmsg; + logmsg << "failed setting socket option, reason: " << zmq_strerror(errno); + FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); + } +} + +void FairMQSocketZMQ::Close() +{ + if (fSocket == NULL){ + return; + } + + int rc = zmq_close (fSocket); + if (rc != 0) { + std::stringstream logmsg; + logmsg << "failed closing socket, reason: " << zmq_strerror(errno); + FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); + } + + fSocket = NULL; +} + +void* FairMQSocketZMQ::GetSocket() +{ + return fSocket; +} + +unsigned long FairMQSocketZMQ::GetBytesTx() +{ + return fBytesTx; +} + +unsigned long FairMQSocketZMQ::GetBytesRx() +{ + return fBytesRx; +} + +unsigned long FairMQSocketZMQ::GetMessagesTx() +{ + return fMessagesTx; +} + +unsigned long FairMQSocketZMQ::GetMessagesRx() +{ + return fMessagesRx; +} + +FairMQSocketZMQ::~FairMQSocketZMQ() +{ +} \ No newline at end of file diff --git a/fairmq/FairMQSocketZMQ.h b/fairmq/FairMQSocketZMQ.h index e69de29b..76ba4012 100644 --- a/fairmq/FairMQSocketZMQ.h +++ b/fairmq/FairMQSocketZMQ.h @@ -0,0 +1,53 @@ +/** + * FairMQSocketZMQ.h + * + * @since 2012-12-05 + * @author D. Klein, A. Rybalchenko + */ + +#ifndef FAIRMQSOCKETZMQ_H_ +#define FAIRMQSOCKETZMQ_H_ + +#include + +#include "FairMQSocket.h" +#include "FairMQContext.h" +#include "FairMQMessageZMQ.h" + + +class FairMQSocketZMQ : public FairMQSocket +{ + public: + FairMQSocketZMQ(FairMQContext* context, int type, int num); + + virtual std::string GetId(); + + virtual void Bind(std::string address); + virtual void Connect(std::string address); + + virtual size_t Send(FairMQMessage* msg); + virtual size_t Receive(FairMQMessage* msg); + virtual void Close(); + virtual void* GetSocket(); + + virtual void SetOption(int option, const void* value, size_t valueSize); + + virtual unsigned long GetBytesTx(); + virtual unsigned long GetBytesRx(); + virtual unsigned long GetMessagesTx(); + virtual unsigned long GetMessagesRx(); + + static std::string GetTypeString(int type); + + virtual ~FairMQSocketZMQ(); + + private: + void* fSocket; + std::string fId; + unsigned long fBytesTx; + unsigned long fBytesRx; + unsigned long fMessagesTx; + unsigned long fMessagesRx; +}; + +#endif /* FAIRMQSOCKETZMQ_H_ */ diff --git a/fairmq/FairMQSplitter.cxx b/fairmq/FairMQSplitter.cxx index e6cd32fb..3ee2703c 100644 --- a/fairmq/FairMQSplitter.cxx +++ b/fairmq/FairMQSplitter.cxx @@ -30,18 +30,20 @@ void FairMQSplitter::Run() int direction = 0; while ( fState == RUNNING ) { - FairMQMessage msg; + FairMQMessage* msg = new FairMQMessageZMQ(); - received = fPayloadInputs->at(0)->Receive(&msg); + received = fPayloadInputs->at(0)->Receive(msg); if (received) { - fPayloadOutputs->at(direction)->Send(&msg); + fPayloadOutputs->at(direction)->Send(msg); direction++; if (direction >= fNumOutputs) { direction = 0; } received = false; } + + delete msg; } rateLogger.interrupt(); diff --git a/fairmq/runBenchmarkSampler.cxx b/fairmq/runBenchmarkSampler.cxx index bbcce1a1..3c860424 100644 --- a/fairmq/runBenchmarkSampler.cxx +++ b/fairmq/runBenchmarkSampler.cxx @@ -10,6 +10,7 @@ #include "FairMQLogger.h" #include "FairMQBenchmarkSampler.h" +#include "FairMQTransportFactoryZMQ.h" FairMQBenchmarkSampler sampler; @@ -50,6 +51,9 @@ int main(int argc, char** argv) logmsg << "PID: " << getpid(); FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); + sampler.SetTransport(transportFactory); + int i = 1; sampler.SetProperty(FairMQBenchmarkSampler::Id, argv[i]); diff --git a/fairmq/runBuffer.cxx b/fairmq/runBuffer.cxx index e75a0b08..8f57afdb 100644 --- a/fairmq/runBuffer.cxx +++ b/fairmq/runBuffer.cxx @@ -10,6 +10,7 @@ #include "FairMQLogger.h" #include "FairMQBuffer.h" +#include "FairMQTransportFactoryZMQ.h" FairMQBuffer buffer; @@ -50,6 +51,9 @@ int main(int argc, char** argv) logmsg << "PID: " << getpid(); FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); + buffer.SetTransport(transportFactory); + int i = 1; buffer.SetProperty(FairMQBuffer::Id, argv[i]); diff --git a/fairmq/runMerger.cxx b/fairmq/runMerger.cxx index 9f44ae6a..a7b1f98b 100644 --- a/fairmq/runMerger.cxx +++ b/fairmq/runMerger.cxx @@ -10,6 +10,7 @@ #include "FairMQLogger.h" #include "FairMQMerger.h" +#include "FairMQTransportFactoryZMQ.h" FairMQMerger merger; @@ -51,6 +52,9 @@ int main(int argc, char** argv) logmsg << "PID: " << getpid(); FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); + merger.SetTransport(transportFactory); + int i = 1; merger.SetProperty(FairMQMerger::Id, argv[i]); diff --git a/fairmq/runNToOneMerger.cxx b/fairmq/runNToOneMerger.cxx index 6e89b39d..cd7c38f6 100644 --- a/fairmq/runNToOneMerger.cxx +++ b/fairmq/runNToOneMerger.cxx @@ -10,6 +10,7 @@ #include "FairMQLogger.h" #include "FairMQMerger.h" +#include "FairMQTransportFactoryZMQ.h" FairMQMerger merger; @@ -53,6 +54,9 @@ int main(int argc, char** argv) logmsg << "PID: " << getpid(); FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); + merger.SetTransport(transportFactory); + int i = 1; merger.SetProperty(FairMQMerger::Id, argv[i]); diff --git a/fairmq/runOneToNSplitter.cxx b/fairmq/runOneToNSplitter.cxx index ba54f533..cba07895 100644 --- a/fairmq/runOneToNSplitter.cxx +++ b/fairmq/runOneToNSplitter.cxx @@ -10,6 +10,7 @@ #include "FairMQLogger.h" #include "FairMQSplitter.h" +#include "FairMQTransportFactoryZMQ.h" FairMQSplitter splitter; @@ -52,6 +53,9 @@ int main(int argc, char** argv) logmsg << "PID: " << getpid(); FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); + splitter.SetTransport(transportFactory); + int i = 1; splitter.SetProperty(FairMQSplitter::Id, argv[i]); diff --git a/fairmq/runProxy.cxx b/fairmq/runProxy.cxx index ef814c66..e66918da 100644 --- a/fairmq/runProxy.cxx +++ b/fairmq/runProxy.cxx @@ -10,6 +10,7 @@ #include "FairMQLogger.h" #include "FairMQProxy.h" +#include "FairMQTransportFactoryZMQ.h" FairMQProxy proxy; @@ -50,6 +51,9 @@ int main(int argc, char** argv) logmsg << "PID: " << getpid(); FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); + proxy.SetTransport(transportFactory); + int i = 1; proxy.SetProperty(FairMQProxy::Id, argv[i]); diff --git a/fairmq/runSink.cxx b/fairmq/runSink.cxx index 55d2b444..c055eb34 100644 --- a/fairmq/runSink.cxx +++ b/fairmq/runSink.cxx @@ -10,6 +10,7 @@ #include "FairMQLogger.h" #include "FairMQSink.h" +#include "FairMQTransportFactoryZMQ.h" FairMQSink sink; @@ -50,6 +51,9 @@ int main(int argc, char** argv) logmsg << "PID: " << getpid(); FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); + sink.SetTransport(transportFactory); + int i = 1; sink.SetProperty(FairMQSink::Id, argv[i]); diff --git a/fairmq/runSplitter.cxx b/fairmq/runSplitter.cxx index 858c553c..da33cafb 100644 --- a/fairmq/runSplitter.cxx +++ b/fairmq/runSplitter.cxx @@ -10,6 +10,7 @@ #include "FairMQLogger.h" #include "FairMQSplitter.h" +#include "FairMQTransportFactoryZMQ.h" FairMQSplitter splitter; @@ -51,6 +52,9 @@ int main(int argc, char** argv) logmsg << "PID: " << getpid(); FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); + splitter.SetTransport(transportFactory); + int i = 1; splitter.SetProperty(FairMQSplitter::Id, argv[i]);