add nanomsg implementations + use factory for nanomsg + lots of small stuff

This commit is contained in:
Alexey Rybalchenko 2014-01-24 15:54:29 +01:00
parent c041c14373
commit 64b9e991c3
44 changed files with 1138 additions and 420 deletions

View File

@ -1,69 +1,98 @@
set(INCLUDE_DIRECTORIES set(INCLUDE_DIRECTORIES
${BASE_INCLUDE_DIRECTORIES} ${BASE_INCLUDE_DIRECTORIES}
${CMAKE_SOURCE_DIR}/fairmq ${CMAKE_SOURCE_DIR}/fairmq
${ZMQ_INCLUDE_DIR}
${NANOMSG_INCLUDE_DIR}
${Boost_INCLUDE_DIR} ${Boost_INCLUDE_DIR}
${ROOT_INCLUDE_DIR} ${ROOT_INCLUDE_DIR}
) )
if(NANOMSG_FOUND)
set(INCLUDE_DIRECTORIES
${INCLUDE_DIRECTORIES}
${NANOMSG_LIBRARY_SHARED}
)
else(NANOMSG_FOUND)
set(INCLUDE_DIRECTORIES
${INCLUDE_DIRECTORIES}
${ZMQ_LIBRARY_SHARED}
)
endif(NANOMSG_FOUND)
include_directories(${INCLUDE_DIRECTORIES}) include_directories(${INCLUDE_DIRECTORIES})
Set(LINK_DIRECTORIES set(LINK_DIRECTORIES
${ROOT_LIBRARY_DIR} ${ROOT_LIBRARY_DIR}
${Boost_LIBRARY_DIRS} ${Boost_LIBRARY_DIRS}
) )
link_directories(${LINK_DIRECTORIES}) link_directories(${LINK_DIRECTORIES})
Set(SRCS set(SRCS
"FairMQLogger.cxx"
"FairMQConfigurable.cxx"
"FairMQStateMachine.cxx"
"FairMQTransportFactory.cxx"
"FairMQMessage.cxx"
"FairMQSocket.cxx"
"FairMQDevice.cxx"
"FairMQSampler.cxx" "FairMQSampler.cxx"
"FairMQBenchmarkSampler.cxx" "FairMQBenchmarkSampler.cxx"
"FairMQStateMachine.cxx" "FairMQProcessor.cxx"
"FairMQConfigurable.cxx" "FairMQSink.cxx"
"FairMQBuffer.cxx" "FairMQBuffer.cxx"
"FairMQSamplerTask.cxx" "FairMQProxy.cxx"
"FairMQLogger.cxx"
"FairMQContext.cxx"
"FairMQMessage.cxx"
"FairMQTransportFactory.cxx"
"FairMQTransportFactoryZMQ.cxx"
"FairMQMessageZMQ.cxx"
"FairMQMessageNN.cxx"
"FairMQSocket.cxx"
"FairMQSocketZMQ.cxx"
"FairMQSocketNN.cxx"
"FairMQSplitter.cxx" "FairMQSplitter.cxx"
"FairMQMerger.cxx" "FairMQMerger.cxx"
"FairMQProcessor.cxx" "FairMQPoller.cxx"
"FairMQSamplerTask.cxx"
"FairMQProcessorTask.cxx" "FairMQProcessorTask.cxx"
"FairMQSink.cxx"
"FairMQDevice.cxx"
"FairMQProxy.cxx"
) )
Set(DEPENDENCIES if(NANOMSG_FOUND)
set(SRCS
${SRCS}
"FairMQTransportFactoryNN.cxx"
"FairMQMessageNN.cxx"
"FairMQSocketNN.cxx"
"FairMQPollerNN.cxx"
)
set(DEPENDENCIES
${NANOMSG_LIBRARY_SHARED}
)
else(NANOMSG_FOUND)
set(SRCS
${SRCS}
"FairMQTransportFactoryZMQ.cxx"
"FairMQMessageZMQ.cxx"
"FairMQSocketZMQ.cxx"
"FairMQPollerZMQ.cxx"
"FairMQContextZMQ.cxx"
)
set(DEPENDENCIES
${ZMQ_LIBRARY_SHARED}
)
endif(NANOMSG_FOUND)
set(DEPENDENCIES
${DEPENDENCIES}
${CMAKE_THREAD_LIBS_INIT} ${CMAKE_THREAD_LIBS_INIT}
${ZMQ_LIBRARY_SHARED}
${NANOMSG_LIBRARY_SHARED}
Base ParBase FairTools GeoBase boost_thread boost_timer boost_system Base ParBase FairTools GeoBase boost_thread boost_timer boost_system
) )
Set(LIBRARY_NAME FairMQ) set(LIBRARY_NAME FairMQ)
GENERATE_LIBRARY() GENERATE_LIBRARY()
Set(Exe_Names bsampler buffer splitter merger sink proxy n_one_merger one_n_splitter) set(Exe_Names bsampler buffer splitter merger sink proxy n_one_merger one_n_splitter)
Set(Exe_Source runBenchmarkSampler.cxx runBuffer.cxx runSplitter.cxx runMerger.cxx runSink.cxx runProxy.cxx runNToOneMerger.cxx runOneToNSplitter.cxx) set(Exe_Source runBenchmarkSampler.cxx runBuffer.cxx runSplitter.cxx runMerger.cxx runSink.cxx runProxy.cxx runNToOneMerger.cxx runOneToNSplitter.cxx)
List(LENGTH Exe_Names _length) list(LENGTH Exe_Names _length)
Math(EXPR _length ${_length}-1) math(EXPR _length ${_length}-1)
ForEach(_file RANGE 0 ${_length}) ForEach(_file RANGE 0 ${_length})
List(GET Exe_Names ${_file} _name) list(GET Exe_Names ${_file} _name)
List(GET Exe_Source ${_file} _src) list(GET Exe_Source ${_file} _src)
Set(EXE_NAME ${_name}) set(EXE_NAME ${_name})
Set(SRCS ${_src}) set(SRCS ${_src})
Set(DEPENDENCIES FairMQ) set(DEPENDENCIES FairMQ)
GENERATE_EXECUTABLE() GENERATE_EXECUTABLE()
EndForEach(_file RANGE 0 ${_length}) EndForEach(_file RANGE 0 ${_length})

View File

