From 231c7c8f7eecefdbbce0100597e614cab16903a5 Mon Sep 17 00:00:00 2001 From: Mohammad Al-Turany Date: Fri, 7 Jun 2013 08:07:48 +0000 Subject: [PATCH] Zero MQ implimentation and example (Tutorial3) git-svn-id: https://subversion.gsi.de/fairroot/fairbase/trunk@20162 0381ead4-6506-0410-b988-94b70fbc4730 --- fairmq/CMakeLists.txt | 94 ++++++ fairmq/FairMQBalancedStandaloneSplitter.cxx | 57 ++++ fairmq/FairMQBalancedStandaloneSplitter.h | 23 ++ fairmq/FairMQBenchmarkSampler.cxx | 151 +++++++++ fairmq/FairMQBenchmarkSampler.h | 45 +++ fairmq/FairMQBuffer.cxx | 51 +++ fairmq/FairMQBuffer.h | 24 ++ fairmq/FairMQConfigurable.cxx | 37 +++ fairmq/FairMQConfigurable.h | 29 ++ fairmq/FairMQContext.cxx | 44 +++ fairmq/FairMQContext.h | 31 ++ fairmq/FairMQDevice.cxx | 341 ++++++++++++++++++++ fairmq/FairMQDevice.h | 74 +++++ fairmq/FairMQLogger.cxx | 71 ++++ fairmq/FairMQLogger.h | 45 +++ fairmq/FairMQMessage.cxx | 63 ++++ fairmq/FairMQMessage.h | 28 ++ fairmq/FairMQProcessor.cxx | 65 ++++ fairmq/FairMQProcessor.h | 28 ++ fairmq/FairMQProcessorTask.cxx | 18 ++ fairmq/FairMQProcessorTask.h | 24 ++ fairmq/FairMQSampler.cxx | 193 +++++++++++ fairmq/FairMQSampler.h | 60 ++++ fairmq/FairMQSamplerTask.cxx | 58 ++++ fairmq/FairMQSamplerTask.h | 37 +++ fairmq/FairMQSink.cxx | 44 +++ fairmq/FairMQSink.h | 26 ++ fairmq/FairMQSocket.cxx | 169 ++++++++++ fairmq/FairMQSocket.h | 47 +++ fairmq/FairMQStandaloneMerger.cxx | 76 +++++ fairmq/FairMQStandaloneMerger.h | 24 ++ fairmq/FairMQStateMachine.cxx | 42 +++ fairmq/FairMQStateMachine.h | 32 ++ fairmq/runBenchmarkSampler.cxx | 83 +++++ fairmq/runBuffer.cxx | 87 +++++ fairmq/runMerger.cxx | 102 ++++++ fairmq/runSink.cxx | 71 ++++ fairmq/runSplitter.cxx | 102 ++++++ 38 files changed, 2596 insertions(+) create mode 100644 fairmq/CMakeLists.txt create mode 100644 fairmq/FairMQBalancedStandaloneSplitter.cxx create mode 100644 fairmq/FairMQBalancedStandaloneSplitter.h create mode 100644 fairmq/FairMQBenchmarkSampler.cxx create mode 100644 fairmq/FairMQBenchmarkSampler.h create mode 100644 fairmq/FairMQBuffer.cxx create mode 100644 fairmq/FairMQBuffer.h create mode 100644 fairmq/FairMQConfigurable.cxx create mode 100644 fairmq/FairMQConfigurable.h create mode 100644 fairmq/FairMQContext.cxx create mode 100644 fairmq/FairMQContext.h create mode 100644 fairmq/FairMQDevice.cxx create mode 100644 fairmq/FairMQDevice.h create mode 100644 fairmq/FairMQLogger.cxx create mode 100644 fairmq/FairMQLogger.h create mode 100644 fairmq/FairMQMessage.cxx create mode 100644 fairmq/FairMQMessage.h create mode 100644 fairmq/FairMQProcessor.cxx create mode 100644 fairmq/FairMQProcessor.h create mode 100644 fairmq/FairMQProcessorTask.cxx create mode 100644 fairmq/FairMQProcessorTask.h create mode 100644 fairmq/FairMQSampler.cxx create mode 100644 fairmq/FairMQSampler.h create mode 100644 fairmq/FairMQSamplerTask.cxx create mode 100644 fairmq/FairMQSamplerTask.h create mode 100644 fairmq/FairMQSink.cxx create mode 100644 fairmq/FairMQSink.h create mode 100644 fairmq/FairMQSocket.cxx create mode 100644 fairmq/FairMQSocket.h create mode 100644 fairmq/FairMQStandaloneMerger.cxx create mode 100644 fairmq/FairMQStandaloneMerger.h create mode 100644 fairmq/FairMQStateMachine.cxx create mode 100644 fairmq/FairMQStateMachine.h create mode 100644 fairmq/runBenchmarkSampler.cxx create mode 100644 fairmq/runBuffer.cxx create mode 100644 fairmq/runMerger.cxx create mode 100644 fairmq/runSink.cxx create mode 100644 fairmq/runSplitter.cxx diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt new file mode 100644 index 00000000..6edddfdc --- /dev/null +++ b/fairmq/CMakeLists.txt @@ -0,0 +1,94 @@ +INCLUDE_DIRECTORIES( + ${BASE_INCLUDE_DIRECTORIES} + ${CMAKE_SOURCE_DIR}/fairmq + ${ZMQ_INCLUDE_DIR} + ${ROOT_INCLUDE_DIR} +) + +include_directories(${INCLUDE_DIRECTORIES}) + +SET(HEADERS + "FairMQSampler.h" + "FairMQBenchmarkSampler.h" + #"FairMQStateMachine.h" + "FairMQConfigurable.h" + "FairMQDevice.h" + "FairMQBuffer.h" + "FairMQSamplerTask.h" + "FairMQLogger.h" + "FairMQContext.h" + "FairMQMessage.h" + "FairMQSocket.h" + "FairMQBalancedStandaloneSplitter.h" + "FairMQStandaloneMerger.h" + "FairMQProcessor.h" + "FairMQProcessorTask.h" + "FairMQSink.h" +) + +SET(SOURCES + "FairMQSampler.cxx" + "FairMQBenchmarkSampler.cxx" + #"FairMQStateMachine.cxx" + "FairMQConfigurable.cxx" + "FairMQBuffer.cxx" + "FairMQSamplerTask.cxx" + "FairMQLogger.cxx" + "FairMQContext.cxx" + "FairMQMessage.cxx" + "FairMQSocket.cxx" + "FairMQBalancedStandaloneSplitter.cxx" + "FairMQStandaloneMerger.cxx" + "FairMQProcessor.cxx" + "FairMQProcessorTask.cxx" + "FairMQSink.cxx" + "FairMQDevice.cxx" +) + +set(LINK_DIRECTORIES +${ROOT_LIBRARY_DIR} +) + +link_directories( ${LINK_DIRECTORIES}) + +SET(LINK_LIBRARIES + ${CMAKE_THREAD_LIBS_INIT} + ${ZMQ_LIBRARY_SHARED} + ${ROOT_LIBRARIES} + Base ParBase FairTools GeoBase +) + + +ADD_LIBRARY(FairMQ SHARED ${HEADERS} ${SOURCES}) +SET_TARGET_PROPERTIES(FairMQ PROPERTIES ${FAIRROOT_LIBRARY_PROPERTIES}) +TARGET_LINK_LIBRARIES(FairMQ ${LINK_LIBRARIES}) + +#Set(LIBRARY_NAME FairMQ) + +#GENERATE_LIBRARY() + +ADD_LIBRARY(FairMQStatic STATIC ${HEADERS} ${SOURCES}) +#SET_TARGET_PROPERTIES(FairMQStatic PROPERTIES OUTPUT_NAME FairMQ) +TARGET_LINK_LIBRARIES(FairMQStatic ${LINK_LIBRARIES}) + +ADD_EXECUTABLE(RunBenchmarkSampler runBenchmarkSampler.cxx) +SET_TARGET_PROPERTIES(RunBenchmarkSampler PROPERTIES OUTPUT_NAME bsampler) +TARGET_LINK_LIBRARIES(RunBenchmarkSampler FairMQ) + +ADD_EXECUTABLE(RunBuffer runBuffer.cxx) +SET_TARGET_PROPERTIES(RunBuffer PROPERTIES OUTPUT_NAME buffer) +TARGET_LINK_LIBRARIES(RunBuffer FairMQ) + +ADD_EXECUTABLE(RunSplitter runSplitter.cxx) +SET_TARGET_PROPERTIES(RunSplitter PROPERTIES OUTPUT_NAME splitter) +TARGET_LINK_LIBRARIES(RunSplitter FairMQ) + +ADD_EXECUTABLE(RunMerger runMerger.cxx) +SET_TARGET_PROPERTIES(RunMerger PROPERTIES OUTPUT_NAME merger) +TARGET_LINK_LIBRARIES(RunMerger FairMQ) + +ADD_EXECUTABLE(RunSink runSink.cxx) +SET_TARGET_PROPERTIES(RunSink PROPERTIES OUTPUT_NAME sink) +TARGET_LINK_LIBRARIES(RunSink FairMQ) + + diff --git a/fairmq/FairMQBalancedStandaloneSplitter.cxx b/fairmq/FairMQBalancedStandaloneSplitter.cxx new file mode 100644 index 00000000..f2c60fda --- /dev/null +++ b/fairmq/FairMQBalancedStandaloneSplitter.cxx @@ -0,0 +1,57 @@ +/* + * FairMQBalancedStandaloneSplitter.cxx + * + * Created on: Dec 6, 2012 + * Author: dklein + */ + +#include "FairMQBalancedStandaloneSplitter.h" + +#include "FairMQLogger.h" + +FairMQBalancedStandaloneSplitter::FairMQBalancedStandaloneSplitter() +{ +} + +FairMQBalancedStandaloneSplitter::~FairMQBalancedStandaloneSplitter() +{ +} + +void FairMQBalancedStandaloneSplitter::Run() +{ + void* status; //necessary for pthread_join + FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> Run <<<<<<<"); + + pthread_t logger; + pthread_create(&logger, NULL, &FairMQDevice::callLogSocketRates, this); + + // Initialize poll set + zmq_pollitem_t items[] = { + { *(fPayloadInputs->at(0)->GetSocket()), 0, ZMQ_POLLIN, 0 } + }; + + Bool_t received = false; + Bool_t direction = false; + while (true) { + FairMQMessage msg; + + zmq_poll(items, 1, -1); + + if (items[0].revents & ZMQ_POLLIN) { + received = fPayloadInputs->at(0)->Receive(&msg); + } + + if (received) { + if (direction) { + fPayloadOutputs->at(0)->Send(&msg); + } else { + fPayloadOutputs->at(1)->Send(&msg); + } + direction = !direction; + } + } + + pthread_join(logger, &status); +} + + diff --git a/fairmq/FairMQBalancedStandaloneSplitter.h b/fairmq/FairMQBalancedStandaloneSplitter.h new file mode 100644 index 00000000..aeb294f9 --- /dev/null +++ b/fairmq/FairMQBalancedStandaloneSplitter.h @@ -0,0 +1,23 @@ +/* + * FairMQBalancedStandaloneSplitter.h + * + * Created on: Dec 6, 2012 + * Author: dklein + */ + +#ifndef FAIRMQBALANCEDSTANDALONESPLITTER_H_ +#define FAIRMQBALANCEDSTANDALONESPLITTER_H_ + +#include "FairMQDevice.h" +#include "Rtypes.h" + + +class FairMQBalancedStandaloneSplitter: public FairMQDevice +{ + public: + FairMQBalancedStandaloneSplitter(); + virtual ~FairMQBalancedStandaloneSplitter(); + virtual void Run(); +}; + +#endif /* FAIRMQBALANCEDSTANDALONESPLITTER_H_ */ diff --git a/fairmq/FairMQBenchmarkSampler.cxx b/fairmq/FairMQBenchmarkSampler.cxx new file mode 100644 index 00000000..ce3f7f5c --- /dev/null +++ b/fairmq/FairMQBenchmarkSampler.cxx @@ -0,0 +1,151 @@ +/* + * FairMQBenchmarkSampler.cpp + * + * Created on: Apr 23, 2013 + * Author: dklein + */ +#include +#include + +#include "FairMQBenchmarkSampler.h" +#include "FairMQLogger.h" + +FairMQBenchmarkSampler::FairMQBenchmarkSampler() : + fEventSize(10000), + fEventRate(1), + fEventCounter(0) +{ +} + +FairMQBenchmarkSampler::~FairMQBenchmarkSampler() +{ +} + +void FairMQBenchmarkSampler::Init() +{ + FairMQDevice::Init(); +} + +void FairMQBenchmarkSampler::Run() +{ + void* status; //necessary for pthread_join + FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> Run <<<<<<<"); + usleep(1000000); + + pthread_t logger; + pthread_create(&logger, NULL, &FairMQDevice::callLogSocketRates, this); + + pthread_t resetEventCounter; + pthread_create(&resetEventCounter, NULL, &FairMQBenchmarkSampler::callResetEventCounter, this); + + void* buffer = operator new[](fEventSize); + FairMQMessage* base_event = new FairMQMessage(buffer, fEventSize, NULL); + + while (true) { + FairMQMessage event; + event.Copy(base_event); + + fPayloadOutputs->at(0)->Send(&event); + + --fEventCounter; + + while (fEventCounter == 0) { + usleep(1000); + } + } + + delete base_event; + + pthread_join(logger, &status); + pthread_join(resetEventCounter, &status); +} + +void* FairMQBenchmarkSampler::ResetEventCounter() +{ + while (true) { + fEventCounter = fEventRate / 100; + + usleep(10000); + } +} + +void FairMQBenchmarkSampler::Log(Int_t intervalInMs) +{ + timestamp_t t0; + timestamp_t t1; + ULong_t bytes = fPayloadOutputs->at(0)->GetBytesTx(); + ULong_t messages = fPayloadOutputs->at(0)->GetMessagesTx(); + ULong_t bytesNew; + ULong_t messagesNew; + Double_t megabytesPerSecond = (bytesNew - bytes) / (1024 * 1024); + Double_t messagesPerSecond = (messagesNew - messages); + + t0 = get_timestamp(); + + while (true) { + usleep(intervalInMs * 1000); + + t1 = get_timestamp(); + + bytesNew = fPayloadOutputs->at(0)->GetBytesTx(); + messagesNew = fPayloadOutputs->at(0)->GetMessagesTx(); + + timestamp_t timeSinceLastLog_ms = (t1 - t0) / 1000.0L; + + megabytesPerSecond = ((Double_t) (bytesNew - bytes) / (1024. * 1024.)) / (Double_t) timeSinceLastLog_ms * 1000.; + messagesPerSecond = (Double_t) (messagesNew - messages) / (Double_t) timeSinceLastLog_ms * 1000.; + + std::stringstream logmsg; + logmsg << "send " << messagesPerSecond << " msg/s, " << megabytesPerSecond << " MB/s"; + FairMQLogger::GetInstance()->Log(FairMQLogger::DEBUG, logmsg.str()); + + bytes = bytesNew; + messages = messagesNew; + t0 = t1; + } +} + +void FairMQBenchmarkSampler::SetProperty(Int_t key, TString value, Int_t slot/*= 0*/) +{ + switch (key) { + default: + FairMQDevice::SetProperty(key, value, slot); + break; + } +} + +TString FairMQBenchmarkSampler::GetProperty(Int_t key, TString default_/*= ""*/, Int_t slot/*= 0*/) +{ + switch (key) { + default: + return FairMQDevice::GetProperty(key, default_, slot); + } +} + +void FairMQBenchmarkSampler::SetProperty(Int_t key, Int_t value, Int_t slot/*= 0*/) +{ + switch (key) { + case EventSize: + fEventSize = value; + break; + case EventRate: + fEventRate = value; + break; + default: + FairMQDevice::SetProperty(key, value, slot); + break; + } +} + +Int_t FairMQBenchmarkSampler::GetProperty(Int_t key, Int_t default_/*= 0*/, Int_t slot/*= 0*/) +{ + switch (key) { + case EventSize: + return fEventSize; + case EventRate: + return fEventRate; + default: + return FairMQDevice::GetProperty(key, default_, slot); + } +} + diff --git a/fairmq/FairMQBenchmarkSampler.h b/fairmq/FairMQBenchmarkSampler.h new file mode 100644 index 00000000..9f5613f2 --- /dev/null +++ b/fairmq/FairMQBenchmarkSampler.h @@ -0,0 +1,45 @@ +/* + * FairMQBenchmarkSampler.h + * + * Created on: Apr 23, 2013 + * Author: dklein + */ + +#ifndef FAIRMQBENCHMARKSAMPLER_H_ +#define FAIRMQBENCHMARKSAMPLER_H_ + +#include +#include "FairMQDevice.h" +#include "TString.h" + +/** + * Sampler to generate traffic for benchmarking. + */ + +class FairMQBenchmarkSampler: public FairMQDevice +{ + protected: + Int_t fEventSize; + Int_t fEventRate; + Int_t fEventCounter; + public: + enum { + InputFile = FairMQDevice::Last, + EventRate, + EventSize, + Last + }; + FairMQBenchmarkSampler(); + virtual ~FairMQBenchmarkSampler(); + virtual void Init(); + virtual void Run(); + void Log(Int_t intervalInMs); + void* ResetEventCounter(); + static void* callResetEventCounter(void* arg) { return ((FairMQBenchmarkSampler*)arg)->ResetEventCounter(); } + virtual void SetProperty(Int_t key, TString value, Int_t slot = 0); + virtual TString GetProperty(Int_t key, TString default_ = "", Int_t slot = 0); + virtual void SetProperty(Int_t key, Int_t value, Int_t slot = 0); + virtual Int_t GetProperty(Int_t key, Int_t default_ = 0, Int_t slot = 0); +}; + +#endif /* FAIRMQBENCHMARKSAMPLER_H_ */ diff --git a/fairmq/FairMQBuffer.cxx b/fairmq/FairMQBuffer.cxx new file mode 100644 index 00000000..9a466f77 --- /dev/null +++ b/fairmq/FairMQBuffer.cxx @@ -0,0 +1,51 @@ +/* + * FairMQBuffer.cxx + * + * Created on: Oct 25, 2012 + * Author: dklein + */ + +#include "FairMQBuffer.h" +#include +#include "FairMQLogger.h" + + +FairMQBuffer::FairMQBuffer() +{ +} + +void FairMQBuffer::Run() +{ + void* status; //necessary for pthread_join + FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> Run <<<<<<<"); + + pthread_t logger; + pthread_create(&logger, NULL, &FairMQDevice::callLogSocketRates, this); + + // Initialize poll set + zmq_pollitem_t items[] = { + { *(fPayloadInputs->at(0)->GetSocket()), 0, ZMQ_POLLIN, 0 } + }; + + Bool_t received = false; + while (true) { + FairMQMessage msg; + + zmq_poll(items, 1, -1); + + if (items[0].revents & ZMQ_POLLIN) { + received = fPayloadInputs->at(0)->Receive(&msg); + } + + if (received) { + fPayloadOutputs->at(0)->Send(&msg); + } + } + + pthread_join(logger, &status); +} + +FairMQBuffer::~FairMQBuffer() +{ +} + diff --git a/fairmq/FairMQBuffer.h b/fairmq/FairMQBuffer.h new file mode 100644 index 00000000..428ef3d3 --- /dev/null +++ b/fairmq/FairMQBuffer.h @@ -0,0 +1,24 @@ +/* + * FairMQBuffer.h + * + * Created on: Oct 25, 2012 + * Author: dklein + */ + +#ifndef FAIRMQBUFFER_H_ +#define FAIRMQBUFFER_H_ + +#include "FairMQDevice.h" +#include "Rtypes.h" + + +class FairMQBuffer: public FairMQDevice +{ + private: + public: + FairMQBuffer(); + virtual void Run(); + virtual ~FairMQBuffer(); +}; + +#endif /* FAIRMQBUFFER_H_ */ diff --git a/fairmq/FairMQConfigurable.cxx b/fairmq/FairMQConfigurable.cxx new file mode 100644 index 00000000..d6d6a9b8 --- /dev/null +++ b/fairmq/FairMQConfigurable.cxx @@ -0,0 +1,37 @@ +/* + * FairMQConfigurable.cxx + * + * Created on: Oct 25, 2012 + * Author: dklein + */ + +#include "FairMQConfigurable.h" + + +FairMQConfigurable::FairMQConfigurable() +{ +} + +void FairMQConfigurable::SetProperty(Int_t key, TString value, Int_t slot/*= 0*/) +{ +} + +TString FairMQConfigurable::GetProperty(Int_t key, TString default_/*= ""*/, Int_t slot/*= 0*/) +{ + return default_; +} + +void FairMQConfigurable::SetProperty(Int_t key, Int_t value, Int_t slot/*= 0*/) +{ +} + +Int_t FairMQConfigurable::GetProperty(Int_t key, Int_t default_/*= 0*/, Int_t slot/*= 0*/) +{ + return default_; +} + +FairMQConfigurable::~FairMQConfigurable() +{ +} + + diff --git a/fairmq/FairMQConfigurable.h b/fairmq/FairMQConfigurable.h new file mode 100644 index 00000000..103325a9 --- /dev/null +++ b/fairmq/FairMQConfigurable.h @@ -0,0 +1,29 @@ +/* + * FairMQConfigurable.h + * + * Created on: Oct 25, 2012 + * Author: dklein + */ + +#ifndef FAIRMQCONFIGURABLE_H_ +#define FAIRMQCONFIGURABLE_H_ + +#include "Rtypes.h" +#include "TString.h" + + +class FairMQConfigurable +{ + public: + enum { + Last = 1 + }; + FairMQConfigurable(); + virtual void SetProperty(Int_t key, TString value, Int_t slot = 0); + virtual TString GetProperty(Int_t key, TString default_ = "", Int_t slot = 0); + virtual void SetProperty(Int_t key, Int_t value, Int_t slot = 0); + virtual Int_t GetProperty(Int_t key, Int_t default_ = 0, Int_t slot = 0); + virtual ~FairMQConfigurable(); +}; + +#endif /* FAIRMQCONFIGURABLE_H_ */ diff --git a/fairmq/FairMQContext.cxx b/fairmq/FairMQContext.cxx new file mode 100644 index 00000000..95e85390 --- /dev/null +++ b/fairmq/FairMQContext.cxx @@ -0,0 +1,44 @@ +/* + * FairMQContext.cxx + * + * Created on: Dec 5, 2012 + * Author: dklein + */ + +#include "FairMQContext.h" +#include + + +const TString FairMQContext::PAYLOAD = "payload"; +const TString FairMQContext::LOG = "log"; +const TString FairMQContext::CONFIG = "config"; +const TString FairMQContext::CONTROL = "control"; + +FairMQContext::FairMQContext(TString deviceId, TString contextId, Int_t numIoThreads) +{ + std::stringstream id; + id << deviceId << "." << contextId; + fId = id.str(); + + fContext = new zmq::context_t(numIoThreads); +} + +FairMQContext::~FairMQContext() +{ +} + +TString FairMQContext::GetId() +{ + return fId; +} + +zmq::context_t* FairMQContext::GetContext() +{ + return fContext; +} + +void FairMQContext::Close() +{ + fContext->close(); +} + diff --git a/fairmq/FairMQContext.h b/fairmq/FairMQContext.h new file mode 100644 index 00000000..046ece47 --- /dev/null +++ b/fairmq/FairMQContext.h @@ -0,0 +1,31 @@ +/* + * FairMQContext.h + * + * Created on: Dec 5, 2012 + * Author: dklein + */ + +#ifndef FAIRMQCONTEXT_H_ +#define FAIRMQCONTEXT_H_ + +#include +#include +#include "Rtypes.h" +#include "TString.h" + + +class FairMQContext +{ + private: + TString fId; + zmq::context_t* fContext; + public: + const static TString PAYLOAD, LOG, CONFIG, CONTROL; + FairMQContext(TString deviceId, TString contextId, Int_t numIoThreads); + virtual ~FairMQContext(); + TString GetId(); + zmq::context_t* GetContext(); + void Close(); +}; + +#endif /* FAIRMQCONTEXT_H_ */ diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx new file mode 100644 index 00000000..a4014285 --- /dev/null +++ b/fairmq/FairMQDevice.cxx @@ -0,0 +1,341 @@ +/** + * FairMQDevice.cxx + * + * @since Oct 25, 2012 + * @authors: D. Klein + */ + +#include "FairMQSocket.h" +#include "FairMQDevice.h" + +#include "FairMQLogger.h" + +FairMQDevice::FairMQDevice() : + fId(""), + fNumIoThreads(1), + fPayloadContext(NULL), + fPayloadInputs(new std::vector()), + fPayloadOutputs(new std::vector()), + fLogIntervalInMs(1000) +{ + +} + +void FairMQDevice::Init() +{ + FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> Init <<<<<<<"); + std::stringstream logmsg; + logmsg << "numIoThreads: " << fNumIoThreads; + FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); + + fPayloadContext = new FairMQContext(fId, FairMQContext::PAYLOAD, fNumIoThreads); + + fBindAddress = new std::vector(fNumOutputs); + fBindSocketType = new std::vector(); + fBindSndBufferSize = new std::vector(); + fBindRcvBufferSize = new std::vector(); + + for (Int_t i = 0; i < fNumOutputs; ++i) { + fBindSocketType->push_back(ZMQ_PUB); + fBindSndBufferSize->push_back(10000); + fBindRcvBufferSize->push_back(10000); + } + + fConnectAddress = new std::vector(fNumInputs); + fConnectSocketType = new std::vector(); + fConnectSndBufferSize = new std::vector(); + fConnectRcvBufferSize = new std::vector(); + + for (Int_t i = 0; i < fNumInputs; ++i) { + fConnectSocketType->push_back(ZMQ_SUB); + fConnectSndBufferSize->push_back(10000); + fConnectRcvBufferSize->push_back(10000); + } +} + +void FairMQDevice::Bind() +{ + FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> Bind <<<<<<<"); + + for (Int_t i = 0; i < fNumOutputs; ++i) { + FairMQSocket* socket = new FairMQSocket(fPayloadContext, fBindSocketType->at(i), i); + socket->GetSocket()->setsockopt(ZMQ_SNDHWM, &fBindSndBufferSize->at(i), sizeof(fBindSndBufferSize->at(i))); + socket->GetSocket()->setsockopt(ZMQ_RCVHWM, &fBindRcvBufferSize->at(i), sizeof(fBindRcvBufferSize->at(i))); + fPayloadOutputs->push_back(socket); + } + + Int_t i = 0; + + for( std::vector::iterator itr = fPayloadOutputs->begin(); itr != fPayloadOutputs->end(); itr++ ) { + try { + (*itr)->Bind(fBindAddress->at(i)); + } catch (std::out_of_range& e) { + } + + ++i; + } + +} + +void FairMQDevice::Connect() +{ + FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> Connect <<<<<<<"); + + for (Int_t i = 0; i < fNumInputs; ++i) { + FairMQSocket* socket = new FairMQSocket(fPayloadContext, fConnectSocketType->at(i), i); + socket->GetSocket()->setsockopt(ZMQ_SNDHWM, &fConnectSndBufferSize->at(i), sizeof(fConnectSndBufferSize->at(i))); + socket->GetSocket()->setsockopt(ZMQ_RCVHWM, &fConnectRcvBufferSize->at(i), sizeof(fConnectRcvBufferSize->at(i))); + fPayloadInputs->push_back(socket); + } + + Int_t i = 0; + + for( std::vector::iterator itr = fPayloadInputs->begin(); itr != fPayloadInputs->end(); itr++ ) { + try { + (*itr)->Connect(fConnectAddress->at(i)); + } catch (std::out_of_range& e) { + } + + ++i; + } + +} + +void FairMQDevice::Run() +{ +} + +void FairMQDevice::Pause() +{ +} + +void FairMQDevice::Shutdown() +{ + for( std::vector::iterator itr = fPayloadInputs->begin(); itr != fPayloadInputs->end(); itr++ ) { + (*itr)->Close(); + } + + for( std::vector::iterator itr = fPayloadOutputs->begin(); itr != fPayloadOutputs->end(); itr++ ) { + (*itr)->Close(); + } + + fPayloadContext->Close(); +} + +void* FairMQDevice::LogSocketRates() +{ + timestamp_t t0; + timestamp_t t1; + + timestamp_t timeSinceLastLog_ms; + + ULong_t* bytesInput = new ULong_t[fNumInputs]; + ULong_t* messagesInput = new ULong_t[fNumInputs]; + ULong_t* bytesOutput = new ULong_t[fNumOutputs]; + ULong_t* messagesOutput = new ULong_t[fNumOutputs]; + + ULong_t* bytesInputNew = new ULong_t[fNumInputs]; + ULong_t* messagesInputNew = new ULong_t[fNumInputs]; + ULong_t* bytesOutputNew = new ULong_t[fNumOutputs]; + ULong_t* messagesOutputNew = new ULong_t[fNumOutputs]; + + Double_t* megabytesPerSecondInput = new Double_t[fNumInputs]; + Double_t* messagesPerSecondInput = new Double_t[fNumInputs]; + Double_t* megabytesPerSecondOutput = new Double_t[fNumOutputs]; + Double_t* messagesPerSecondOutput = new Double_t[fNumOutputs]; + + Int_t i = 0; + for( std::vector::iterator itr = fPayloadInputs->begin(); itr != fPayloadInputs->end(); itr++ ) { + bytesInput[i] = (*itr)->GetBytesRx(); + messagesInput[i] = (*itr)->GetMessagesRx(); + ++i; + } + + i = 0; + for( std::vector::iterator itr = fPayloadOutputs->begin(); itr != fPayloadOutputs->end(); itr++ ) { + bytesOutput[i] = (*itr)->GetBytesTx(); + messagesOutput[i] = (*itr)->GetMessagesTx(); + ++i; + } + + t0 = get_timestamp(); + + while (true) { + usleep(fLogIntervalInMs * 1000); + + t1 = get_timestamp(); + + timeSinceLastLog_ms = (t1 - t0) / 1000.0L; + + i = 0; + + for( std::vector::iterator itr = fPayloadInputs->begin(); itr != fPayloadInputs->end(); itr++ ) { + bytesInputNew[i] = (*itr)->GetBytesRx(); + megabytesPerSecondInput[i] = ((Double_t) (bytesInputNew[i] - bytesInput[i]) / (1024. * 1024.)) / (Double_t) timeSinceLastLog_ms * 1000.; + bytesInput[i] = bytesInputNew[i]; + messagesInputNew[i] = (*itr)->GetMessagesRx(); + messagesPerSecondInput[i] = (Double_t) (messagesInputNew[i] - messagesInput[i]) / (Double_t) timeSinceLastLog_ms * 1000.; + messagesInput[i] = messagesInputNew[i]; + + std::stringstream logmsg; + logmsg << "#" << (*itr)->GetId() << ": " << messagesPerSecondInput[i] << " msg/s, " << megabytesPerSecondInput[i] << " MB/s"; + FairMQLogger::GetInstance()->Log(FairMQLogger::DEBUG, logmsg.str()); + + ++i; + } + + i = 0; + + for( std::vector::iterator itr = fPayloadOutputs->begin(); itr != fPayloadOutputs->end(); itr++ ) { + bytesOutputNew[i] = (*itr)->GetBytesTx(); + megabytesPerSecondOutput[i] = ((Double_t) (bytesOutputNew[i] - bytesOutput[i]) / (1024. * 1024.)) / (Double_t) timeSinceLastLog_ms * 1000.; + bytesOutput[i] = bytesOutputNew[i]; + messagesOutputNew[i] = (*itr)->GetMessagesTx(); + messagesPerSecondOutput[i] = (Double_t) (messagesOutputNew[i] - messagesOutput[i]) / (Double_t) timeSinceLastLog_ms * 1000.; + messagesOutput[i] = messagesOutputNew[i]; + + std::stringstream logmsg; + logmsg << "#" << (*itr)->GetId() << ": " << messagesPerSecondOutput[i] << " msg/s, " << megabytesPerSecondOutput[i] << " MB/s"; + FairMQLogger::GetInstance()->Log(FairMQLogger::DEBUG, logmsg.str()); + + ++i; + } + + t0 = t1; + } + + delete[] bytesInput; + delete[] messagesInput; + delete[] bytesOutput; + delete[] messagesOutput; + + delete[] bytesInputNew; + delete[] messagesInputNew; + delete[] bytesOutputNew; + delete[] messagesOutputNew; + + delete[] megabytesPerSecondInput; + delete[] messagesPerSecondInput; + delete[] megabytesPerSecondOutput; + delete[] messagesPerSecondOutput; +} + +void FairMQDevice::SetProperty(Int_t key, TString value, Int_t slot/*= 0*/) +{ + switch (key) { + case Id: + fId = value; + break; + case BindAddress: + fBindAddress->erase(fBindAddress->begin() + slot); + fBindAddress->insert(fBindAddress->begin() + slot, value); + break; + case ConnectAddress: + fConnectAddress->erase(fConnectAddress->begin() + slot); + fConnectAddress->insert(fConnectAddress->begin() + slot, value); + break; + default: + FairMQConfigurable::SetProperty(key, value, slot); + break; + } +} + +TString FairMQDevice::GetProperty(Int_t key, TString default_/*= ""*/, Int_t slot/*= 0*/) +{ + switch (key) { + case Id: + return fId; + case BindAddress: + return fBindAddress->at(slot); + case ConnectAddress: + return fConnectAddress->at(slot); + default: + return FairMQConfigurable::GetProperty(key, default_, slot); + } +} + +void FairMQDevice::SetProperty(Int_t key, Int_t value, Int_t slot/*= 0*/) +{ + switch (key) { + case NumIoThreads: + fNumIoThreads = value; + break; + case NumInputs: + fNumInputs = value; + break; + case NumOutputs: + fNumOutputs = value; + break; + case LogIntervalInMs: + fLogIntervalInMs = value; + break; + case BindSocketType: + fBindSocketType->erase(fBindSocketType->begin() + slot); + fBindSocketType->insert(fBindSocketType->begin() + slot, value); + break; + case BindSndBufferSize: + fBindSndBufferSize->erase(fBindSndBufferSize->begin() + slot); + fBindSndBufferSize->insert(fBindSndBufferSize->begin() + slot, value); + break; + case BindRcvBufferSize: + fBindRcvBufferSize->erase(fBindRcvBufferSize->begin() + slot); + fBindRcvBufferSize->insert(fBindRcvBufferSize->begin() + slot, value); + break; + case ConnectSocketType: + fConnectSocketType->erase(fConnectSocketType->begin() + slot); + fConnectSocketType->insert(fConnectSocketType->begin() + slot, value); + break; + case ConnectSndBufferSize: + fConnectSndBufferSize->erase(fConnectSndBufferSize->begin() + slot); + fConnectSndBufferSize->insert(fConnectSndBufferSize->begin() + slot, value); + break; + case ConnectRcvBufferSize: + fConnectRcvBufferSize->erase(fConnectRcvBufferSize->begin() + slot); + fConnectRcvBufferSize->insert(fConnectRcvBufferSize->begin() + slot, value); + break; + default: + FairMQConfigurable::SetProperty(key, value, slot); + break; + } +} + +Int_t FairMQDevice::GetProperty(Int_t key, Int_t default_/*= 0*/, Int_t slot/*= 0*/) +{ + switch (key) { + case NumIoThreads: + return fNumIoThreads; + case LogIntervalInMs: + return fLogIntervalInMs; + case BindSocketType: + return fBindSocketType->at(slot); + case ConnectSocketType: + return fConnectSocketType->at(slot); + case ConnectSndBufferSize: + return fConnectSndBufferSize->at(slot); + case ConnectRcvBufferSize: + return fConnectRcvBufferSize->at(slot); + case BindSndBufferSize: + return fBindSndBufferSize->at(slot); + case BindRcvBufferSize: + return fBindRcvBufferSize->at(slot); + default: + return FairMQConfigurable::GetProperty(key, default_, slot); + } +} + +FairMQDevice::~FairMQDevice() +{ + for( std::vector::iterator itr = fPayloadInputs->begin(); itr != fPayloadInputs->end(); itr++ ) { + delete (*itr); + } + + for( std::vector::iterator itr = fPayloadOutputs->begin(); itr != fPayloadOutputs->end(); itr++ ) { + delete (*itr); + } + + delete fBindAddress; + delete fConnectAddress; + delete fPayloadInputs; + delete fPayloadOutputs; +} + diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h new file mode 100644 index 00000000..573f638c --- /dev/null +++ b/fairmq/FairMQDevice.h @@ -0,0 +1,74 @@ +/** + * FairMQDevice.h + * + * @since Oct 25, 2012 + * @authors: D. Klein + */ + +#ifndef FAIRMQDEVICE_H_ +#define FAIRMQDEVICE_H_ + +#include "FairMQConfigurable.h" +#include "FairMQStateMachine.h" +#include +#include "FairMQContext.h" +#include "FairMQSocket.h" +#include +//#include "FairMQLogger.h" +#include "Rtypes.h" +#include "TString.h" + + +class FairMQDevice : /*public FairMQStateMachine,*/ public FairMQConfigurable +{ + protected: + TString fId; + Int_t fNumIoThreads; + FairMQContext* fPayloadContext; + std::vector *fBindAddress; + std::vector *fBindSocketType; + std::vector *fBindSndBufferSize; + std::vector *fBindRcvBufferSize; + std::vector *fConnectAddress; + std::vector *fConnectSocketType; + std::vector *fConnectSndBufferSize; + std::vector *fConnectRcvBufferSize; + std::vector *fPayloadInputs; + std::vector *fPayloadOutputs; + Int_t fLogIntervalInMs; + Int_t fNumInputs; + Int_t fNumOutputs; + public: + enum { + Id = FairMQConfigurable::Last, + NumIoThreads, + NumInputs, + NumOutputs, + BindAddress, + BindSocketType, + BindSndBufferSize, + BindRcvBufferSize, + ConnectAddress, + ConnectSocketType, + ConnectSndBufferSize, + ConnectRcvBufferSize, + LogIntervalInMs, + Last + }; + FairMQDevice(); + virtual void Init(); + virtual void Bind(); + virtual void Connect(); + virtual void Run(); + virtual void Pause(); + virtual void Shutdown(); + virtual void* LogSocketRates(); + static void* callLogSocketRates(void* arg) { return ((FairMQDevice*)arg)->LogSocketRates(); } + virtual void SetProperty(Int_t key, TString value, Int_t slot = 0); + virtual TString GetProperty(Int_t key, TString default_ = "", Int_t slot = 0); + virtual void SetProperty(Int_t key, Int_t value, Int_t slot = 0); + virtual Int_t GetProperty(Int_t key, Int_t default_ = 0, Int_t slot = 0); + virtual ~FairMQDevice(); +}; + +#endif /* FAIRMQDEVICE_H_ */ diff --git a/fairmq/FairMQLogger.cxx b/fairmq/FairMQLogger.cxx new file mode 100644 index 00000000..9743c674 --- /dev/null +++ b/fairmq/FairMQLogger.cxx @@ -0,0 +1,71 @@ +/* + * FairMQLogger.cxx + * + * Created on: Dec 4, 2012 + * Author: dklein + */ + +#include "FairMQLogger.h" +#include +#include +#include + + +FairMQLogger* FairMQLogger::instance = NULL; + +FairMQLogger* FairMQLogger::GetInstance() +{ + if (instance == NULL) { + instance = new FairMQLogger(); + } + return instance; +} + +FairMQLogger* FairMQLogger::InitInstance(TString bindAddress) +{ + instance = new FairMQLogger(bindAddress); + return instance; +} + +FairMQLogger::FairMQLogger() : + fBindAddress("") +{ +} + +FairMQLogger::FairMQLogger(TString bindAddress) : + fBindAddress(bindAddress) +{ +} + +FairMQLogger::~FairMQLogger() +{ +} + +void FairMQLogger::Log(Int_t type, TString logmsg) +{ + timestamp_t tm = get_timestamp(); + timestamp_t ms = tm / 1000.0L; + timestamp_t s = ms / 1000.0L; + std::time_t t = s; + std::size_t fractional_seconds = ms % 1000; + Text_t mbstr[100]; + std::strftime(mbstr, 100, "%H:%M:%S:", std::localtime(&t)); + + TString type_str; + switch (type) { + case DEBUG: + type_str = "\033[01;34mDEBUG\033[0m"; + break; + case INFO: + type_str = "\033[01;32mINFO\033[0m"; + break; + case ERROR: + type_str = "\033[01;31mERROR\033[0m"; + break; + default: + break; + } + + std::cout << "[\033[01;36m" << mbstr << fractional_seconds << "\033[0m]" << "[" << type_str << "]" << " " << logmsg << std::endl; +} + diff --git a/fairmq/FairMQLogger.h b/fairmq/FairMQLogger.h new file mode 100644 index 00000000..f1633000 --- /dev/null +++ b/fairmq/FairMQLogger.h @@ -0,0 +1,45 @@ +/* + * FairMQLogger.h + * + * Created on: Dec 4, 2012 + * Author: dklein + */ + +#ifndef FAIRMQLOGGER_H_ +#define FAIRMQLOGGER_H_ +#include +#include +//#ifndef _MAKECINT_ +#include +//#endif +#include "Rtypes.h" +#include "TString.h" + + +class FairMQLogger +{ + private: + static FairMQLogger* instance; + TString fBindAddress; + public: + enum { + DEBUG, INFO, ERROR + }; + FairMQLogger(); + FairMQLogger(TString bindAdress); + virtual ~FairMQLogger(); + void Log(Int_t type, TString logmsg); + static FairMQLogger* GetInstance(); + static FairMQLogger* InitInstance(TString bindAddress); +}; + +typedef unsigned long long timestamp_t; + +static timestamp_t get_timestamp () +{ + struct timeval now; + gettimeofday (&now, NULL); + return now.tv_usec + (timestamp_t)now.tv_sec * 1000000; +} + +#endif /* FAIRMQLOGGER_H_ */ diff --git a/fairmq/FairMQMessage.cxx b/fairmq/FairMQMessage.cxx new file mode 100644 index 00000000..57c6e2ee --- /dev/null +++ b/fairmq/FairMQMessage.cxx @@ -0,0 +1,63 @@ +/* + * FairMQMessage.cxx + * + * Created on: Dec 5, 2012 + * Author: dklein + */ + +#include "FairMQMessage.h" +#include "FairMQLogger.h" + + +FairMQMessage::FairMQMessage() +{ + try { + fMessage = new zmq::message_t(); + } catch (zmq::error_t& e) { + std::stringstream logmsg; + logmsg << "failed allocating new message, reason: " << e.what(); + FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); + } +} + +FairMQMessage::FairMQMessage(void* data_, size_t size_, zmq::free_fn* ffn_, void* hint_/*= NULL*/) +{ + try { + fMessage = new zmq::message_t(data_, size_, ffn_, hint_); + } catch (zmq::error_t& e) { + std::stringstream logmsg; + logmsg << "failed allocating new message, reason: " << e.what(); + FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); + } +} + +FairMQMessage::~FairMQMessage() +{ + delete fMessage; +} + +zmq::message_t* FairMQMessage::GetMessage() +{ + return fMessage; +} + +Int_t FairMQMessage::Size() +{ + return fMessage->size(); +} + +Bool_t FairMQMessage::Copy(FairMQMessage* msg) +{ + Bool_t result = false; + + try { + fMessage->copy(msg->GetMessage()); + } catch (zmq::error_t& e) { + std::stringstream logmsg; + logmsg << "failed copying message, reason: " << e.what(); + FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); + } + + return result; +} + diff --git a/fairmq/FairMQMessage.h b/fairmq/FairMQMessage.h new file mode 100644 index 00000000..7db45665 --- /dev/null +++ b/fairmq/FairMQMessage.h @@ -0,0 +1,28 @@ +/* + * FairMQMessage.h + * + * Created on: Dec 5, 2012 + * Author: dklein + */ + +#ifndef FAIRMQMESSAGE_H_ +#define FAIRMQMESSAGE_H_ + +#include +#include "Rtypes.h" + + +class FairMQMessage +{ + private: + zmq::message_t* fMessage; + public: + FairMQMessage(); + FairMQMessage(void* data_, size_t size_, zmq::free_fn* ffn_, void* hint_ = NULL); + virtual ~FairMQMessage(); + zmq::message_t* GetMessage(); + Int_t Size(); + Bool_t Copy(FairMQMessage* msg); +}; + +#endif /* FAIRMQMESSAGE_H_ */ diff --git a/fairmq/FairMQProcessor.cxx b/fairmq/FairMQProcessor.cxx new file mode 100644 index 00000000..4a9fab78 --- /dev/null +++ b/fairmq/FairMQProcessor.cxx @@ -0,0 +1,65 @@ +/* + * FairMQProcessor.cxx + * + * Created on: Dec 6, 2012 + * Author: dklein + */ + +#include "FairMQProcessor.h" +#include "FairMQLogger.h" + +FairMQProcessor::FairMQProcessor() : + fTask(NULL) +{ +} + +FairMQProcessor::~FairMQProcessor() +{ + delete fTask; +} + +void FairMQProcessor::SetTask(FairMQProcessorTask* task) +{ + fTask = task; +} + +void FairMQProcessor::Init() +{ + FairMQDevice::Init(); + + fTask->InitTask(); +} + +void FairMQProcessor::Run() +{ + FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> Run <<<<<<<"); + + void* status; //necessary for pthread_join + pthread_t logger; + pthread_create(&logger, NULL, &FairMQDevice::callLogSocketRates, this); + + // Initialize poll set + zmq_pollitem_t items[] = { + { *(fPayloadInputs->at(0)->GetSocket()), 0, ZMQ_POLLIN, 0 } + }; + + Bool_t received = false; + while (true) { + FairMQMessage msg; + + zmq_poll(items, 1, -1); + + if (items[0].revents & ZMQ_POLLIN) { + received = fPayloadInputs->at(0)->Receive(&msg); + } + + if (received) { + fTask->Exec(&msg, NULL); + + fPayloadOutputs->at(0)->Send(&msg); + } + } + + pthread_join(logger, &status); +} + diff --git a/fairmq/FairMQProcessor.h b/fairmq/FairMQProcessor.h new file mode 100644 index 00000000..b9b43d38 --- /dev/null +++ b/fairmq/FairMQProcessor.h @@ -0,0 +1,28 @@ +/* + * FairMQProcessor.h + * + * Created on: Dec 6, 2012 + * Author: dklein + */ + +#ifndef FAIRMQPROCESSOR_H_ +#define FAIRMQPROCESSOR_H_ + +#include "FairMQDevice.h" +#include "FairMQProcessorTask.h" +#include "Rtypes.h" + + +class FairMQProcessor: public FairMQDevice +{ + public: + FairMQProcessor(); + virtual ~FairMQProcessor(); + void SetTask(FairMQProcessorTask* task); + virtual void Init(); + virtual void Run(); + private: + FairMQProcessorTask* fTask; +}; + +#endif /* FAIRMQPROCESSOR_H_ */ diff --git a/fairmq/FairMQProcessorTask.cxx b/fairmq/FairMQProcessorTask.cxx new file mode 100644 index 00000000..f6888b25 --- /dev/null +++ b/fairmq/FairMQProcessorTask.cxx @@ -0,0 +1,18 @@ +/* + * FairMQProcessorTask.cxx + * + * Created on: Dec 6, 2012 + * Author: dklein + */ + +#include "FairMQProcessorTask.h" + + +FairMQProcessorTask::FairMQProcessorTask() +{ +} + +FairMQProcessorTask::~FairMQProcessorTask() +{ +} + diff --git a/fairmq/FairMQProcessorTask.h b/fairmq/FairMQProcessorTask.h new file mode 100644 index 00000000..c9c8b9c6 --- /dev/null +++ b/fairmq/FairMQProcessorTask.h @@ -0,0 +1,24 @@ +/* + * FairMQProcessorTask.h + * + * Created on: Dec 6, 2012 + * Author: dklein + */ + +#ifndef FAIRMQPROCESSORTASK_H_ +#define FAIRMQPROCESSORTASK_H_ + +#include +#include "FairMQMessage.h" +#include "FairTask.h" + + +class FairMQProcessorTask : public FairTask +{ + public: + FairMQProcessorTask(); + virtual ~FairMQProcessorTask(); + virtual void Exec(FairMQMessage* msg, Option_t* opt) = 0; +}; + +#endif /* FAIRMQPROCESSORTASK_H_ */ diff --git a/fairmq/FairMQSampler.cxx b/fairmq/FairMQSampler.cxx new file mode 100644 index 00000000..b76ff0a3 --- /dev/null +++ b/fairmq/FairMQSampler.cxx @@ -0,0 +1,193 @@ +/* + * FairMQSampler.cpp + * + * Created on: Sep 27, 2012 + * Author: dklein + */ +#include +#include + +#include "TList.h" +#include "TObjString.h" +#include "TClonesArray.h" +#include "FairParRootFileIo.h" +#include "FairRuntimeDb.h" +#include "TROOT.h" + +#include "FairMQSampler.h" +#include "FairMQLogger.h" + + +FairMQSampler::FairMQSampler() : + fFairRunAna(new FairRunAna()), + fSamplerTask(NULL), + fInputFile(""), + fBranch(""), + fParFile(""), + fEventRate(1) +{ +} + +FairMQSampler::~FairMQSampler() +{ + delete fSamplerTask; + delete fFairRunAna; +} + +void FairMQSampler::Init() +{ + FairMQDevice::Init(); + + fSamplerTask->SetBranch(fBranch); + + TString rootlogon_macro = TString(getenv("VMCWORKDIR")) + "/gconfig/rootlogon.C"; + gROOT->LoadMacro(rootlogon_macro.Data()); + gROOT->ProcessLine("rootlogon()"); + + fFairRunAna->SetInputFile(TString(fInputFile)); + fFairRunAna->SetOutputFile("dummy.out"); + + fFairRunAna->AddTask(fSamplerTask); + + FairRuntimeDb* rtdb = fFairRunAna->GetRuntimeDb(); + FairParRootFileIo* parInput1 = new FairParRootFileIo(); + parInput1->open(TString(fParFile).Data()); + rtdb->setFirstInput(parInput1); + rtdb->print(); + + // read complete file and extract digis. + fFairRunAna->Init(); + fFairRunAna->Run(0, 0); +} + +void FairMQSampler::Run() +{ + void* status; //necessary for pthread_join + FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> Run <<<<<<<"); + usleep(1000000); + + pthread_t logger; + pthread_create(&logger, NULL, &FairMQDevice::callLogSocketRates, this); + + pthread_t resetEventCounter; + pthread_create(&resetEventCounter, NULL, &FairMQSampler::callResetEventCounter, this); + + while (true) { + for( std::vector::iterator itr = fSamplerTask->GetOutput()->begin(); itr != fSamplerTask->GetOutput()->end(); itr++ ) { + FairMQMessage event; + event.Copy(*itr); + + fPayloadOutputs->at(0)->Send(&event); + + --fEventCounter; + + while (fEventCounter == 0) { + usleep(1000); + } + } + + } + + pthread_join(logger, &status); + pthread_join(resetEventCounter, &status); +} + +void* FairMQSampler::ResetEventCounter() +{ + while (true) { + fEventCounter = fEventRate / 100; + + usleep(10000); + } +} + +void FairMQSampler::Log(Int_t intervalInMs) +{ + timestamp_t t0; + timestamp_t t1; + ULong_t bytes = fPayloadOutputs->at(0)->GetBytesTx(); + ULong_t messages = fPayloadOutputs->at(0)->GetMessagesTx(); + ULong_t bytesNew; + ULong_t messagesNew; + Double_t megabytesPerSecond = (bytesNew - bytes) / (1024 * 1024); + Double_t messagesPerSecond = (messagesNew - messages); + + t0 = get_timestamp(); + + while (true) { + usleep(intervalInMs * 1000); + + t1 = get_timestamp(); + + bytesNew = fPayloadOutputs->at(0)->GetBytesTx(); + messagesNew = fPayloadOutputs->at(0)->GetMessagesTx(); + + timestamp_t timeSinceLastLog_ms = (t1 - t0) / 1000.0L; + + megabytesPerSecond = ((Double_t) (bytesNew - bytes) / (1024. * 1024.)) / (Double_t) timeSinceLastLog_ms * 1000.; + messagesPerSecond = (Double_t) (messagesNew - messages) / (Double_t) timeSinceLastLog_ms * 1000.; + + std::stringstream logmsg; + logmsg << "send " << messagesPerSecond << " msg/s, " << megabytesPerSecond << " MB/s"; + FairMQLogger::GetInstance()->Log(FairMQLogger::DEBUG, logmsg.str()); + + bytes = bytesNew; + messages = messagesNew; + t0 = t1; + } +} + +void FairMQSampler::SetProperty(Int_t key, TString value, Int_t slot/*= 0*/) +{ + switch (key) { + case InputFile: + fInputFile = value; + break; + case ParFile: + fParFile = value; + break; + case Branch: + fBranch = value; + break; + default: + FairMQDevice::SetProperty(key, value, slot); + break; + } +} + +TString FairMQSampler::GetProperty(Int_t key, TString default_/*= ""*/, Int_t slot/*= 0*/) +{ + switch (key) { + case InputFile: + return fInputFile; + case ParFile: + return fParFile; + case Branch: + return fBranch; + default: + return FairMQDevice::GetProperty(key, default_, slot); + } +} + +void FairMQSampler::SetProperty(Int_t key, Int_t value, Int_t slot/*= 0*/) +{ + switch (key) { + case EventRate: + fEventRate = value; + break; + default: + FairMQDevice::SetProperty(key, value, slot); + break; + } +} + +Int_t FairMQSampler::GetProperty(Int_t key, Int_t default_/*= 0*/, Int_t slot/*= 0*/) +{ + switch (key) { + case EventRate: + return fEventRate; + default: + return FairMQDevice::GetProperty(key, default_, slot); + } +} + diff --git a/fairmq/FairMQSampler.h b/fairmq/FairMQSampler.h new file mode 100644 index 00000000..3cca6255 --- /dev/null +++ b/fairmq/FairMQSampler.h @@ -0,0 +1,60 @@ +/* + * FairMQSampler.h + * + * Created on: Sep 27, 2012 + * Author: dklein + */ + +#ifndef FAIRMQSAMPLER_H_ +#define FAIRMQSAMPLER_H_ + +#include +#include "FairRunAna.h" +#include "FairTask.h" +#include "FairMQDevice.h" +#include "FairMQSamplerTask.h" +#include "TString.h" + + +/** + * Reads simulated digis from a root file and samples the digi as a time-series UDP stream. + * Must be initialized with the filename to the root file and the name of the sub-detector + * branch, whose digis should be streamed. + * + * The purpose of this class is to provide a data source of digis very similar to the + * future detector output at the point where the detector is connected to the online + * computing farm. For the development of online analysis algorithms, it is very important + * to simulate the future detector output as realistic as possible to evaluate the + * feasibility and quality of the various possible online analysis features. + */ +class FairMQSampler: public FairMQDevice +{ + protected: + FairRunAna* fFairRunAna; + FairMQSamplerTask* fSamplerTask; + TString fInputFile; // Filename of a root file containing the simulated digis. + TString fParFile; + TString fBranch; // The name of the sub-detector branch to stream the digis from. + Int_t fEventRate; + Int_t fEventCounter; + public: + enum { + InputFile = FairMQDevice::Last, + Branch, + ParFile, + EventRate + }; + FairMQSampler(); + virtual ~FairMQSampler(); + virtual void Init(); + virtual void Run(); + void Log(Int_t intervalInMs); + void* ResetEventCounter(); + static void* callResetEventCounter(void* arg) { return ((FairMQSampler*)arg)->ResetEventCounter(); } + virtual void SetProperty(Int_t key, TString value, Int_t slot = 0); + virtual TString GetProperty(Int_t key, TString default_ = "", Int_t slot = 0); + virtual void SetProperty(Int_t key, Int_t value, Int_t slot = 0); + virtual Int_t GetProperty(Int_t key, Int_t default_ = 0, Int_t slot = 0); +}; + +#endif /* FAIRMQSAMPLER_H_ */ diff --git a/fairmq/FairMQSamplerTask.cxx b/fairmq/FairMQSamplerTask.cxx new file mode 100644 index 00000000..2f8e65de --- /dev/null +++ b/fairmq/FairMQSamplerTask.cxx @@ -0,0 +1,58 @@ +/* + * FairMQSamplerTask.cxx + * + * Created on: Nov 22, 2012 + * Author: dklein + */ + +#include "FairMQSamplerTask.h" + + +FairMQSamplerTask::FairMQSamplerTask(const Text_t* name, Int_t iVerbose) : + FairTask(name, iVerbose), + fInput(NULL), + fBranch(""), + fMessageSize(32768), + fOutput(new std::vector) +{ +} + +FairMQSamplerTask::FairMQSamplerTask() : + FairTask( "Abstract base task used for loading a branch from a root file into memory"), + fInput(NULL), + fBranch(""), + fMessageSize(32768), + fOutput(new std::vector) +{ +} + +FairMQSamplerTask::~FairMQSamplerTask() +{ + delete fInput; + + // leave fOutput in memory, because it is needed even after FairMQSamplerTask is terminated. +} + +InitStatus FairMQSamplerTask::Init() +{ + FairRootManager* ioman = FairRootManager::Instance(); + fInput = (TClonesArray*) ioman->GetObject(fBranch.Data()); + + return kSUCCESS; +} + +void FairMQSamplerTask::SetBranch(TString branch) +{ + fBranch = branch; +} + +void FairMQSamplerTask::SetMessageSize(int size) +{ + fMessageSize = size; +} + +std::vector *FairMQSamplerTask::GetOutput() +{ + return fOutput; +} + diff --git a/fairmq/FairMQSamplerTask.h b/fairmq/FairMQSamplerTask.h new file mode 100644 index 00000000..bfe9229c --- /dev/null +++ b/fairmq/FairMQSamplerTask.h @@ -0,0 +1,37 @@ +/* + * FairMQSamplerTask.h + * + * Created on: Nov 22, 2012 + * Author: dklein + */ + +#ifndef FAIRMQSAMPLERTASK_H_ +#define FAIRMQSAMPLERTASK_H_ + +#include "FairTask.h" +#include +#include "TClonesArray.h" +#include +#include "FairMQMessage.h" +#include "TString.h" + + +class FairMQSamplerTask: public FairTask +{ + public: + FairMQSamplerTask(); + FairMQSamplerTask(const Text_t* name, Int_t iVerbose=1); + virtual ~FairMQSamplerTask(); + virtual InitStatus Init(); + virtual void Exec(Option_t* opt) = 0; + void SetBranch(TString branch); + void SetMessageSize(Int_t size); + std::vector *GetOutput(); + protected: + TClonesArray* fInput; + TString fBranch; + Int_t fMessageSize; + std::vector *fOutput; +}; + +#endif /* FAIRMQSAMPLERTASK_H_ */ diff --git a/fairmq/FairMQSink.cxx b/fairmq/FairMQSink.cxx new file mode 100644 index 00000000..e4d0eee0 --- /dev/null +++ b/fairmq/FairMQSink.cxx @@ -0,0 +1,44 @@ +/* + * FairMQSink.cxx + * + * Created on: Jan 9, 2013 + * Author: dklein + */ + +#include "FairMQSink.h" +#include "FairMQLogger.h" + +FairMQSink::FairMQSink() +{ +} + +void FairMQSink::Run() +{ + void* status; //necessary for pthread_join + FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> Run <<<<<<<"); + + pthread_t logger; + pthread_create(&logger, NULL, &FairMQDevice::callLogSocketRates, this); + + // Initialize poll set + zmq_pollitem_t items[] = { + { *(fPayloadInputs->at(0)->GetSocket()), 0, ZMQ_POLLIN, 0 } + }; + + while (true) { + FairMQMessage msg; + + zmq_poll(items, 1, -1); + + if (items[0].revents & ZMQ_POLLIN) { + fPayloadInputs->at(0)->Receive(&msg); + } + } + + pthread_join(logger, &status); +} + +FairMQSink::~FairMQSink() +{ +} + diff --git a/fairmq/FairMQSink.h b/fairmq/FairMQSink.h new file mode 100644 index 00000000..9f2027dd --- /dev/null +++ b/fairmq/FairMQSink.h @@ -0,0 +1,26 @@ +/* + * FairMQSink.h + * + * Created on: Jan 9, 2013 + * Author: dklein + */ + +#ifndef FAIRMQSINK_H_ +#define FAIRMQSINK_H_ + +#include "Rtypes.h" +#include + +#include "FairMQDevice.h" + + + +class FairMQSink: public FairMQDevice +{ + public: + FairMQSink(); + virtual void Run(); + virtual ~FairMQSink(); +}; + +#endif /* FAIRMQSINK_H_ */ diff --git a/fairmq/FairMQSocket.cxx b/fairmq/FairMQSocket.cxx new file mode 100644 index 00000000..9e248d90 --- /dev/null +++ b/fairmq/FairMQSocket.cxx @@ -0,0 +1,169 @@ +/* + * FairMQSocket.cxx + * + * Created on: Dec 5, 2012 + * Author: dklein + */ + +#include "FairMQSocket.h" +#include +#include "FairMQLogger.h" + + +const TString FairMQSocket::TCP = "tcp://"; +const TString FairMQSocket::IPC = "ipc://"; +const TString FairMQSocket::INPROC = "inproc://"; + +FairMQSocket::FairMQSocket(FairMQContext* context, int type, int num) : + fBytesTx(0), + fBytesRx(0), + fMessagesTx(0), + fMessagesRx(0) +{ + std::stringstream id; + id << context->GetId() << "." << GetTypeString(type) << "." << num; + fId = id.str(); + + fSocket = new zmq::socket_t(*context->GetContext(), type); + fSocket->setsockopt(ZMQ_IDENTITY, &fId, fId.Length()); + if (type == ZMQ_SUB) { + fSocket->setsockopt(ZMQ_SUBSCRIBE, NULL, 0); + } + + std::stringstream logmsg; + logmsg << "created socket #" << fId; + FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); +} + +FairMQSocket::~FairMQSocket() +{ +} + +TString FairMQSocket::GetId() +{ + return fId; +} + +TString FairMQSocket::GetTypeString(Int_t type) +{ + switch (type) { + case ZMQ_SUB: + return "sub"; + case ZMQ_PUB: + return "pub"; + case ZMQ_PUSH: + return "push"; + case ZMQ_PULL: + return "pull"; + default: + return ""; + } +} + +Bool_t FairMQSocket::Bind(TString address) +{ + Bool_t result = true; + + try { + if ( address.Length() > 0 /*!address.empty()*/) { + std::stringstream logmsg; + logmsg << "bind socket #" << fId << " on " << address; + FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); + fSocket->bind(address.Data()); + } + } catch (zmq::error_t& e) { + std::stringstream logmsg2; + logmsg2 << "failed binding socket #" << fId << ", reason: " << e.what(); + FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg2.str()); + result = false; + } + + return result; +} + +Bool_t FairMQSocket::Connect(TString address) +{ + Bool_t result = true; + + try { + if ( address.Length() > 0 /*!address.empty()*/) { + std::stringstream logmsg; + logmsg << "connect socket #" << fId << " to " << address; + FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); + fSocket->connect(address.Data()); + } + } catch (zmq::error_t& e) { + std::stringstream logmsg2; + logmsg2 << "failed connecting socket #" << fId << ", reason: " << e.what(); + FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg2.str()); + result = false; + } + + return result; +} + +Bool_t FairMQSocket::Send(FairMQMessage* msg) +{ + Bool_t result = false; + + try { + fBytesTx += msg->Size(); + ++fMessagesTx; + result = fSocket->send(*msg->GetMessage()); + } catch (zmq::error_t& e) { + std::stringstream logmsg; + logmsg << "failed sending on socket #" << fId << ", reason: " << e.what(); + FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); + result = false; + } + + return result; +} + +Bool_t FairMQSocket::Receive(FairMQMessage* msg) +{ + Bool_t result = false; + + try { + result = fSocket->recv(msg->GetMessage()); + fBytesRx += msg->Size(); + ++fMessagesRx; + } catch (zmq::error_t& e) { + std::stringstream logmsg; + logmsg << "failed receiving on socket #" << fId << ", reason: " << e.what(); + FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); + result = false; + } + + return result; +} + +void FairMQSocket::Close() +{ + fSocket->close(); +} + +zmq::socket_t* FairMQSocket::GetSocket() +{ + return fSocket; +} + +ULong_t FairMQSocket::GetBytesTx() +{ + return fBytesTx; +} + +ULong_t FairMQSocket::GetBytesRx() +{ + return fBytesRx; +} + +ULong_t FairMQSocket::GetMessagesTx() +{ + return fMessagesTx; +} + +ULong_t FairMQSocket::GetMessagesRx() +{ + return fMessagesRx; +} diff --git a/fairmq/FairMQSocket.h b/fairmq/FairMQSocket.h new file mode 100644 index 00000000..a5875b73 --- /dev/null +++ b/fairmq/FairMQSocket.h @@ -0,0 +1,47 @@ +/* + * FairMQSocket.h + * + * Created on: Dec 5, 2012 + * Author: dklein + */ + +#ifndef FAIRMQSOCKET_H_ +#define FAIRMQSOCKET_H_ + +#include +#include +#include "FairMQContext.h" +#include "FairMQMessage.h" +#include "Rtypes.h" +#include "TString.h" + + +class FairMQSocket +{ + private: + zmq::socket_t* fSocket; + TString fId; + ULong_t fBytesTx; + ULong_t fBytesRx; + ULong_t fMessagesTx; + ULong_t fMessagesRx; + public: + const static TString TCP, IPC, INPROC; + FairMQSocket(FairMQContext* context, Int_t type, Int_t num); + virtual ~FairMQSocket(); + TString GetId(); + static TString GetTypeString(Int_t type); + Bool_t Send(FairMQMessage* msg); + Bool_t Receive(FairMQMessage* msg); + void Close(); + Bool_t Bind(TString address); + Bool_t Connect(TString address); + zmq::socket_t* GetSocket(); + ULong_t GetBytesTx(); + ULong_t GetBytesRx(); + ULong_t GetMessagesTx(); + ULong_t GetMessagesRx(); + +}; + +#endif /* FAIRMQSOCKET_H_ */ diff --git a/fairmq/FairMQStandaloneMerger.cxx b/fairmq/FairMQStandaloneMerger.cxx new file mode 100644 index 00000000..cfa2b309 --- /dev/null +++ b/fairmq/FairMQStandaloneMerger.cxx @@ -0,0 +1,76 @@ +/* + * FairMQStandaloneMerger.cxx + * + * Created on: Dec 6, 2012 + * Author: dklein + */ + +#include "FairMQStandaloneMerger.h" +#include "FairMQLogger.h" + +FairMQStandaloneMerger::FairMQStandaloneMerger() +{ +} + +FairMQStandaloneMerger::~FairMQStandaloneMerger() +{ +} + +void FairMQStandaloneMerger::Run() +{ + FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> Run <<<<<<<"); + + Bool_t received0 = false; + Bool_t received1 = false; + FairMQMessage* msg0 = NULL; + FairMQMessage* msg1 = NULL; + TString source0 = fPayloadInputs->at(0)->GetId(); + TString source1 = fPayloadInputs->at(1)->GetId(); + Int_t size0 = 0; + Int_t size1 = 0; + + // Initialize poll set + zmq_pollitem_t items[] = { + { *(fPayloadInputs->at(0)->GetSocket()), 0, ZMQ_POLLIN, 0 }, + { *(fPayloadInputs->at(1)->GetSocket()), 0, ZMQ_POLLIN, 0 } + }; + + void* status; //necessary for pthread_join + pthread_t logger; + pthread_create(&logger, NULL, &FairMQDevice::callLogSocketRates, this); + + while (true) { + msg0 = new FairMQMessage(); + msg1 = new FairMQMessage(); + + zmq_poll(items, 2, -1); + + if (items[0].revents & ZMQ_POLLIN) { + received0 = fPayloadInputs->at(0)->Receive(msg0); + } + + if (items[1].revents & ZMQ_POLLIN) { + received1 = fPayloadInputs->at(1)->Receive(msg1); + } + + if (received0) { + size0 = msg0->Size(); + fPayloadOutputs->at(0)->Send(msg0); + + received0 = false; + } + + if (received1) { + size1 = msg1->Size(); + fPayloadOutputs->at(0)->Send(msg1); + + received1 = false; + } + + delete msg0; + delete msg1; + } + + pthread_join(logger, &status); +} + diff --git a/fairmq/FairMQStandaloneMerger.h b/fairmq/FairMQStandaloneMerger.h new file mode 100644 index 00000000..2da51df1 --- /dev/null +++ b/fairmq/FairMQStandaloneMerger.h @@ -0,0 +1,24 @@ +/* + * FairMQStandaloneMerger.h + * + * Created on: Dec 6, 2012 + * Author: dklein + */ + +#ifndef FAIRMQSTANDALONEMERGER_H_ +#define FAIRMQSTANDALONEMERGER_H_ + +#include "FairMQDevice.h" +#include "Rtypes.h" +#include "TString.h" + + +class FairMQStandaloneMerger: public FairMQDevice +{ + public: + FairMQStandaloneMerger(); + virtual ~FairMQStandaloneMerger(); + virtual void Run(); +}; + +#endif /* FAIRMQSTANDALONEMERGER_H_ */ diff --git a/fairmq/FairMQStateMachine.cxx b/fairmq/FairMQStateMachine.cxx new file mode 100644 index 00000000..ecbd036a --- /dev/null +++ b/fairmq/FairMQStateMachine.cxx @@ -0,0 +1,42 @@ +/* + * FairMQStateMachine.cxx + * + * Created on: Oct 25, 2012 + * Author: dklein + */ + +#include "FairMQStateMachine.h" + + +FairMQStateMachine::FairMQStateMachine() : + fState(START) +{ +} + +FairMQStateMachine::RunStateMachine() +{ + void* status; //necessary for pthread_join + pthread_t state; + + changeState(INIT); + + while(fState != END) { + switch(fState) { + case INIT: + pthread_create(&state, NULL, &FairMQStateMachine::Init, this); + break; + + } + pthread_join(state, &status); + } + + + +} + + + +FairMQStateMachine::~FairMQStateMachine() +{ +} + diff --git a/fairmq/FairMQStateMachine.h b/fairmq/FairMQStateMachine.h new file mode 100644 index 00000000..b84fd24f --- /dev/null +++ b/fairmq/FairMQStateMachine.h @@ -0,0 +1,32 @@ +/* + * FairMQStateMachine.h + * + * Created on: Oct 25, 2012 + * Author: dklein + */ + +#ifndef FAIRMQSTATEMACHINE_H_ +#define FAIRMQSTATEMACHINE_H_ + + +class FairMQStateMachine +{ + private: + int fState; + public: + enum { + START, INIT, BIND, CONNECT, RUN, PAUSE, SHUTDOWN, END + }; + FairMQStateMachine(); + virtual void Init() = 0; + virtual void Bind() = 0; + virtual void Connect() = 0; + virtual void Run() = 0; + virtual void Pause() = 0; + virtual void Shutdown() = 0; + bool ChangeState(int new_state); + void RunStateMachine(); + virtual ~FairMQStateMachine(); +}; + +#endif /* FAIRMQSTATEMACHINE_H_ */ diff --git a/fairmq/runBenchmarkSampler.cxx b/fairmq/runBenchmarkSampler.cxx new file mode 100644 index 00000000..b5e9ad8d --- /dev/null +++ b/fairmq/runBenchmarkSampler.cxx @@ -0,0 +1,83 @@ +/* + * runBenchmarkSampler.cxx + * + * Created on: Apr 23, 2013 + * Author: dklein + */ + +#include +#include +#include +#include "FairMQLogger.h" +#include +#include +#include +#include "FairMQBenchmarkSampler.h" + + +int main(int argc, char** argv) +{ + if( argc != 8 ) { + std::cout << "Usage: bsampler ID eventSize eventRate numIoTreads\n" << + "\t\tbindSocketType bindSndBufferSize BindAddress\n" << std::endl; + return 1; + } + + pid_t pid = getpid(); + std::stringstream logmsg; + logmsg << "PID: " << pid; + FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); + + int i = 1; + + FairMQBenchmarkSampler* sampler = new FairMQBenchmarkSampler(); + + sampler->SetProperty(FairMQBenchmarkSampler::Id, argv[i]); + ++i; + + int eventSize; + std::stringstream(argv[i]) >> eventSize; + sampler->SetProperty(FairMQBenchmarkSampler::EventSize, eventSize); + ++i; + + int eventRate; + std::stringstream(argv[i]) >> eventRate; + sampler->SetProperty(FairMQBenchmarkSampler::EventRate, eventRate); + ++i; + + int numIoThreads; + std::stringstream(argv[i]) >> numIoThreads; + sampler->SetProperty(FairMQBenchmarkSampler::NumIoThreads, numIoThreads); + ++i; + + int numInputs = 0; + sampler->SetProperty(FairMQBenchmarkSampler::NumInputs, numInputs); + + int numOutputs = 1; + sampler->SetProperty(FairMQBenchmarkSampler::NumOutputs, numOutputs); + + sampler->Init(); + + int bindSocketType = ZMQ_PUB; + if (strcmp(argv[i], "push") == 0) { + bindSocketType = ZMQ_PUSH; + } + sampler->SetProperty(FairMQBenchmarkSampler::BindSocketType, bindSocketType, 0); + ++i; + + int bindSndBufferSize; + std::stringstream(argv[i]) >> bindSndBufferSize; + sampler->SetProperty(FairMQBenchmarkSampler::BindSndBufferSize, bindSndBufferSize, 0); + ++i; + + sampler->SetProperty(FairMQBenchmarkSampler::BindAddress, argv[i], 0); + ++i; + + + sampler->Bind(); + sampler->Connect(); + sampler->Run(); + + exit(0); +} + diff --git a/fairmq/runBuffer.cxx b/fairmq/runBuffer.cxx new file mode 100644 index 00000000..f217b575 --- /dev/null +++ b/fairmq/runBuffer.cxx @@ -0,0 +1,87 @@ +/* + * runBuffer.cxx + * + * Created on: Oct 26, 2012 + * Author: dklein + */ + +#include "FairMQBuffer.h" +#include +#include +#include "FairMQLogger.h" +#include +#include +#include + + +int main(int argc, char** argv) +{ + if( argc != 9 ) { + std::cout << "Usage: buffer \tID numIoTreads\n" << + "\t\tconnectSocketType connectRcvBufferSize ConnectAddress\n" << + "\t\tbindSocketType bindSndBufferSize BindAddress\n" << std::endl; + return 1; + } + + pid_t pid = getpid(); + std::stringstream logmsg; + logmsg << "PID: " << pid; + FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); + + int i = 1; + + FairMQBuffer* buffer = new FairMQBuffer(); + buffer->SetProperty(FairMQBuffer::Id, argv[i]); + ++i; + + int numIoThreads; + std::stringstream(argv[i]) >> numIoThreads; + buffer->SetProperty(FairMQBuffer::NumIoThreads, numIoThreads); + ++i; + + int numInputs = 1; + buffer->SetProperty(FairMQBuffer::NumInputs, numInputs); + + int numOutputs = 1; + buffer->SetProperty(FairMQBuffer::NumOutputs, numOutputs); + + buffer->Init(); + + int connectSocketType = ZMQ_SUB; + if (strcmp(argv[i], "pull") == 0) { + connectSocketType = ZMQ_PULL; + } + buffer->SetProperty(FairMQBuffer::ConnectSocketType, connectSocketType, 0); + ++i; + + int connectRcvBufferSize; + std::stringstream(argv[i]) >> connectRcvBufferSize; + buffer->SetProperty(FairMQBuffer::ConnectRcvBufferSize, connectRcvBufferSize, 0); + ++i; + + buffer->SetProperty(FairMQBuffer::ConnectAddress, argv[i], 0); + ++i; + + int bindSocketType = ZMQ_PUB; + if (strcmp(argv[i], "push") == 0) { + bindSocketType = ZMQ_PUSH; + } + buffer->SetProperty(FairMQBuffer::BindSocketType, bindSocketType, 0); + ++i; + + int bindSndBufferSize; + std::stringstream(argv[i]) >> bindSndBufferSize; + buffer->SetProperty(FairMQBuffer::BindSndBufferSize, bindSndBufferSize, 0); + ++i; + + buffer->SetProperty(FairMQBuffer::BindAddress, argv[i], 0); + ++i; + + + buffer->Bind(); + buffer->Connect(); + buffer->Run(); + + exit(0); +} + diff --git a/fairmq/runMerger.cxx b/fairmq/runMerger.cxx new file mode 100644 index 00000000..4c04d482 --- /dev/null +++ b/fairmq/runMerger.cxx @@ -0,0 +1,102 @@ +/* + * runMerger.cxx + * + * Created on: Dec 6, 2012 + * Author: dklein + */ + +#include "FairMQStandaloneMerger.h" +#include +#include +#include "FairMQLogger.h" +#include +#include +#include + + +int main(int argc, char** argv) +{ + if( argc != 12 ) { + std::cout << "Usage: merger \tID numIoTreads\n" << + "\t\tconnectSocketType connectRcvBufferSize ConnectAddress\n" << + "\t\tconnectSocketType connectRcvBufferSize ConnectAddress\n" << + "\t\tbindSocketType bindSndBufferSize BindAddress\n" << std::endl; + return 1; + } + + pid_t pid = getpid(); + std::stringstream logmsg; + logmsg << "PID: " << pid; + FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); + + int i = 1; + + FairMQStandaloneMerger* merger = new FairMQStandaloneMerger(); + merger->SetProperty(FairMQStandaloneMerger::Id, argv[i]); + ++i; + + int numIoThreads; + std::stringstream(argv[i]) >> numIoThreads; + merger->SetProperty(FairMQStandaloneMerger::NumIoThreads, numIoThreads); + ++i; + + int numInputs = 2; + merger->SetProperty(FairMQStandaloneMerger::NumInputs, numInputs); + + int numOutputs = 1; + merger->SetProperty(FairMQStandaloneMerger::NumOutputs, numOutputs); + + merger->Init(); + + int connectSocketType = ZMQ_SUB; + if (strcmp(argv[i], "pull") == 0) { + connectSocketType = ZMQ_PULL; + } + merger->SetProperty(FairMQStandaloneMerger::ConnectSocketType, connectSocketType, 0); + ++i; + + int connectRcvBufferSize; + std::stringstream(argv[i]) >> connectRcvBufferSize; + merger->SetProperty(FairMQStandaloneMerger::ConnectRcvBufferSize, connectRcvBufferSize, 0); + ++i; + + merger->SetProperty(FairMQStandaloneMerger::ConnectAddress, argv[i], 0); + ++i; + + connectSocketType = ZMQ_SUB; + if (strcmp(argv[i], "pull") == 0) { + connectSocketType = ZMQ_PULL; + } + merger->SetProperty(FairMQStandaloneMerger::ConnectSocketType, connectSocketType, 1); + ++i; + + std::stringstream(argv[i]) >> connectRcvBufferSize; + merger->SetProperty(FairMQStandaloneMerger::ConnectRcvBufferSize, connectRcvBufferSize, 1); + ++i; + + merger->SetProperty(FairMQStandaloneMerger::ConnectAddress, argv[i], 1); + ++i; + + int bindSocketType = ZMQ_PUB; + if (strcmp(argv[i], "push") == 0) { + bindSocketType = ZMQ_PUSH; + } + merger->SetProperty(FairMQStandaloneMerger::BindSocketType, bindSocketType, 0); + ++i; + + int bindSndBufferSize; + std::stringstream(argv[i]) >> bindSndBufferSize; + merger->SetProperty(FairMQStandaloneMerger::BindSndBufferSize, bindSndBufferSize, 0); + ++i; + + merger->SetProperty(FairMQStandaloneMerger::BindAddress, argv[i], 0); + ++i; + + + merger->Bind(); + merger->Connect(); + merger->Run(); + + exit(0); +} + diff --git a/fairmq/runSink.cxx b/fairmq/runSink.cxx new file mode 100644 index 00000000..6cc686d4 --- /dev/null +++ b/fairmq/runSink.cxx @@ -0,0 +1,71 @@ +/* + * runSink.cxx + * + * Created on: Jan 21, 2013 + * Author: dklein + */ + +#include "FairMQSink.h" +#include +#include +#include "FairMQLogger.h" +#include +#include +#include + + +int main(int argc, char** argv) +{ + if( argc != 6 ) { + std::cout << "Usage: sink \tID numIoTreads\n" << + "\t\tconnectSocketType connectRcvBufferSize ConnectAddress\n" << std::endl; + return 1; + } + + pid_t pid = getpid(); + std::stringstream logmsg; + logmsg << "PID: " << pid; + FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); + + int i = 1; + + FairMQSink* sink = new FairMQSink(); + sink->SetProperty(FairMQSink::Id, argv[i]); + ++i; + + int numIoThreads; + std::stringstream(argv[i]) >> numIoThreads; + sink->SetProperty(FairMQSink::NumIoThreads, numIoThreads); + ++i; + + int numInputs = 1; + sink->SetProperty(FairMQSink::NumInputs, numInputs); + + int numOutputs = 0; + sink->SetProperty(FairMQSink::NumOutputs, numOutputs); + + sink->Init(); + + int connectSocketType = ZMQ_SUB; + if (strcmp(argv[i], "pull") == 0) { + connectSocketType = ZMQ_PULL; + } + sink->SetProperty(FairMQSink::ConnectSocketType, connectSocketType, 0); + ++i; + + int connectRcvBufferSize; + std::stringstream(argv[i]) >> connectRcvBufferSize; + sink->SetProperty(FairMQSink::ConnectRcvBufferSize, connectRcvBufferSize, 0); + ++i; + + sink->SetProperty(FairMQSink::ConnectAddress, argv[i], 0); + ++i; + + + sink->Bind(); + sink->Connect(); + sink->Run(); + + exit(0); +} + diff --git a/fairmq/runSplitter.cxx b/fairmq/runSplitter.cxx new file mode 100644 index 00000000..e9da7c78 --- /dev/null +++ b/fairmq/runSplitter.cxx @@ -0,0 +1,102 @@ +/* + * runSplitter.cxx + * + * Created on: Dec 6, 2012 + * Author: dklein + */ + +#include "FairMQBalancedStandaloneSplitter.h" +#include +#include +#include "FairMQLogger.h" +#include +#include +#include + + +int main(int argc, char** argv) +{ + if( argc != 12 ) { + std::cout << "Usage: splitter \tID numIoTreads\n" << + "\t\tconnectSocketType connectRcvBufferSize ConnectAddress\n" << + "\t\tbindSocketType bindSndBufferSize BindAddress\n" << + "\t\tbindSocketType bindSndBufferSize BindAddress\n" << std::endl; + return 1; + } + + pid_t pid = getpid(); + std::stringstream logmsg; + logmsg << "PID: " << pid; + FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); + + int i = 1; + + FairMQBalancedStandaloneSplitter* splitter = new FairMQBalancedStandaloneSplitter(); + splitter->SetProperty(FairMQBalancedStandaloneSplitter::Id, argv[i]); + ++i; + + int numIoThreads; + std::stringstream(argv[i]) >> numIoThreads; + splitter->SetProperty(FairMQBalancedStandaloneSplitter::NumIoThreads, numIoThreads); + ++i; + + int numInputs = 1; + splitter->SetProperty(FairMQBalancedStandaloneSplitter::NumInputs, numInputs); + + int numOutputs = 2; + splitter->SetProperty(FairMQBalancedStandaloneSplitter::NumOutputs, numOutputs); + + splitter->Init(); + + int connectSocketType = ZMQ_SUB; + if (strcmp(argv[i], "pull") == 0) { + connectSocketType = ZMQ_PULL; + } + splitter->SetProperty(FairMQBalancedStandaloneSplitter::ConnectSocketType, connectSocketType, 0); + ++i; + + int connectRcvBufferSize; + std::stringstream(argv[i]) >> connectRcvBufferSize; + splitter->SetProperty(FairMQBalancedStandaloneSplitter::ConnectRcvBufferSize, connectRcvBufferSize, 0); + ++i; + + splitter->SetProperty(FairMQBalancedStandaloneSplitter::ConnectAddress, argv[i], 0); + ++i; + + int bindSocketType = ZMQ_PUB; + if (strcmp(argv[i], "push") == 0) { + bindSocketType = ZMQ_PUSH; + } + splitter->SetProperty(FairMQBalancedStandaloneSplitter::BindSocketType, bindSocketType, 0); + ++i; + + int bindSndBufferSize; + std::stringstream(argv[i]) >> bindSndBufferSize; + splitter->SetProperty(FairMQBalancedStandaloneSplitter::BindSndBufferSize, bindSndBufferSize, 0); + ++i; + + splitter->SetProperty(FairMQBalancedStandaloneSplitter::BindAddress, argv[i], 0); + ++i; + + bindSocketType = ZMQ_PUB; + if (strcmp(argv[i], "push") == 0) { + bindSocketType = ZMQ_PUSH; + } + splitter->SetProperty(FairMQBalancedStandaloneSplitter::BindSocketType, bindSocketType, 1); + ++i; + + std::stringstream(argv[i]) >> bindSndBufferSize; + splitter->SetProperty(FairMQBalancedStandaloneSplitter::BindSndBufferSize, bindSndBufferSize, 1); + ++i; + + splitter->SetProperty(FairMQBalancedStandaloneSplitter::BindAddress, argv[i], 1); + ++i; + + + splitter->Bind(); + splitter->Connect(); + splitter->Run(); + + exit(0); +} +