diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index 989d7b69..ba3dd4ae 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -1,35 +1,14 @@ -INCLUDE_DIRECTORIES( +include_directories( ${BASE_INCLUDE_DIRECTORIES} ${CMAKE_SOURCE_DIR}/fairmq ${ZMQ_INCLUDE_DIR} ${ROOT_INCLUDE_DIR} ) -include_directories(${INCLUDE_DIRECTORIES}) - -SET(HEADERS - "FairMQSampler.h" - "FairMQBenchmarkSampler.h" - #"FairMQStateMachine.h" - "FairMQConfigurable.h" - "FairMQDevice.h" - "FairMQBuffer.h" - "FairMQSamplerTask.h" - "FairMQLogger.h" - "FairMQContext.h" - "FairMQMessage.h" - "FairMQSocket.h" - "FairMQBalancedStandaloneSplitter.h" - "FairMQStandaloneMerger.h" - "FairMQProcessor.h" - "FairMQProcessorTask.h" - "FairMQSink.h" -) - -SET(SOURCES +Set(SRCS "FairMQSampler.cxx" "FairMQBenchmarkSampler.cxx" - #"FairMQStateMachine.cxx" + "FairMQStateMachine.cxx" "FairMQConfigurable.cxx" "FairMQBuffer.cxx" "FairMQSamplerTask.cxx" @@ -43,31 +22,28 @@ SET(SOURCES "FairMQProcessorTask.cxx" "FairMQSink.cxx" "FairMQDevice.cxx" + "FairMQProxy.cxx" ) -set(LINK_DIRECTORIES -${ROOT_LIBRARY_DIR} +Set(LINK_DIRECTORIES + ${ROOT_LIBRARY_DIR} + ${Boost_LIBRARY_DIRS} ) -link_directories( ${LINK_DIRECTORIES}) +link_directories(${LINK_DIRECTORIES}) -SET(LINK_LIBRARIES -) - - -Set(SRCS ${SOURCES}) Set(LIBRARY_NAME FairMQ) Set(LINKDEF) -Set(DEPENDENCIES +Set(DEPENDENCIES ${CMAKE_THREAD_LIBS_INIT} ${ZMQ_LIBRARY_SHARED} - Base ParBase FairTools GeoBase - ) + Base ParBase FairTools GeoBase boost_thread boost_timer boost_system +) GENERATE_LIBRARY() -Set(Exe_Names bsampler buffer splitter merger sink) -Set(Exe_Source runBenchmarkSampler.cxx runBuffer.cxx runSplitter.cxx runMerger.cxx runSink.cxx) +Set(Exe_Names bsampler buffer splitter merger sink proxy) +Set(Exe_Source runBenchmarkSampler.cxx runBuffer.cxx runSplitter.cxx runMerger.cxx runSink.cxx runProxy.cxx) List(LENGTH Exe_Names _length) Math(EXPR _length ${_length}-1) diff --git a/fairmq/FairMQBalancedStandaloneSplitter.cxx b/fairmq/FairMQBalancedStandaloneSplitter.cxx index f2c60fda..667e69cf 100644 --- a/fairmq/FairMQBalancedStandaloneSplitter.cxx +++ b/fairmq/FairMQBalancedStandaloneSplitter.cxx @@ -5,9 +5,11 @@ * Author: dklein */ -#include "FairMQBalancedStandaloneSplitter.h" +#include +#include #include "FairMQLogger.h" +#include "FairMQBalancedStandaloneSplitter.h" FairMQBalancedStandaloneSplitter::FairMQBalancedStandaloneSplitter() { @@ -19,11 +21,9 @@ FairMQBalancedStandaloneSplitter::~FairMQBalancedStandaloneSplitter() void FairMQBalancedStandaloneSplitter::Run() { - void* status; //necessary for pthread_join FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> Run <<<<<<<"); - pthread_t logger; - pthread_create(&logger, NULL, &FairMQDevice::callLogSocketRates, this); + boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this)); // Initialize poll set zmq_pollitem_t items[] = { @@ -31,27 +31,27 @@ void FairMQBalancedStandaloneSplitter::Run() }; Bool_t received = false; - Bool_t direction = false; - while (true) { + Int_t direction = 0; + + while ( fState == RUNNING ) { FairMQMessage msg; - zmq_poll(items, 1, -1); + zmq_poll(items, 1, 100); if (items[0].revents & ZMQ_POLLIN) { received = fPayloadInputs->at(0)->Receive(&msg); } if (received) { - if (direction) { - fPayloadOutputs->at(0)->Send(&msg); - } else { - fPayloadOutputs->at(1)->Send(&msg); + fPayloadOutputs->at(direction)->Send(&msg); + direction++; + if (direction >= fNumOutputs) { + direction = 0; } - direction = !direction; - } + received = false; + }//if received } - pthread_join(logger, &status); + rateLogger.interrupt(); + rateLogger.join(); } - - diff --git a/fairmq/FairMQBalancedStandaloneSplitter.h b/fairmq/FairMQBalancedStandaloneSplitter.h index aeb294f9..7ec35530 100644 --- a/fairmq/FairMQBalancedStandaloneSplitter.h +++ b/fairmq/FairMQBalancedStandaloneSplitter.h @@ -17,6 +17,7 @@ class FairMQBalancedStandaloneSplitter: public FairMQDevice public: FairMQBalancedStandaloneSplitter(); virtual ~FairMQBalancedStandaloneSplitter(); + protected: virtual void Run(); }; diff --git a/fairmq/FairMQBenchmarkSampler.cxx b/fairmq/FairMQBenchmarkSampler.cxx index ce3f7f5c..97dca4df 100644 --- a/fairmq/FairMQBenchmarkSampler.cxx +++ b/fairmq/FairMQBenchmarkSampler.cxx @@ -5,7 +5,9 @@ * Author: dklein */ #include -#include + +#include +#include #include "FairMQBenchmarkSampler.h" #include "FairMQLogger.h" @@ -28,20 +30,16 @@ void FairMQBenchmarkSampler::Init() void FairMQBenchmarkSampler::Run() { - void* status; //necessary for pthread_join FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> Run <<<<<<<"); - usleep(1000000); + //boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); - pthread_t logger; - pthread_create(&logger, NULL, &FairMQDevice::callLogSocketRates, this); - - pthread_t resetEventCounter; - pthread_create(&resetEventCounter, NULL, &FairMQBenchmarkSampler::callResetEventCounter, this); + boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this)); + boost::thread resetEventCounter(boost::bind(&FairMQBenchmarkSampler::ResetEventCounter, this)); void* buffer = operator new[](fEventSize); FairMQMessage* base_event = new FairMQMessage(buffer, fEventSize, NULL); - while (true) { + while ( fState == RUNNING ) { FairMQMessage event; event.Copy(base_event); @@ -50,22 +48,28 @@ void FairMQBenchmarkSampler::Run() --fEventCounter; while (fEventCounter == 0) { - usleep(1000); + boost::this_thread::sleep(boost::posix_time::milliseconds(1)); } } delete base_event; - pthread_join(logger, &status); - pthread_join(resetEventCounter, &status); + rateLogger.interrupt(); + resetEventCounter.interrupt(); + + rateLogger.join(); + resetEventCounter.join(); } -void* FairMQBenchmarkSampler::ResetEventCounter() +void FairMQBenchmarkSampler::ResetEventCounter() { - while (true) { - fEventCounter = fEventRate / 100; - - usleep(10000); + while ( true ) { + try { + fEventCounter = fEventRate / 100; + boost::this_thread::sleep(boost::posix_time::milliseconds(10)); + } catch (boost::thread_interrupted&) { + break; + } } } @@ -83,7 +87,7 @@ void FairMQBenchmarkSampler::Log(Int_t intervalInMs) t0 = get_timestamp(); while (true) { - usleep(intervalInMs * 1000); + boost::this_thread::sleep(boost::posix_time::milliseconds(intervalInMs)); t1 = get_timestamp(); @@ -105,7 +109,7 @@ void FairMQBenchmarkSampler::Log(Int_t intervalInMs) } } -void FairMQBenchmarkSampler::SetProperty(Int_t key, TString value, Int_t slot/*= 0*/) +void FairMQBenchmarkSampler::SetProperty(const Int_t& key, const TString& value, const Int_t& slot/*= 0*/) { switch (key) { default: @@ -114,7 +118,7 @@ void FairMQBenchmarkSampler::SetProperty(Int_t key, TString value, Int_t slot/*= } } -TString FairMQBenchmarkSampler::GetProperty(Int_t key, TString default_/*= ""*/, Int_t slot/*= 0*/) +TString FairMQBenchmarkSampler::GetProperty(const Int_t& key, const TString& default_/*= ""*/, const Int_t& slot/*= 0*/) { switch (key) { default: @@ -122,7 +126,7 @@ TString FairMQBenchmarkSampler::GetProperty(Int_t key, TString default_/*= ""*/, } } -void FairMQBenchmarkSampler::SetProperty(Int_t key, Int_t value, Int_t slot/*= 0*/) +void FairMQBenchmarkSampler::SetProperty(const Int_t& key, const Int_t& value, const Int_t& slot/*= 0*/) { switch (key) { case EventSize: @@ -137,7 +141,7 @@ void FairMQBenchmarkSampler::SetProperty(Int_t key, Int_t value, Int_t slot/*= 0 } } -Int_t FairMQBenchmarkSampler::GetProperty(Int_t key, Int_t default_/*= 0*/, Int_t slot/*= 0*/) +Int_t FairMQBenchmarkSampler::GetProperty(const Int_t& key, const Int_t& default_/*= 0*/, const Int_t& slot/*= 0*/) { switch (key) { case EventSize: diff --git a/fairmq/FairMQBenchmarkSampler.h b/fairmq/FairMQBenchmarkSampler.h index 9f5613f2..14a67e68 100644 --- a/fairmq/FairMQBenchmarkSampler.h +++ b/fairmq/FairMQBenchmarkSampler.h @@ -18,10 +18,6 @@ class FairMQBenchmarkSampler: public FairMQDevice { - protected: - Int_t fEventSize; - Int_t fEventRate; - Int_t fEventCounter; public: enum { InputFile = FairMQDevice::Last, @@ -31,15 +27,18 @@ class FairMQBenchmarkSampler: public FairMQDevice }; FairMQBenchmarkSampler(); virtual ~FairMQBenchmarkSampler(); + void Log(Int_t intervalInMs); + void ResetEventCounter(); + virtual void SetProperty(const Int_t& key, const TString& value, const Int_t& slot = 0); + virtual TString GetProperty(const Int_t& key, const TString& default_ = "", const Int_t& slot = 0); + virtual void SetProperty(const Int_t& key, const Int_t& value, const Int_t& slot = 0); + virtual Int_t GetProperty(const Int_t& key, const Int_t& default_ = 0, const Int_t& slot = 0); + protected: + Int_t fEventSize; + Int_t fEventRate; + Int_t fEventCounter; virtual void Init(); virtual void Run(); - void Log(Int_t intervalInMs); - void* ResetEventCounter(); - static void* callResetEventCounter(void* arg) { return ((FairMQBenchmarkSampler*)arg)->ResetEventCounter(); } - virtual void SetProperty(Int_t key, TString value, Int_t slot = 0); - virtual TString GetProperty(Int_t key, TString default_ = "", Int_t slot = 0); - virtual void SetProperty(Int_t key, Int_t value, Int_t slot = 0); - virtual Int_t GetProperty(Int_t key, Int_t default_ = 0, Int_t slot = 0); }; #endif /* FAIRMQBENCHMARKSAMPLER_H_ */ diff --git a/fairmq/FairMQBuffer.cxx b/fairmq/FairMQBuffer.cxx index 9a466f77..7b4f9a23 100644 --- a/fairmq/FairMQBuffer.cxx +++ b/fairmq/FairMQBuffer.cxx @@ -5,10 +5,13 @@ * Author: dklein */ -#include "FairMQBuffer.h" #include -#include "FairMQLogger.h" +#include +#include + +#include "FairMQBuffer.h" +#include "FairMQLogger.h" FairMQBuffer::FairMQBuffer() { @@ -16,11 +19,9 @@ FairMQBuffer::FairMQBuffer() void FairMQBuffer::Run() { - void* status; //necessary for pthread_join FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> Run <<<<<<<"); - pthread_t logger; - pthread_create(&logger, NULL, &FairMQDevice::callLogSocketRates, this); + boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this)); // Initialize poll set zmq_pollitem_t items[] = { @@ -28,10 +29,10 @@ void FairMQBuffer::Run() }; Bool_t received = false; - while (true) { + while ( fState == RUNNING ) { FairMQMessage msg; - zmq_poll(items, 1, -1); + zmq_poll(items, 1, 100); if (items[0].revents & ZMQ_POLLIN) { received = fPayloadInputs->at(0)->Receive(&msg); @@ -39,10 +40,12 @@ void FairMQBuffer::Run() if (received) { fPayloadOutputs->at(0)->Send(&msg); + received = false; } } - pthread_join(logger, &status); + rateLogger.interrupt(); + rateLogger.join(); } FairMQBuffer::~FairMQBuffer() diff --git a/fairmq/FairMQBuffer.h b/fairmq/FairMQBuffer.h index 428ef3d3..76e37978 100644 --- a/fairmq/FairMQBuffer.h +++ b/fairmq/FairMQBuffer.h @@ -14,11 +14,11 @@ class FairMQBuffer: public FairMQDevice { - private: public: FairMQBuffer(); - virtual void Run(); virtual ~FairMQBuffer(); + protected: + virtual void Run(); }; #endif /* FAIRMQBUFFER_H_ */ diff --git a/fairmq/FairMQConfigurable.cxx b/fairmq/FairMQConfigurable.cxx index d6d6a9b8..1bc0114c 100644 --- a/fairmq/FairMQConfigurable.cxx +++ b/fairmq/FairMQConfigurable.cxx @@ -12,20 +12,20 @@ FairMQConfigurable::FairMQConfigurable() { } -void FairMQConfigurable::SetProperty(Int_t key, TString value, Int_t slot/*= 0*/) +void FairMQConfigurable::SetProperty(const Int_t& key, const TString& value, const Int_t& slot/*= 0*/) { } -TString FairMQConfigurable::GetProperty(Int_t key, TString default_/*= ""*/, Int_t slot/*= 0*/) +TString FairMQConfigurable::GetProperty(const Int_t& key, const TString& default_/*= ""*/, const Int_t& slot/*= 0*/) { return default_; } -void FairMQConfigurable::SetProperty(Int_t key, Int_t value, Int_t slot/*= 0*/) +void FairMQConfigurable::SetProperty(const Int_t& key, const Int_t& value, const Int_t& slot/*= 0*/) { } -Int_t FairMQConfigurable::GetProperty(Int_t key, Int_t default_/*= 0*/, Int_t slot/*= 0*/) +Int_t FairMQConfigurable::GetProperty(const Int_t& key, const Int_t& default_/*= 0*/, const Int_t& slot/*= 0*/) { return default_; } diff --git a/fairmq/FairMQConfigurable.h b/fairmq/FairMQConfigurable.h index 103325a9..1854f404 100644 --- a/fairmq/FairMQConfigurable.h +++ b/fairmq/FairMQConfigurable.h @@ -19,10 +19,10 @@ class FairMQConfigurable Last = 1 }; FairMQConfigurable(); - virtual void SetProperty(Int_t key, TString value, Int_t slot = 0); - virtual TString GetProperty(Int_t key, TString default_ = "", Int_t slot = 0); - virtual void SetProperty(Int_t key, Int_t value, Int_t slot = 0); - virtual Int_t GetProperty(Int_t key, Int_t default_ = 0, Int_t slot = 0); + virtual void SetProperty(const Int_t& key, const TString& value, const Int_t& slot = 0); + virtual TString GetProperty(const Int_t& key, const TString& default_ = "", const Int_t& slot = 0); + virtual void SetProperty(const Int_t& key, const Int_t& value, const Int_t& slot = 0); + virtual Int_t GetProperty(const Int_t& key, const Int_t& default_ = 0, const Int_t& slot = 0); virtual ~FairMQConfigurable(); }; diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index a4014285..5ca70e1e 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -2,12 +2,15 @@ * FairMQDevice.cxx * * @since Oct 25, 2012 - * @authors: D. Klein + * @authors: D. Klein, A. Rybalchenko */ +#include + +#include + #include "FairMQSocket.h" #include "FairMQDevice.h" - #include "FairMQLogger.h" FairMQDevice::FairMQDevice() : @@ -18,7 +21,6 @@ FairMQDevice::FairMQDevice() : fPayloadOutputs(new std::vector()), fLogIntervalInMs(1000) { - } void FairMQDevice::Init() @@ -30,75 +32,71 @@ void FairMQDevice::Init() fPayloadContext = new FairMQContext(fId, FairMQContext::PAYLOAD, fNumIoThreads); - fBindAddress = new std::vector(fNumOutputs); - fBindSocketType = new std::vector(); - fBindSndBufferSize = new std::vector(); - fBindRcvBufferSize = new std::vector(); - - for (Int_t i = 0; i < fNumOutputs; ++i) { - fBindSocketType->push_back(ZMQ_PUB); - fBindSndBufferSize->push_back(10000); - fBindRcvBufferSize->push_back(10000); - } - - fConnectAddress = new std::vector(fNumInputs); - fConnectSocketType = new std::vector(); - fConnectSndBufferSize = new std::vector(); - fConnectRcvBufferSize = new std::vector(); + fInputAddress = new std::vector(fNumInputs); + fInputMethod = new std::vector(); + fInputSocketType = new std::vector(); + fInputSndBufSize = new std::vector(); + fInputRcvBufSize = new std::vector(); for (Int_t i = 0; i < fNumInputs; ++i) { - fConnectSocketType->push_back(ZMQ_SUB); - fConnectSndBufferSize->push_back(10000); - fConnectRcvBufferSize->push_back(10000); + fInputMethod->push_back("connect"); // default value, can be overwritten in configuration + fInputSocketType->push_back(ZMQ_SUB); // default value, can be overwritten in configuration + fInputSndBufSize->push_back(10000); // default value, can be overwritten in configuration + fInputRcvBufSize->push_back(10000); // default value, can be overwritten in configuration + } + + fOutputAddress = new std::vector(fNumOutputs); + fOutputMethod = new std::vector(); + fOutputSocketType = new std::vector(); + fOutputSndBufSize = new std::vector(); + fOutputRcvBufSize = new std::vector(); + + for (Int_t i = 0; i < fNumOutputs; ++i) { + fOutputMethod->push_back("bind"); // default value, can be overwritten in configuration + fOutputSocketType->push_back(ZMQ_PUB); // default value, can be overwritten in configuration + fOutputSndBufSize->push_back(10000); // default value, can be overwritten in configuration + fOutputRcvBufSize->push_back(10000); // default value, can be overwritten in configuration } } -void FairMQDevice::Bind() +void FairMQDevice::InitInput() { - FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> Bind <<<<<<<"); - - for (Int_t i = 0; i < fNumOutputs; ++i) { - FairMQSocket* socket = new FairMQSocket(fPayloadContext, fBindSocketType->at(i), i); - socket->GetSocket()->setsockopt(ZMQ_SNDHWM, &fBindSndBufferSize->at(i), sizeof(fBindSndBufferSize->at(i))); - socket->GetSocket()->setsockopt(ZMQ_RCVHWM, &fBindRcvBufferSize->at(i), sizeof(fBindRcvBufferSize->at(i))); - fPayloadOutputs->push_back(socket); - } - - Int_t i = 0; - - for( std::vector::iterator itr = fPayloadOutputs->begin(); itr != fPayloadOutputs->end(); itr++ ) { - try { - (*itr)->Bind(fBindAddress->at(i)); - } catch (std::out_of_range& e) { - } - - ++i; - } - -} - -void FairMQDevice::Connect() -{ - FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> Connect <<<<<<<"); + FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> InitInput <<<<<<<"); for (Int_t i = 0; i < fNumInputs; ++i) { - FairMQSocket* socket = new FairMQSocket(fPayloadContext, fConnectSocketType->at(i), i); - socket->GetSocket()->setsockopt(ZMQ_SNDHWM, &fConnectSndBufferSize->at(i), sizeof(fConnectSndBufferSize->at(i))); - socket->GetSocket()->setsockopt(ZMQ_RCVHWM, &fConnectRcvBufferSize->at(i), sizeof(fConnectRcvBufferSize->at(i))); + FairMQSocket* socket = new FairMQSocket(fPayloadContext, fInputSocketType->at(i), i); + socket->GetSocket()->setsockopt(ZMQ_SNDHWM, &fInputSndBufSize->at(i), sizeof(fInputSndBufSize->at(i))); + socket->GetSocket()->setsockopt(ZMQ_RCVHWM, &fInputRcvBufSize->at(i), sizeof(fInputRcvBufSize->at(i))); fPayloadInputs->push_back(socket); - } - - Int_t i = 0; - - for( std::vector::iterator itr = fPayloadInputs->begin(); itr != fPayloadInputs->end(); itr++ ) { try { - (*itr)->Connect(fConnectAddress->at(i)); + if (fInputMethod->at(i) == "bind") { + fPayloadInputs->at(i)->Bind(fInputAddress->at(i)); + } else { + fPayloadInputs->at(i)->Connect(fInputAddress->at(i)); + } } catch (std::out_of_range& e) { } - - ++i; } +} +void FairMQDevice::InitOutput() +{ + FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> InitOutput <<<<<<<"); + + for (Int_t i = 0; i < fNumOutputs; ++i) { + FairMQSocket* socket = new FairMQSocket(fPayloadContext, fOutputSocketType->at(i), i); + socket->GetSocket()->setsockopt(ZMQ_SNDHWM, &fOutputSndBufSize->at(i), sizeof(fOutputSndBufSize->at(i))); + socket->GetSocket()->setsockopt(ZMQ_RCVHWM, &fOutputRcvBufSize->at(i), sizeof(fOutputRcvBufSize->at(i))); + fPayloadOutputs->push_back(socket); + try { + if (fOutputMethod->at(i) == "bind") { + fPayloadOutputs->at(i)->Bind(fOutputAddress->at(i)); + } else { + fPayloadOutputs->at(i)->Connect(fOutputAddress->at(i)); + } + } catch (std::out_of_range& e) { + } + } } void FairMQDevice::Run() @@ -109,20 +107,126 @@ void FairMQDevice::Pause() { } -void FairMQDevice::Shutdown() +// Method for setting properties represented as a string. +void FairMQDevice::SetProperty(const Int_t& key, const TString& value, const Int_t& slot/*= 0*/) { - for( std::vector::iterator itr = fPayloadInputs->begin(); itr != fPayloadInputs->end(); itr++ ) { - (*itr)->Close(); + switch (key) { + case Id: + fId = value; + break; + case InputAddress: + fInputAddress->erase(fInputAddress->begin() + slot); + fInputAddress->insert(fInputAddress->begin() + slot, value); + break; + case OutputAddress: + fOutputAddress->erase(fOutputAddress->begin() + slot); + fOutputAddress->insert(fOutputAddress->begin() + slot, value); + break; + case InputMethod: + fInputMethod->erase(fInputMethod->begin() + slot); + fInputMethod->insert(fInputMethod->begin() + slot, value); + break; + case OutputMethod: + fOutputMethod->erase(fOutputMethod->begin() + slot); + fOutputMethod->insert(fOutputMethod->begin() + slot, value); + break; + default: + FairMQConfigurable::SetProperty(key, value, slot); + break; } - - for( std::vector::iterator itr = fPayloadOutputs->begin(); itr != fPayloadOutputs->end(); itr++ ) { - (*itr)->Close(); - } - - fPayloadContext->Close(); } -void* FairMQDevice::LogSocketRates() +// Method for setting properties represented as an integer. +void FairMQDevice::SetProperty(const Int_t& key, const Int_t& value, const Int_t& slot/*= 0*/) +{ + switch (key) { + case NumIoThreads: + fNumIoThreads = value; + break; + case NumInputs: + fNumInputs = value; + break; + case NumOutputs: + fNumOutputs = value; + break; + case LogIntervalInMs: + fLogIntervalInMs = value; + break; + case InputSocketType: + fInputSocketType->erase(fInputSocketType->begin() + slot); + fInputSocketType->insert(fInputSocketType->begin() + slot, value); + break; + case InputSndBufSize: + fInputSndBufSize->erase(fInputSndBufSize->begin() + slot); + fInputSndBufSize->insert(fInputSndBufSize->begin() + slot, value); + break; + case InputRcvBufSize: + fInputRcvBufSize->erase(fInputRcvBufSize->begin() + slot); + fInputRcvBufSize->insert(fInputRcvBufSize->begin() + slot, value); + break; + case OutputSocketType: + fOutputSocketType->erase(fOutputSocketType->begin() + slot); + fOutputSocketType->insert(fOutputSocketType->begin() + slot, value); + break; + case OutputSndBufSize: + fOutputSndBufSize->erase(fOutputSndBufSize->begin() + slot); + fOutputSndBufSize->insert(fOutputSndBufSize->begin() + slot, value); + break; + case OutputRcvBufSize: + fOutputRcvBufSize->erase(fOutputRcvBufSize->begin() + slot); + fOutputRcvBufSize->insert(fOutputRcvBufSize->begin() + slot, value); + break; + default: + FairMQConfigurable::SetProperty(key, value, slot); + break; + } +} + +// Method for getting properties represented as an string. +TString FairMQDevice::GetProperty(const Int_t& key, const TString& default_/*= ""*/, const Int_t& slot/*= 0*/) +{ + switch (key) { + case Id: + return fId; + case InputAddress: + return fInputAddress->at(slot); + case OutputAddress: + return fOutputAddress->at(slot); + case InputMethod: + return fInputMethod->at(slot); + case OutputMethod: + return fOutputMethod->at(slot); + default: + return FairMQConfigurable::GetProperty(key, default_, slot); + } +} + +// Method for getting properties represented as an integer. +Int_t FairMQDevice::GetProperty(const Int_t& key, const Int_t& default_/*= 0*/, const Int_t& slot/*= 0*/) +{ + switch (key) { + case NumIoThreads: + return fNumIoThreads; + case LogIntervalInMs: + return fLogIntervalInMs; + case InputSocketType: + return fInputSocketType->at(slot); + case InputSndBufSize: + return fInputSndBufSize->at(slot); + case InputRcvBufSize: + return fInputRcvBufSize->at(slot); + case OutputSocketType: + return fOutputSocketType->at(slot); + case OutputSndBufSize: + return fOutputSndBufSize->at(slot); + case OutputRcvBufSize: + return fOutputRcvBufSize->at(slot); + default: + return FairMQConfigurable::GetProperty(key, default_, slot); + } +} + +void FairMQDevice::LogSocketRates() { timestamp_t t0; timestamp_t t1; @@ -144,6 +248,13 @@ void* FairMQDevice::LogSocketRates() Double_t* megabytesPerSecondOutput = new Double_t[fNumOutputs]; Double_t* messagesPerSecondOutput = new Double_t[fNumOutputs]; + // Temp stuff for process termination + bool receivedSomething = false; + bool sentSomething = false; + int didNotReceiveFor = 0; + int didNotSendFor = 0; + // End of temp stuff + Int_t i = 0; for( std::vector::iterator itr = fPayloadInputs->begin(); itr != fPayloadInputs->end(); itr++ ) { bytesInput[i] = (*itr)->GetBytesRx(); @@ -160,48 +271,86 @@ void* FairMQDevice::LogSocketRates() t0 = get_timestamp(); - while (true) { - usleep(fLogIntervalInMs * 1000); + while ( true ) { + try { + boost::this_thread::sleep(boost::posix_time::milliseconds(fLogIntervalInMs)); - t1 = get_timestamp(); + t1 = get_timestamp(); - timeSinceLastLog_ms = (t1 - t0) / 1000.0L; + timeSinceLastLog_ms = (t1 - t0) / 1000.0L; - i = 0; + i = 0; - for( std::vector::iterator itr = fPayloadInputs->begin(); itr != fPayloadInputs->end(); itr++ ) { - bytesInputNew[i] = (*itr)->GetBytesRx(); - megabytesPerSecondInput[i] = ((Double_t) (bytesInputNew[i] - bytesInput[i]) / (1024. * 1024.)) / (Double_t) timeSinceLastLog_ms * 1000.; - bytesInput[i] = bytesInputNew[i]; - messagesInputNew[i] = (*itr)->GetMessagesRx(); - messagesPerSecondInput[i] = (Double_t) (messagesInputNew[i] - messagesInput[i]) / (Double_t) timeSinceLastLog_ms * 1000.; - messagesInput[i] = messagesInputNew[i]; + for( std::vector::iterator itr = fPayloadInputs->begin(); itr != fPayloadInputs->end(); itr++ ) { + bytesInputNew[i] = (*itr)->GetBytesRx(); + megabytesPerSecondInput[i] = ((Double_t) (bytesInputNew[i] - bytesInput[i]) / (1024. * 1024.)) / (Double_t) timeSinceLastLog_ms * 1000.; + bytesInput[i] = bytesInputNew[i]; + messagesInputNew[i] = (*itr)->GetMessagesRx(); + messagesPerSecondInput[i] = (Double_t) (messagesInputNew[i] - messagesInput[i]) / (Double_t) timeSinceLastLog_ms * 1000.; + messagesInput[i] = messagesInputNew[i]; - std::stringstream logmsg; - logmsg << "#" << (*itr)->GetId() << ": " << messagesPerSecondInput[i] << " msg/s, " << megabytesPerSecondInput[i] << " MB/s"; - FairMQLogger::GetInstance()->Log(FairMQLogger::DEBUG, logmsg.str()); + std::stringstream logmsg; + logmsg << "#" << (*itr)->GetId() << ": " << messagesPerSecondInput[i] << " msg/s, " << megabytesPerSecondInput[i] << " MB/s"; + FairMQLogger::GetInstance()->Log(FairMQLogger::DEBUG, logmsg.str()); - ++i; + // Temp stuff for process termination + if ( !receivedSomething && messagesPerSecondInput[i] > 0 ) { + receivedSomething = true; + } + if ( receivedSomething && messagesPerSecondInput[i] == 0 ) { + std::cout << "Did not receive anything on socket " << i << " for " << didNotReceiveFor++ << " seconds." << std::endl; + } else { + didNotReceiveFor = 0; + } + // End of temp stuff + + ++i; + } + + i = 0; + + for( std::vector::iterator itr = fPayloadOutputs->begin(); itr != fPayloadOutputs->end(); itr++ ) { + bytesOutputNew[i] = (*itr)->GetBytesTx(); + megabytesPerSecondOutput[i] = ((Double_t) (bytesOutputNew[i] - bytesOutput[i]) / (1024. * 1024.)) / (Double_t) timeSinceLastLog_ms * 1000.; + bytesOutput[i] = bytesOutputNew[i]; + messagesOutputNew[i] = (*itr)->GetMessagesTx(); + messagesPerSecondOutput[i] = (Double_t) (messagesOutputNew[i] - messagesOutput[i]) / (Double_t) timeSinceLastLog_ms * 1000.; + messagesOutput[i] = messagesOutputNew[i]; + + std::stringstream logmsg; + logmsg << "#" << (*itr)->GetId() << ": " << messagesPerSecondOutput[i] << " msg/s, " << megabytesPerSecondOutput[i] << " MB/s"; + FairMQLogger::GetInstance()->Log(FairMQLogger::DEBUG, logmsg.str()); + + // Temp stuff for process termination + if ( !sentSomething && messagesPerSecondOutput[i] > 0 ) { + sentSomething = true; + } + if ( sentSomething && messagesPerSecondOutput[i] == 0 ) { + std::cout << "Did not send anything on socket " << i << " for " << didNotSendFor++ << " seconds." << std::endl; + } else { + didNotSendFor = 0; + } + // End of temp stuff + + ++i; + } + + // Temp stuff for process termination + if (receivedSomething && didNotReceiveFor > 5) { + std::cout << "stopping because nothing was received for 5 seconds." << std::endl; + ChangeState(STOP); + } + if (sentSomething && didNotSendFor > 5) { + std::cout << "stopping because nothing was sent for 5 seconds." << std::endl; + ChangeState(STOP); + } + // End of temp stuff + + t0 = t1; + } catch (boost::thread_interrupted&) { + std::cout << "rateLogger interrupted" << std::endl; + break; } - - i = 0; - - for( std::vector::iterator itr = fPayloadOutputs->begin(); itr != fPayloadOutputs->end(); itr++ ) { - bytesOutputNew[i] = (*itr)->GetBytesTx(); - megabytesPerSecondOutput[i] = ((Double_t) (bytesOutputNew[i] - bytesOutput[i]) / (1024. * 1024.)) / (Double_t) timeSinceLastLog_ms * 1000.; - bytesOutput[i] = bytesOutputNew[i]; - messagesOutputNew[i] = (*itr)->GetMessagesTx(); - messagesPerSecondOutput[i] = (Double_t) (messagesOutputNew[i] - messagesOutput[i]) / (Double_t) timeSinceLastLog_ms * 1000.; - messagesOutput[i] = messagesOutputNew[i]; - - std::stringstream logmsg; - logmsg << "#" << (*itr)->GetId() << ": " << messagesPerSecondOutput[i] << " msg/s, " << megabytesPerSecondOutput[i] << " MB/s"; - FairMQLogger::GetInstance()->Log(FairMQLogger::DEBUG, logmsg.str()); - - ++i; - } - - t0 = t1; } delete[] bytesInput; @@ -218,109 +367,28 @@ void* FairMQDevice::LogSocketRates() delete[] messagesPerSecondInput; delete[] megabytesPerSecondOutput; delete[] messagesPerSecondOutput; + + FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> stopping rateLogger <<<<<<<"); } -void FairMQDevice::SetProperty(Int_t key, TString value, Int_t slot/*= 0*/) +void FairMQDevice::ListenToCommands() { - switch (key) { - case Id: - fId = value; - break; - case BindAddress: - fBindAddress->erase(fBindAddress->begin() + slot); - fBindAddress->insert(fBindAddress->begin() + slot, value); - break; - case ConnectAddress: - fConnectAddress->erase(fConnectAddress->begin() + slot); - fConnectAddress->insert(fConnectAddress->begin() + slot, value); - break; - default: - FairMQConfigurable::SetProperty(key, value, slot); - break; - } } -TString FairMQDevice::GetProperty(Int_t key, TString default_/*= ""*/, Int_t slot/*= 0*/) +void FairMQDevice::Shutdown() { - switch (key) { - case Id: - return fId; - case BindAddress: - return fBindAddress->at(slot); - case ConnectAddress: - return fConnectAddress->at(slot); - default: - return FairMQConfigurable::GetProperty(key, default_, slot); + FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> closing inputs <<<<<<<"); + for( std::vector::iterator itr = fPayloadInputs->begin(); itr != fPayloadInputs->end(); itr++ ) { + (*itr)->Close(); } -} -void FairMQDevice::SetProperty(Int_t key, Int_t value, Int_t slot/*= 0*/) -{ - switch (key) { - case NumIoThreads: - fNumIoThreads = value; - break; - case NumInputs: - fNumInputs = value; - break; - case NumOutputs: - fNumOutputs = value; - break; - case LogIntervalInMs: - fLogIntervalInMs = value; - break; - case BindSocketType: - fBindSocketType->erase(fBindSocketType->begin() + slot); - fBindSocketType->insert(fBindSocketType->begin() + slot, value); - break; - case BindSndBufferSize: - fBindSndBufferSize->erase(fBindSndBufferSize->begin() + slot); - fBindSndBufferSize->insert(fBindSndBufferSize->begin() + slot, value); - break; - case BindRcvBufferSize: - fBindRcvBufferSize->erase(fBindRcvBufferSize->begin() + slot); - fBindRcvBufferSize->insert(fBindRcvBufferSize->begin() + slot, value); - break; - case ConnectSocketType: - fConnectSocketType->erase(fConnectSocketType->begin() + slot); - fConnectSocketType->insert(fConnectSocketType->begin() + slot, value); - break; - case ConnectSndBufferSize: - fConnectSndBufferSize->erase(fConnectSndBufferSize->begin() + slot); - fConnectSndBufferSize->insert(fConnectSndBufferSize->begin() + slot, value); - break; - case ConnectRcvBufferSize: - fConnectRcvBufferSize->erase(fConnectRcvBufferSize->begin() + slot); - fConnectRcvBufferSize->insert(fConnectRcvBufferSize->begin() + slot, value); - break; - default: - FairMQConfigurable::SetProperty(key, value, slot); - break; + FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> closing outputs <<<<<<<"); + for( std::vector::iterator itr = fPayloadOutputs->begin(); itr != fPayloadOutputs->end(); itr++ ) { + (*itr)->Close(); } -} -Int_t FairMQDevice::GetProperty(Int_t key, Int_t default_/*= 0*/, Int_t slot/*= 0*/) -{ - switch (key) { - case NumIoThreads: - return fNumIoThreads; - case LogIntervalInMs: - return fLogIntervalInMs; - case BindSocketType: - return fBindSocketType->at(slot); - case ConnectSocketType: - return fConnectSocketType->at(slot); - case ConnectSndBufferSize: - return fConnectSndBufferSize->at(slot); - case ConnectRcvBufferSize: - return fConnectRcvBufferSize->at(slot); - case BindSndBufferSize: - return fBindSndBufferSize->at(slot); - case BindRcvBufferSize: - return fBindRcvBufferSize->at(slot); - default: - return FairMQConfigurable::GetProperty(key, default_, slot); - } + FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> closing context <<<<<<<"); + fPayloadContext->Close(); } FairMQDevice::~FairMQDevice() @@ -333,8 +401,8 @@ FairMQDevice::~FairMQDevice() delete (*itr); } - delete fBindAddress; - delete fConnectAddress; + delete fInputAddress; + delete fOutputAddress; delete fPayloadInputs; delete fPayloadOutputs; } diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index 573f638c..6c5c8f86 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -2,7 +2,7 @@ * FairMQDevice.h * * @since Oct 25, 2012 - * @authors: D. Klein + * @authors: D. Klein, A. Rybalchenko */ #ifndef FAIRMQDEVICE_H_ @@ -13,62 +13,75 @@ #include #include "FairMQContext.h" #include "FairMQSocket.h" -#include -//#include "FairMQLogger.h" #include "Rtypes.h" #include "TString.h" -class FairMQDevice : /*public FairMQStateMachine,*/ public FairMQConfigurable +class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable { - protected: - TString fId; - Int_t fNumIoThreads; - FairMQContext* fPayloadContext; - std::vector *fBindAddress; - std::vector *fBindSocketType; - std::vector *fBindSndBufferSize; - std::vector *fBindRcvBufferSize; - std::vector *fConnectAddress; - std::vector *fConnectSocketType; - std::vector *fConnectSndBufferSize; - std::vector *fConnectRcvBufferSize; - std::vector *fPayloadInputs; - std::vector *fPayloadOutputs; - Int_t fLogIntervalInMs; - Int_t fNumInputs; - Int_t fNumOutputs; public: enum { Id = FairMQConfigurable::Last, NumIoThreads, NumInputs, NumOutputs, - BindAddress, - BindSocketType, - BindSndBufferSize, - BindRcvBufferSize, - ConnectAddress, - ConnectSocketType, - ConnectSndBufferSize, - ConnectRcvBufferSize, + InputAddress, + InputMethod, + InputSocketType, + InputSndBufSize, + InputRcvBufSize, + OutputAddress, + OutputMethod, + OutputSocketType, + OutputSndBufSize, + OutputRcvBufSize, LogIntervalInMs, Last }; + FairMQDevice(); + + virtual void LogSocketRates(); + virtual void ListenToCommands(); + + virtual void SetProperty(const Int_t& key, const TString& value, const Int_t& slot = 0); + virtual TString GetProperty(const Int_t& key, const TString& default_ = "", const Int_t& slot = 0); + virtual void SetProperty(const Int_t& key, const Int_t& value, const Int_t& slot = 0); + virtual Int_t GetProperty(const Int_t& key, const Int_t& default_ = 0, const Int_t& slot = 0); + + virtual ~FairMQDevice(); + + protected: + TString fId; + Int_t fNumIoThreads; + FairMQContext* fPayloadContext; + + Int_t fNumInputs; + Int_t fNumOutputs; + + std::vector *fInputAddress; + std::vector *fInputMethod; + std::vector *fInputSocketType; + std::vector *fInputSndBufSize; + std::vector *fInputRcvBufSize; + + std::vector *fOutputAddress; + std::vector *fOutputMethod; + std::vector *fOutputSocketType; + std::vector *fOutputSndBufSize; + std::vector *fOutputRcvBufSize; + + std::vector *fPayloadInputs; + std::vector *fPayloadOutputs; + + Int_t fLogIntervalInMs; + virtual void Init(); - virtual void Bind(); - virtual void Connect(); virtual void Run(); virtual void Pause(); virtual void Shutdown(); - virtual void* LogSocketRates(); - static void* callLogSocketRates(void* arg) { return ((FairMQDevice*)arg)->LogSocketRates(); } - virtual void SetProperty(Int_t key, TString value, Int_t slot = 0); - virtual TString GetProperty(Int_t key, TString default_ = "", Int_t slot = 0); - virtual void SetProperty(Int_t key, Int_t value, Int_t slot = 0); - virtual Int_t GetProperty(Int_t key, Int_t default_ = 0, Int_t slot = 0); - virtual ~FairMQDevice(); + virtual void InitOutput(); + virtual void InitInput(); }; #endif /* FAIRMQDEVICE_H_ */ diff --git a/fairmq/FairMQLogger.cxx b/fairmq/FairMQLogger.cxx index 9743c674..027a1e25 100644 --- a/fairmq/FairMQLogger.cxx +++ b/fairmq/FairMQLogger.cxx @@ -62,6 +62,8 @@ void FairMQLogger::Log(Int_t type, TString logmsg) case ERROR: type_str = "\033[01;31mERROR\033[0m"; break; + case STATE: + type_str = "\033[01;33mSTATE\033[0m"; default: break; } diff --git a/fairmq/FairMQLogger.h b/fairmq/FairMQLogger.h index f1633000..e3848054 100644 --- a/fairmq/FairMQLogger.h +++ b/fairmq/FairMQLogger.h @@ -9,9 +9,7 @@ #define FAIRMQLOGGER_H_ #include #include -//#ifndef _MAKECINT_ #include -//#endif #include "Rtypes.h" #include "TString.h" @@ -23,7 +21,7 @@ class FairMQLogger TString fBindAddress; public: enum { - DEBUG, INFO, ERROR + DEBUG, INFO, ERROR, STATE }; FairMQLogger(); FairMQLogger(TString bindAdress); diff --git a/fairmq/FairMQProcessor.cxx b/fairmq/FairMQProcessor.cxx index 4a9fab78..d623253a 100644 --- a/fairmq/FairMQProcessor.cxx +++ b/fairmq/FairMQProcessor.cxx @@ -5,6 +5,9 @@ * Author: dklein */ +#include +#include + #include "FairMQProcessor.h" #include "FairMQLogger.h" @@ -34,32 +37,42 @@ void FairMQProcessor::Run() { FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> Run <<<<<<<"); - void* status; //necessary for pthread_join - pthread_t logger; - pthread_create(&logger, NULL, &FairMQDevice::callLogSocketRates, this); + boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this)); // Initialize poll set zmq_pollitem_t items[] = { { *(fPayloadInputs->at(0)->GetSocket()), 0, ZMQ_POLLIN, 0 } }; + int receivedMsgs = 0; + int sentMsgs = 0; + Bool_t received = false; - while (true) { + + while ( fState == RUNNING ) { FairMQMessage msg; - zmq_poll(items, 1, -1); + zmq_poll(items, 1, 100); if (items[0].revents & ZMQ_POLLIN) { received = fPayloadInputs->at(0)->Receive(&msg); + receivedMsgs++; } if (received) { fTask->Exec(&msg, NULL); fPayloadOutputs->at(0)->Send(&msg); + sentMsgs++; + received = false; } } - pthread_join(logger, &status); + std::cout << "I've received " << receivedMsgs << " and sent " << sentMsgs << " messages!" << std::endl; + + boost::this_thread::sleep(boost::posix_time::milliseconds(5000)); + + rateLogger.interrupt(); + rateLogger.join(); } diff --git a/fairmq/FairMQProcessor.h b/fairmq/FairMQProcessor.h index b9b43d38..7a839c32 100644 --- a/fairmq/FairMQProcessor.h +++ b/fairmq/FairMQProcessor.h @@ -19,6 +19,7 @@ class FairMQProcessor: public FairMQDevice FairMQProcessor(); virtual ~FairMQProcessor(); void SetTask(FairMQProcessorTask* task); + protected: virtual void Init(); virtual void Run(); private: diff --git a/fairmq/FairMQProcessorTask.cxx b/fairmq/FairMQProcessorTask.cxx index f6888b25..19462a84 100644 --- a/fairmq/FairMQProcessorTask.cxx +++ b/fairmq/FairMQProcessorTask.cxx @@ -16,3 +16,7 @@ FairMQProcessorTask::~FairMQProcessorTask() { } +void FairMQProcessorTask::ClearOutput(void* data, void* hint) +{ + free (data); +} \ No newline at end of file diff --git a/fairmq/FairMQProcessorTask.h b/fairmq/FairMQProcessorTask.h index c9c8b9c6..0fe1bbdc 100644 --- a/fairmq/FairMQProcessorTask.h +++ b/fairmq/FairMQProcessorTask.h @@ -19,6 +19,7 @@ class FairMQProcessorTask : public FairTask FairMQProcessorTask(); virtual ~FairMQProcessorTask(); virtual void Exec(FairMQMessage* msg, Option_t* opt) = 0; + static void ClearOutput(void* data, void* hint); }; #endif /* FAIRMQPROCESSORTASK_H_ */ diff --git a/fairmq/FairMQProxy.cxx b/fairmq/FairMQProxy.cxx new file mode 100644 index 00000000..4f381580 --- /dev/null +++ b/fairmq/FairMQProxy.cxx @@ -0,0 +1,41 @@ +/* + * FairMQProxy.cxx + * + * Created on: Oct 2, 2013 + * Author: A. Rybalchenko + */ + +#include + +#include +#include + +#include "FairMQLogger.h" +#include "FairMQProxy.h" + +FairMQProxy::FairMQProxy() +{ +} + +FairMQProxy::~FairMQProxy() +{ +} + +void FairMQProxy::Run() +{ + FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> Run <<<<<<<"); + + boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this)); + + //TODO: check rateLogger output + int rc = zmq_proxy(*(fPayloadInputs->at(0)->GetSocket()), *(fPayloadOutputs->at(0)->GetSocket()), NULL); + if (rc == -1) { + std::cout << "Error: proxy failed: " << strerror(errno) << std::endl; + } + + //TODO: make proxy bind on both ends. + + rateLogger.interrupt(); + rateLogger.join(); +} + diff --git a/fairmq/FairMQProxy.h b/fairmq/FairMQProxy.h new file mode 100644 index 00000000..854237b1 --- /dev/null +++ b/fairmq/FairMQProxy.h @@ -0,0 +1,25 @@ +/* + * FairMQProxy.h + * + * Created on: Oct 2, 2013 + * Author: A. Rybalchenko + */ + +#ifndef FAIRMQPROXY_H_ +#define FAIRMQPROXY_H_ + +#include "FairMQDevice.h" +#include "Rtypes.h" +#include "TString.h" + + +class FairMQProxy: public FairMQDevice +{ + public: + FairMQProxy(); + virtual ~FairMQProxy(); + protected: + virtual void Run(); +}; + +#endif /* FAIRMQPROXY_H_ */ diff --git a/fairmq/FairMQSampler.cxx b/fairmq/FairMQSampler.cxx index 99516db6..ab395c17 100644 --- a/fairmq/FairMQSampler.cxx +++ b/fairmq/FairMQSampler.cxx @@ -2,10 +2,15 @@ * FairMQSampler.cpp * * Created on: Sep 27, 2012 - * Author: dklein + * Author: A. Rybalchenko, D. Klein */ + #include -#include +#include + +#include +#include +#include #include "TList.h" #include "TObjString.h" @@ -30,8 +35,9 @@ FairMQSampler::FairMQSampler() : FairMQSampler::~FairMQSampler() { - delete fSamplerTask; - delete fFairRunAna; + if(fFairRunAna) { + fFairRunAna->TerminateRun(); + } } void FairMQSampler::Init() @@ -41,7 +47,9 @@ void FairMQSampler::Init() fSamplerTask->SetBranch(fBranch); fFairRunAna->SetInputFile(TString(fInputFile)); - fFairRunAna->SetOutputFile("dummy.out"); + TString output=fInputFile; + output.Append(".out.root"); + fFairRunAna->SetOutputFile(output.Data()); fFairRunAna->AddTask(fSamplerTask); @@ -51,89 +59,114 @@ void FairMQSampler::Init() rtdb->setFirstInput(parInput1); rtdb->print(); - // read complete file and extract digis. fFairRunAna->Init(); - fFairRunAna->Run(0, 0); + //fFairRunAna->Run(0, 0); + FairRootManager* ioman = FairRootManager::Instance(); + fNumEvents = Int_t((ioman->GetInChain())->GetEntries()); } void FairMQSampler::Run() { - void* status; //necessary for pthread_join FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> Run <<<<<<<"); - usleep(1000000); + boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); - pthread_t logger; - pthread_create(&logger, NULL, &FairMQDevice::callLogSocketRates, this); + boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this)); + boost::thread resetEventCounter(boost::bind(&FairMQSampler::ResetEventCounter, this)); + //boost::thread commandListener(boost::bind(&FairMQSampler::ListenToCommands, this)); - pthread_t resetEventCounter; - pthread_create(&resetEventCounter, NULL, &FairMQSampler::callResetEventCounter, this); + int sentMsgs = 0; - while (true) { - for( std::vector::iterator itr = fSamplerTask->GetOutput()->begin(); itr != fSamplerTask->GetOutput()->end(); itr++ ) { - FairMQMessage event; - event.Copy(*itr); + boost::timer::auto_cpu_timer timer; - fPayloadOutputs->at(0)->Send(&event); + std::cout << "Number of events to process: " << fNumEvents << std::endl; - --fEventCounter; + Long64_t eventNr = 0; - while (fEventCounter == 0) { - usleep(1000); - } +// while ( fState == RUNNING ) { + + for ( /* eventNr */ ; eventNr < fNumEvents; eventNr++ ) { + fFairRunAna->RunMQ(eventNr); + + fPayloadOutputs->at(0)->Send(fSamplerTask->GetOutput()); + sentMsgs++; + + --fEventCounter; + + while (fEventCounter == 0) { + boost::this_thread::sleep(boost::posix_time::milliseconds(1)); } + if( fState != RUNNING ) { break; } } - pthread_join(logger, &status); - pthread_join(resetEventCounter, &status); + boost::this_thread::interruption_point(); +// } + + boost::timer::cpu_times const elapsed_time(timer.elapsed()); + + std::cout << "Sent everything in:\n" << boost::timer::format(elapsed_time, 2) << std::endl; + std::cout << "Sent " << sentMsgs << " messages!" << std::endl; + + //boost::this_thread::sleep(boost::posix_time::milliseconds(5000)); + + rateLogger.interrupt(); + rateLogger.join(); + resetEventCounter.interrupt(); + resetEventCounter.join(); + //commandListener.interrupt(); + //commandListener.join(); } -void* FairMQSampler::ResetEventCounter() +void FairMQSampler::ResetEventCounter() { - while (true) { - fEventCounter = fEventRate / 100; - - usleep(10000); + while ( true ) { + try { + fEventCounter = fEventRate / 100; + boost::this_thread::sleep(boost::posix_time::milliseconds(10)); + } catch (boost::thread_interrupted&) { + std::cout << "resetEventCounter interrupted" << std::endl; + break; + } } + FairMQLogger::GetInstance()->Log(FairMQLogger::DEBUG, ">>>>>>> stopping resetEventCounter <<<<<<<"); } -void FairMQSampler::Log(Int_t intervalInMs) +void FairMQSampler::ListenToCommands() { - timestamp_t t0; - timestamp_t t1; - ULong_t bytes = fPayloadOutputs->at(0)->GetBytesTx(); - ULong_t messages = fPayloadOutputs->at(0)->GetMessagesTx(); - ULong_t bytesNew; - ULong_t messagesNew; - Double_t megabytesPerSecond = (bytesNew - bytes) / (1024 * 1024); - Double_t messagesPerSecond = (messagesNew - messages); + FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> ListenToCommands <<<<<<<"); - t0 = get_timestamp(); + // Initialize poll set + zmq_pollitem_t items[] = { + { *(fPayloadInputs->at(0)->GetSocket()), 0, ZMQ_POLLIN, 0 } + }; - while (true) { - usleep(intervalInMs * 1000); + Bool_t received = false; + while ( true ) { + try { + FairMQMessage msg; - t1 = get_timestamp(); + zmq_poll(items, 1, 100); - bytesNew = fPayloadOutputs->at(0)->GetBytesTx(); - messagesNew = fPayloadOutputs->at(0)->GetMessagesTx(); + if (items[0].revents & ZMQ_POLLIN) { + received = fPayloadInputs->at(0)->Receive(&msg); + } - timestamp_t timeSinceLastLog_ms = (t1 - t0) / 1000.0L; + if (received) { + //command handling goes here. + FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, "> received command <"); + received = false; + } - megabytesPerSecond = ((Double_t) (bytesNew - bytes) / (1024. * 1024.)) / (Double_t) timeSinceLastLog_ms * 1000.; - messagesPerSecond = (Double_t) (messagesNew - messages) / (Double_t) timeSinceLastLog_ms * 1000.; - - std::stringstream logmsg; - logmsg << "send " << messagesPerSecond << " msg/s, " << megabytesPerSecond << " MB/s"; - FairMQLogger::GetInstance()->Log(FairMQLogger::DEBUG, logmsg.str()); - - bytes = bytesNew; - messages = messagesNew; - t0 = t1; + boost::this_thread::interruption_point(); + } catch (boost::thread_interrupted&) { + std::cout << "commandListener interrupted" << std::endl; + break; + } } + FairMQLogger::GetInstance()->Log(FairMQLogger::DEBUG, ">>>>>>> stopping commandListener <<<<<<<"); } -void FairMQSampler::SetProperty(Int_t key, TString value, Int_t slot/*= 0*/) +void FairMQSampler::SetProperty(const Int_t& key, const TString& value, const Int_t& slot/*= 0*/) { switch (key) { case InputFile: @@ -151,7 +184,7 @@ void FairMQSampler::SetProperty(Int_t key, TString value, Int_t slot/*= 0*/) } } -TString FairMQSampler::GetProperty(Int_t key, TString default_/*= ""*/, Int_t slot/*= 0*/) +TString FairMQSampler::GetProperty(const Int_t& key, const TString& default_/*= ""*/, const Int_t& slot/*= 0*/) { switch (key) { case InputFile: @@ -165,7 +198,7 @@ TString FairMQSampler::GetProperty(Int_t key, TString default_/*= ""*/, Int_t sl } } -void FairMQSampler::SetProperty(Int_t key, Int_t value, Int_t slot/*= 0*/) +void FairMQSampler::SetProperty(const Int_t& key, const Int_t& value, const Int_t& slot/*= 0*/) { switch (key) { case EventRate: @@ -177,7 +210,7 @@ void FairMQSampler::SetProperty(Int_t key, Int_t value, Int_t slot/*= 0*/) } } -Int_t FairMQSampler::GetProperty(Int_t key, Int_t default_/*= 0*/, Int_t slot/*= 0*/) +Int_t FairMQSampler::GetProperty(const Int_t& key, const Int_t& default_/*= 0*/, const Int_t& slot/*= 0*/) { switch (key) { case EventRate: diff --git a/fairmq/FairMQSampler.h b/fairmq/FairMQSampler.h index 3cca6255..18149a4a 100644 --- a/fairmq/FairMQSampler.h +++ b/fairmq/FairMQSampler.h @@ -29,14 +29,6 @@ */ class FairMQSampler: public FairMQDevice { - protected: - FairRunAna* fFairRunAna; - FairMQSamplerTask* fSamplerTask; - TString fInputFile; // Filename of a root file containing the simulated digis. - TString fParFile; - TString fBranch; // The name of the sub-detector branch to stream the digis from. - Int_t fEventRate; - Int_t fEventCounter; public: enum { InputFile = FairMQDevice::Last, @@ -46,15 +38,25 @@ class FairMQSampler: public FairMQDevice }; FairMQSampler(); virtual ~FairMQSampler(); + + void ResetEventCounter(); + virtual void ListenToCommands(); + virtual void SetProperty(const Int_t& key, const TString& value, const Int_t& slot = 0); + virtual TString GetProperty(const Int_t& key, const TString& default_ = "", const Int_t& slot = 0); + virtual void SetProperty(const Int_t& key, const Int_t& value, const Int_t& slot = 0); + virtual Int_t GetProperty(const Int_t& key, const Int_t& default_ = 0, const Int_t& slot = 0); + protected: + FairRunAna* fFairRunAna; + Int_t fNumEvents; + FairMQSamplerTask* fSamplerTask; + TString fInputFile; // Filename of a root file containing the simulated digis. + TString fParFile; + TString fBranch; // The name of the sub-detector branch to stream the digis from. + Int_t fEventRate; + Int_t fEventCounter; virtual void Init(); virtual void Run(); - void Log(Int_t intervalInMs); - void* ResetEventCounter(); - static void* callResetEventCounter(void* arg) { return ((FairMQSampler*)arg)->ResetEventCounter(); } - virtual void SetProperty(Int_t key, TString value, Int_t slot = 0); - virtual TString GetProperty(Int_t key, TString default_ = "", Int_t slot = 0); - virtual void SetProperty(Int_t key, Int_t value, Int_t slot = 0); - virtual Int_t GetProperty(Int_t key, Int_t default_ = 0, Int_t slot = 0); + }; #endif /* FAIRMQSAMPLER_H_ */ diff --git a/fairmq/FairMQSamplerTask.cxx b/fairmq/FairMQSamplerTask.cxx index 2f8e65de..b52c4e05 100644 --- a/fairmq/FairMQSamplerTask.cxx +++ b/fairmq/FairMQSamplerTask.cxx @@ -12,8 +12,7 @@ FairMQSamplerTask::FairMQSamplerTask(const Text_t* name, Int_t iVerbose) : FairTask(name, iVerbose), fInput(NULL), fBranch(""), - fMessageSize(32768), - fOutput(new std::vector) + fOutput(new FairMQMessage) { } @@ -21,16 +20,14 @@ FairMQSamplerTask::FairMQSamplerTask() : FairTask( "Abstract base task used for loading a branch from a root file into memory"), fInput(NULL), fBranch(""), - fMessageSize(32768), - fOutput(new std::vector) + fOutput(new FairMQMessage) { } FairMQSamplerTask::~FairMQSamplerTask() { delete fInput; - - // leave fOutput in memory, because it is needed even after FairMQSamplerTask is terminated. + //delete fOutput; // leave fOutput in memory, because it is needed even after FairMQSamplerTask is terminated. ClearOutput will clean it when it is no longer needed. } InitStatus FairMQSamplerTask::Init() @@ -46,13 +43,13 @@ void FairMQSamplerTask::SetBranch(TString branch) fBranch = branch; } -void FairMQSamplerTask::SetMessageSize(int size) -{ - fMessageSize = size; -} - -std::vector *FairMQSamplerTask::GetOutput() +FairMQMessage* FairMQSamplerTask::GetOutput() { return fOutput; } +void FairMQSamplerTask::ClearOutput(void* data, void* hint) +{ + free (data); +} + diff --git a/fairmq/FairMQSamplerTask.h b/fairmq/FairMQSamplerTask.h index bfe9229c..f3d347f0 100644 --- a/fairmq/FairMQSamplerTask.h +++ b/fairmq/FairMQSamplerTask.h @@ -25,13 +25,12 @@ class FairMQSamplerTask: public FairTask virtual InitStatus Init(); virtual void Exec(Option_t* opt) = 0; void SetBranch(TString branch); - void SetMessageSize(Int_t size); - std::vector *GetOutput(); + FairMQMessage* GetOutput(); + static void ClearOutput(void* data, void* hint); protected: TClonesArray* fInput; TString fBranch; - Int_t fMessageSize; - std::vector *fOutput; + FairMQMessage* fOutput; }; #endif /* FAIRMQSAMPLERTASK_H_ */ diff --git a/fairmq/FairMQSink.cxx b/fairmq/FairMQSink.cxx index e4d0eee0..2598a254 100644 --- a/fairmq/FairMQSink.cxx +++ b/fairmq/FairMQSink.cxx @@ -5,6 +5,11 @@ * Author: dklein */ +#include + +#include +#include + #include "FairMQSink.h" #include "FairMQLogger.h" @@ -17,25 +22,25 @@ void FairMQSink::Run() void* status; //necessary for pthread_join FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> Run <<<<<<<"); - pthread_t logger; - pthread_create(&logger, NULL, &FairMQDevice::callLogSocketRates, this); + boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this)); // Initialize poll set zmq_pollitem_t items[] = { { *(fPayloadInputs->at(0)->GetSocket()), 0, ZMQ_POLLIN, 0 } }; - while (true) { + while ( fState == RUNNING ) { FairMQMessage msg; - zmq_poll(items, 1, -1); + zmq_poll(items, 1, 100); if (items[0].revents & ZMQ_POLLIN) { fPayloadInputs->at(0)->Receive(&msg); } } - pthread_join(logger, &status); + rateLogger.interrupt(); + rateLogger.join(); } FairMQSink::~FairMQSink() diff --git a/fairmq/FairMQSink.h b/fairmq/FairMQSink.h index 9f2027dd..73e576fd 100644 --- a/fairmq/FairMQSink.h +++ b/fairmq/FairMQSink.h @@ -19,8 +19,9 @@ class FairMQSink: public FairMQDevice { public: FairMQSink(); - virtual void Run(); virtual ~FairMQSink(); + protected: + virtual void Run(); }; #endif /* FAIRMQSINK_H_ */ diff --git a/fairmq/FairMQSocket.cxx b/fairmq/FairMQSocket.cxx index 9e248d90..402c5737 100644 --- a/fairmq/FairMQSocket.cxx +++ b/fairmq/FairMQSocket.cxx @@ -109,7 +109,7 @@ Bool_t FairMQSocket::Send(FairMQMessage* msg) try { fBytesTx += msg->Size(); ++fMessagesTx; - result = fSocket->send(*msg->GetMessage()); + result = fSocket->send(*msg->GetMessage()); // use send(*msg->GetMessage(), ZMQ_DONTWAIT) for non-blocking call } catch (zmq::error_t& e) { std::stringstream logmsg; logmsg << "failed sending on socket #" << fId << ", reason: " << e.what(); diff --git a/fairmq/FairMQStandaloneMerger.cxx b/fairmq/FairMQStandaloneMerger.cxx index cfa2b309..82036864 100644 --- a/fairmq/FairMQStandaloneMerger.cxx +++ b/fairmq/FairMQStandaloneMerger.cxx @@ -5,8 +5,11 @@ * Author: dklein */ -#include "FairMQStandaloneMerger.h" +#include +#include + #include "FairMQLogger.h" +#include "FairMQStandaloneMerger.h" FairMQStandaloneMerger::FairMQStandaloneMerger() { @@ -20,14 +23,7 @@ void FairMQStandaloneMerger::Run() { FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> Run <<<<<<<"); - Bool_t received0 = false; - Bool_t received1 = false; - FairMQMessage* msg0 = NULL; - FairMQMessage* msg1 = NULL; - TString source0 = fPayloadInputs->at(0)->GetId(); - TString source1 = fPayloadInputs->at(1)->GetId(); - Int_t size0 = 0; - Int_t size1 = 0; + boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this)); // Initialize poll set zmq_pollitem_t items[] = { @@ -35,42 +31,34 @@ void FairMQStandaloneMerger::Run() { *(fPayloadInputs->at(1)->GetSocket()), 0, ZMQ_POLLIN, 0 } }; - void* status; //necessary for pthread_join - pthread_t logger; - pthread_create(&logger, NULL, &FairMQDevice::callLogSocketRates, this); + Bool_t received = false; - while (true) { - msg0 = new FairMQMessage(); - msg1 = new FairMQMessage(); + while ( fState == RUNNING ) { + FairMQMessage msg; - zmq_poll(items, 2, -1); + zmq_poll(items, fNumInputs, 100); if (items[0].revents & ZMQ_POLLIN) { - received0 = fPayloadInputs->at(0)->Receive(msg0); + received = fPayloadInputs->at(0)->Receive(&msg); + } + + if (received) { + fPayloadOutputs->at(0)->Send(&msg); + received = false; } if (items[1].revents & ZMQ_POLLIN) { - received1 = fPayloadInputs->at(1)->Receive(msg1); + received = fPayloadInputs->at(1)->Receive(&msg); } - if (received0) { - size0 = msg0->Size(); - fPayloadOutputs->at(0)->Send(msg0); - - received0 = false; + if (received) { + fPayloadOutputs->at(0)->Send(&msg); + received = false; } - if (received1) { - size1 = msg1->Size(); - fPayloadOutputs->at(0)->Send(msg1); - - received1 = false; - } - - delete msg0; - delete msg1; } - pthread_join(logger, &status); + rateLogger.interrupt(); + rateLogger.join(); } diff --git a/fairmq/FairMQStandaloneMerger.h b/fairmq/FairMQStandaloneMerger.h index 2da51df1..c9e855a1 100644 --- a/fairmq/FairMQStandaloneMerger.h +++ b/fairmq/FairMQStandaloneMerger.h @@ -18,6 +18,7 @@ class FairMQStandaloneMerger: public FairMQDevice public: FairMQStandaloneMerger(); virtual ~FairMQStandaloneMerger(); + protected: virtual void Run(); }; diff --git a/fairmq/FairMQStateMachine.cxx b/fairmq/FairMQStateMachine.cxx index ecbd036a..d4d59c2c 100644 --- a/fairmq/FairMQStateMachine.cxx +++ b/fairmq/FairMQStateMachine.cxx @@ -5,36 +5,165 @@ * Author: dklein */ +#include +#include +#include + #include "FairMQStateMachine.h" +#include "FairMQLogger.h" FairMQStateMachine::FairMQStateMachine() : - fState(START) + fState(IDLE) { } -FairMQStateMachine::RunStateMachine() +void FairMQStateMachine::ChangeState(int event) { - void* status; //necessary for pthread_join - pthread_t state; + switch(fState) { - changeState(INIT); + case IDLE: + switch(event) { - while(fState != END) { - switch(fState) { case INIT: - pthread_create(&state, NULL, &FairMQStateMachine::Init, this); - break; + FairMQLogger::GetInstance()->Log(FairMQLogger::STATE, "IDLE --init--> INITIALIZING"); + fState = INITIALIZING; + Init(); + return; + case END: + FairMQLogger::GetInstance()->Log(FairMQLogger::STATE, "IDLE --end--> (o)"); + return; + + default: + return; } - pthread_join(state, &status); - } + break; + case INITIALIZING: + switch(event) { + case SETOUTPUT: + FairMQLogger::GetInstance()->Log(FairMQLogger::STATE, "INITIALIZING --bind--> SETTINGOUTPUT"); + fState = SETTINGOUTPUT; + InitOutput(); + return; + + default: + return; + } + break; + + + case SETTINGOUTPUT: + switch(event) { + + case SETINPUT: + FairMQLogger::GetInstance()->Log(FairMQLogger::STATE, "SETTINGOUTPUT --connect--> SETTINGINPUT"); + fState = SETTINGINPUT; + InitInput(); + return; + + default: + return; + } + break; + + + case SETTINGINPUT: + switch(event) { + + case PAUSE: + FairMQLogger::GetInstance()->Log(FairMQLogger::STATE, "SETTINGINPUT --pause--> WAITING"); + fState = WAITING; + Pause(); + return; + + case RUN: + FairMQLogger::GetInstance()->Log(FairMQLogger::STATE, "SETTINGINPUT --run--> RUNNING"); + fState = RUNNING; + running_state = boost::thread(boost::bind(&FairMQStateMachine::Run, this)); + return; + + default: + return; + } + break; + + + case WAITING: + switch(event) { + + case RUN: + FairMQLogger::GetInstance()->Log(FairMQLogger::STATE, "WAITING --run--> RUNNING"); + fState = RUNNING; + running_state = boost::thread(boost::bind(&FairMQStateMachine::Run, this)); + return; + + case STOP: + FairMQLogger::GetInstance()->Log(FairMQLogger::STATE, "WAITING --stop--> IDLE"); + fState = IDLE; + Shutdown(); + return; + + default: + return; + } + break; + + + case RUNNING: + switch(event) { + + case PAUSE: + FairMQLogger::GetInstance()->Log(FairMQLogger::STATE, "RUNNING --pause--> WAITING"); + fState = WAITING; + running_state.join(); + return; + + case STOP: + FairMQLogger::GetInstance()->Log(FairMQLogger::STATE, "RUNNING --stop--> IDLE"); + fState = IDLE; + running_state.join(); + Shutdown(); + return; + + default: + return; + } + break; + + + default: + break; + + }//switch fState } +void FairMQStateMachine::Init() +{ +} +void FairMQStateMachine::Run() +{ +} + +void FairMQStateMachine::Pause() +{ +} + +void FairMQStateMachine::Shutdown() +{ +} + +void FairMQStateMachine::InitOutput() +{ +} + +void FairMQStateMachine::InitInput() +{ +} FairMQStateMachine::~FairMQStateMachine() { diff --git a/fairmq/FairMQStateMachine.h b/fairmq/FairMQStateMachine.h index b84fd24f..a19cbc04 100644 --- a/fairmq/FairMQStateMachine.h +++ b/fairmq/FairMQStateMachine.h @@ -5,28 +5,37 @@ * Author: dklein */ + + #ifndef FAIRMQSTATEMACHINE_H_ #define FAIRMQSTATEMACHINE_H_ +#include + class FairMQStateMachine { - private: - int fState; public: - enum { - START, INIT, BIND, CONNECT, RUN, PAUSE, SHUTDOWN, END + enum State { + IDLE, INITIALIZING, SETTINGOUTPUT, SETTINGINPUT, WAITING, RUNNING + }; + enum Event { + INIT, SETOUTPUT, SETINPUT, PAUSE, RUN, STOP, END }; FairMQStateMachine(); - virtual void Init() = 0; - virtual void Bind() = 0; - virtual void Connect() = 0; - virtual void Run() = 0; - virtual void Pause() = 0; - virtual void Shutdown() = 0; - bool ChangeState(int new_state); - void RunStateMachine(); + void ChangeState(int event); virtual ~FairMQStateMachine(); + + protected: + State fState; + Event fEvent; + virtual void Init(); + virtual void Run(); + virtual void Pause(); + virtual void Shutdown(); + virtual void InitOutput(); + virtual void InitInput(); + boost::thread running_state; }; #endif /* FAIRMQSTATEMACHINE_H_ */ diff --git a/fairmq/runBenchmarkSampler.cxx b/fairmq/runBenchmarkSampler.cxx index b5e9ad8d..80418b6c 100644 --- a/fairmq/runBenchmarkSampler.cxx +++ b/fairmq/runBenchmarkSampler.cxx @@ -5,79 +5,106 @@ * Author: dklein */ -#include -#include -#include -#include "FairMQLogger.h" -#include -#include #include +#include + +#include "FairMQLogger.h" #include "FairMQBenchmarkSampler.h" +FairMQBenchmarkSampler sampler; + +static void s_signal_handler (int signal) +{ + std::cout << std::endl << "Caught signal " << signal << std::endl; + + sampler.ChangeState(FairMQBenchmarkSampler::STOP); + sampler.ChangeState(FairMQBenchmarkSampler::END); + + std::cout << "Shutdown complete. Bye!" << std::endl; + exit(1); +} + +static void s_catch_signals (void) +{ + struct sigaction action; + action.sa_handler = s_signal_handler; + action.sa_flags = 0; + sigemptyset(&action.sa_mask); + sigaction(SIGINT, &action, NULL); + sigaction(SIGTERM, &action, NULL); +} + int main(int argc, char** argv) { - if( argc != 8 ) { - std::cout << "Usage: bsampler ID eventSize eventRate numIoTreads\n" << - "\t\tbindSocketType bindSndBufferSize BindAddress\n" << std::endl; + if ( argc != 9 ) { + std::cout << "Usage: bsampler ID eventSize eventRate numIoTreads\n" + << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" + << std::endl; return 1; } - pid_t pid = getpid(); + s_catch_signals(); + std::stringstream logmsg; - logmsg << "PID: " << pid; + logmsg << "PID: " << getpid(); FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); int i = 1; - FairMQBenchmarkSampler* sampler = new FairMQBenchmarkSampler(); - - sampler->SetProperty(FairMQBenchmarkSampler::Id, argv[i]); + sampler.SetProperty(FairMQBenchmarkSampler::Id, argv[i]); ++i; int eventSize; std::stringstream(argv[i]) >> eventSize; - sampler->SetProperty(FairMQBenchmarkSampler::EventSize, eventSize); + sampler.SetProperty(FairMQBenchmarkSampler::EventSize, eventSize); ++i; int eventRate; std::stringstream(argv[i]) >> eventRate; - sampler->SetProperty(FairMQBenchmarkSampler::EventRate, eventRate); + sampler.SetProperty(FairMQBenchmarkSampler::EventRate, eventRate); ++i; int numIoThreads; std::stringstream(argv[i]) >> numIoThreads; - sampler->SetProperty(FairMQBenchmarkSampler::NumIoThreads, numIoThreads); + sampler.SetProperty(FairMQBenchmarkSampler::NumIoThreads, numIoThreads); ++i; - int numInputs = 0; - sampler->SetProperty(FairMQBenchmarkSampler::NumInputs, numInputs); + sampler.SetProperty(FairMQBenchmarkSampler::NumInputs, 0); + sampler.SetProperty(FairMQBenchmarkSampler::NumOutputs, 1); - int numOutputs = 1; - sampler->SetProperty(FairMQBenchmarkSampler::NumOutputs, numOutputs); - sampler->Init(); + sampler.ChangeState(FairMQBenchmarkSampler::INIT); - int bindSocketType = ZMQ_PUB; + + int outputSocketType = ZMQ_PUB; if (strcmp(argv[i], "push") == 0) { - bindSocketType = ZMQ_PUSH; + outputSocketType = ZMQ_PUSH; } - sampler->SetProperty(FairMQBenchmarkSampler::BindSocketType, bindSocketType, 0); + sampler.SetProperty(FairMQBenchmarkSampler::OutputSocketType, outputSocketType, 0); ++i; - - int bindSndBufferSize; - std::stringstream(argv[i]) >> bindSndBufferSize; - sampler->SetProperty(FairMQBenchmarkSampler::BindSndBufferSize, bindSndBufferSize, 0); + int outputSndBufSize; + std::stringstream(argv[i]) >> outputSndBufSize; + sampler.SetProperty(FairMQBenchmarkSampler::OutputSndBufSize, outputSndBufSize, 0); ++i; - - sampler->SetProperty(FairMQBenchmarkSampler::BindAddress, argv[i], 0); + sampler.SetProperty(FairMQBenchmarkSampler::OutputMethod, argv[i], 0); + ++i; + sampler.SetProperty(FairMQBenchmarkSampler::OutputAddress, argv[i], 0); ++i; - sampler->Bind(); - sampler->Connect(); - sampler->Run(); + sampler.ChangeState(FairMQBenchmarkSampler::SETOUTPUT); + sampler.ChangeState(FairMQBenchmarkSampler::SETINPUT); + sampler.ChangeState(FairMQBenchmarkSampler::RUN); - exit(0); + + + char ch; + std::cin.get(ch); + + sampler.ChangeState(FairMQBenchmarkSampler::STOP); + sampler.ChangeState(FairMQBenchmarkSampler::END); + + return 0; } diff --git a/fairmq/runBuffer.cxx b/fairmq/runBuffer.cxx index f217b575..5fa8b6dc 100644 --- a/fairmq/runBuffer.cxx +++ b/fairmq/runBuffer.cxx @@ -1,87 +1,115 @@ -/* +/** * runBuffer.cxx * - * Created on: Oct 26, 2012 - * Author: dklein + * @since Oct 26, 2012 + * @authors: D. Klein, A. Rybalchenko */ -#include "FairMQBuffer.h" -#include -#include -#include "FairMQLogger.h" -#include -#include #include +#include +#include "FairMQLogger.h" +#include "FairMQBuffer.h" + + +FairMQBuffer buffer; + +static void s_signal_handler (int signal) +{ + std::cout << std::endl << "Caught signal " << signal << std::endl; + + buffer.ChangeState(FairMQBuffer::STOP); + buffer.ChangeState(FairMQBuffer::END); + + std::cout << "Shutdown complete. Bye!" << std::endl; + exit(1); +} + +static void s_catch_signals (void) +{ + struct sigaction action; + action.sa_handler = s_signal_handler; + action.sa_flags = 0; + sigemptyset(&action.sa_mask); + sigaction(SIGINT, &action, NULL); + sigaction(SIGTERM, &action, NULL); +} int main(int argc, char** argv) { - if( argc != 9 ) { - std::cout << "Usage: buffer \tID numIoTreads\n" << - "\t\tconnectSocketType connectRcvBufferSize ConnectAddress\n" << - "\t\tbindSocketType bindSndBufferSize BindAddress\n" << std::endl; + if ( argc != 11 ) { + std::cout << "Usage: buffer \tID numIoTreads\n" + << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" + << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" << std::endl; return 1; } - pid_t pid = getpid(); + s_catch_signals(); + std::stringstream logmsg; - logmsg << "PID: " << pid; + logmsg << "PID: " << getpid(); FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); int i = 1; - FairMQBuffer* buffer = new FairMQBuffer(); - buffer->SetProperty(FairMQBuffer::Id, argv[i]); + buffer.SetProperty(FairMQBuffer::Id, argv[i]); ++i; int numIoThreads; std::stringstream(argv[i]) >> numIoThreads; - buffer->SetProperty(FairMQBuffer::NumIoThreads, numIoThreads); + buffer.SetProperty(FairMQBuffer::NumIoThreads, numIoThreads); ++i; + buffer.SetProperty(FairMQBuffer::NumInputs, 1); + buffer.SetProperty(FairMQBuffer::NumOutputs, 1); - int numInputs = 1; - buffer->SetProperty(FairMQBuffer::NumInputs, numInputs); - int numOutputs = 1; - buffer->SetProperty(FairMQBuffer::NumOutputs, numOutputs); + buffer.ChangeState(FairMQBuffer::INIT); - buffer->Init(); - int connectSocketType = ZMQ_SUB; + int inputSocketType = ZMQ_SUB; if (strcmp(argv[i], "pull") == 0) { - connectSocketType = ZMQ_PULL; + inputSocketType = ZMQ_PULL; } - buffer->SetProperty(FairMQBuffer::ConnectSocketType, connectSocketType, 0); + buffer.SetProperty(FairMQBuffer::InputSocketType, inputSocketType, 0); + ++i; + int inputRcvBufSize; + std::stringstream(argv[i]) >> inputRcvBufSize; + buffer.SetProperty(FairMQBuffer::InputRcvBufSize, inputRcvBufSize, 0); + ++i; + buffer.SetProperty(FairMQBuffer::InputMethod, argv[i], 0); + ++i; + buffer.SetProperty(FairMQBuffer::InputAddress, argv[i], 0); ++i; - int connectRcvBufferSize; - std::stringstream(argv[i]) >> connectRcvBufferSize; - buffer->SetProperty(FairMQBuffer::ConnectRcvBufferSize, connectRcvBufferSize, 0); - ++i; - buffer->SetProperty(FairMQBuffer::ConnectAddress, argv[i], 0); - ++i; - - int bindSocketType = ZMQ_PUB; + int outputSocketType = ZMQ_PUB; if (strcmp(argv[i], "push") == 0) { - bindSocketType = ZMQ_PUSH; + outputSocketType = ZMQ_PUSH; } - buffer->SetProperty(FairMQBuffer::BindSocketType, bindSocketType, 0); + buffer.SetProperty(FairMQBuffer::OutputSocketType, outputSocketType, 0); ++i; - - int bindSndBufferSize; - std::stringstream(argv[i]) >> bindSndBufferSize; - buffer->SetProperty(FairMQBuffer::BindSndBufferSize, bindSndBufferSize, 0); + int outputSndBufSize; + std::stringstream(argv[i]) >> outputSndBufSize; + buffer.SetProperty(FairMQBuffer::OutputSndBufSize, outputSndBufSize, 0); ++i; - - buffer->SetProperty(FairMQBuffer::BindAddress, argv[i], 0); + buffer.SetProperty(FairMQBuffer::OutputMethod, argv[i], 0); + ++i; + buffer.SetProperty(FairMQBuffer::OutputAddress, argv[i], 0); ++i; - buffer->Bind(); - buffer->Connect(); - buffer->Run(); + buffer.ChangeState(FairMQBuffer::SETOUTPUT); + buffer.ChangeState(FairMQBuffer::SETINPUT); + buffer.ChangeState(FairMQBuffer::RUN); - exit(0); + + + char ch; + std::cin.get(ch); + + buffer.ChangeState(FairMQBuffer::STOP); + buffer.ChangeState(FairMQBuffer::END); + + return 0; } diff --git a/fairmq/runMerger.cxx b/fairmq/runMerger.cxx index 4c04d482..1e8af158 100644 --- a/fairmq/runMerger.cxx +++ b/fairmq/runMerger.cxx @@ -5,98 +5,125 @@ * Author: dklein */ -#include "FairMQStandaloneMerger.h" -#include -#include -#include "FairMQLogger.h" -#include -#include #include +#include +#include "FairMQLogger.h" +#include "FairMQStandaloneMerger.h" + + +FairMQStandaloneMerger merger; + +static void s_signal_handler (int signal) +{ + std::cout << std::endl << "Caught signal " << signal << std::endl; + + merger.ChangeState(FairMQStandaloneMerger::STOP); + merger.ChangeState(FairMQStandaloneMerger::END); + + std::cout << "Shutdown complete. Bye!" << std::endl; + exit(1); +} + +static void s_catch_signals (void) +{ + struct sigaction action; + action.sa_handler = s_signal_handler; + action.sa_flags = 0; + sigemptyset(&action.sa_mask); + sigaction(SIGINT, &action, NULL); + sigaction(SIGTERM, &action, NULL); +} int main(int argc, char** argv) { - if( argc != 12 ) { - std::cout << "Usage: merger \tID numIoTreads\n" << - "\t\tconnectSocketType connectRcvBufferSize ConnectAddress\n" << - "\t\tconnectSocketType connectRcvBufferSize ConnectAddress\n" << - "\t\tbindSocketType bindSndBufferSize BindAddress\n" << std::endl; + if ( argc != 15 ) { + std::cout << "Usage: merger \tID numIoTreads\n" + << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" + << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" + << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" << std::endl; return 1; } - pid_t pid = getpid(); + s_catch_signals(); + std::stringstream logmsg; - logmsg << "PID: " << pid; + logmsg << "PID: " << getpid(); FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); int i = 1; - FairMQStandaloneMerger* merger = new FairMQStandaloneMerger(); - merger->SetProperty(FairMQStandaloneMerger::Id, argv[i]); + merger.SetProperty(FairMQStandaloneMerger::Id, argv[i]); ++i; int numIoThreads; std::stringstream(argv[i]) >> numIoThreads; - merger->SetProperty(FairMQStandaloneMerger::NumIoThreads, numIoThreads); + merger.SetProperty(FairMQStandaloneMerger::NumIoThreads, numIoThreads); ++i; - int numInputs = 2; - merger->SetProperty(FairMQStandaloneMerger::NumInputs, numInputs); + merger.SetProperty(FairMQStandaloneMerger::NumInputs, 2); + merger.SetProperty(FairMQStandaloneMerger::NumOutputs, 1); - int numOutputs = 1; - merger->SetProperty(FairMQStandaloneMerger::NumOutputs, numOutputs); - merger->Init(); + merger.ChangeState(FairMQStandaloneMerger::INIT); - int connectSocketType = ZMQ_SUB; + + int inputSocketType = ZMQ_SUB; if (strcmp(argv[i], "pull") == 0) { - connectSocketType = ZMQ_PULL; + inputSocketType = ZMQ_PULL; } - merger->SetProperty(FairMQStandaloneMerger::ConnectSocketType, connectSocketType, 0); + merger.SetProperty(FairMQStandaloneMerger::InputSocketType, inputSocketType, 0); + ++i; + int inputRcvBufSize; + std::stringstream(argv[i]) >> inputRcvBufSize; + merger.SetProperty(FairMQStandaloneMerger::InputRcvBufSize, inputRcvBufSize, 0); + ++i; + merger.SetProperty(FairMQStandaloneMerger::InputMethod, argv[i], 0); + ++i; + merger.SetProperty(FairMQStandaloneMerger::InputAddress, argv[i], 0); ++i; - int connectRcvBufferSize; - std::stringstream(argv[i]) >> connectRcvBufferSize; - merger->SetProperty(FairMQStandaloneMerger::ConnectRcvBufferSize, connectRcvBufferSize, 0); - ++i; - - merger->SetProperty(FairMQStandaloneMerger::ConnectAddress, argv[i], 0); - ++i; - - connectSocketType = ZMQ_SUB; + inputSocketType = ZMQ_SUB; if (strcmp(argv[i], "pull") == 0) { - connectSocketType = ZMQ_PULL; + inputSocketType = ZMQ_PULL; } - merger->SetProperty(FairMQStandaloneMerger::ConnectSocketType, connectSocketType, 1); + merger.SetProperty(FairMQStandaloneMerger::InputSocketType, inputSocketType, 1); + ++i; + std::stringstream(argv[i]) >> inputRcvBufSize; + merger.SetProperty(FairMQStandaloneMerger::InputRcvBufSize, inputRcvBufSize, 1); + ++i; + merger.SetProperty(FairMQStandaloneMerger::InputMethod, argv[i], 1); + ++i; + merger.SetProperty(FairMQStandaloneMerger::InputAddress, argv[i], 1); ++i; - std::stringstream(argv[i]) >> connectRcvBufferSize; - merger->SetProperty(FairMQStandaloneMerger::ConnectRcvBufferSize, connectRcvBufferSize, 1); - ++i; - - merger->SetProperty(FairMQStandaloneMerger::ConnectAddress, argv[i], 1); - ++i; - - int bindSocketType = ZMQ_PUB; + int outputSocketType = ZMQ_PUB; if (strcmp(argv[i], "push") == 0) { - bindSocketType = ZMQ_PUSH; + outputSocketType = ZMQ_PUSH; } - merger->SetProperty(FairMQStandaloneMerger::BindSocketType, bindSocketType, 0); + merger.SetProperty(FairMQStandaloneMerger::OutputSocketType, outputSocketType, 0); ++i; - - int bindSndBufferSize; - std::stringstream(argv[i]) >> bindSndBufferSize; - merger->SetProperty(FairMQStandaloneMerger::BindSndBufferSize, bindSndBufferSize, 0); + int outputSndBufSize; + std::stringstream(argv[i]) >> outputSndBufSize; + merger.SetProperty(FairMQStandaloneMerger::OutputSndBufSize, outputSndBufSize, 0); ++i; - - merger->SetProperty(FairMQStandaloneMerger::BindAddress, argv[i], 0); + merger.SetProperty(FairMQStandaloneMerger::OutputMethod, argv[i], 0); + ++i; + merger.SetProperty(FairMQStandaloneMerger::OutputAddress, argv[i], 0); ++i; - merger->Bind(); - merger->Connect(); - merger->Run(); + merger.ChangeState(FairMQStandaloneMerger::SETOUTPUT); + merger.ChangeState(FairMQStandaloneMerger::SETINPUT); + merger.ChangeState(FairMQStandaloneMerger::RUN); - exit(0); + + char ch; + std::cin.get(ch); + + merger.ChangeState(FairMQStandaloneMerger::STOP); + merger.ChangeState(FairMQStandaloneMerger::END); + + return 0; } diff --git a/fairmq/runProxy.cxx b/fairmq/runProxy.cxx new file mode 100644 index 00000000..2032bb5b --- /dev/null +++ b/fairmq/runProxy.cxx @@ -0,0 +1,114 @@ +/** + * runProxy.cxx + * + * @since: Oct 07, 2013 + * @authors: A. Rybalchenko + */ + +#include +#include + +#include "FairMQLogger.h" +#include "FairMQProxy.h" + + +FairMQProxy proxy; + +static void s_signal_handler (int signal) +{ + std::cout << std::endl << "Caught signal " << signal << std::endl; + + proxy.ChangeState(FairMQProxy::STOP); + proxy.ChangeState(FairMQProxy::END); + + std::cout << "Shutdown complete. Bye!" << std::endl; + exit(1); +} + +static void s_catch_signals (void) +{ + struct sigaction action; + action.sa_handler = s_signal_handler; + action.sa_flags = 0; + sigemptyset(&action.sa_mask); + sigaction(SIGINT, &action, NULL); + sigaction(SIGTERM, &action, NULL); +} + +int main(int argc, char** argv) +{ + if ( argc != 11 ) { + std::cout << "Usage: proxy \tID numIoTreads\n" + << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" + << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" << std::endl; + return 1; + } + + s_catch_signals(); + + std::stringstream logmsg; + logmsg << "PID: " << getpid(); + FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); + + int i = 1; + + proxy.SetProperty(FairMQProxy::Id, argv[i]); + ++i; + + int numIoThreads; + std::stringstream(argv[i]) >> numIoThreads; + proxy.SetProperty(FairMQProxy::NumIoThreads, numIoThreads); + ++i; + + proxy.SetProperty(FairMQProxy::NumInputs, 1); + proxy.SetProperty(FairMQProxy::NumOutputs, 1); + + + proxy.ChangeState(FairMQProxy::INIT); + + + int inputSocketType = ZMQ_XSUB; + if (strcmp(argv[i], "pull") == 0) { + inputSocketType = ZMQ_PULL; + } + proxy.SetProperty(FairMQProxy::InputSocketType, inputSocketType, 0); + ++i; + int inputRcvBufSize; + std::stringstream(argv[i]) >> inputRcvBufSize; + proxy.SetProperty(FairMQProxy::InputRcvBufSize, inputRcvBufSize, 0); + ++i; + proxy.SetProperty(FairMQProxy::InputMethod, argv[i], 0); + ++i; + proxy.SetProperty(FairMQProxy::InputAddress, argv[i], 0); + ++i; + + int outputSocketType = ZMQ_XPUB; + if (strcmp(argv[i], "push") == 0) { + outputSocketType = ZMQ_PUSH; + } + proxy.SetProperty(FairMQProxy::OutputSocketType, outputSocketType, 0); + ++i; + int outputSndBufSize; + std::stringstream(argv[i]) >> outputSndBufSize; + proxy.SetProperty(FairMQProxy::OutputSndBufSize, outputSndBufSize, 0); + ++i; + proxy.SetProperty(FairMQProxy::OutputMethod, argv[i], 0); + ++i; + proxy.SetProperty(FairMQProxy::OutputAddress, argv[i], 0); + ++i; + + + proxy.ChangeState(FairMQProxy::SETOUTPUT); + proxy.ChangeState(FairMQProxy::SETINPUT); + proxy.ChangeState(FairMQProxy::RUN); + + + char ch; + std::cin.get(ch); + + proxy.ChangeState(FairMQProxy::STOP); + proxy.ChangeState(FairMQProxy::END); + + return 0; +} + diff --git a/fairmq/runSink.cxx b/fairmq/runSink.cxx index 6cc686d4..f6434a85 100644 --- a/fairmq/runSink.cxx +++ b/fairmq/runSink.cxx @@ -1,71 +1,99 @@ /* * runSink.cxx * - * Created on: Jan 21, 2013 - * Author: dklein + * @since: Jan 21, 2013 + * @author: dklein */ -#include "FairMQSink.h" -#include -#include -#include "FairMQLogger.h" -#include -#include #include +#include +#include "FairMQLogger.h" +#include "FairMQSink.h" + + +FairMQSink sink; + +static void s_signal_handler (int signal) +{ + std::cout << std::endl << "Caught signal " << signal << std::endl; + + sink.ChangeState(FairMQSink::STOP); + sink.ChangeState(FairMQSink::END); + + std::cout << "Shutdown complete. Bye!" << std::endl; + exit(1); +} + +static void s_catch_signals (void) +{ + struct sigaction action; + action.sa_handler = s_signal_handler; + action.sa_flags = 0; + sigemptyset(&action.sa_mask); + sigaction(SIGINT, &action, NULL); + sigaction(SIGTERM, &action, NULL); +} int main(int argc, char** argv) { - if( argc != 6 ) { - std::cout << "Usage: sink \tID numIoTreads\n" << - "\t\tconnectSocketType connectRcvBufferSize ConnectAddress\n" << std::endl; + if ( argc != 7 ) { + std::cout << "Usage: sink \tID numIoTreads\n" + << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" + << std::endl; return 1; } - pid_t pid = getpid(); + s_catch_signals(); + std::stringstream logmsg; - logmsg << "PID: " << pid; + logmsg << "PID: " << getpid(); FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); int i = 1; - FairMQSink* sink = new FairMQSink(); - sink->SetProperty(FairMQSink::Id, argv[i]); + sink.SetProperty(FairMQSink::Id, argv[i]); ++i; int numIoThreads; std::stringstream(argv[i]) >> numIoThreads; - sink->SetProperty(FairMQSink::NumIoThreads, numIoThreads); + sink.SetProperty(FairMQSink::NumIoThreads, numIoThreads); ++i; - int numInputs = 1; - sink->SetProperty(FairMQSink::NumInputs, numInputs); + sink.SetProperty(FairMQSink::NumInputs, 1); + sink.SetProperty(FairMQSink::NumOutputs, 0); - int numOutputs = 0; - sink->SetProperty(FairMQSink::NumOutputs, numOutputs); - sink->Init(); + sink.ChangeState(FairMQSink::INIT); - int connectSocketType = ZMQ_SUB; + + int inputSocketType = ZMQ_SUB; if (strcmp(argv[i], "pull") == 0) { - connectSocketType = ZMQ_PULL; + inputSocketType = ZMQ_PULL; } - sink->SetProperty(FairMQSink::ConnectSocketType, connectSocketType, 0); + sink.SetProperty(FairMQSink::InputSocketType, inputSocketType, 0); ++i; - - int connectRcvBufferSize; - std::stringstream(argv[i]) >> connectRcvBufferSize; - sink->SetProperty(FairMQSink::ConnectRcvBufferSize, connectRcvBufferSize, 0); + int inputRcvBufSize; + std::stringstream(argv[i]) >> inputRcvBufSize; + sink.SetProperty(FairMQSink::InputRcvBufSize, inputRcvBufSize, 0); ++i; - - sink->SetProperty(FairMQSink::ConnectAddress, argv[i], 0); + sink.SetProperty(FairMQSink::InputMethod, argv[i], 0); + ++i; + sink.SetProperty(FairMQSink::InputAddress, argv[i], 0); ++i; - sink->Bind(); - sink->Connect(); - sink->Run(); + sink.ChangeState(FairMQSink::SETOUTPUT); + sink.ChangeState(FairMQSink::SETINPUT); + sink.ChangeState(FairMQSink::RUN); - exit(0); + + char ch; + std::cin.get(ch); + + sink.ChangeState(FairMQSink::STOP); + sink.ChangeState(FairMQSink::END); + + return 0; } diff --git a/fairmq/runSplitter.cxx b/fairmq/runSplitter.cxx index e9da7c78..eceb40df 100644 --- a/fairmq/runSplitter.cxx +++ b/fairmq/runSplitter.cxx @@ -5,98 +5,125 @@ * Author: dklein */ -#include "FairMQBalancedStandaloneSplitter.h" -#include -#include -#include "FairMQLogger.h" -#include -#include #include +#include +#include "FairMQLogger.h" +#include "FairMQBalancedStandaloneSplitter.h" + + +FairMQBalancedStandaloneSplitter splitter; + +static void s_signal_handler (int signal) +{ + std::cout << std::endl << "Caught signal " << signal << std::endl; + + splitter.ChangeState(FairMQBalancedStandaloneSplitter::STOP); + splitter.ChangeState(FairMQBalancedStandaloneSplitter::END); + + std::cout << "Shutdown complete. Bye!" << std::endl; + exit(1); +} + +static void s_catch_signals (void) +{ + struct sigaction action; + action.sa_handler = s_signal_handler; + action.sa_flags = 0; + sigemptyset(&action.sa_mask); + sigaction(SIGINT, &action, NULL); + sigaction(SIGTERM, &action, NULL); +} int main(int argc, char** argv) { - if( argc != 12 ) { - std::cout << "Usage: splitter \tID numIoTreads\n" << - "\t\tconnectSocketType connectRcvBufferSize ConnectAddress\n" << - "\t\tbindSocketType bindSndBufferSize BindAddress\n" << - "\t\tbindSocketType bindSndBufferSize BindAddress\n" << std::endl; + if ( argc != 15 ) { + std::cout << "Usage: splitter \tID numIoTreads\n" + << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" + << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" + << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" << std::endl; return 1; } - pid_t pid = getpid(); + s_catch_signals(); + std::stringstream logmsg; - logmsg << "PID: " << pid; + logmsg << "PID: " << getpid(); FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); int i = 1; - FairMQBalancedStandaloneSplitter* splitter = new FairMQBalancedStandaloneSplitter(); - splitter->SetProperty(FairMQBalancedStandaloneSplitter::Id, argv[i]); + splitter.SetProperty(FairMQBalancedStandaloneSplitter::Id, argv[i]); ++i; int numIoThreads; std::stringstream(argv[i]) >> numIoThreads; - splitter->SetProperty(FairMQBalancedStandaloneSplitter::NumIoThreads, numIoThreads); + splitter.SetProperty(FairMQBalancedStandaloneSplitter::NumIoThreads, numIoThreads); ++i; - int numInputs = 1; - splitter->SetProperty(FairMQBalancedStandaloneSplitter::NumInputs, numInputs); + splitter.SetProperty(FairMQBalancedStandaloneSplitter::NumInputs, 1); + splitter.SetProperty(FairMQBalancedStandaloneSplitter::NumOutputs, 2); - int numOutputs = 2; - splitter->SetProperty(FairMQBalancedStandaloneSplitter::NumOutputs, numOutputs); - splitter->Init(); + splitter.ChangeState(FairMQBalancedStandaloneSplitter::INIT); - int connectSocketType = ZMQ_SUB; + + int inputSocketType = ZMQ_SUB; if (strcmp(argv[i], "pull") == 0) { - connectSocketType = ZMQ_PULL; + inputSocketType = ZMQ_PULL; } - splitter->SetProperty(FairMQBalancedStandaloneSplitter::ConnectSocketType, connectSocketType, 0); + splitter.SetProperty(FairMQBalancedStandaloneSplitter::InputSocketType, inputSocketType, 0); + ++i; + int inputRcvBufSize; + std::stringstream(argv[i]) >> inputRcvBufSize; + splitter.SetProperty(FairMQBalancedStandaloneSplitter::InputRcvBufSize, inputRcvBufSize, 0); + ++i; + splitter.SetProperty(FairMQBalancedStandaloneSplitter::InputMethod, argv[i], 0); + ++i; + splitter.SetProperty(FairMQBalancedStandaloneSplitter::InputAddress, argv[i], 0); ++i; - int connectRcvBufferSize; - std::stringstream(argv[i]) >> connectRcvBufferSize; - splitter->SetProperty(FairMQBalancedStandaloneSplitter::ConnectRcvBufferSize, connectRcvBufferSize, 0); - ++i; - - splitter->SetProperty(FairMQBalancedStandaloneSplitter::ConnectAddress, argv[i], 0); - ++i; - - int bindSocketType = ZMQ_PUB; + int outputSocketType = ZMQ_PUB; if (strcmp(argv[i], "push") == 0) { - bindSocketType = ZMQ_PUSH; + outputSocketType = ZMQ_PUSH; } - splitter->SetProperty(FairMQBalancedStandaloneSplitter::BindSocketType, bindSocketType, 0); + splitter.SetProperty(FairMQBalancedStandaloneSplitter::OutputSocketType, outputSocketType, 0); + ++i; + int outputSndBufSize; + std::stringstream(argv[i]) >> outputSndBufSize; + splitter.SetProperty(FairMQBalancedStandaloneSplitter::OutputSndBufSize, outputSndBufSize, 0); + ++i; + splitter.SetProperty(FairMQBalancedStandaloneSplitter::OutputMethod, argv[i], 0); + ++i; + splitter.SetProperty(FairMQBalancedStandaloneSplitter::OutputAddress, argv[i], 0); ++i; - int bindSndBufferSize; - std::stringstream(argv[i]) >> bindSndBufferSize; - splitter->SetProperty(FairMQBalancedStandaloneSplitter::BindSndBufferSize, bindSndBufferSize, 0); - ++i; - - splitter->SetProperty(FairMQBalancedStandaloneSplitter::BindAddress, argv[i], 0); - ++i; - - bindSocketType = ZMQ_PUB; + outputSocketType = ZMQ_PUB; if (strcmp(argv[i], "push") == 0) { - bindSocketType = ZMQ_PUSH; + outputSocketType = ZMQ_PUSH; } - splitter->SetProperty(FairMQBalancedStandaloneSplitter::BindSocketType, bindSocketType, 1); + splitter.SetProperty(FairMQBalancedStandaloneSplitter::OutputSocketType, outputSocketType, 1); ++i; - - std::stringstream(argv[i]) >> bindSndBufferSize; - splitter->SetProperty(FairMQBalancedStandaloneSplitter::BindSndBufferSize, bindSndBufferSize, 1); + std::stringstream(argv[i]) >> outputSndBufSize; + splitter.SetProperty(FairMQBalancedStandaloneSplitter::OutputSndBufSize, outputSndBufSize, 1); ++i; - - splitter->SetProperty(FairMQBalancedStandaloneSplitter::BindAddress, argv[i], 1); + splitter.SetProperty(FairMQBalancedStandaloneSplitter::OutputMethod, argv[i], 1); + ++i; + splitter.SetProperty(FairMQBalancedStandaloneSplitter::OutputAddress, argv[i], 1); ++i; - splitter->Bind(); - splitter->Connect(); - splitter->Run(); + splitter.ChangeState(FairMQBalancedStandaloneSplitter::SETOUTPUT); + splitter.ChangeState(FairMQBalancedStandaloneSplitter::SETINPUT); + splitter.ChangeState(FairMQBalancedStandaloneSplitter::RUN); - exit(0); + + char ch; + std::cin.get(ch); + + splitter.ChangeState(FairMQBalancedStandaloneSplitter::STOP); + splitter.ChangeState(FairMQBalancedStandaloneSplitter::END); + + return 0; }