@ -103,7 +103,7 @@ void FairMQBenchmarkSampler::Log(int intervalInMs)
megabytesPerSecond = ((double) (bytesNew - bytes) / (1024. * 1024.)) / (double) timeSinceLastLog_ms * 1000.; megabytesPerSecond = ((double) (bytesNew - bytes) / (1024. * 1024.)) / (double) timeSinceLastLog_ms * 1000.;
messagesPerSecond = (double) (messagesNew - messages) / (double) timeSinceLastLog_ms * 1000.; messagesPerSecond = (double) (messagesNew - messages) / (double) timeSinceLastLog_ms * 1000.;
std::stringstream logmsg; stringstream logmsg;
logmsg << "send " << messagesPerSecond << " msg/s, " << megabytesPerSecond << " MB/s"; logmsg << "send " << messagesPerSecond << " msg/s, " << megabytesPerSecond << " MB/s";
FairMQLogger::GetInstance()->Log(FairMQLogger::DEBUG, logmsg.str()); FairMQLogger::GetInstance()->Log(FairMQLogger::DEBUG, logmsg.str());
@ -113,7 +113,7 @@ void FairMQBenchmarkSampler::Log(int intervalInMs)
} }
} }
void FairMQBenchmarkSampler::SetProperty(const int& key, const std::string& value, const int& slot/*= 0*/) void FairMQBenchmarkSampler::SetProperty(const int& key, const string& value, const int& slot/*= 0*/)
{ {
switch (key) { switch (key) {
default: default:
@ -122,7 +122,7 @@ void FairMQBenchmarkSampler::SetProperty(const int& key, const std::string& valu
} }
} }
std::string FairMQBenchmarkSampler::GetProperty(const int& key, const std::string& default_/*= ""*/, const int& slot/*= 0*/) string FairMQBenchmarkSampler::GetProperty(const int& key, const string& default_/*= ""*/, const int& slot/*= 0*/)
{ {
switch (key) { switch (key) {
default: default:

View File

@ -29,8 +29,8 @@ class FairMQBenchmarkSampler: public FairMQDevice
virtual ~FairMQBenchmarkSampler(); virtual ~FairMQBenchmarkSampler();
void Log(int intervalInMs); void Log(int intervalInMs);
void ResetEventCounter(); void ResetEventCounter();
virtual void SetProperty(const int& key, const std::string& value, const int& slot = 0); virtual void SetProperty(const int& key, const string& value, const int& slot = 0);
virtual std::string GetProperty(const int& key, const std::string& default_ = "", const int& slot = 0); virtual string GetProperty(const int& key, const string& default_ = "", const int& slot = 0);
virtual void SetProperty(const int& key, const int& value, const int& slot = 0); virtual void SetProperty(const int& key, const int& value, const int& slot = 0);
virtual int GetProperty(const int& key, const int& default_ = 0, const int& slot = 0); virtual int GetProperty(const int& key, const int& default_ = 0, const int& slot = 0);
protected: protected:

View File

@ -12,20 +12,20 @@ FairMQConfigurable::FairMQConfigurable()
{ {
} }
void FairMQConfigurable::SetProperty(const int& key, const std::string& value, const int& slot/*= 0*/) void FairMQConfigurable::SetProperty(const int key, const string& value, const int slot/*= 0*/)
{ {
} }
std::string FairMQConfigurable::GetProperty(const int& key, const std::string& default_/*= ""*/, const int& slot/*= 0*/) string FairMQConfigurable::GetProperty(const int key, const string& default_/*= ""*/, const int slot/*= 0*/)
{ {
return default_; return default_;
} }
void FairMQConfigurable::SetProperty(const int& key, const int& value, const int& slot/*= 0*/) void FairMQConfigurable::SetProperty(const int key, const int value, const int slot/*= 0*/)
{ {
} }
int FairMQConfigurable::GetProperty(const int& key, const int& default_/*= 0*/, const int& slot/*= 0*/) int FairMQConfigurable::GetProperty(const int key, const int default_/*= 0*/, const int slot/*= 0*/)
{ {
return default_; return default_;
} }

View File

@ -10,6 +10,7 @@
#include <string> #include <string>
using std::string;
class FairMQConfigurable class FairMQConfigurable
{ {
@ -18,11 +19,13 @@ class FairMQConfigurable
Last = 1 Last = 1
}; };
FairMQConfigurable(); FairMQConfigurable();
virtual void SetProperty(const int& key, const std::string& value, const int& slot = 0); virtual void SetProperty(const int key, const string& value, const int slot = 0);
virtual std::string GetProperty(const int& key, const std::string& default_ = "", const int& slot = 0); virtual string GetProperty(const int key, const string& default_ = "", const int slot = 0);
virtual void SetProperty(const int& key, const int& value, const int& slot = 0); virtual void SetProperty(const int key, const int value, const int slot = 0);
virtual int GetProperty(const int& key, const int& default_ = 0, const int& slot = 0); virtual int GetProperty(const int key, const int default_ = 0, const int slot = 0);
virtual ~FairMQConfigurable(); virtual ~FairMQConfigurable();
// TODO: by value for integers
}; };
#endif /* FAIRMQCONFIGURABLE_H_ */ #endif /* FAIRMQCONFIGURABLE_H_ */

View File

@ -1,41 +1,42 @@
/** /**
* FairMQContext.cxx * FairMQContextZMQ.cxx
* *
* @since 2012-12-05 * @since 2012-12-05
* @author D. Klein, A. Rybalchenko * @author D. Klein, A. Rybalchenko
*/ */
#include "FairMQLogger.h" #include "FairMQLogger.h"
#include "FairMQContext.h" #include "FairMQContextZMQ.h"
#include <sstream> #include <sstream>
FairMQContext::FairMQContext(int numIoThreads) FairMQContextZMQ::FairMQContextZMQ(int numIoThreads)
{ {
fContext = zmq_ctx_new (); fContext = zmq_ctx_new ();
if (fContext == NULL){ if (fContext == NULL){
std::stringstream logmsg; stringstream logmsg;
logmsg << "failed creating context, reason: " << zmq_strerror(errno); logmsg << "failed creating context, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
} }
int rc = zmq_ctx_set (fContext, ZMQ_IO_THREADS, numIoThreads); int rc = zmq_ctx_set (fContext, ZMQ_IO_THREADS, numIoThreads);
if (rc != 0){ if (rc != 0){
std::stringstream logmsg; stringstream logmsg;
logmsg << "failed configuring context, reason: " << zmq_strerror(errno); logmsg << "failed configuring context, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
} }
} }
FairMQContext::~FairMQContext() FairMQContextZMQ::~FairMQContextZMQ()
{ {
Close();
} }
void* FairMQContext::GetContext() void* FairMQContextZMQ::GetContext()
{ {
return fContext; return fContext;
} }
void FairMQContext::Close() void FairMQContextZMQ::Close()
{ {
if (fContext == NULL){ if (fContext == NULL){
return; return;
@ -43,7 +44,7 @@ void FairMQContext::Close()
int rc = zmq_ctx_destroy (fContext); int rc = zmq_ctx_destroy (fContext);
if (rc != 0) { if (rc != 0) {
std::stringstream logmsg; stringstream logmsg;
logmsg << "failed closing context, reason: " << zmq_strerror(errno); logmsg << "failed closing context, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
} }

View File

@ -5,16 +5,16 @@
* @author D. Klein, A. Rybalchenko * @author D. Klein, A. Rybalchenko
*/ */
#ifndef FAIRMQCONTEXT_H_ #ifndef FAIRMQCONTEXTZMQ_H_
#define FAIRMQCONTEXT_H_ #define FAIRMQCONTEXTZMQ_H_
#include <zmq.h> #include <zmq.h>
class FairMQContext class FairMQContextZMQ
{ {
public: public:
FairMQContext(int numIoThreads); FairMQContextZMQ(int numIoThreads);
virtual ~FairMQContext(); virtual ~FairMQContextZMQ();
void* GetContext(); void* GetContext();
void Close(); void Close();
@ -22,4 +22,4 @@ class FairMQContext
void* fContext; void* fContext;
}; };
#endif /* FAIRMQCONTEXT_H_ */ #endif /* FAIRMQCONTEXTZMQ_H_ */

View File

@ -5,20 +5,17 @@
* @author D. Klein, A. Rybalchenko * @author D. Klein, A. Rybalchenko
*/ */
#include <iostream>
#include <boost/thread.hpp> #include <boost/thread.hpp>
#include "FairMQSocketZMQ.h" #include "FairMQSocket.h"
#include "FairMQDevice.h" #include "FairMQDevice.h"
#include "FairMQLogger.h" #include "FairMQLogger.h"
FairMQDevice::FairMQDevice() : FairMQDevice::FairMQDevice() :
fId(""),
fNumIoThreads(1), fNumIoThreads(1),
fPayloadContext(NULL), //fPayloadContext(NULL),
fPayloadInputs(new std::vector<FairMQSocket*>()), fPayloadInputs(new vector<FairMQSocket*>()),
fPayloadOutputs(new std::vector<FairMQSocket*>()), fPayloadOutputs(new vector<FairMQSocket*>()),
fLogIntervalInMs(1000) fLogIntervalInMs(1000)
{ {
} }
@ -26,34 +23,35 @@ FairMQDevice::FairMQDevice() :
void FairMQDevice::Init() void FairMQDevice::Init()
{ {
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> Init <<<<<<<"); FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> Init <<<<<<<");
std::stringstream logmsg; stringstream logmsg;
logmsg << "numIoThreads: " << fNumIoThreads; logmsg << "numIoThreads: " << fNumIoThreads;
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
fPayloadContext = new FairMQContext(fNumIoThreads); // fPayloadContext = new FairMQContextZMQ(fNumIoThreads);
fInputAddress = new std::vector<std::string>(fNumInputs); // TODO: nafiga?
fInputMethod = new std::vector<std::string>(); fInputAddress = new vector<string>(fNumInputs);
fInputSocketType = new std::vector<int>(); fInputMethod = new vector<string>();
fInputSndBufSize = new std::vector<int>(); fInputSocketType = new vector<string>();
fInputRcvBufSize = new std::vector<int>(); fInputSndBufSize = new vector<int>();
fInputRcvBufSize = new vector<int>();
for (int i = 0; i < fNumInputs; ++i) { for (int i = 0; i < fNumInputs; ++i) {
fInputMethod->push_back("connect"); // default value, can be overwritten in configuration fInputMethod->push_back("connect"); // default value, can be overwritten in configuration
fInputSocketType->push_back(ZMQ_SUB); // default value, can be overwritten in configuration fInputSocketType->push_back("sub"); // default value, can be overwritten in configuration
fInputSndBufSize->push_back(10000); // default value, can be overwritten in configuration fInputSndBufSize->push_back(10000); // default value, can be overwritten in configuration
fInputRcvBufSize->push_back(10000); // default value, can be overwritten in configuration fInputRcvBufSize->push_back(10000); // default value, can be overwritten in configuration
} }
fOutputAddress = new std::vector<std::string>(fNumOutputs); fOutputAddress = new vector<string>(fNumOutputs);
fOutputMethod = new std::vector<std::string>(); fOutputMethod = new vector<string>();
fOutputSocketType = new std::vector<int>(); fOutputSocketType = new vector<string>();
fOutputSndBufSize = new std::vector<int>(); fOutputSndBufSize = new vector<int>();
fOutputRcvBufSize = new std::vector<int>(); fOutputRcvBufSize = new vector<int>();
for (int i = 0; i < fNumOutputs; ++i) { for (int i = 0; i < fNumOutputs; ++i) {
fOutputMethod->push_back("bind"); // default value, can be overwritten in configuration fOutputMethod->push_back("bind"); // default value, can be overwritten in configuration
fOutputSocketType->push_back(ZMQ_PUB); // default value, can be overwritten in configuration fOutputSocketType->push_back("pub"); // default value, can be overwritten in configuration
fOutputSndBufSize->push_back(10000); // default value, can be overwritten in configuration fOutputSndBufSize->push_back(10000); // default value, can be overwritten in configuration
fOutputRcvBufSize->push_back(10000); // default value, can be overwritten in configuration fOutputRcvBufSize->push_back(10000); // default value, can be overwritten in configuration
} }
@ -64,11 +62,10 @@ void FairMQDevice::InitInput()
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> InitInput <<<<<<<"); FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> InitInput <<<<<<<");
for (int i = 0; i < fNumInputs; ++i) { for (int i = 0; i < fNumInputs; ++i) {
//FairMQSocket* socket = new FairMQSocketZMQ(fPayloadContext, fInputSocketType->at(i), i); FairMQSocket* socket = fTransportFactory->CreateSocket(fInputSocketType->at(i), i);
FairMQSocket* socket = fTransportFactory->CreateSocket(fPayloadContext, fInputSocketType->at(i), i);
socket->SetOption(ZMQ_SNDHWM, &fInputSndBufSize->at(i), sizeof(fInputSndBufSize->at(i))); socket->SetOption("snd-hwm", &fInputSndBufSize->at(i), sizeof(fInputSndBufSize->at(i)));
socket->SetOption(ZMQ_RCVHWM, &fInputRcvBufSize->at(i), sizeof(fInputRcvBufSize->at(i))); socket->SetOption("rcv-hwm", &fInputRcvBufSize->at(i), sizeof(fInputRcvBufSize->at(i)));
fPayloadInputs->push_back(socket); fPayloadInputs->push_back(socket);
@ -88,10 +85,10 @@ void FairMQDevice::InitOutput()
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> InitOutput <<<<<<<"); FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> InitOutput <<<<<<<");
for (int i = 0; i < fNumOutputs; ++i) { for (int i = 0; i < fNumOutputs; ++i) {
FairMQSocket* socket = fTransportFactory->CreateSocket(fPayloadContext, fOutputSocketType->at(i), i); FairMQSocket* socket = fTransportFactory->CreateSocket(fOutputSocketType->at(i), i);
socket->SetOption(ZMQ_SNDHWM, &fOutputSndBufSize->at(i), sizeof(fOutputSndBufSize->at(i))); socket->SetOption("snd-hwm", &fOutputSndBufSize->at(i), sizeof(fOutputSndBufSize->at(i)));
socket->SetOption(ZMQ_RCVHWM, &fOutputRcvBufSize->at(i), sizeof(fOutputRcvBufSize->at(i))); socket->SetOption("rcv-hwm", &fOutputRcvBufSize->at(i), sizeof(fOutputRcvBufSize->at(i)));
fPayloadOutputs->push_back(socket); fPayloadOutputs->push_back(socket);
@ -115,7 +112,7 @@ void FairMQDevice::Pause()
} }
// Method for setting properties represented as a string. // Method for setting properties represented as a string.
void FairMQDevice::SetProperty(const int& key, const std::string& value, const int& slot/*= 0*/) void FairMQDevice::SetProperty(const int key, const string& value, const int slot/*= 0*/)
{ {
switch (key) { switch (key) {
case Id: case Id:
@ -137,6 +134,14 @@ void FairMQDevice::SetProperty(const int& key, const std::string& value, const i
fOutputMethod->erase(fOutputMethod->begin() + slot); fOutputMethod->erase(fOutputMethod->begin() + slot);
fOutputMethod->insert(fOutputMethod->begin() + slot, value); fOutputMethod->insert(fOutputMethod->begin() + slot, value);
break; break;
case InputSocketType:
fInputSocketType->erase(fInputSocketType->begin() + slot);
fInputSocketType->insert(fInputSocketType->begin() + slot, value);
break;
case OutputSocketType:
fOutputSocketType->erase(fOutputSocketType->begin() + slot);
fOutputSocketType->insert(fOutputSocketType->begin() + slot, value);
break;
default: default:
FairMQConfigurable::SetProperty(key, value, slot); FairMQConfigurable::SetProperty(key, value, slot);
break; break;
@ -144,7 +149,7 @@ void FairMQDevice::SetProperty(const int& key, const std::string& value, const i
} }
// Method for setting properties represented as an integer. // Method for setting properties represented as an integer.
void FairMQDevice::SetProperty(const int& key, const int& value, const int& slot/*= 0*/) void FairMQDevice::SetProperty(const int key, const int value, const int slot/*= 0*/)
{ {
switch (key) { switch (key) {
case NumIoThreads: case NumIoThreads:
@ -159,10 +164,6 @@ void FairMQDevice::SetProperty(const int& key, const int& value, const int& slot
case LogIntervalInMs: case LogIntervalInMs:
fLogIntervalInMs = value; fLogIntervalInMs = value;
break; break;
case InputSocketType:
fInputSocketType->erase(fInputSocketType->begin() + slot);
fInputSocketType->insert(fInputSocketType->begin() + slot, value);
break;
case InputSndBufSize: case InputSndBufSize:
fInputSndBufSize->erase(fInputSndBufSize->begin() + slot); fInputSndBufSize->erase(fInputSndBufSize->begin() + slot);
fInputSndBufSize->insert(fInputSndBufSize->begin() + slot, value); fInputSndBufSize->insert(fInputSndBufSize->begin() + slot, value);
@ -171,10 +172,6 @@ void FairMQDevice::SetProperty(const int& key, const int& value, const int& slot
fInputRcvBufSize->erase(fInputRcvBufSize->begin() + slot); fInputRcvBufSize->erase(fInputRcvBufSize->begin() + slot);
fInputRcvBufSize->insert(fInputRcvBufSize->begin() + slot, value); fInputRcvBufSize->insert(fInputRcvBufSize->begin() + slot, value);
break; break;
case OutputSocketType:
fOutputSocketType->erase(fOutputSocketType->begin() + slot);
fOutputSocketType->insert(fOutputSocketType->begin() + slot, value);
break;
case OutputSndBufSize: case OutputSndBufSize:
fOutputSndBufSize->erase(fOutputSndBufSize->begin() + slot); fOutputSndBufSize->erase(fOutputSndBufSize->begin() + slot);
fOutputSndBufSize->insert(fOutputSndBufSize->begin() + slot, value); fOutputSndBufSize->insert(fOutputSndBufSize->begin() + slot, value);
@ -190,7 +187,7 @@ void FairMQDevice::SetProperty(const int& key, const int& value, const int& slot
} }
// Method for getting properties represented as an string. // Method for getting properties represented as an string.
std::string FairMQDevice::GetProperty(const int& key, const std::string& default_/*= ""*/, const int& slot/*= 0*/) string FairMQDevice::GetProperty(const int key, const string& default_/*= ""*/, const int slot/*= 0*/)
{ {
switch (key) { switch (key) {
case Id: case Id:
@ -203,27 +200,27 @@ std::string FairMQDevice::GetProperty(const int& key, const std::string& default
return fInputMethod->at(slot); return fInputMethod->at(slot);
case OutputMethod: case OutputMethod:
return fOutputMethod->at(slot); return fOutputMethod->at(slot);
case InputSocketType:
return fInputSocketType->at(slot);
case OutputSocketType:
return fOutputSocketType->at(slot);
default: default:
return FairMQConfigurable::GetProperty(key, default_, slot); return FairMQConfigurable::GetProperty(key, default_, slot);
} }
} }
// Method for getting properties represented as an integer. // Method for getting properties represented as an integer.
int FairMQDevice::GetProperty(const int& key, const int& default_/*= 0*/, const int& slot/*= 0*/) int FairMQDevice::GetProperty(const int key, const int default_/*= 0*/, const int slot/*= 0*/)
{ {
switch (key) { switch (key) {
case NumIoThreads: case NumIoThreads:
return fNumIoThreads; return fNumIoThreads;
case LogIntervalInMs: case LogIntervalInMs:
return fLogIntervalInMs; return fLogIntervalInMs;
case InputSocketType:
return fInputSocketType->at(slot);
case InputSndBufSize: case InputSndBufSize:
return fInputSndBufSize->at(slot); return fInputSndBufSize->at(slot);
case InputRcvBufSize: case InputRcvBufSize:
return fInputRcvBufSize->at(slot); return fInputRcvBufSize->at(slot);
case OutputSocketType:
return fOutputSocketType->at(slot);
case OutputSndBufSize: case OutputSndBufSize:
return fOutputSndBufSize->at(slot); return fOutputSndBufSize->at(slot);
case OutputRcvBufSize: case OutputRcvBufSize:
@ -268,14 +265,14 @@ void FairMQDevice::LogSocketRates()
// End of temp stuff // End of temp stuff
int i = 0; int i = 0;
for ( std::vector<FairMQSocket*>::iterator itr = fPayloadInputs->begin(); itr != fPayloadInputs->end(); itr++ ) { for ( vector<FairMQSocket*>::iterator itr = fPayloadInputs->begin(); itr != fPayloadInputs->end(); itr++ ) {
bytesInput[i] = (*itr)->GetBytesRx(); bytesInput[i] = (*itr)->GetBytesRx();
messagesInput[i] = (*itr)->GetMessagesRx(); messagesInput[i] = (*itr)->GetMessagesRx();
++i; ++i;
} }
i = 0; i = 0;
for ( std::vector<FairMQSocket*>::iterator itr = fPayloadOutputs->begin(); itr != fPayloadOutputs->end(); itr++ ) { for ( vector<FairMQSocket*>::iterator itr = fPayloadOutputs->begin(); itr != fPayloadOutputs->end(); itr++ ) {
bytesOutput[i] = (*itr)->GetBytesTx(); bytesOutput[i] = (*itr)->GetBytesTx();
messagesOutput[i] = (*itr)->GetMessagesTx(); messagesOutput[i] = (*itr)->GetMessagesTx();
++i; ++i;
@ -293,7 +290,7 @@ void FairMQDevice::LogSocketRates()
i = 0; i = 0;
for ( std::vector<FairMQSocket*>::iterator itr = fPayloadInputs->begin(); itr != fPayloadInputs->end(); itr++ ) { for ( vector<FairMQSocket*>::iterator itr = fPayloadInputs->begin(); itr != fPayloadInputs->end(); itr++ ) {
bytesInputNew[i] = (*itr)->GetBytesRx(); bytesInputNew[i] = (*itr)->GetBytesRx();
megabytesPerSecondInput[i] = ((double) (bytesInputNew[i] - bytesInput[i]) / (1024. * 1024.)) / (double) timeSinceLastLog_ms * 1000.; megabytesPerSecondInput[i] = ((double) (bytesInputNew[i] - bytesInput[i]) / (1024. * 1024.)) / (double) timeSinceLastLog_ms * 1000.;
bytesInput[i] = bytesInputNew[i]; bytesInput[i] = bytesInputNew[i];
@ -301,7 +298,7 @@ void FairMQDevice::LogSocketRates()
messagesPerSecondInput[i] = (double) (messagesInputNew[i] - messagesInput[i]) / (double) timeSinceLastLog_ms * 1000.; messagesPerSecondInput[i] = (double) (messagesInputNew[i] - messagesInput[i]) / (double) timeSinceLastLog_ms * 1000.;
messagesInput[i] = messagesInputNew[i]; messagesInput[i] = messagesInputNew[i];
std::stringstream logmsg; stringstream logmsg;
logmsg << "#" << fId << "." << (*itr)->GetId() << ": " << messagesPerSecondInput[i] << " msg/s, " << megabytesPerSecondInput[i] << " MB/s"; logmsg << "#" << fId << "." << (*itr)->GetId() << ": " << messagesPerSecondInput[i] << " msg/s, " << megabytesPerSecondInput[i] << " MB/s";
FairMQLogger::GetInstance()->Log(FairMQLogger::DEBUG, logmsg.str()); FairMQLogger::GetInstance()->Log(FairMQLogger::DEBUG, logmsg.str());
@ -310,7 +307,7 @@ void FairMQDevice::LogSocketRates()
receivedSomething = true; receivedSomething = true;
} }
if ( receivedSomething && messagesPerSecondInput[i] == 0 ) { if ( receivedSomething && messagesPerSecondInput[i] == 0 ) {
std::cout << "Did not receive anything on socket " << i << " for " << didNotReceiveFor++ << " seconds." << std::endl; cout << "Did not receive anything on socket " << i << " for " << didNotReceiveFor++ << " seconds." << endl;
} else { } else {
didNotReceiveFor = 0; didNotReceiveFor = 0;
} }
@ -321,7 +318,7 @@ void FairMQDevice::LogSocketRates()
i = 0; i = 0;
for ( std::vector<FairMQSocket*>::iterator itr = fPayloadOutputs->begin(); itr != fPayloadOutputs->end(); itr++ ) { for ( vector<FairMQSocket*>::iterator itr = fPayloadOutputs->begin(); itr != fPayloadOutputs->end(); itr++ ) {
bytesOutputNew[i] = (*itr)->GetBytesTx(); bytesOutputNew[i] = (*itr)->GetBytesTx();
megabytesPerSecondOutput[i] = ((double) (bytesOutputNew[i] - bytesOutput[i]) / (1024. * 1024.)) / (double) timeSinceLastLog_ms * 1000.; megabytesPerSecondOutput[i] = ((double) (bytesOutputNew[i] - bytesOutput[i]) / (1024. * 1024.)) / (double) timeSinceLastLog_ms * 1000.;
bytesOutput[i] = bytesOutputNew[i]; bytesOutput[i] = bytesOutputNew[i];
@ -329,7 +326,7 @@ void FairMQDevice::LogSocketRates()
messagesPerSecondOutput[i] = (double) (messagesOutputNew[i] - messagesOutput[i]) / (double) timeSinceLastLog_ms * 1000.; messagesPerSecondOutput[i] = (double) (messagesOutputNew[i] - messagesOutput[i]) / (double) timeSinceLastLog_ms * 1000.;
messagesOutput[i] = messagesOutputNew[i]; messagesOutput[i] = messagesOutputNew[i];
std::stringstream logmsg; stringstream logmsg;
logmsg << "#" << fId << "." << (*itr)->GetId() << ": " << messagesPerSecondOutput[i] << " msg/s, " << megabytesPerSecondOutput[i] << " MB/s"; logmsg << "#" << fId << "." << (*itr)->GetId() << ": " << messagesPerSecondOutput[i] << " msg/s, " << megabytesPerSecondOutput[i] << " MB/s";
FairMQLogger::GetInstance()->Log(FairMQLogger::DEBUG, logmsg.str()); FairMQLogger::GetInstance()->Log(FairMQLogger::DEBUG, logmsg.str());
@ -338,7 +335,7 @@ void FairMQDevice::LogSocketRates()
sentSomething = true; sentSomething = true;
} }
if ( sentSomething && messagesPerSecondOutput[i] == 0 ) { if ( sentSomething && messagesPerSecondOutput[i] == 0 ) {
std::cout << "Did not send anything on socket " << i << " for " << didNotSendFor++ << " seconds." << std::endl; cout << "Did not send anything on socket " << i << " for " << didNotSendFor++ << " seconds." << endl;
} else { } else {
didNotSendFor = 0; didNotSendFor = 0;
} }
@ -349,18 +346,18 @@ void FairMQDevice::LogSocketRates()
// Temp stuff for process termination // Temp stuff for process termination
if (receivedSomething && didNotReceiveFor > 5) { if (receivedSomething && didNotReceiveFor > 5) {
std::cout << "stopping because nothing was received for 5 seconds." << std::endl; cout << "stopping because nothing was received for 5 seconds." << endl;
ChangeState(STOP); ChangeState(STOP);
} }
if (sentSomething && didNotSendFor > 5) { if (sentSomething && didNotSendFor > 5) {
std::cout << "stopping because nothing was sent for 5 seconds." << std::endl; cout << "stopping because nothing was sent for 5 seconds." << endl;
ChangeState(STOP); ChangeState(STOP);
} }
// End of temp stuff // End of temp stuff
t0 = t1; t0 = t1;
} catch (boost::thread_interrupted&) { } catch (boost::thread_interrupted&) {
std::cout << "rateLogger interrupted" << std::endl; cout << "rateLogger interrupted" << endl;
break; break;
} }
} }
@ -390,26 +387,26 @@ void FairMQDevice::ListenToCommands()
void FairMQDevice::Shutdown() void FairMQDevice::Shutdown()
{ {
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> closing inputs <<<<<<<"); FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> closing inputs <<<<<<<");
for( std::vector<FairMQSocket*>::iterator itr = fPayloadInputs->begin(); itr != fPayloadInputs->end(); itr++ ) { for( vector<FairMQSocket*>::iterator itr = fPayloadInputs->begin(); itr != fPayloadInputs->end(); itr++ ) {
(*itr)->Close(); (*itr)->Close();
} }
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> closing outputs <<<<<<<"); FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> closing outputs <<<<<<<");
for( std::vector<FairMQSocket*>::iterator itr = fPayloadOutputs->begin(); itr != fPayloadOutputs->end(); itr++ ) { for( vector<FairMQSocket*>::iterator itr = fPayloadOutputs->begin(); itr != fPayloadOutputs->end(); itr++ ) {
(*itr)->Close(); (*itr)->Close();
} }
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> closing context <<<<<<<"); //FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> closing context <<<<<<<");
fPayloadContext->Close(); //fPayloadContext->Close();
} }
FairMQDevice::~FairMQDevice() FairMQDevice::~FairMQDevice()
{ {
for( std::vector<FairMQSocket*>::iterator itr = fPayloadInputs->begin(); itr != fPayloadInputs->end(); itr++ ) { for( vector<FairMQSocket*>::iterator itr = fPayloadInputs->begin(); itr != fPayloadInputs->end(); itr++ ) {
delete (*itr); delete (*itr);
} }
for( std::vector<FairMQSocket*>::iterator itr = fPayloadOutputs->begin(); itr != fPayloadOutputs->end(); itr++ ) { for( vector<FairMQSocket*>::iterator itr = fPayloadOutputs->begin(); itr != fPayloadOutputs->end(); itr++ ) {
delete (*itr); delete (*itr);
} }

View File

@ -10,13 +10,17 @@
#include <vector> #include <vector>
#include <string> #include <string>
#include <iostream>
#include "FairMQConfigurable.h" #include "FairMQConfigurable.h"
#include "FairMQStateMachine.h" #include "FairMQStateMachine.h"
#include "FairMQTransportFactory.h" #include "FairMQTransportFactory.h"
#include "FairMQContext.h"
#include "FairMQSocket.h" #include "FairMQSocket.h"
using std::vector;
using std::cin;
using std::cout;
using std::endl;
class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable
{ {
@ -45,38 +49,37 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable
virtual void LogSocketRates(); virtual void LogSocketRates();
virtual void ListenToCommands(); virtual void ListenToCommands();
virtual void SetProperty(const int& key, const std::string& value, const int& slot = 0); virtual void SetProperty(const int key, const string& value, const int slot = 0);
virtual std::string GetProperty(const int& key, const std::string& default_ = "", const int& slot = 0); virtual string GetProperty(const int key, const string& default_ = "", const int slot = 0);
virtual void SetProperty(const int& key, const int& value, const int& slot = 0); virtual void SetProperty(const int key, const int value, const int slot = 0);
virtual int GetProperty(const int& key, const int& default_ = 0, const int& slot = 0); virtual int GetProperty(const int key, const int default_ = 0, const int slot = 0);
virtual void SetTransport(FairMQTransportFactory* factory); virtual void SetTransport(FairMQTransportFactory* factory);
virtual ~FairMQDevice(); virtual ~FairMQDevice();
protected: protected:
std::string fId; string fId;
int fNumIoThreads; int fNumIoThreads;
FairMQContext* fPayloadContext;
FairMQTransportFactory* fTransportFactory; FairMQTransportFactory* fTransportFactory;
int fNumInputs; int fNumInputs;
int fNumOutputs; int fNumOutputs;
std::vector<std::string> *fInputAddress; vector<string> *fInputAddress;
std::vector<std::string> *fInputMethod; vector<string> *fInputMethod;
std::vector<int> *fInputSocketType; vector<string> *fInputSocketType;
std::vector<int> *fInputSndBufSize; vector<int> *fInputSndBufSize;
std::vector<int> *fInputRcvBufSize; vector<int> *fInputRcvBufSize;
std::vector<std::string> *fOutputAddress; vector<string> *fOutputAddress;
std::vector<std::string> *fOutputMethod; vector<string> *fOutputMethod;
std::vector<int> *fOutputSocketType; vector<string> *fOutputSocketType;
std::vector<int> *fOutputSndBufSize; vector<int> *fOutputSndBufSize;
std::vector<int> *fOutputRcvBufSize; vector<int> *fOutputRcvBufSize;
std::vector<FairMQSocket*> *fPayloadInputs; vector<FairMQSocket*> *fPayloadInputs;
std::vector<FairMQSocket*> *fPayloadOutputs; vector<FairMQSocket*> *fPayloadOutputs;
int fLogIntervalInMs; int fLogIntervalInMs;

View File

@ -11,6 +11,9 @@
#include "FairMQLogger.h" #include "FairMQLogger.h"
using std::cin;
using std::cout;
using std::endl;
FairMQLogger* FairMQLogger::instance = NULL; FairMQLogger* FairMQLogger::instance = NULL;
@ -22,7 +25,7 @@ FairMQLogger* FairMQLogger::GetInstance()
return instance; return instance;
} }
FairMQLogger* FairMQLogger::InitInstance(std::string bindAddress) FairMQLogger* FairMQLogger::InitInstance(const string& bindAddress)
{ {
instance = new FairMQLogger(bindAddress); instance = new FairMQLogger(bindAddress);
return instance; return instance;
@ -33,7 +36,7 @@ FairMQLogger::FairMQLogger() :
{ {
} }
FairMQLogger::FairMQLogger(std::string bindAddress) : FairMQLogger::FairMQLogger(const string& bindAddress) :
fBindAddress(bindAddress) fBindAddress(bindAddress)
{ {
} }
@ -42,7 +45,7 @@ FairMQLogger::~FairMQLogger()
{ {
} }
void FairMQLogger::Log(int type, std::string logmsg) void FairMQLogger::Log(int type, const string& logmsg)
{ {
timestamp_t tm = get_timestamp(); timestamp_t tm = get_timestamp();
timestamp_t ms = tm / 1000.0L; timestamp_t ms = tm / 1000.0L;
@ -52,7 +55,7 @@ void FairMQLogger::Log(int type, std::string logmsg)
char mbstr[100]; char mbstr[100];
std::strftime(mbstr, 100, "%H:%M:%S:", std::localtime(&t)); std::strftime(mbstr, 100, "%H:%M:%S:", std::localtime(&t));
std::string type_str; string type_str;
switch (type) { switch (type) {
case DEBUG: case DEBUG:
type_str = "\033[01;34mDEBUG\033[0m"; type_str = "\033[01;34mDEBUG\033[0m";
@ -69,7 +72,7 @@ void FairMQLogger::Log(int type, std::string logmsg)
break; break;
} }
std::cout << "[\033[01;36m" << mbstr << fractional_seconds << "\033[0m]" << "[" << type_str << "]" << " " << logmsg << std::endl; cout << "[\033[01;36m" << mbstr << fractional_seconds << "\033[0m]" << "[" << type_str << "]" << " " << logmsg << endl;
} }
timestamp_t get_timestamp () timestamp_t get_timestamp ()

View File

@ -12,22 +12,24 @@
#include <sstream> #include <sstream>
#include <sys/time.h> #include <sys/time.h>
using std::string;
using std::stringstream;
class FairMQLogger class FairMQLogger
{ {
private: private:
static FairMQLogger* instance; static FairMQLogger* instance;
std::string fBindAddress; string fBindAddress;
public: public:
enum { enum {
DEBUG, INFO, ERROR, STATE DEBUG, INFO, ERROR, STATE
}; };
FairMQLogger(); FairMQLogger();
FairMQLogger(std::string bindAdress); FairMQLogger(const string& bindAdress); // TODO: check this for const ref
virtual ~FairMQLogger(); virtual ~FairMQLogger();
void Log(int type, std::string logmsg); void Log(int type, const string& logmsg);
static FairMQLogger* GetInstance(); static FairMQLogger* GetInstance();
static FairMQLogger* InitInstance(std::string bindAddress); static FairMQLogger* InitInstance(const string& bindAddress); // TODO: check this for const ref
}; };
typedef unsigned long long timestamp_t; typedef unsigned long long timestamp_t;

View File

@ -10,6 +10,7 @@
#include "FairMQLogger.h" #include "FairMQLogger.h"
#include "FairMQMerger.h" #include "FairMQMerger.h"
#include "FairMQPoller.h"
FairMQMerger::FairMQMerger() FairMQMerger::FairMQMerger()
@ -26,24 +27,17 @@ void FairMQMerger::Run()
boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this)); boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this));
zmq_pollitem_t items[fNumInputs]; FairMQPoller* poller = fTransportFactory->CreatePoller(*fPayloadInputs);
for (int i = 0; i < fNumInputs; i++) {
items[i].socket = fPayloadInputs->at(i)->GetSocket();
items[i].fd = 0;
items[i].events = ZMQ_POLLIN;
items[i].revents = 0;
}
bool received = false; bool received = false;
while ( fState == RUNNING ) { while ( fState == RUNNING ) {
FairMQMessage* msg = fTransportFactory->CreateMessage(); FairMQMessage* msg = fTransportFactory->CreateMessage();
zmq_poll(items, fNumInputs, 100); poller->Poll(100);
for(int i = 0; i < fNumInputs; i++) { for(int i = 0; i < fNumInputs; i++) {
if (items[i].revents & ZMQ_POLLIN) { if (poller->CheckInput(i)){
received = fPayloadInputs->at(i)->Receive(msg); received = fPayloadInputs->at(i)->Receive(msg);
} }
if (received) { if (received) {
@ -55,6 +49,8 @@ void FairMQMerger::Run()
delete msg; delete msg;
} }
delete poller;
rateLogger.interrupt(); rateLogger.interrupt();
rateLogger.join(); rateLogger.join();
} }

View File

@ -21,6 +21,7 @@ class FairMQMessage
virtual void* GetMessage() = 0; virtual void* GetMessage() = 0;
virtual void* GetData() = 0; virtual void* GetData() = 0;
virtual size_t GetSize() = 0; virtual size_t GetSize() = 0;
virtual void SetMessage(void* data, size_t size) = 0;
virtual void Copy(FairMQMessage* msg) = 0; virtual void Copy(FairMQMessage* msg) = 0;

View File

@ -0,0 +1,135 @@
/**
* FairMQMessageNN.cxx
*
* @since 2013-12-05
* @author: A. Rybalchenko
*/
#include <cstring>
#include <nanomsg/nn.h>
#include "FairMQMessageNN.h"
#include "FairMQLogger.h"
FairMQMessageNN::FairMQMessageNN() :
fSize(0),
fMessage(NULL)
{
}
FairMQMessageNN::FairMQMessageNN(size_t size)
{
fMessage = nn_allocmsg(size, 0);
if(!fMessage){
stringstream logmsg;
logmsg << "failed allocating message, reason: " << nn_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
}
fSize = size;
}
FairMQMessageNN::FairMQMessageNN(void* data, size_t size)
{
fMessage = nn_allocmsg(size, 0);
if(!fMessage){
stringstream logmsg;
logmsg << "failed allocating message, reason: " << nn_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
}
memcpy (fMessage, data, size);
fSize = size;
}
void FairMQMessageNN::Rebuild()
{
Clear();
fSize = 0;
fMessage = NULL;
}
void FairMQMessageNN::Rebuild(size_t size)
{
Clear();
fMessage = nn_allocmsg(size, 0);
if(!fMessage){
stringstream logmsg;
logmsg << "failed allocating message, reason: " << nn_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
}
fSize = size;
}
void FairMQMessageNN::Rebuild(void* data, size_t size)
{
Clear();
fMessage = nn_allocmsg(size, 0);
if(!fMessage){
stringstream logmsg;
logmsg << "failed allocating message, reason: " << nn_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
}
fSize = size;
}
void* FairMQMessageNN::GetMessage()
{
return fMessage;
}
void* FairMQMessageNN::GetData()
{
return fMessage;
}
size_t FairMQMessageNN::GetSize()
{
return fSize;
}
void FairMQMessageNN::SetMessage(void* data, size_t size)
{
fMessage = data;
fSize = size;
}
void FairMQMessageNN::Copy(FairMQMessage* msg)
{
if(fMessage){
int rc = nn_freemsg(fMessage);
if( rc < 0 ){
stringstream logmsg;
logmsg << "failed freeing message, reason: " << nn_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
}
}
size_t size = msg->GetSize();
fMessage = nn_allocmsg(size, 0);
if(!fMessage){
stringstream logmsg;
logmsg << "failed allocating message, reason: " << nn_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
}
std::memcpy (fMessage, msg->GetMessage(), size);
fSize = size;
}
void FairMQMessageNN::Clear()
{
int rc = nn_freemsg(fMessage);
if (rc < 0) {
stringstream logmsg;
logmsg << "failed freeing message, reason: " << nn_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
} else {
fMessage = NULL;
fSize = 0;
}
}
FairMQMessageNN::~FairMQMessageNN()
{
}

View File

@ -0,0 +1,43 @@
/**
* FairMQMessageNN.h
*
* @since 2013-12-05
* @author: A. Rybalchenko
*/
#ifndef FAIRMQMESSAGENN_H_
#define FAIRMQMESSAGENN_H_
#include <cstddef>
#include "FairMQMessage.h"
class FairMQMessageNN : public FairMQMessage
{
public:
FairMQMessageNN();
FairMQMessageNN(size_t size);
FairMQMessageNN(void* data, size_t size);
virtual void Rebuild();
virtual void Rebuild(size_t size);
virtual void Rebuild(void* data, size_t site);
virtual void* GetMessage();
virtual void* GetData();
virtual size_t GetSize();
virtual void Copy(FairMQMessage* msg);
void SetMessage(void* data, size_t size);
void Clear();
virtual ~FairMQMessageNN();
private:
void* fMessage;
size_t fSize;
};
#endif /* FAIRMQMESSAGENN_H_ */

View File

@ -14,8 +14,8 @@
FairMQMessageZMQ::FairMQMessageZMQ() FairMQMessageZMQ::FairMQMessageZMQ()
{ {
int rc = zmq_msg_init (&fMessage); int rc = zmq_msg_init (&fMessage);
if (rc != 0){ if (rc != 0) {
std::stringstream logmsg; stringstream logmsg;
logmsg << "failed initializing message, reason: " << zmq_strerror(errno); logmsg << "failed initializing message, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
} }
@ -24,8 +24,8 @@ FairMQMessageZMQ::FairMQMessageZMQ()
FairMQMessageZMQ::FairMQMessageZMQ(size_t size) FairMQMessageZMQ::FairMQMessageZMQ(size_t size)
{ {
int rc = zmq_msg_init_size (&fMessage, size); int rc = zmq_msg_init_size (&fMessage, size);
if (rc != 0){ if (rc != 0) {
std::stringstream logmsg; stringstream logmsg;
logmsg << "failed initializing message with size, reason: " << zmq_strerror(errno); logmsg << "failed initializing message with size, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
} }
@ -34,8 +34,8 @@ FairMQMessageZMQ::FairMQMessageZMQ(size_t size)
FairMQMessageZMQ::FairMQMessageZMQ(void* data, size_t size) FairMQMessageZMQ::FairMQMessageZMQ(void* data, size_t size)
{ {
int rc = zmq_msg_init_data (&fMessage, data, size, &CleanUp, NULL); int rc = zmq_msg_init_data (&fMessage, data, size, &CleanUp, NULL);
if (rc != 0){ if (rc != 0) {
std::stringstream logmsg; stringstream logmsg;
logmsg << "failed initializing message with data, reason: " << zmq_strerror(errno); logmsg << "failed initializing message with data, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
} }
@ -43,28 +43,50 @@ FairMQMessageZMQ::FairMQMessageZMQ(void* data, size_t size)
void FairMQMessageZMQ::Rebuild() void FairMQMessageZMQ::Rebuild()
{ {
//TODO int rc = zmq_msg_close (&fMessage);
if (rc != 0) {
stringstream logmsg;
logmsg << "failed closing message, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
}
rc = zmq_msg_init (&fMessage);
if (rc != 0) {
stringstream logmsg;
logmsg << "failed initializing message, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
}
} }
void FairMQMessageZMQ::Rebuild(size_t size) void FairMQMessageZMQ::Rebuild(size_t size)
{ {
//TODO int rc = zmq_msg_close (&fMessage);
if (rc != 0) {
stringstream logmsg;
logmsg << "failed closing message, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
}
rc = zmq_msg_init_size (&fMessage, size);
if (rc != 0) {
stringstream logmsg;
logmsg << "failed initializing message with size, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
}
} }
void FairMQMessageZMQ::Rebuild(void* data, size_t size) void FairMQMessageZMQ::Rebuild(void* data, size_t size)
{ {
int rc = zmq_msg_close (&fMessage); int rc = zmq_msg_close (&fMessage);
if (rc != 0) { if (rc != 0) {
std::stringstream logmsg; stringstream logmsg;
logmsg << "failed closing message, reason: " << zmq_strerror(errno); logmsg << "failed closing message, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
} }
rc = zmq_msg_init_data (&fMessage, data, size, &CleanUp, NULL); rc = zmq_msg_init_data (&fMessage, data, size, &CleanUp, NULL);
if (rc != 0) { if (rc != 0) {
std::stringstream logmsg2; stringstream logmsg2;
logmsg2 << "failed initializing message with data, reason: " << zmq_strerror(errno); logmsg2 << "failed initializing message with data, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg2.str()); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg2.str());
} }
@ -85,11 +107,16 @@ size_t FairMQMessageZMQ::GetSize()
return zmq_msg_size (&fMessage); return zmq_msg_size (&fMessage);
} }
void FairMQMessageZMQ::SetMessage(void* data, size_t size)
{
// dummy method to comply with the interface. functionality not allowed in zeromq.
}
void FairMQMessageZMQ::Copy(FairMQMessage* msg) void FairMQMessageZMQ::Copy(FairMQMessage* msg)
{ {
int rc = zmq_msg_copy (&fMessage, &(static_cast<FairMQMessageZMQ*>(msg)->fMessage)); int rc = zmq_msg_copy (&fMessage, &(static_cast<FairMQMessageZMQ*>(msg)->fMessage));
if (rc != 0) { if (rc != 0) {
std::stringstream logmsg; stringstream logmsg;
logmsg << "failed copying message, reason: " << zmq_strerror(errno); logmsg << "failed copying message, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
} }
@ -103,8 +130,8 @@ void FairMQMessageZMQ::CleanUp(void* data, void* hint)
FairMQMessageZMQ::~FairMQMessageZMQ() FairMQMessageZMQ::~FairMQMessageZMQ()
{ {
int rc = zmq_msg_close (&fMessage); int rc = zmq_msg_close (&fMessage);
if (rc != 0){ if (rc != 0) {
std::stringstream logmsg; stringstream logmsg;
logmsg << "failed closing message with data, reason: " << zmq_strerror(errno); logmsg << "failed closing message with data, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
} }

View File

@ -15,7 +15,7 @@
#include "FairMQMessage.h" #include "FairMQMessage.h"
class FairMQMessageZMQ: public FairMQMessage class FairMQMessageZMQ : public FairMQMessage
{ {
public: public:
FairMQMessageZMQ(); FairMQMessageZMQ();
@ -29,6 +29,7 @@ class FairMQMessageZMQ: public FairMQMessage
virtual void* GetMessage(); virtual void* GetMessage();
virtual void* GetData(); virtual void* GetData();
virtual size_t GetSize(); virtual size_t GetSize();
virtual void SetMessage(void* data, size_t size);
virtual void Copy(FairMQMessage* msg); virtual void Copy(FairMQMessage* msg);

6
fairmq/FairMQPoller.cxx Normal file
View File

@ -0,0 +1,6 @@
/**
* FairMQPoller.cxx
*
* @since 2014-01-23
* @author A. Rybalchenko
*/

21
fairmq/FairMQPoller.h Normal file
View File

@ -0,0 +1,21 @@
/**
* FairMQPoller.h
*
* @since 2014-01-23
* @author A. Rybalchenko
*/
#ifndef FAIRMQPOLLER_H_
#define FAIRMQPOLLER_H_
class FairMQPoller
{
public:
virtual void Poll(int timeout) = 0;
virtual bool CheckInput(int index) = 0;
virtual ~FairMQPoller() {};
};
#endif /* FAIRMQPOLLER_H_ */

39
fairmq/FairMQPollerNN.cxx Normal file
View File

@ -0,0 +1,39 @@
/**
* FairMQPollerNN.cxx
*
* @since 2014-01-23
* @author A. Rybalchenko
*/
#include <nanomsg/nn.h>
#include "FairMQPollerNN.h"
FairMQPollerNN::FairMQPollerNN(const vector<FairMQSocket*>& inputs)
{
fNumItems = inputs.size();
items = new nn_pollfd[fNumItems];
for (int i = 0; i < fNumItems; i++) {
items[i].fd = inputs.at(i)->GetSocket(1);
items[i].events = NN_POLLIN;
}
}
void FairMQPollerNN::Poll(int timeout)
{
nn_poll(items, fNumItems, timeout);
}
bool FairMQPollerNN::CheckInput(int index)
{
if (items[index].revents & NN_POLLIN)
return true;
return false;
}
FairMQPollerNN::~FairMQPollerNN()
{
if (items != NULL) delete [] items;
}

33
fairmq/FairMQPollerNN.h Normal file
View File

@ -0,0 +1,33 @@
/**
* FairMQPollerNN.h
*
* @since 2014-01-23
* @author A. Rybalchenko
*/
#ifndef FAIRMQPOLLERNN_H_
#define FAIRMQPOLLERNN_H_
#include <vector>
#include "FairMQPoller.h"
#include "FairMQSocket.h"
using std::vector;
class FairMQPollerNN : public FairMQPoller
{
public:
FairMQPollerNN(const vector<FairMQSocket*>& inputs);
virtual void Poll(int timeout);
virtual bool CheckInput(int index);
virtual ~FairMQPollerNN();
private:
nn_pollfd* items;
int fNumItems;
};
#endif /* FAIRMQPOLLERNN_H_ */

View File

@ -0,0 +1,41 @@
/**
* FairMQPollerZMQ.cxx
*
* @since 2014-01-23
* @author A. Rybalchenko
*/
#include <zmq.h>
#include "FairMQPollerZMQ.h"
FairMQPollerZMQ::FairMQPollerZMQ(const vector<FairMQSocket*>& inputs)
{
fNumItems = inputs.size();
items = new zmq_pollitem_t[fNumItems];
for (int i = 0; i < fNumItems; i++) {
items[i].socket = inputs.at(i)->GetSocket();
items[i].fd = 0;
items[i].events = ZMQ_POLLIN;
items[i].revents = 0;
}
}
void FairMQPollerZMQ::Poll(int timeout)
{
zmq_poll(items, fNumItems, timeout);
}
bool FairMQPollerZMQ::CheckInput(int index)
{
if (items[index].revents & ZMQ_POLLIN)
return true;
return false;
}
FairMQPollerZMQ::~FairMQPollerZMQ()
{
if (items != NULL) delete [] items;
}

33
fairmq/FairMQPollerZMQ.h Normal file
View File

@ -0,0 +1,33 @@
/**
* FairMQPollerZMQ.h
*
* @since 2014-01-23
* @author A. Rybalchenko
*/
#ifndef FAIRMQPOLLERZMQ_H_
#define FAIRMQPOLLERZMQ_H_
#include <vector>
#include "FairMQPoller.h"
#include "FairMQSocket.h"
using std::vector;
class FairMQPollerZMQ : public FairMQPoller
{
public:
FairMQPollerZMQ(const vector<FairMQSocket*>& inputs);
virtual void Poll(int timeout);
virtual bool CheckInput(int index);
virtual ~FairMQPollerZMQ();
private:
zmq_pollitem_t* items;
int fNumItems;
};
#endif /* FAIRMQPOLLERZMQ_H_ */

View File

@ -61,7 +61,7 @@ void FairMQProcessor::Run()
delete msg; delete msg;
} }
std::cout << "I've received " << receivedMsgs << " and sent " << sentMsgs << " messages!" << std::endl; cout << "I've received " << receivedMsgs << " and sent " << sentMsgs << " messages!" << endl;
boost::this_thread::sleep(boost::posix_time::milliseconds(5000)); boost::this_thread::sleep(boost::posix_time::milliseconds(5000));

View File

@ -45,7 +45,7 @@ void FairMQSampler::Init()
FairMQDevice::Init(); FairMQDevice::Init();
fSamplerTask->SetBranch(fBranch); fSamplerTask->SetBranch(fBranch);
fSamplerTask->SetTransport(fTransportFactory); // TODO: simplify message creation for sampler task? fSamplerTask->SetTransport(fTransportFactory);
fFairRunAna->SetInputFile(TString(fInputFile)); fFairRunAna->SetInputFile(TString(fInputFile));
TString output = fInputFile; TString output = fInputFile;
@ -79,7 +79,7 @@ void FairMQSampler::Run()
boost::timer::auto_cpu_timer timer; boost::timer::auto_cpu_timer timer;
std::cout << "Number of events to process: " << fNumEvents << std::endl; cout << "Number of events to process: " << fNumEvents << endl;
Long64_t eventNr = 0; Long64_t eventNr = 0;
@ -105,8 +105,8 @@ void FairMQSampler::Run()
boost::timer::cpu_times const elapsed_time(timer.elapsed()); boost::timer::cpu_times const elapsed_time(timer.elapsed());
std::cout << "Sent everything in:\n" << boost::timer::format(elapsed_time, 2) << std::endl; cout << "Sent everything in:\n" << boost::timer::format(elapsed_time, 2) << endl;
std::cout << "Sent " << sentMsgs << " messages!" << std::endl; cout << "Sent " << sentMsgs << " messages!" << endl;
//boost::this_thread::sleep(boost::posix_time::milliseconds(5000)); //boost::this_thread::sleep(boost::posix_time::milliseconds(5000));
@ -125,7 +125,7 @@ void FairMQSampler::ResetEventCounter()
fEventCounter = fEventRate / 100; fEventCounter = fEventRate / 100;
boost::this_thread::sleep(boost::posix_time::milliseconds(10)); boost::this_thread::sleep(boost::posix_time::milliseconds(10));
} catch (boost::thread_interrupted&) { } catch (boost::thread_interrupted&) {
std::cout << "resetEventCounter interrupted" << std::endl; cout << "resetEventCounter interrupted" << endl;
break; break;
} }
} }
@ -154,14 +154,14 @@ void FairMQSampler::ListenToCommands()
boost::this_thread::interruption_point(); boost::this_thread::interruption_point();
} catch (boost::thread_interrupted&) { } catch (boost::thread_interrupted&) {
std::cout << "commandListener interrupted" << std::endl; cout << "commandListener interrupted" << endl;
break; break;
} }
} }
FairMQLogger::GetInstance()->Log(FairMQLogger::DEBUG, ">>>>>>> stopping commandListener <<<<<<<"); FairMQLogger::GetInstance()->Log(FairMQLogger::DEBUG, ">>>>>>> stopping commandListener <<<<<<<");
} }
void FairMQSampler::SetProperty(const int& key, const std::string& value, const int& slot/*= 0*/) void FairMQSampler::SetProperty(const int& key, const string& value, const int& slot/*= 0*/)
{ {
switch (key) { switch (key) {
case InputFile: case InputFile:
@ -179,7 +179,7 @@ void FairMQSampler::SetProperty(const int& key, const std::string& value, const
} }
} }
std::string FairMQSampler::GetProperty(const int& key, const std::string& default_/*= ""*/, const int& slot/*= 0*/) string FairMQSampler::GetProperty(const int& key, const string& default_/*= ""*/, const int& slot/*= 0*/)
{ {
switch (key) { switch (key) {
case InputFile: case InputFile:

View File

@ -8,7 +8,6 @@
#ifndef FAIRMQSAMPLER_H_ #ifndef FAIRMQSAMPLER_H_
#define FAIRMQSAMPLER_H_ #define FAIRMQSAMPLER_H_
#include <string>
#include "FairRunAna.h" #include "FairRunAna.h"
#include "FairTask.h" #include "FairTask.h"
#include "FairMQDevice.h" #include "FairMQDevice.h"
@ -40,17 +39,17 @@ class FairMQSampler: public FairMQDevice
void ResetEventCounter(); void ResetEventCounter();
virtual void ListenToCommands(); virtual void ListenToCommands();
virtual void SetProperty(const int& key, const std::string& value, const int& slot = 0); virtual void SetProperty(const int& key, const string& value, const int& slot = 0);
virtual std::string GetProperty(const int& key, const std::string& default_ = "", const int& slot = 0); virtual string GetProperty(const int& key, const string& default_ = "", const int& slot = 0);
virtual void SetProperty(const int& key, const int& value, const int& slot = 0); virtual void SetProperty(const int& key, const int& value, const int& slot = 0);
virtual int GetProperty(const int& key, const int& default_ = 0, const int& slot = 0); virtual int GetProperty(const int& key, const int& default_ = 0, const int& slot = 0);
protected: protected:
FairRunAna* fFairRunAna; FairRunAna* fFairRunAna;
int fNumEvents; int fNumEvents;
FairMQSamplerTask* fSamplerTask; FairMQSamplerTask* fSamplerTask;
std::string fInputFile; // Filename of a root file containing the simulated digis. string fInputFile; // Filename of a root file containing the simulated digis.
std::string fParFile; string fParFile;
std::string fBranch; // The name of the sub-detector branch to stream the digis from. string fBranch; // The name of the sub-detector branch to stream the digis from.
int fEventRate; int fEventRate;
int fEventCounter; int fEventCounter;
virtual void Init(); virtual void Init();

View File

@ -9,25 +9,27 @@
#define FAIRMQSOCKET_H_ #define FAIRMQSOCKET_H_
#include <string> #include <string>
#include "FairMQContext.h"
#include "FairMQMessage.h" #include "FairMQMessage.h"
using std::string;
using std::stringstream;
class FairMQSocket class FairMQSocket
{ {
public: public:
virtual std::string GetId() = 0; virtual string GetId() = 0;
virtual void Bind(std::string address) = 0; virtual void Bind(const string& address) = 0;
virtual void Connect(std::string address) = 0; virtual void Connect(const string& address) = 0;
virtual size_t Send(FairMQMessage* msg) = 0; virtual size_t Send(FairMQMessage* msg) = 0;
virtual size_t Receive(FairMQMessage* msg) = 0; virtual size_t Receive(FairMQMessage* msg) = 0;
virtual void Close() = 0;
virtual void* GetSocket() = 0; virtual void* GetSocket() = 0;
virtual int GetSocket(int nothing) = 0;
virtual void Close() = 0;
virtual void SetOption(int option, const void* value, size_t valueSize) = 0; virtual void SetOption(const string& option, const void* value, size_t valueSize) = 0;
virtual unsigned long GetBytesTx() = 0; virtual unsigned long GetBytesTx() = 0;
virtual unsigned long GetBytesRx() = 0; virtual unsigned long GetBytesRx() = 0;

View File

@ -0,0 +1,161 @@
/**
* FairMQSocketNN.cxx
*
* @since 2012-12-05
* @author A. Rybalchenko
*/
#include <sstream>
#include "FairMQSocketNN.h"
#include "FairMQLogger.h"
FairMQSocketNN::FairMQSocketNN(const string& type, int num) :
fBytesTx(0),
fBytesRx(0),
fMessagesTx(0),
fMessagesRx(0)
{
stringstream id;
id << type << "." << num;
fId = id.str();
fSocket = nn_socket (AF_SP, GetConstant(type));
if (type == "sub") {
nn_setsockopt(fSocket, NN_SUB, NN_SUB_SUBSCRIBE, NULL, 0);
}
stringstream logmsg;
logmsg << "created socket #" << fId;
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
}
string FairMQSocketNN::GetId()
{
return fId;
}
void FairMQSocketNN::Bind(const string& address)
{
stringstream logmsg;
logmsg << "bind socket #" << fId << " on " << address;
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
int eid = nn_bind(fSocket, address.c_str());
if (eid < 0) {
stringstream logmsg2;
logmsg2 << "failed binding socket #" << fId << ", reason: " << nn_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg2.str());
}
}
void FairMQSocketNN::Connect(const string& address)
{
stringstream logmsg;
logmsg << "connect socket #" << fId << " to " << address;
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
int eid = nn_connect(fSocket, address.c_str());
if (eid < 0) {
stringstream logmsg2;
logmsg2 << "failed connecting socket #" << fId << ", reason: " << nn_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg2.str());
}
}
size_t FairMQSocketNN::Send(FairMQMessage* msg)
{
void* ptr = msg->GetMessage();
int rc = nn_send(fSocket, &ptr, NN_MSG, 0);
if (rc < 0) {
stringstream logmsg;
logmsg << "failed sending on socket #" << fId << ", reason: " << nn_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
} else {
fBytesTx += rc;
++fMessagesTx;
}
return rc;
}
size_t FairMQSocketNN::Receive(FairMQMessage* msg)
{
void* ptr = msg->GetMessage();
int rc = nn_recv(fSocket, &ptr, NN_MSG, 0);
if (rc < 0) {
stringstream logmsg;
logmsg << "failed receiving on socket #" << fId << ", reason: " << nn_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
} else {
fBytesRx += rc;
++fMessagesRx;
msg->SetMessage(ptr, rc);
}
return rc;
}
void* FairMQSocketNN::GetSocket()
{
return NULL;// dummy method to compy with the interface. functionality not possible in zeromq.
}
int FairMQSocketNN::GetSocket(int nothing)
{
return fSocket;
}
void FairMQSocketNN::Close()
{
nn_close(fSocket);
}
void FairMQSocketNN::SetOption(const string& option, const void* value, size_t valueSize)
{
int rc = nn_setsockopt(fSocket, NN_SOL_SOCKET, GetConstant(option), value, valueSize);
if (rc < 0) {
stringstream logmsg;
logmsg << "failed setting socket option, reason: " << nn_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
}
}
unsigned long FairMQSocketNN::GetBytesTx()
{
return fBytesTx;
}
unsigned long FairMQSocketNN::GetBytesRx()
{
return fBytesRx;
}
unsigned long FairMQSocketNN::GetMessagesTx()
{
return fMessagesTx;
}
unsigned long FairMQSocketNN::GetMessagesRx()
{
return fMessagesRx;
}
int FairMQSocketNN::GetConstant(const string& constant)
{
if (constant == "sub") return NN_SUB;
if (constant == "pub") return NN_PUB;
if (constant == "xsub") return NN_SUB; // TODO: is there XPUB, XSUB for nanomsg?
if (constant == "xpub") return NN_PUB;
if (constant == "push") return NN_PUSH;
if (constant == "pull") return NN_PULL;
if (constant == "snd-hwm") return NN_SNDBUF;
if (constant == "rcv-hwm") return NN_RCVBUF;
return -1;
}
FairMQSocketNN::~FairMQSocketNN()
{
Close();
}

View File

@ -0,0 +1,56 @@
/**
* FairMQSocketNN.h
*
* @since 2013-12-05
* @author A. Rybalchenko
*/
#ifndef FAIRMQSOCKETNN_H_
#define FAIRMQSOCKETNN_H_
#include <nanomsg/nn.h>
#include <nanomsg/pipeline.h>
#include <nanomsg/pubsub.h>
#include "FairMQSocket.h"
class FairMQSocketNN : public FairMQSocket
{
public:
FairMQSocketNN(const string& type, int num);
virtual string GetId();
virtual void Bind(const string& address);
virtual void Connect(const string& address);
virtual size_t Send(FairMQMessage* msg);
virtual size_t Receive(FairMQMessage* msg);
virtual void* GetSocket();
virtual int GetSocket(int nothing);
virtual void Close();
virtual void SetOption(const string& option, const void* value, size_t valueSize);
unsigned long GetBytesTx();
unsigned long GetBytesRx();
unsigned long GetMessagesTx();
unsigned long GetMessagesRx();
static int GetConstant(const string& constant);
virtual ~FairMQSocketNN();
private:
int fSocket;
string fId;
unsigned long fBytesTx;
unsigned long fBytesRx;
unsigned long fMessagesTx;
unsigned long fMessagesRx;
};
#endif /* FAIRMQSOCKETNN_H_ */

View File

@ -10,83 +10,68 @@
#include "FairMQSocketZMQ.h" #include "FairMQSocketZMQ.h"
#include "FairMQLogger.h" #include "FairMQLogger.h"
boost::shared_ptr<FairMQContextZMQ> FairMQSocketZMQ::fContext = boost::shared_ptr<FairMQContextZMQ>(new FairMQContextZMQ(1)); // TODO: numIoThreads!
FairMQSocketZMQ::FairMQSocketZMQ(FairMQContext* context, int type, int num) : FairMQSocketZMQ::FairMQSocketZMQ(const string& type, int num) :
fBytesTx(0), fBytesTx(0),
fBytesRx(0), fBytesRx(0),
fMessagesTx(0), fMessagesTx(0),
fMessagesRx(0) fMessagesRx(0)
{ {
std::stringstream id; stringstream id; // TODO
id << GetTypeString(type) << "." << num; id << type << "." << num;
fId = id.str(); fId = id.str();
fSocket = zmq_socket(context->GetContext(), type); fSocket = zmq_socket(fContext->GetContext(), GetConstant(type));
int rc = zmq_setsockopt(fSocket, ZMQ_IDENTITY, &fId, fId.length()); int rc = zmq_setsockopt(fSocket, ZMQ_IDENTITY, &fId, fId.length());
if (rc != 0) { if (rc != 0) {
std::stringstream logmsg; stringstream logmsg;
logmsg << "failed setting socket option, reason: " << zmq_strerror(errno); logmsg << "failed setting socket option, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
} }
if (type == ZMQ_SUB) { if (type == "sub") {
rc = zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, NULL, 0); rc = zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, NULL, 0);
if (rc != 0) { if (rc != 0) {
std::stringstream logmsg2; stringstream logmsg2;
logmsg2 << "failed setting socket option, reason: " << zmq_strerror(errno); logmsg2 << "failed setting socket option, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg2.str()); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg2.str());
} }
} }
std::stringstream logmsg3; stringstream logmsg3;
logmsg3 << "created socket #" << fId; logmsg3 << "created socket #" << fId;
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg3.str()); FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg3.str());
} }
std::string FairMQSocketZMQ::GetId() string FairMQSocketZMQ::GetId()
{ {
return fId; return fId;
} }
std::string FairMQSocketZMQ::GetTypeString(int type) void FairMQSocketZMQ::Bind(const string& address)
{ {
switch (type) { stringstream logmsg;
case ZMQ_SUB:
return "sub";
case ZMQ_PUB:
return "pub";
case ZMQ_PUSH:
return "push";
case ZMQ_PULL:
return "pull";
default:
return "";
}
}
void FairMQSocketZMQ::Bind(std::string address)
{
std::stringstream logmsg;
logmsg << "bind socket #" << fId << " on " << address; logmsg << "bind socket #" << fId << " on " << address;
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
int rc = zmq_bind (fSocket, address.c_str()); int rc = zmq_bind (fSocket, address.c_str());
if (rc != 0) { if (rc != 0) {
std::stringstream logmsg2; stringstream logmsg2;
logmsg2 << "failed binding socket #" << fId << ", reason: " << zmq_strerror(errno); logmsg2 << "failed binding socket #" << fId << ", reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg2.str()); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg2.str());
} }
} }
void FairMQSocketZMQ::Connect(std::string address) void FairMQSocketZMQ::Connect(const string& address)
{ {
std::stringstream logmsg; stringstream logmsg;
logmsg << "connect socket #" << fId << " on " << address; logmsg << "connect socket #" << fId << " on " << address;
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
int rc = zmq_connect (fSocket, address.c_str()); int rc = zmq_connect (fSocket, address.c_str());
if (rc != 0) { if (rc != 0) {
std::stringstream logmsg2; stringstream logmsg2;
logmsg2 << "failed connecting socket #" << fId << ", reason: " << zmq_strerror(errno); logmsg2 << "failed connecting socket #" << fId << ", reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg2.str()); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg2.str());
} }
@ -103,7 +88,7 @@ size_t FairMQSocketZMQ::Send(FairMQMessage* msg)
if (zmq_errno() == EAGAIN){ if (zmq_errno() == EAGAIN){
return false; return false;
} }
std::stringstream logmsg; stringstream logmsg;
logmsg << "failed sending on socket #" << fId << ", reason: " << zmq_strerror(errno); logmsg << "failed sending on socket #" << fId << ", reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
return nbytes; return nbytes;
@ -120,22 +105,12 @@ size_t FairMQSocketZMQ::Receive(FairMQMessage* msg)
if (zmq_errno() == EAGAIN){ if (zmq_errno() == EAGAIN){
return false; return false;
} }
std::stringstream logmsg; stringstream logmsg;
logmsg << "failed receiving on socket #" << fId << ", reason: " << zmq_strerror(errno); logmsg << "failed receiving on socket #" << fId << ", reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
return nbytes; return nbytes;
} }
void FairMQSocketZMQ::SetOption(int option, const void* value, size_t valueSize)
{
int rc = zmq_setsockopt(fSocket, option, value, valueSize);
if (rc < 0) {
std::stringstream logmsg;
logmsg << "failed setting socket option, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
}
}
void FairMQSocketZMQ::Close() void FairMQSocketZMQ::Close()
{ {
if (fSocket == NULL){ if (fSocket == NULL){
@ -144,7 +119,7 @@ void FairMQSocketZMQ::Close()
int rc = zmq_close (fSocket); int rc = zmq_close (fSocket);
if (rc != 0) { if (rc != 0) {
std::stringstream logmsg; stringstream logmsg;
logmsg << "failed closing socket, reason: " << zmq_strerror(errno); logmsg << "failed closing socket, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
} }
@ -157,6 +132,21 @@ void* FairMQSocketZMQ::GetSocket()
return fSocket; return fSocket;
} }
int FairMQSocketZMQ::GetSocket(int nothing)
{
// dummy method to compy with the interface. functionality not possible in zeromq.
}
void FairMQSocketZMQ::SetOption(const string& option, const void* value, size_t valueSize)
{
int rc = zmq_setsockopt(fSocket, GetConstant(option), value, valueSize);
if (rc < 0) {
stringstream logmsg;
logmsg << "failed setting socket option, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
}
}
unsigned long FairMQSocketZMQ::GetBytesTx() unsigned long FairMQSocketZMQ::GetBytesTx()
{ {
return fBytesTx; return fBytesTx;
@ -177,6 +167,20 @@ unsigned long FairMQSocketZMQ::GetMessagesRx()
return fMessagesRx; return fMessagesRx;
} }
int FairMQSocketZMQ::GetConstant(const string& constant)
{
if (constant == "sub") return ZMQ_SUB;
if (constant == "pub") return ZMQ_PUB;
if (constant == "xsub") return ZMQ_XSUB;
if (constant == "xpub") return ZMQ_XPUB;
if (constant == "push") return ZMQ_PUSH;
if (constant == "pull") return ZMQ_PULL;
if (constant == "snd-hwm") return ZMQ_SNDHWM;
if (constant == "rcv-hwm") return ZMQ_RCVHWM;
return -1;
}
FairMQSocketZMQ::~FairMQSocketZMQ() FairMQSocketZMQ::~FairMQSocketZMQ()
{ {
} }

View File

@ -8,45 +8,51 @@
#ifndef FAIRMQSOCKETZMQ_H_ #ifndef FAIRMQSOCKETZMQ_H_
#define FAIRMQSOCKETZMQ_H_ #define FAIRMQSOCKETZMQ_H_
#include <boost/shared_ptr.hpp>
#include <zmq.h> #include <zmq.h>
#include "FairMQSocket.h" #include "FairMQSocket.h"
#include "FairMQContext.h" #include "FairMQContextZMQ.h"
class FairMQSocketZMQ : public FairMQSocket class FairMQSocketZMQ : public FairMQSocket
{ {
public: public:
FairMQSocketZMQ(FairMQContext* context, int type, int num); FairMQSocketZMQ(const string& type, int num);
virtual std::string GetId(); virtual string GetId();
virtual void Bind(std::string address); virtual void Bind(const string& address);
virtual void Connect(std::string address); virtual void Connect(const string& address);
virtual size_t Send(FairMQMessage* msg); virtual size_t Send(FairMQMessage* msg);
virtual size_t Receive(FairMQMessage* msg); virtual size_t Receive(FairMQMessage* msg);
virtual void Close();
virtual void* GetSocket();
virtual void SetOption(int option, const void* value, size_t valueSize); virtual void* GetSocket();
virtual int GetSocket(int nothing);
virtual void Close();
virtual void SetOption(const string& option, const void* value, size_t valueSize);
virtual unsigned long GetBytesTx(); virtual unsigned long GetBytesTx();
virtual unsigned long GetBytesRx(); virtual unsigned long GetBytesRx();
virtual unsigned long GetMessagesTx(); virtual unsigned long GetMessagesTx();
virtual unsigned long GetMessagesRx(); virtual unsigned long GetMessagesRx();
static std::string GetTypeString(int type); static int GetConstant(const string& constant);
virtual ~FairMQSocketZMQ(); virtual ~FairMQSocketZMQ();
private: private:
void* fSocket; void* fSocket;
std::string fId; string fId;
unsigned long fBytesTx; unsigned long fBytesTx;
unsigned long fBytesRx; unsigned long fBytesRx;
unsigned long fMessagesTx; unsigned long fMessagesTx;
unsigned long fMessagesRx; unsigned long fMessagesRx;
static boost::shared_ptr<FairMQContextZMQ> fContext;
}; };
#endif /* FAIRMQSOCKETZMQ_H_ */ #endif /* FAIRMQSOCKETZMQ_H_ */

View File

@ -8,8 +8,13 @@
#ifndef FAIRMQTRANSPORTFACTORY_H_ #ifndef FAIRMQTRANSPORTFACTORY_H_
#define FAIRMQTRANSPORTFACTORY_H_ #define FAIRMQTRANSPORTFACTORY_H_
#include <string>
#include "FairMQMessage.h" #include "FairMQMessage.h"
#include "FairMQSocket.h" #include "FairMQSocket.h"
#include "FairMQPoller.h"
using std::vector;
class FairMQTransportFactory class FairMQTransportFactory
{ {
@ -17,7 +22,8 @@ class FairMQTransportFactory
virtual FairMQMessage* CreateMessage() = 0; virtual FairMQMessage* CreateMessage() = 0;
virtual FairMQMessage* CreateMessage(size_t size) = 0; virtual FairMQMessage* CreateMessage(size_t size) = 0;
virtual FairMQMessage* CreateMessage(void* data, size_t size) = 0; virtual FairMQMessage* CreateMessage(void* data, size_t size) = 0;
virtual FairMQSocket* CreateSocket(FairMQContext* context, int type, int num) = 0; virtual FairMQSocket* CreateSocket(string type, int num) = 0;
virtual FairMQPoller* CreatePoller(const vector<FairMQSocket*>& inputs) = 0;
virtual ~FairMQTransportFactory() {}; virtual ~FairMQTransportFactory() {};
}; };

View File

@ -0,0 +1,38 @@
/**
* FairMQTransportFactoryNN.cxx
*
* @since 2014-01-20
* @author: A. Rybalchenko
*/
#include "FairMQTransportFactoryNN.h"
FairMQTransportFactoryNN::FairMQTransportFactoryNN()
{
}
FairMQMessage* FairMQTransportFactoryNN::CreateMessage()
{
return new FairMQMessageNN();
}
FairMQMessage* FairMQTransportFactoryNN::CreateMessage(size_t size)
{
return new FairMQMessageNN(size);
}
FairMQMessage* FairMQTransportFactoryNN::CreateMessage(void* data, size_t size)
{
return new FairMQMessageNN(data, size);
}
FairMQSocket* FairMQTransportFactoryNN::CreateSocket(string type, int num)
{
return new FairMQSocketNN(type, num);
}
FairMQPoller* FairMQTransportFactoryNN::CreatePoller(const vector<FairMQSocket*>& inputs)
{
return new FairMQPollerNN(inputs);
}

View File

@ -0,0 +1,33 @@
/**
* FairMQTransportFactoryNN.h
*
* @since 2014-01-20
* @author: A. Rybalchenko
*/
#ifndef FAIRMQTRANSPORTFACTORYNN_H_
#define FAIRMQTRANSPORTFACTORYNN_H_
#include <vector>
#include "FairMQTransportFactory.h"
#include "FairMQMessageNN.h"
#include "FairMQSocketNN.h"
#include "FairMQPollerNN.h"
class FairMQTransportFactoryNN : public FairMQTransportFactory
{
public:
FairMQTransportFactoryNN();
virtual FairMQMessage* CreateMessage();
virtual FairMQMessage* CreateMessage(size_t size);
virtual FairMQMessage* CreateMessage(void* data, size_t size);
virtual FairMQSocket* CreateSocket(string type, int num);
virtual FairMQPoller* CreatePoller(const vector<FairMQSocket*>& inputs);
virtual ~FairMQTransportFactoryNN() {};
};
#endif /* FAIRMQTRANSPORTFACTORYNN_H_ */

View File

@ -9,25 +9,30 @@
FairMQTransportFactoryZMQ::FairMQTransportFactoryZMQ() FairMQTransportFactoryZMQ::FairMQTransportFactoryZMQ()
{ {
} }
FairMQMessage* FairMQTransportFactoryZMQ::CreateMessage() FairMQMessage* FairMQTransportFactoryZMQ::CreateMessage()
{ {
return new FairMQMessageZMQ(); return new FairMQMessageZMQ();
} }
FairMQMessage* FairMQTransportFactoryZMQ::CreateMessage(size_t size) FairMQMessage* FairMQTransportFactoryZMQ::CreateMessage(size_t size)
{ {
return new FairMQMessageZMQ(size); return new FairMQMessageZMQ(size);
} }
FairMQMessage* FairMQTransportFactoryZMQ::CreateMessage(void* data, size_t size) FairMQMessage* FairMQTransportFactoryZMQ::CreateMessage(void* data, size_t size)
{ {
return new FairMQMessageZMQ(data, size); return new FairMQMessageZMQ(data, size);
} }
FairMQSocket* FairMQTransportFactoryZMQ::CreateSocket(FairMQContext* context, int type, int num) FairMQSocket* FairMQTransportFactoryZMQ::CreateSocket(string type, int num)
{ {
return new FairMQSocketZMQ(context, type, num); return new FairMQSocketZMQ(type, num);
}
FairMQPoller* FairMQTransportFactoryZMQ::CreatePoller(const vector<FairMQSocket*>& inputs)
{
return new FairMQPollerZMQ(inputs);
} }

View File

@ -8,10 +8,13 @@
#ifndef FAIRMQTRANSPORTFACTORYZMQ_H_ #ifndef FAIRMQTRANSPORTFACTORYZMQ_H_
#define FAIRMQTRANSPORTFACTORYZMQ_H_ #define FAIRMQTRANSPORTFACTORYZMQ_H_
#include <vector>
#include "FairMQTransportFactory.h" #include "FairMQTransportFactory.h"
#include "FairMQContext.h" #include "FairMQContextZMQ.h"
#include "FairMQMessageZMQ.h" #include "FairMQMessageZMQ.h"
#include "FairMQSocketZMQ.h" #include "FairMQSocketZMQ.h"
#include "FairMQPollerZMQ.h"
class FairMQTransportFactoryZMQ : public FairMQTransportFactory class FairMQTransportFactoryZMQ : public FairMQTransportFactory
{ {
@ -21,7 +24,9 @@ class FairMQTransportFactoryZMQ : public FairMQTransportFactory
virtual FairMQMessage* CreateMessage(); virtual FairMQMessage* CreateMessage();
virtual FairMQMessage* CreateMessage(size_t size); virtual FairMQMessage* CreateMessage(size_t size);
virtual FairMQMessage* CreateMessage(void* data, size_t size); virtual FairMQMessage* CreateMessage(void* data, size_t size);
virtual FairMQSocket* CreateSocket(FairMQContext* context, int type, int num); virtual FairMQSocket* CreateSocket(string type, int num);
virtual FairMQPoller* CreatePoller(const vector<FairMQSocket*>& inputs);
virtual ~FairMQTransportFactoryZMQ() {}; virtual ~FairMQTransportFactoryZMQ() {};
}; };

View File

@ -11,18 +11,24 @@
#include "FairMQLogger.h" #include "FairMQLogger.h"
#include "FairMQBenchmarkSampler.h" #include "FairMQBenchmarkSampler.h"
#include "FairMQTransportFactoryZMQ.h" #include "FairMQTransportFactoryZMQ.h"
// #include "FairMQTransportFactoryNN.h"
using std::cout;
using std::cin;
using std::endl;
using std::stringstream;
FairMQBenchmarkSampler sampler; FairMQBenchmarkSampler sampler;
static void s_signal_handler (int signal) static void s_signal_handler (int signal)
{ {
std::cout << std::endl << "Caught signal " << signal << std::endl; cout << endl << "Caught signal " << signal << endl;
sampler.ChangeState(FairMQBenchmarkSampler::STOP); sampler.ChangeState(FairMQBenchmarkSampler::STOP);
sampler.ChangeState(FairMQBenchmarkSampler::END); sampler.ChangeState(FairMQBenchmarkSampler::END);
std::cout << "Shutdown complete. Bye!" << std::endl; cout << "Shutdown complete. Bye!" << endl;
exit(1); exit(1);
} }
@ -39,19 +45,20 @@ static void s_catch_signals (void)
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
if ( argc != 9 ) { if ( argc != 9 ) {
std::cout << "Usage: bsampler ID eventSize eventRate numIoTreads\n" cout << "Usage: bsampler ID eventSize eventRate numIoTreads\n"
<< "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n"
<< std::endl; << endl;
return 1; return 1;
} }
s_catch_signals(); s_catch_signals();
std::stringstream logmsg; stringstream logmsg;
logmsg << "PID: " << getpid(); logmsg << "PID: " << getpid();
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
// FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
sampler.SetTransport(transportFactory); sampler.SetTransport(transportFactory);
int i = 1; int i = 1;
@ -60,17 +67,17 @@ int main(int argc, char** argv)
++i; ++i;
int eventSize; int eventSize;
std::stringstream(argv[i]) >> eventSize; stringstream(argv[i]) >> eventSize;
sampler.SetProperty(FairMQBenchmarkSampler::EventSize, eventSize); sampler.SetProperty(FairMQBenchmarkSampler::EventSize, eventSize);
++i; ++i;
int eventRate; int eventRate;
std::stringstream(argv[i]) >> eventRate; stringstream(argv[i]) >> eventRate;
sampler.SetProperty(FairMQBenchmarkSampler::EventRate, eventRate); sampler.SetProperty(FairMQBenchmarkSampler::EventRate, eventRate);
++i; ++i;
int numIoThreads; int numIoThreads;
std::stringstream(argv[i]) >> numIoThreads; stringstream(argv[i]) >> numIoThreads;
sampler.SetProperty(FairMQBenchmarkSampler::NumIoThreads, numIoThreads); sampler.SetProperty(FairMQBenchmarkSampler::NumIoThreads, numIoThreads);
++i; ++i;
@ -81,14 +88,10 @@ int main(int argc, char** argv)
sampler.ChangeState(FairMQBenchmarkSampler::INIT); sampler.ChangeState(FairMQBenchmarkSampler::INIT);
int outputSocketType = ZMQ_PUB; sampler.SetProperty(FairMQBenchmarkSampler::OutputSocketType, argv[i], 0);
if (strcmp(argv[i], "push") == 0) {
outputSocketType = ZMQ_PUSH;
}
sampler.SetProperty(FairMQBenchmarkSampler::OutputSocketType, outputSocketType, 0);
++i; ++i;
int outputSndBufSize; int outputSndBufSize;
std::stringstream(argv[i]) >> outputSndBufSize; stringstream(argv[i]) >> outputSndBufSize;
sampler.SetProperty(FairMQBenchmarkSampler::OutputSndBufSize, outputSndBufSize, 0); sampler.SetProperty(FairMQBenchmarkSampler::OutputSndBufSize, outputSndBufSize, 0);
++i; ++i;
sampler.SetProperty(FairMQBenchmarkSampler::OutputMethod, argv[i], 0); sampler.SetProperty(FairMQBenchmarkSampler::OutputMethod, argv[i], 0);
@ -104,7 +107,7 @@ int main(int argc, char** argv)
char ch; char ch;
std::cin.get(ch); cin.get(ch);
sampler.ChangeState(FairMQBenchmarkSampler::STOP); sampler.ChangeState(FairMQBenchmarkSampler::STOP);
sampler.ChangeState(FairMQBenchmarkSampler::END); sampler.ChangeState(FairMQBenchmarkSampler::END);

View File

@ -11,18 +11,24 @@
#include "FairMQLogger.h" #include "FairMQLogger.h"
#include "FairMQBuffer.h" #include "FairMQBuffer.h"
#include "FairMQTransportFactoryZMQ.h" #include "FairMQTransportFactoryZMQ.h"
// #include "FairMQTransportFactoryNN.h"
using std::cout;
using std::cin;
using std::endl;
using std::stringstream;
FairMQBuffer buffer; FairMQBuffer buffer;
static void s_signal_handler (int signal) static void s_signal_handler (int signal)
{ {
std::cout << std::endl << "Caught signal " << signal << std::endl; cout << endl << "Caught signal " << signal << endl;
buffer.ChangeState(FairMQBuffer::STOP); buffer.ChangeState(FairMQBuffer::STOP);
buffer.ChangeState(FairMQBuffer::END); buffer.ChangeState(FairMQBuffer::END);
std::cout << "Shutdown complete. Bye!" << std::endl; cout << "Shutdown complete. Bye!" << endl;
exit(1); exit(1);
} }
@ -39,19 +45,20 @@ static void s_catch_signals (void)
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
if ( argc != 11 ) { if ( argc != 11 ) {
std::cout << "Usage: buffer \tID numIoTreads\n" cout << "Usage: buffer \tID numIoTreads\n"
<< "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n"
<< "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" << std::endl; << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" << endl;
return 1; return 1;
} }
s_catch_signals(); s_catch_signals();
std::stringstream logmsg; stringstream logmsg;
logmsg << "PID: " << getpid(); logmsg << "PID: " << getpid();
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
// FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
buffer.SetTransport(transportFactory); buffer.SetTransport(transportFactory);
int i = 1; int i = 1;
@ -60,7 +67,7 @@ int main(int argc, char** argv)
++i; ++i;
int numIoThreads; int numIoThreads;
std::stringstream(argv[i]) >> numIoThreads; stringstream(argv[i]) >> numIoThreads;
buffer.SetProperty(FairMQBuffer::NumIoThreads, numIoThreads); buffer.SetProperty(FairMQBuffer::NumIoThreads, numIoThreads);
++i; ++i;
buffer.SetProperty(FairMQBuffer::NumInputs, 1); buffer.SetProperty(FairMQBuffer::NumInputs, 1);
@ -70,14 +77,10 @@ int main(int argc, char** argv)
buffer.ChangeState(FairMQBuffer::INIT); buffer.ChangeState(FairMQBuffer::INIT);
int inputSocketType = ZMQ_SUB; buffer.SetProperty(FairMQBuffer::InputSocketType, argv[i], 0);
if (strcmp(argv[i], "pull") == 0) {
inputSocketType = ZMQ_PULL;
}
buffer.SetProperty(FairMQBuffer::InputSocketType, inputSocketType, 0);
++i; ++i;
int inputRcvBufSize; int inputRcvBufSize;
std::stringstream(argv[i]) >> inputRcvBufSize; stringstream(argv[i]) >> inputRcvBufSize;
buffer.SetProperty(FairMQBuffer::InputRcvBufSize, inputRcvBufSize, 0); buffer.SetProperty(FairMQBuffer::InputRcvBufSize, inputRcvBufSize, 0);
++i; ++i;
buffer.SetProperty(FairMQBuffer::InputMethod, argv[i], 0); buffer.SetProperty(FairMQBuffer::InputMethod, argv[i], 0);
@ -85,15 +88,10 @@ int main(int argc, char** argv)
buffer.SetProperty(FairMQBuffer::InputAddress, argv[i], 0); buffer.SetProperty(FairMQBuffer::InputAddress, argv[i], 0);
++i; ++i;
buffer.SetProperty(FairMQBuffer::OutputSocketType, argv[i], 0);
int outputSocketType = ZMQ_PUB;
if (strcmp(argv[i], "push") == 0) {
outputSocketType = ZMQ_PUSH;
}
buffer.SetProperty(FairMQBuffer::OutputSocketType, outputSocketType, 0);
++i; ++i;
int outputSndBufSize; int outputSndBufSize;
std::stringstream(argv[i]) >> outputSndBufSize; stringstream(argv[i]) >> outputSndBufSize;
buffer.SetProperty(FairMQBuffer::OutputSndBufSize, outputSndBufSize, 0); buffer.SetProperty(FairMQBuffer::OutputSndBufSize, outputSndBufSize, 0);
++i; ++i;
buffer.SetProperty(FairMQBuffer::OutputMethod, argv[i], 0); buffer.SetProperty(FairMQBuffer::OutputMethod, argv[i], 0);
@ -109,7 +107,7 @@ int main(int argc, char** argv)
char ch; char ch;
std::cin.get(ch); cin.get(ch);
buffer.ChangeState(FairMQBuffer::STOP); buffer.ChangeState(FairMQBuffer::STOP);
buffer.ChangeState(FairMQBuffer::END); buffer.ChangeState(FairMQBuffer::END);

View File

@ -11,18 +11,24 @@
#include "FairMQLogger.h" #include "FairMQLogger.h"
#include "FairMQMerger.h" #include "FairMQMerger.h"
#include "FairMQTransportFactoryZMQ.h" #include "FairMQTransportFactoryZMQ.h"
// #include "FairMQTransportFactoryNN.h"
using std::cout;
using std::cin;
using std::endl;
using std::stringstream;
FairMQMerger merger; FairMQMerger merger;
static void s_signal_handler (int signal) static void s_signal_handler (int signal)
{ {
std::cout << std::endl << "Caught signal " << signal << std::endl; cout << endl << "Caught signal " << signal << endl;
merger.ChangeState(FairMQMerger::STOP); merger.ChangeState(FairMQMerger::STOP);
merger.ChangeState(FairMQMerger::END); merger.ChangeState(FairMQMerger::END);
std::cout << "Shutdown complete. Bye!" << std::endl; cout << "Shutdown complete. Bye!" << endl;
exit(1); exit(1);
} }
@ -39,20 +45,21 @@ static void s_catch_signals (void)
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
if ( argc != 15 ) { if ( argc != 15 ) {
std::cout << "Usage: merger \tID numIoTreads\n" cout << "Usage: merger \tID numIoTreads\n"
<< "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n"
<< "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n"
<< "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" << std::endl; << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" << endl;
return 1; return 1;
} }
s_catch_signals(); s_catch_signals();
std::stringstream logmsg; stringstream logmsg;
logmsg << "PID: " << getpid(); logmsg << "PID: " << getpid();
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
// FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
merger.SetTransport(transportFactory); merger.SetTransport(transportFactory);
int i = 1; int i = 1;
@ -61,7 +68,7 @@ int main(int argc, char** argv)
++i; ++i;
int numIoThreads; int numIoThreads;
std::stringstream(argv[i]) >> numIoThreads; stringstream(argv[i]) >> numIoThreads;
merger.SetProperty(FairMQMerger::NumIoThreads, numIoThreads); merger.SetProperty(FairMQMerger::NumIoThreads, numIoThreads);
++i; ++i;
@ -72,14 +79,10 @@ int main(int argc, char** argv)
merger.ChangeState(FairMQMerger::INIT); merger.ChangeState(FairMQMerger::INIT);
int inputSocketType = ZMQ_SUB; merger.SetProperty(FairMQMerger::InputSocketType, argv[i], 0);
if (strcmp(argv[i], "pull") == 0) {
inputSocketType = ZMQ_PULL;
}
merger.SetProperty(FairMQMerger::InputSocketType, inputSocketType, 0);
++i; ++i;
int inputRcvBufSize; int inputRcvBufSize;
std::stringstream(argv[i]) >> inputRcvBufSize; stringstream(argv[i]) >> inputRcvBufSize;
merger.SetProperty(FairMQMerger::InputRcvBufSize, inputRcvBufSize, 0); merger.SetProperty(FairMQMerger::InputRcvBufSize, inputRcvBufSize, 0);
++i; ++i;
merger.SetProperty(FairMQMerger::InputMethod, argv[i], 0); merger.SetProperty(FairMQMerger::InputMethod, argv[i], 0);
@ -87,13 +90,9 @@ int main(int argc, char** argv)
merger.SetProperty(FairMQMerger::InputAddress, argv[i], 0); merger.SetProperty(FairMQMerger::InputAddress, argv[i], 0);
++i; ++i;
inputSocketType = ZMQ_SUB; merger.SetProperty(FairMQMerger::InputSocketType, argv[i], 1);
if (strcmp(argv[i], "pull") == 0) {
inputSocketType = ZMQ_PULL;
}
merger.SetProperty(FairMQMerger::InputSocketType, inputSocketType, 1);
++i; ++i;
std::stringstream(argv[i]) >> inputRcvBufSize; stringstream(argv[i]) >> inputRcvBufSize;
merger.SetProperty(FairMQMerger::InputRcvBufSize, inputRcvBufSize, 1); merger.SetProperty(FairMQMerger::InputRcvBufSize, inputRcvBufSize, 1);
++i; ++i;
merger.SetProperty(FairMQMerger::InputMethod, argv[i], 1); merger.SetProperty(FairMQMerger::InputMethod, argv[i], 1);
@ -101,14 +100,10 @@ int main(int argc, char** argv)
merger.SetProperty(FairMQMerger::InputAddress, argv[i], 1); merger.SetProperty(FairMQMerger::InputAddress, argv[i], 1);
++i; ++i;
int outputSocketType = ZMQ_PUB; merger.SetProperty(FairMQMerger::OutputSocketType, argv[i], 0);
if (strcmp(argv[i], "push") == 0) {
outputSocketType = ZMQ_PUSH;
}
merger.SetProperty(FairMQMerger::OutputSocketType, outputSocketType, 0);
++i; ++i;
int outputSndBufSize; int outputSndBufSize;
std::stringstream(argv[i]) >> outputSndBufSize; stringstream(argv[i]) >> outputSndBufSize;
merger.SetProperty(FairMQMerger::OutputSndBufSize, outputSndBufSize, 0); merger.SetProperty(FairMQMerger::OutputSndBufSize, outputSndBufSize, 0);
++i; ++i;
merger.SetProperty(FairMQMerger::OutputMethod, argv[i], 0); merger.SetProperty(FairMQMerger::OutputMethod, argv[i], 0);
@ -123,7 +118,7 @@ int main(int argc, char** argv)
char ch; char ch;
std::cin.get(ch); cin.get(ch);
merger.ChangeState(FairMQMerger::STOP); merger.ChangeState(FairMQMerger::STOP);
merger.ChangeState(FairMQMerger::END); merger.ChangeState(FairMQMerger::END);

View File

@ -11,18 +11,24 @@
#include "FairMQLogger.h" #include "FairMQLogger.h"
#include "FairMQMerger.h" #include "FairMQMerger.h"
#include "FairMQTransportFactoryZMQ.h" #include "FairMQTransportFactoryZMQ.h"
// #include "FairMQTransportFactoryNN.h"
using std::cout;
using std::cin;
using std::endl;
using std::stringstream;
FairMQMerger merger; FairMQMerger merger;
static void s_signal_handler (int signal) static void s_signal_handler (int signal)
{ {
std::cout << std::endl << "Caught signal " << signal << std::endl; cout << endl << "Caught signal " << signal << endl;
merger.ChangeState(FairMQMerger::STOP); merger.ChangeState(FairMQMerger::STOP);
merger.ChangeState(FairMQMerger::END); merger.ChangeState(FairMQMerger::END);
std::cout << "Shutdown complete. Bye!" << std::endl; cout << "Shutdown complete. Bye!" << endl;
exit(1); exit(1);
} }
@ -39,22 +45,23 @@ static void s_catch_signals (void)
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
if ( argc < 16 || (argc-8)%4!=0 ) { if ( argc < 16 || (argc-8)%4!=0 ) {
std::cout << "Usage: merger \tID numIoTreads numInputs\n" cout << "Usage: merger \tID numIoTreads numInputs\n"
<< "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n"
<< "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n"
<< "\t\t...\n" << "\t\t...\n"
<< "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n"
<< argc << std::endl; << argc << endl;
return 1; return 1;
} }
s_catch_signals(); s_catch_signals();
std::stringstream logmsg; stringstream logmsg;
logmsg << "PID: " << getpid(); logmsg << "PID: " << getpid();
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
// FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
merger.SetTransport(transportFactory); merger.SetTransport(transportFactory);
int i = 1; int i = 1;
@ -63,12 +70,12 @@ int main(int argc, char** argv)
++i; ++i;
int numIoThreads; int numIoThreads;
std::stringstream(argv[i]) >> numIoThreads; stringstream(argv[i]) >> numIoThreads;
merger.SetProperty(FairMQMerger::NumIoThreads, numIoThreads); merger.SetProperty(FairMQMerger::NumIoThreads, numIoThreads);
++i; ++i;
int numInputs; int numInputs;
std::stringstream(argv[i]) >> numInputs; stringstream(argv[i]) >> numInputs;
merger.SetProperty(FairMQMerger::NumInputs, numInputs); merger.SetProperty(FairMQMerger::NumInputs, numInputs);
++i; ++i;
@ -78,16 +85,11 @@ int main(int argc, char** argv)
merger.ChangeState(FairMQMerger::INIT); merger.ChangeState(FairMQMerger::INIT);
int inputSocketType;
for (int iInput = 0; iInput < numInputs; iInput++ ) { for (int iInput = 0; iInput < numInputs; iInput++ ) {
inputSocketType = ZMQ_SUB; merger.SetProperty(FairMQMerger::InputSocketType, argv[i], iInput);
if (strcmp(argv[i], "pull") == 0) {
inputSocketType = ZMQ_PULL;
}
merger.SetProperty(FairMQMerger::InputSocketType, inputSocketType, iInput);
++i; ++i;
int inputRcvBufSize; int inputRcvBufSize;
std::stringstream(argv[i]) >> inputRcvBufSize; stringstream(argv[i]) >> inputRcvBufSize;
merger.SetProperty(FairMQMerger::InputRcvBufSize, inputRcvBufSize, iInput); merger.SetProperty(FairMQMerger::InputRcvBufSize, inputRcvBufSize, iInput);
++i; ++i;
merger.SetProperty(FairMQMerger::InputMethod, argv[i], iInput); merger.SetProperty(FairMQMerger::InputMethod, argv[i], iInput);
@ -96,14 +98,10 @@ int main(int argc, char** argv)
++i; ++i;
} }
int outputSocketType = ZMQ_PUB; merger.SetProperty(FairMQMerger::OutputSocketType, argv[i], 0);
if (strcmp(argv[i], "push") == 0) {
outputSocketType = ZMQ_PUSH;
}
merger.SetProperty(FairMQMerger::OutputSocketType, outputSocketType, 0);
++i; ++i;
int outputSndBufSize; int outputSndBufSize;
std::stringstream(argv[i]) >> outputSndBufSize; stringstream(argv[i]) >> outputSndBufSize;
merger.SetProperty(FairMQMerger::OutputSndBufSize, outputSndBufSize, 0); merger.SetProperty(FairMQMerger::OutputSndBufSize, outputSndBufSize, 0);
++i; ++i;
merger.SetProperty(FairMQMerger::OutputMethod, argv[i], 0); merger.SetProperty(FairMQMerger::OutputMethod, argv[i], 0);
@ -118,7 +116,7 @@ int main(int argc, char** argv)
char ch; char ch;
std::cin.get(ch); cin.get(ch);
merger.ChangeState(FairMQMerger::STOP); merger.ChangeState(FairMQMerger::STOP);
merger.ChangeState(FairMQMerger::END); merger.ChangeState(FairMQMerger::END);

View File

@ -11,18 +11,24 @@
#include "FairMQLogger.h" #include "FairMQLogger.h"
#include "FairMQSplitter.h" #include "FairMQSplitter.h"
#include "FairMQTransportFactoryZMQ.h" #include "FairMQTransportFactoryZMQ.h"
// #include "FairMQTransportFactoryNN.h"
using std::cout;
using std::cin;
using std::endl;
using std::stringstream;
FairMQSplitter splitter; FairMQSplitter splitter;
static void s_signal_handler (int signal) static void s_signal_handler (int signal)
{ {
std::cout << std::endl << "Caught signal " << signal << std::endl; cout << endl << "Caught signal " << signal << endl;
splitter.ChangeState(FairMQSplitter::STOP); splitter.ChangeState(FairMQSplitter::STOP);
splitter.ChangeState(FairMQSplitter::END); splitter.ChangeState(FairMQSplitter::END);
std::cout << "Shutdown complete. Bye!" << std::endl; cout << "Shutdown complete. Bye!" << endl;
exit(1); exit(1);
} }
@ -39,21 +45,22 @@ static void s_catch_signals (void)
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
if ( argc < 16 || (argc - 8) % 4 != 0 ) { // argc{ name, id, threads, nout, insock, inbuff, inmet, inadd, ... out} if ( argc < 16 || (argc - 8) % 4 != 0 ) { // argc{ name, id, threads, nout, insock, inbuff, inmet, inadd, ... out}
std::cout << "Usage: splitter \tID numIoTreads numOutputs\n" cout << "Usage: splitter \tID numIoTreads numOutputs\n"
<< "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n"
<< "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n"
<< "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n"
<< "\t\t..." << argc << " arguments provided" << std::endl; << "\t\t..." << argc << " arguments provided" << endl;
return 1; return 1;
} }
s_catch_signals(); s_catch_signals();
std::stringstream logmsg; stringstream logmsg;
logmsg << "PID: " << getpid(); logmsg << "PID: " << getpid();
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
// FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
splitter.SetTransport(transportFactory); splitter.SetTransport(transportFactory);
int i = 1; int i = 1;
@ -62,14 +69,14 @@ int main(int argc, char** argv)
++i; ++i;
int numIoThreads; int numIoThreads;
std::stringstream(argv[i]) >> numIoThreads; stringstream(argv[i]) >> numIoThreads;
splitter.SetProperty(FairMQSplitter::NumIoThreads, numIoThreads); splitter.SetProperty(FairMQSplitter::NumIoThreads, numIoThreads);
++i; ++i;
splitter.SetProperty(FairMQSplitter::NumInputs, 1); splitter.SetProperty(FairMQSplitter::NumInputs, 1);
int numOutputs; int numOutputs;
std::stringstream(argv[i]) >> numOutputs; stringstream(argv[i]) >> numOutputs;
splitter.SetProperty(FairMQSplitter::NumOutputs, numOutputs); splitter.SetProperty(FairMQSplitter::NumOutputs, numOutputs);
++i; ++i;
@ -77,14 +84,10 @@ int main(int argc, char** argv)
splitter.ChangeState(FairMQSplitter::INIT); splitter.ChangeState(FairMQSplitter::INIT);
int inputSocketType = ZMQ_SUB; splitter.SetProperty(FairMQSplitter::InputSocketType, argv[i], 0);
if (strcmp(argv[i], "pull") == 0) {
inputSocketType = ZMQ_PULL;
}
splitter.SetProperty(FairMQSplitter::InputSocketType, inputSocketType, 0);
++i; ++i;
int inputRcvBufSize; int inputRcvBufSize;
std::stringstream(argv[i]) >> inputRcvBufSize; stringstream(argv[i]) >> inputRcvBufSize;
splitter.SetProperty(FairMQSplitter::InputRcvBufSize, inputRcvBufSize, 0); splitter.SetProperty(FairMQSplitter::InputRcvBufSize, inputRcvBufSize, 0);
++i; ++i;
splitter.SetProperty(FairMQSplitter::InputMethod, argv[i], 0); splitter.SetProperty(FairMQSplitter::InputMethod, argv[i], 0);
@ -92,16 +95,11 @@ int main(int argc, char** argv)
splitter.SetProperty(FairMQSplitter::InputAddress, argv[i], 0); splitter.SetProperty(FairMQSplitter::InputAddress, argv[i], 0);
++i; ++i;
int outputSocketType;
int outputSndBufSize; int outputSndBufSize;
for (int iOutput = 0; iOutput < numOutputs; iOutput++) { for (int iOutput = 0; iOutput < numOutputs; iOutput++) {
outputSocketType = ZMQ_PUB; splitter.SetProperty(FairMQSplitter::OutputSocketType, argv[i], iOutput);
if (strcmp(argv[i], "push") == 0) {
outputSocketType = ZMQ_PUSH;
}
splitter.SetProperty(FairMQSplitter::OutputSocketType, outputSocketType, iOutput);
++i; ++i;
std::stringstream(argv[i]) >> outputSndBufSize; stringstream(argv[i]) >> outputSndBufSize;
splitter.SetProperty(FairMQSplitter::OutputSndBufSize, outputSndBufSize, iOutput); splitter.SetProperty(FairMQSplitter::OutputSndBufSize, outputSndBufSize, iOutput);
++i; ++i;
splitter.SetProperty(FairMQSplitter::OutputMethod, argv[i], iOutput); splitter.SetProperty(FairMQSplitter::OutputMethod, argv[i], iOutput);
@ -116,7 +114,7 @@ int main(int argc, char** argv)
char ch; char ch;
std::cin.get(ch); cin.get(ch);
splitter.ChangeState(FairMQSplitter::STOP); splitter.ChangeState(FairMQSplitter::STOP);
splitter.ChangeState(FairMQSplitter::END); splitter.ChangeState(FairMQSplitter::END);

View File

@ -11,18 +11,24 @@
#include "FairMQLogger.h" #include "FairMQLogger.h"
#include "FairMQProxy.h" #include "FairMQProxy.h"
#include "FairMQTransportFactoryZMQ.h" #include "FairMQTransportFactoryZMQ.h"
// #include "FairMQTransportFactoryNN.h"
using std::cout;
using std::cin;
using std::endl;
using std::stringstream;
FairMQProxy proxy; FairMQProxy proxy;
static void s_signal_handler (int signal) static void s_signal_handler (int signal)
{ {
std::cout << std::endl << "Caught signal " << signal << std::endl; cout << endl << "Caught signal " << signal << endl;
proxy.ChangeState(FairMQProxy::STOP); proxy.ChangeState(FairMQProxy::STOP);
proxy.ChangeState(FairMQProxy::END); proxy.ChangeState(FairMQProxy::END);
std::cout << "Shutdown complete. Bye!" << std::endl; cout << "Shutdown complete. Bye!" << endl;
exit(1); exit(1);
} }
@ -39,19 +45,20 @@ static void s_catch_signals (void)
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
if ( argc != 11 ) { if ( argc != 11 ) {
std::cout << "Usage: proxy \tID numIoTreads\n" cout << "Usage: proxy \tID numIoTreads\n"
<< "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n"
<< "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" << std::endl; << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" << endl;
return 1; return 1;
} }
s_catch_signals(); s_catch_signals();
std::stringstream logmsg; stringstream logmsg;
logmsg << "PID: " << getpid(); logmsg << "PID: " << getpid();
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
// FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
proxy.SetTransport(transportFactory); proxy.SetTransport(transportFactory);
int i = 1; int i = 1;
@ -60,7 +67,7 @@ int main(int argc, char** argv)
++i; ++i;
int numIoThreads; int numIoThreads;
std::stringstream(argv[i]) >> numIoThreads; stringstream(argv[i]) >> numIoThreads;
proxy.SetProperty(FairMQProxy::NumIoThreads, numIoThreads); proxy.SetProperty(FairMQProxy::NumIoThreads, numIoThreads);
++i; ++i;
@ -71,14 +78,10 @@ int main(int argc, char** argv)
proxy.ChangeState(FairMQProxy::INIT); proxy.ChangeState(FairMQProxy::INIT);
int inputSocketType = ZMQ_XSUB; proxy.SetProperty(FairMQProxy::InputSocketType, argv[i], 0);
if (strcmp(argv[i], "pull") == 0) {
inputSocketType = ZMQ_PULL;
}
proxy.SetProperty(FairMQProxy::InputSocketType, inputSocketType, 0);
++i; ++i;
int inputRcvBufSize; int inputRcvBufSize;
std::stringstream(argv[i]) >> inputRcvBufSize; stringstream(argv[i]) >> inputRcvBufSize;
proxy.SetProperty(FairMQProxy::InputRcvBufSize, inputRcvBufSize, 0); proxy.SetProperty(FairMQProxy::InputRcvBufSize, inputRcvBufSize, 0);
++i; ++i;
proxy.SetProperty(FairMQProxy::InputMethod, argv[i], 0); proxy.SetProperty(FairMQProxy::InputMethod, argv[i], 0);
@ -86,14 +89,10 @@ int main(int argc, char** argv)
proxy.SetProperty(FairMQProxy::InputAddress, argv[i], 0); proxy.SetProperty(FairMQProxy::InputAddress, argv[i], 0);
++i; ++i;
int outputSocketType = ZMQ_XPUB; proxy.SetProperty(FairMQProxy::OutputSocketType, argv[i], 0);
if (strcmp(argv[i], "push") == 0) {
outputSocketType = ZMQ_PUSH;
}
proxy.SetProperty(FairMQProxy::OutputSocketType, outputSocketType, 0);
++i; ++i;
int outputSndBufSize; int outputSndBufSize;
std::stringstream(argv[i]) >> outputSndBufSize; stringstream(argv[i]) >> outputSndBufSize;
proxy.SetProperty(FairMQProxy::OutputSndBufSize, outputSndBufSize, 0); proxy.SetProperty(FairMQProxy::OutputSndBufSize, outputSndBufSize, 0);
++i; ++i;
proxy.SetProperty(FairMQProxy::OutputMethod, argv[i], 0); proxy.SetProperty(FairMQProxy::OutputMethod, argv[i], 0);
@ -108,7 +107,7 @@ int main(int argc, char** argv)
char ch; char ch;
std::cin.get(ch); cin.get(ch);
proxy.ChangeState(FairMQProxy::STOP); proxy.ChangeState(FairMQProxy::STOP);
proxy.ChangeState(FairMQProxy::END); proxy.ChangeState(FairMQProxy::END);

View File

@ -11,18 +11,24 @@
#include "FairMQLogger.h" #include "FairMQLogger.h"
#include "FairMQSink.h" #include "FairMQSink.h"
#include "FairMQTransportFactoryZMQ.h" #include "FairMQTransportFactoryZMQ.h"
// #include "FairMQTransportFactoryNN.h"
using std::cout;
using std::cin;
using std::endl;
using std::stringstream;
FairMQSink sink; FairMQSink sink;
static void s_signal_handler (int signal) static void s_signal_handler (int signal)
{ {
std::cout << std::endl << "Caught signal " << signal << std::endl; cout << endl << "Caught signal " << signal << endl;
sink.ChangeState(FairMQSink::STOP); sink.ChangeState(FairMQSink::STOP);
sink.ChangeState(FairMQSink::END); sink.ChangeState(FairMQSink::END);
std::cout << "Shutdown complete. Bye!" << std::endl; cout << "Shutdown complete. Bye!" << endl;
exit(1); exit(1);
} }
@ -39,19 +45,20 @@ static void s_catch_signals (void)
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
if ( argc != 7 ) { if ( argc != 7 ) {
std::cout << "Usage: sink \tID numIoTreads\n" cout << "Usage: sink \tID numIoTreads\n"
<< "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n"
<< std::endl; << endl;
return 1; return 1;
} }
s_catch_signals(); s_catch_signals();
std::stringstream logmsg; stringstream logmsg;
logmsg << "PID: " << getpid(); logmsg << "PID: " << getpid();
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
// FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
sink.SetTransport(transportFactory); sink.SetTransport(transportFactory);
int i = 1; int i = 1;
@ -60,7 +67,7 @@ int main(int argc, char** argv)
++i; ++i;
int numIoThreads; int numIoThreads;
std::stringstream(argv[i]) >> numIoThreads; stringstream(argv[i]) >> numIoThreads;
sink.SetProperty(FairMQSink::NumIoThreads, numIoThreads); sink.SetProperty(FairMQSink::NumIoThreads, numIoThreads);
++i; ++i;
@ -71,14 +78,10 @@ int main(int argc, char** argv)
sink.ChangeState(FairMQSink::INIT); sink.ChangeState(FairMQSink::INIT);
int inputSocketType = ZMQ_SUB; sink.SetProperty(FairMQSink::InputSocketType, argv[i], 0);
if (strcmp(argv[i], "pull") == 0) {
inputSocketType = ZMQ_PULL;
}
sink.SetProperty(FairMQSink::InputSocketType, inputSocketType, 0);
++i; ++i;
int inputRcvBufSize; int inputRcvBufSize;
std::stringstream(argv[i]) >> inputRcvBufSize; stringstream(argv[i]) >> inputRcvBufSize;
sink.SetProperty(FairMQSink::InputRcvBufSize, inputRcvBufSize, 0); sink.SetProperty(FairMQSink::InputRcvBufSize, inputRcvBufSize, 0);
++i; ++i;
sink.SetProperty(FairMQSink::InputMethod, argv[i], 0); sink.SetProperty(FairMQSink::InputMethod, argv[i], 0);
@ -93,7 +96,7 @@ int main(int argc, char** argv)
char ch; char ch;
std::cin.get(ch); cin.get(ch);
sink.ChangeState(FairMQSink::STOP); sink.ChangeState(FairMQSink::STOP);
sink.ChangeState(FairMQSink::END); sink.ChangeState(FairMQSink::END);

View File

@ -11,18 +11,24 @@
#include "FairMQLogger.h" #include "FairMQLogger.h"
#include "FairMQSplitter.h" #include "FairMQSplitter.h"
#include "FairMQTransportFactoryZMQ.h" #include "FairMQTransportFactoryZMQ.h"
// #include "FairMQTransportFactoryNN.h"
using std::cout;
using std::cin;
using std::endl;
using std::stringstream;
FairMQSplitter splitter; FairMQSplitter splitter;
static void s_signal_handler (int signal) static void s_signal_handler (int signal)
{ {
std::cout << std::endl << "Caught signal " << signal << std::endl; cout << endl << "Caught signal " << signal << endl;
splitter.ChangeState(FairMQSplitter::STOP); splitter.ChangeState(FairMQSplitter::STOP);
splitter.ChangeState(FairMQSplitter::END); splitter.ChangeState(FairMQSplitter::END);
std::cout << "Shutdown complete. Bye!" << std::endl; cout << "Shutdown complete. Bye!" << endl;
exit(1); exit(1);
} }
@ -39,20 +45,21 @@ static void s_catch_signals (void)
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
if ( argc != 15 ) { if ( argc != 15 ) {
std::cout << "Usage: splitter \tID numIoTreads\n" cout << "Usage: splitter \tID numIoTreads\n"
<< "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n"
<< "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n"
<< "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" << std::endl; << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" << endl;
return 1; return 1;
} }
s_catch_signals(); s_catch_signals();
std::stringstream logmsg; stringstream logmsg;
logmsg << "PID: " << getpid(); logmsg << "PID: " << getpid();
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
// FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
splitter.SetTransport(transportFactory); splitter.SetTransport(transportFactory);
int i = 1; int i = 1;
@ -61,7 +68,7 @@ int main(int argc, char** argv)
++i; ++i;
int numIoThreads; int numIoThreads;
std::stringstream(argv[i]) >> numIoThreads; stringstream(argv[i]) >> numIoThreads;
splitter.SetProperty(FairMQSplitter::NumIoThreads, numIoThreads); splitter.SetProperty(FairMQSplitter::NumIoThreads, numIoThreads);
++i; ++i;
@ -72,14 +79,10 @@ int main(int argc, char** argv)
splitter.ChangeState(FairMQSplitter::INIT); splitter.ChangeState(FairMQSplitter::INIT);
int inputSocketType = ZMQ_SUB; splitter.SetProperty(FairMQSplitter::InputSocketType, argv[i], 0);
if (strcmp(argv[i], "pull") == 0) {
inputSocketType = ZMQ_PULL;
}
splitter.SetProperty(FairMQSplitter::InputSocketType, inputSocketType, 0);
++i; ++i;
int inputRcvBufSize; int inputRcvBufSize;
std::stringstream(argv[i]) >> inputRcvBufSize; stringstream(argv[i]) >> inputRcvBufSize;
splitter.SetProperty(FairMQSplitter::InputRcvBufSize, inputRcvBufSize, 0); splitter.SetProperty(FairMQSplitter::InputRcvBufSize, inputRcvBufSize, 0);
++i; ++i;
splitter.SetProperty(FairMQSplitter::InputMethod, argv[i], 0); splitter.SetProperty(FairMQSplitter::InputMethod, argv[i], 0);
@ -87,14 +90,10 @@ int main(int argc, char** argv)
splitter.SetProperty(FairMQSplitter::InputAddress, argv[i], 0); splitter.SetProperty(FairMQSplitter::InputAddress, argv[i], 0);
++i; ++i;
int outputSocketType = ZMQ_PUB; splitter.SetProperty(FairMQSplitter::OutputSocketType, argv[i], 0);
if (strcmp(argv[i], "push") == 0) {
outputSocketType = ZMQ_PUSH;
}
splitter.SetProperty(FairMQSplitter::OutputSocketType, outputSocketType, 0);
++i; ++i;
int outputSndBufSize; int outputSndBufSize;
std::stringstream(argv[i]) >> outputSndBufSize; stringstream(argv[i]) >> outputSndBufSize;
splitter.SetProperty(FairMQSplitter::OutputSndBufSize, outputSndBufSize, 0); splitter.SetProperty(FairMQSplitter::OutputSndBufSize, outputSndBufSize, 0);
++i; ++i;
splitter.SetProperty(FairMQSplitter::OutputMethod, argv[i], 0); splitter.SetProperty(FairMQSplitter::OutputMethod, argv[i], 0);
@ -102,13 +101,9 @@ int main(int argc, char** argv)
splitter.SetProperty(FairMQSplitter::OutputAddress, argv[i], 0); splitter.SetProperty(FairMQSplitter::OutputAddress, argv[i], 0);
++i; ++i;
outputSocketType = ZMQ_PUB; splitter.SetProperty(FairMQSplitter::OutputSocketType, argv[i], 1);
if (strcmp(argv[i], "push") == 0) {
outputSocketType = ZMQ_PUSH;
}
splitter.SetProperty(FairMQSplitter::OutputSocketType, outputSocketType, 1);
++i; ++i;
std::stringstream(argv[i]) >> outputSndBufSize; stringstream(argv[i]) >> outputSndBufSize;
splitter.SetProperty(FairMQSplitter::OutputSndBufSize, outputSndBufSize, 1); splitter.SetProperty(FairMQSplitter::OutputSndBufSize, outputSndBufSize, 1);
++i; ++i;
splitter.SetProperty(FairMQSplitter::OutputMethod, argv[i], 1); splitter.SetProperty(FairMQSplitter::OutputMethod, argv[i], 1);
@ -123,7 +118,7 @@ int main(int argc, char** argv)
char ch; char ch;
std::cin.get(ch); cin.get(ch);
splitter.ChangeState(FairMQSplitter::STOP); splitter.ChangeState(FairMQSplitter::STOP);
splitter.ChangeState(FairMQSplitter::END); splitter.ChangeState(FairMQSplitter::END);