use factory for sockets

This commit is contained in:
Alexey Rybalchenko 2014-01-21 15:57:59 +01:00
parent a383434c45
commit 88fee245b8
38 changed files with 653 additions and 493 deletions

View File

@ -7,6 +7,13 @@ include_directories(
${ROOT_INCLUDE_DIR}
)
Set(LINK_DIRECTORIES
${ROOT_LIBRARY_DIR}
${Boost_LIBRARY_DIRS}
)
link_directories(${LINK_DIRECTORIES})
Set(SRCS
"FairMQSampler.cxx"
"FairMQBenchmarkSampler.cxx"
@ -17,6 +24,8 @@ Set(SRCS
"FairMQLogger.cxx"
"FairMQContext.cxx"
"FairMQMessage.cxx"
"FairMQTransportFactory.cxx"
"FairMQTransportFactoryZMQ.cxx"
"FairMQMessageZMQ.cxx"
"FairMQMessageNN.cxx"
"FairMQSocket.cxx"
@ -31,15 +40,8 @@ Set(SRCS
"FairMQProxy.cxx"
)
Set(LINK_DIRECTORIES
${ROOT_LIBRARY_DIR}
${Boost_LIBRARY_DIRS}
)
link_directories(${LINK_DIRECTORIES})
Set(LIBRARY_NAME FairMQ)
Set(LINKDEF)
Set(DEPENDENCIES
${CMAKE_THREAD_LIBS_INIT}
${ZMQ_LIBRARY_SHARED}

View File

@ -39,19 +39,21 @@ void FairMQBenchmarkSampler::Run()
boost::thread resetEventCounter(boost::bind(&FairMQBenchmarkSampler::ResetEventCounter, this));
void* buffer = operator new[](fEventSize);
FairMQMessage* base_event = new FairMQMessage(buffer, fEventSize);
FairMQMessage* base_event = new FairMQMessageZMQ(buffer, fEventSize);
while ( fState == RUNNING ) {
FairMQMessage event;
event.Copy(base_event);
FairMQMessage* event = new FairMQMessageZMQ();
event->Copy(base_event);
fPayloadOutputs->at(0)->Send(&event);
fPayloadOutputs->at(0)->Send(event);
--fEventCounter;
while (fEventCounter == 0) {
boost::this_thread::sleep(boost::posix_time::milliseconds(1));
}
delete event;
}
delete base_event;
@ -75,16 +77,16 @@ void FairMQBenchmarkSampler::ResetEventCounter()
}
}
void FairMQBenchmarkSampler::Log(Int_t intervalInMs)
void FairMQBenchmarkSampler::Log(int intervalInMs)
{
timestamp_t t0;
timestamp_t t1;
ULong_t bytes = fPayloadOutputs->at(0)->GetBytesTx();
ULong_t messages = fPayloadOutputs->at(0)->GetMessagesTx();
ULong_t bytesNew;
ULong_t messagesNew;
Double_t megabytesPerSecond = (bytesNew - bytes) / (1024 * 1024);
Double_t messagesPerSecond = (messagesNew - messages);
unsigned long bytes = fPayloadOutputs->at(0)->GetBytesTx();
unsigned long messages = fPayloadOutputs->at(0)->GetMessagesTx();
unsigned long bytesNew = 0;
unsigned long messagesNew = 0;
double megabytesPerSecond = 0;
double messagesPerSecond = 0;
t0 = get_timestamp();
@ -98,8 +100,8 @@ void FairMQBenchmarkSampler::Log(Int_t intervalInMs)
timestamp_t timeSinceLastLog_ms = (t1 - t0) / 1000.0L;
megabytesPerSecond = ((Double_t) (bytesNew - bytes) / (1024. * 1024.)) / (Double_t) timeSinceLastLog_ms * 1000.;
messagesPerSecond = (Double_t) (messagesNew - messages) / (Double_t) timeSinceLastLog_ms * 1000.;
megabytesPerSecond = ((double) (bytesNew - bytes) / (1024. * 1024.)) / (double) timeSinceLastLog_ms * 1000.;
messagesPerSecond = (double) (messagesNew - messages) / (double) timeSinceLastLog_ms * 1000.;
std::stringstream logmsg;
logmsg << "send " << messagesPerSecond << " msg/s, " << megabytesPerSecond << " MB/s";
@ -111,7 +113,7 @@ void FairMQBenchmarkSampler::Log(Int_t intervalInMs)
}
}
void FairMQBenchmarkSampler::SetProperty(const Int_t& key, const TString& value, const Int_t& slot/*= 0*/)
void FairMQBenchmarkSampler::SetProperty(const int& key, const std::string& value, const int& slot/*= 0*/)
{
switch (key) {
default:
@ -120,7 +122,7 @@ void FairMQBenchmarkSampler::SetProperty(const Int_t& key, const TString& value,
}
}
TString FairMQBenchmarkSampler::GetProperty(const Int_t& key, const TString& default_/*= ""*/, const Int_t& slot/*= 0*/)
std::string FairMQBenchmarkSampler::GetProperty(const int& key, const std::string& default_/*= ""*/, const int& slot/*= 0*/)
{
switch (key) {
default:
@ -128,7 +130,7 @@ TString FairMQBenchmarkSampler::GetProperty(const Int_t& key, const TString& def
}
}
void FairMQBenchmarkSampler::SetProperty(const Int_t& key, const Int_t& value, const Int_t& slot/*= 0*/)
void FairMQBenchmarkSampler::SetProperty(const int& key, const int& value, const int& slot/*= 0*/)
{
switch (key) {
case EventSize:
@ -143,7 +145,7 @@ void FairMQBenchmarkSampler::SetProperty(const Int_t& key, const Int_t& value, c
}
}
Int_t FairMQBenchmarkSampler::GetProperty(const Int_t& key, const Int_t& default_/*= 0*/, const Int_t& slot/*= 0*/)
int FairMQBenchmarkSampler::GetProperty(const int& key, const int& default_/*= 0*/, const int& slot/*= 0*/)
{
switch (key) {
case EventSize:

View File

@ -11,7 +11,6 @@
#include <string>
#include "FairMQDevice.h"
#include "TString.h"
/**
* Sampler to generate traffic for benchmarking.
@ -28,16 +27,16 @@ class FairMQBenchmarkSampler: public FairMQDevice
};
FairMQBenchmarkSampler();
virtual ~FairMQBenchmarkSampler();
void Log(Int_t intervalInMs);
void Log(int intervalInMs);
void ResetEventCounter();
virtual void SetProperty(const Int_t& key, const TString& value, const Int_t& slot = 0);
virtual TString GetProperty(const Int_t& key, const TString& default_ = "", const Int_t& slot = 0);
virtual void SetProperty(const Int_t& key, const Int_t& value, const Int_t& slot = 0);
virtual Int_t GetProperty(const Int_t& key, const Int_t& default_ = 0, const Int_t& slot = 0);
virtual void SetProperty(const int& key, const std::string& value, const int& slot = 0);
virtual std::string GetProperty(const int& key, const std::string& default_ = "", const int& slot = 0);
virtual void SetProperty(const int& key, const int& value, const int& slot = 0);
virtual int GetProperty(const int& key, const int& default_ = 0, const int& slot = 0);
protected:
Int_t fEventSize;
Int_t fEventRate;
Int_t fEventCounter;
int fEventSize;
int fEventRate;
int fEventCounter;
virtual void Init();
virtual void Run();
};

View File

@ -26,14 +26,16 @@ void FairMQBuffer::Run()
bool received = false;
while ( fState == RUNNING ) {
FairMQMessage msg;
FairMQMessage* msg = new FairMQMessageZMQ();
received = fPayloadInputs->at(0)->Receive(&msg);
received = fPayloadInputs->at(0)->Receive(msg);
if (received) {
fPayloadOutputs->at(0)->Send(&msg);
fPayloadOutputs->at(0)->Send(msg);
received = false;
}
delete msg;
}
rateLogger.interrupt();

View File

@ -12,20 +12,20 @@ FairMQConfigurable::FairMQConfigurable()
{
}
void FairMQConfigurable::SetProperty(const Int_t& key, const TString& value, const Int_t& slot/*= 0*/)
void FairMQConfigurable::SetProperty(const int& key, const std::string& value, const int& slot/*= 0*/)
{
}
TString FairMQConfigurable::GetProperty(const Int_t& key, const TString& default_/*= ""*/, const Int_t& slot/*= 0*/)
std::string FairMQConfigurable::GetProperty(const int& key, const std::string& default_/*= ""*/, const int& slot/*= 0*/)
{
return default_;
}
void FairMQConfigurable::SetProperty(const Int_t& key, const Int_t& value, const Int_t& slot/*= 0*/)
void FairMQConfigurable::SetProperty(const int& key, const int& value, const int& slot/*= 0*/)
{
}
Int_t FairMQConfigurable::GetProperty(const Int_t& key, const Int_t& default_/*= 0*/, const Int_t& slot/*= 0*/)
int FairMQConfigurable::GetProperty(const int& key, const int& default_/*= 0*/, const int& slot/*= 0*/)
{
return default_;
}

View File

@ -8,8 +8,7 @@
#ifndef FAIRMQCONFIGURABLE_H_
#define FAIRMQCONFIGURABLE_H_
#include "Rtypes.h"
#include "TString.h"
#include <string>
class FairMQConfigurable
@ -19,10 +18,10 @@ class FairMQConfigurable
Last = 1
};
FairMQConfigurable();
virtual void SetProperty(const Int_t& key, const TString& value, const Int_t& slot = 0);
virtual TString GetProperty(const Int_t& key, const TString& default_ = "", const Int_t& slot = 0);
virtual void SetProperty(const Int_t& key, const Int_t& value, const Int_t& slot = 0);
virtual Int_t GetProperty(const Int_t& key, const Int_t& default_ = 0, const Int_t& slot = 0);
virtual void SetProperty(const int& key, const std::string& value, const int& slot = 0);
virtual std::string GetProperty(const int& key, const std::string& default_ = "", const int& slot = 0);
virtual void SetProperty(const int& key, const int& value, const int& slot = 0);
virtual int GetProperty(const int& key, const int& default_ = 0, const int& slot = 0);
virtual ~FairMQConfigurable();
};

View File

@ -9,7 +9,7 @@
#include <boost/thread.hpp>
#include "FairMQSocket.h"
#include "FairMQSocketZMQ.h"
#include "FairMQDevice.h"
#include "FairMQLogger.h"
@ -32,26 +32,26 @@ void FairMQDevice::Init()
fPayloadContext = new FairMQContext(fNumIoThreads);
fInputAddress = new std::vector<TString>(fNumInputs);
fInputMethod = new std::vector<TString>();
fInputSocketType = new std::vector<Int_t>();
fInputSndBufSize = new std::vector<Int_t>();
fInputRcvBufSize = new std::vector<Int_t>();
fInputAddress = new std::vector<std::string>(fNumInputs);
fInputMethod = new std::vector<std::string>();
fInputSocketType = new std::vector<int>();
fInputSndBufSize = new std::vector<int>();
fInputRcvBufSize = new std::vector<int>();
for (Int_t i = 0; i < fNumInputs; ++i) {
for (int i = 0; i < fNumInputs; ++i) {
fInputMethod->push_back("connect"); // default value, can be overwritten in configuration
fInputSocketType->push_back(ZMQ_SUB); // default value, can be overwritten in configuration
fInputSndBufSize->push_back(10000); // default value, can be overwritten in configuration
fInputRcvBufSize->push_back(10000); // default value, can be overwritten in configuration
}
fOutputAddress = new std::vector<TString>(fNumOutputs);
fOutputMethod = new std::vector<TString>();
fOutputSocketType = new std::vector<Int_t>();
fOutputSndBufSize = new std::vector<Int_t>();
fOutputRcvBufSize = new std::vector<Int_t>();
fOutputAddress = new std::vector<std::string>(fNumOutputs);
fOutputMethod = new std::vector<std::string>();
fOutputSocketType = new std::vector<int>();
fOutputSndBufSize = new std::vector<int>();
fOutputRcvBufSize = new std::vector<int>();
for (Int_t i = 0; i < fNumOutputs; ++i) {
for (int i = 0; i < fNumOutputs; ++i) {
fOutputMethod->push_back("bind"); // default value, can be overwritten in configuration
fOutputSocketType->push_back(ZMQ_PUB); // default value, can be overwritten in configuration
fOutputSndBufSize->push_back(10000); // default value, can be overwritten in configuration
@ -63,8 +63,9 @@ void FairMQDevice::InitInput()
{
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> InitInput <<<<<<<");
for (Int_t i = 0; i < fNumInputs; ++i) {
FairMQSocket* socket = new FairMQSocket(fPayloadContext, fInputSocketType->at(i), i);
for (int i = 0; i < fNumInputs; ++i) {
//FairMQSocket* socket = new FairMQSocketZMQ(fPayloadContext, fInputSocketType->at(i), i);
FairMQSocket* socket = fTransportFactory->CreateSocket(fPayloadContext, fInputSocketType->at(i), i);
socket->SetOption(ZMQ_SNDHWM, &fInputSndBufSize->at(i), sizeof(fInputSndBufSize->at(i)));
socket->SetOption(ZMQ_RCVHWM, &fInputRcvBufSize->at(i), sizeof(fInputRcvBufSize->at(i)));
@ -86,11 +87,14 @@ void FairMQDevice::InitOutput()
{
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> InitOutput <<<<<<<");
for (Int_t i = 0; i < fNumOutputs; ++i) {
FairMQSocket* socket = new FairMQSocket(fPayloadContext, fOutputSocketType->at(i), i);
for (int i = 0; i < fNumOutputs; ++i) {
FairMQSocket* socket = fTransportFactory->CreateSocket(fPayloadContext, fOutputSocketType->at(i), i);
socket->SetOption(ZMQ_SNDHWM, &fOutputSndBufSize->at(i), sizeof(fOutputSndBufSize->at(i)));
socket->SetOption(ZMQ_RCVHWM, &fOutputRcvBufSize->at(i), sizeof(fOutputRcvBufSize->at(i)));
fPayloadOutputs->push_back(socket);
try {
if (fOutputMethod->at(i) == "bind") {
fPayloadOutputs->at(i)->Bind(fOutputAddress->at(i));
@ -111,7 +115,7 @@ void FairMQDevice::Pause()
}
// Method for setting properties represented as a string.
void FairMQDevice::SetProperty(const Int_t& key, const TString& value, const Int_t& slot/*= 0*/)
void FairMQDevice::SetProperty(const int& key, const std::string& value, const int& slot/*= 0*/)
{
switch (key) {
case Id:
@ -140,7 +144,7 @@ void FairMQDevice::SetProperty(const Int_t& key, const TString& value, const Int
}
// Method for setting properties represented as an integer.
void FairMQDevice::SetProperty(const Int_t& key, const Int_t& value, const Int_t& slot/*= 0*/)
void FairMQDevice::SetProperty(const int& key, const int& value, const int& slot/*= 0*/)
{
switch (key) {
case NumIoThreads:
@ -186,7 +190,7 @@ void FairMQDevice::SetProperty(const Int_t& key, const Int_t& value, const Int_t
}
// Method for getting properties represented as an string.
TString FairMQDevice::GetProperty(const Int_t& key, const TString& default_/*= ""*/, const Int_t& slot/*= 0*/)
std::string FairMQDevice::GetProperty(const int& key, const std::string& default_/*= ""*/, const int& slot/*= 0*/)
{
switch (key) {
case Id:
@ -205,7 +209,7 @@ TString FairMQDevice::GetProperty(const Int_t& key, const TString& default_/*= "
}
// Method for getting properties represented as an integer.
Int_t FairMQDevice::GetProperty(const Int_t& key, const Int_t& default_/*= 0*/, const Int_t& slot/*= 0*/)
int FairMQDevice::GetProperty(const int& key, const int& default_/*= 0*/, const int& slot/*= 0*/)
{
switch (key) {
case NumIoThreads:
@ -229,6 +233,11 @@ Int_t FairMQDevice::GetProperty(const Int_t& key, const Int_t& default_/*= 0*/,
}
}
void FairMQDevice::SetTransport(FairMQTransportFactory* factory)
{
fTransportFactory = factory;
}
void FairMQDevice::LogSocketRates()
{
timestamp_t t0;
@ -236,20 +245,20 @@ void FairMQDevice::LogSocketRates()
timestamp_t timeSinceLastLog_ms;
ULong_t* bytesInput = new ULong_t[fNumInputs];
ULong_t* messagesInput = new ULong_t[fNumInputs];
ULong_t* bytesOutput = new ULong_t[fNumOutputs];
ULong_t* messagesOutput = new ULong_t[fNumOutputs];
unsigned long* bytesInput = new unsigned long[fNumInputs];
unsigned long* messagesInput = new unsigned long[fNumInputs];
unsigned long* bytesOutput = new unsigned long[fNumOutputs];
unsigned long* messagesOutput = new unsigned long[fNumOutputs];
ULong_t* bytesInputNew = new ULong_t[fNumInputs];
ULong_t* messagesInputNew = new ULong_t[fNumInputs];
ULong_t* bytesOutputNew = new ULong_t[fNumOutputs];
ULong_t* messagesOutputNew = new ULong_t[fNumOutputs];
unsigned long* bytesInputNew = new unsigned long[fNumInputs];
unsigned long* messagesInputNew = new unsigned long[fNumInputs];
unsigned long* bytesOutputNew = new unsigned long[fNumOutputs];
unsigned long* messagesOutputNew = new unsigned long[fNumOutputs];
Double_t* megabytesPerSecondInput = new Double_t[fNumInputs];
Double_t* messagesPerSecondInput = new Double_t[fNumInputs];
Double_t* megabytesPerSecondOutput = new Double_t[fNumOutputs];
Double_t* messagesPerSecondOutput = new Double_t[fNumOutputs];
double* megabytesPerSecondInput = new double[fNumInputs];
double* messagesPerSecondInput = new double[fNumInputs];
double* megabytesPerSecondOutput = new double[fNumOutputs];
double* messagesPerSecondOutput = new double[fNumOutputs];
// Temp stuff for process termination
bool receivedSomething = false;
@ -258,7 +267,7 @@ void FairMQDevice::LogSocketRates()
int didNotSendFor = 0;
// End of temp stuff
Int_t i = 0;
int i = 0;
for ( std::vector<FairMQSocket*>::iterator itr = fPayloadInputs->begin(); itr != fPayloadInputs->end(); itr++ ) {
bytesInput[i] = (*itr)->GetBytesRx();
messagesInput[i] = (*itr)->GetMessagesRx();
@ -286,10 +295,10 @@ void FairMQDevice::LogSocketRates()
for ( std::vector<FairMQSocket*>::iterator itr = fPayloadInputs->begin(); itr != fPayloadInputs->end(); itr++ ) {
bytesInputNew[i] = (*itr)->GetBytesRx();
megabytesPerSecondInput[i] = ((Double_t) (bytesInputNew[i] - bytesInput[i]) / (1024. * 1024.)) / (Double_t) timeSinceLastLog_ms * 1000.;
megabytesPerSecondInput[i] = ((double) (bytesInputNew[i] - bytesInput[i]) / (1024. * 1024.)) / (double) timeSinceLastLog_ms * 1000.;
bytesInput[i] = bytesInputNew[i];
messagesInputNew[i] = (*itr)->GetMessagesRx();
messagesPerSecondInput[i] = (Double_t) (messagesInputNew[i] - messagesInput[i]) / (Double_t) timeSinceLastLog_ms * 1000.;
messagesPerSecondInput[i] = (double) (messagesInputNew[i] - messagesInput[i]) / (double) timeSinceLastLog_ms * 1000.;
messagesInput[i] = messagesInputNew[i];
std::stringstream logmsg;
@ -314,10 +323,10 @@ void FairMQDevice::LogSocketRates()
for ( std::vector<FairMQSocket*>::iterator itr = fPayloadOutputs->begin(); itr != fPayloadOutputs->end(); itr++ ) {
bytesOutputNew[i] = (*itr)->GetBytesTx();
megabytesPerSecondOutput[i] = ((Double_t) (bytesOutputNew[i] - bytesOutput[i]) / (1024. * 1024.)) / (Double_t) timeSinceLastLog_ms * 1000.;
megabytesPerSecondOutput[i] = ((double) (bytesOutputNew[i] - bytesOutput[i]) / (1024. * 1024.)) / (double) timeSinceLastLog_ms * 1000.;
bytesOutput[i] = bytesOutputNew[i];
messagesOutputNew[i] = (*itr)->GetMessagesTx();
messagesPerSecondOutput[i] = (Double_t) (messagesOutputNew[i] - messagesOutput[i]) / (Double_t) timeSinceLastLog_ms * 1000.;
messagesPerSecondOutput[i] = (double) (messagesOutputNew[i] - messagesOutput[i]) / (double) timeSinceLastLog_ms * 1000.;
messagesOutput[i] = messagesOutputNew[i];
std::stringstream logmsg;

View File

@ -8,13 +8,14 @@
#ifndef FAIRMQDEVICE_H_
#define FAIRMQDEVICE_H_
#include <vector>
#include <string>
#include "FairMQConfigurable.h"
#include "FairMQStateMachine.h"
#include <vector>
#include "FairMQTransportFactory.h"
#include "FairMQContext.h"
#include "FairMQSocket.h"
#include "Rtypes.h"
#include "TString.h"
class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable
@ -44,37 +45,40 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable
virtual void LogSocketRates();
virtual void ListenToCommands();
virtual void SetProperty(const Int_t& key, const TString& value, const Int_t& slot = 0);
virtual TString GetProperty(const Int_t& key, const TString& default_ = "", const Int_t& slot = 0);
virtual void SetProperty(const Int_t& key, const Int_t& value, const Int_t& slot = 0);
virtual Int_t GetProperty(const Int_t& key, const Int_t& default_ = 0, const Int_t& slot = 0);
virtual void SetProperty(const int& key, const std::string& value, const int& slot = 0);
virtual std::string GetProperty(const int& key, const std::string& default_ = "", const int& slot = 0);
virtual void SetProperty(const int& key, const int& value, const int& slot = 0);
virtual int GetProperty(const int& key, const int& default_ = 0, const int& slot = 0);
virtual void SetTransport(FairMQTransportFactory* factory);
virtual ~FairMQDevice();
protected:
TString fId;
Int_t fNumIoThreads;
std::string fId;
int fNumIoThreads;
FairMQContext* fPayloadContext;
FairMQTransportFactory* fTransportFactory;
Int_t fNumInputs;
Int_t fNumOutputs;
int fNumInputs;
int fNumOutputs;
std::vector<TString> *fInputAddress;
std::vector<TString> *fInputMethod;
std::vector<Int_t> *fInputSocketType;
std::vector<Int_t> *fInputSndBufSize;
std::vector<Int_t> *fInputRcvBufSize;
std::vector<std::string> *fInputAddress;
std::vector<std::string> *fInputMethod;
std::vector<int> *fInputSocketType;
std::vector<int> *fInputSndBufSize;
std::vector<int> *fInputRcvBufSize;
std::vector<TString> *fOutputAddress;
std::vector<TString> *fOutputMethod;
std::vector<Int_t> *fOutputSocketType;
std::vector<Int_t> *fOutputSndBufSize;
std::vector<Int_t> *fOutputRcvBufSize;
std::vector<std::string> *fOutputAddress;
std::vector<std::string> *fOutputMethod;
std::vector<int> *fOutputSocketType;
std::vector<int> *fOutputSndBufSize;
std::vector<int> *fOutputRcvBufSize;
std::vector<FairMQSocket*> *fPayloadInputs;
std::vector<FairMQSocket*> *fPayloadOutputs;
Int_t fLogIntervalInMs;
int fLogIntervalInMs;
virtual void Init();
virtual void Run();

View File

@ -5,10 +5,11 @@
* @author D. Klein, A. Rybalchenko
*/
#include "FairMQLogger.h"
#include <iostream>
#include <ctime>
#include <iomanip>
#include <ctime>
#include "FairMQLogger.h"
FairMQLogger* FairMQLogger::instance = NULL;
@ -21,7 +22,7 @@ FairMQLogger* FairMQLogger::GetInstance()
return instance;
}
FairMQLogger* FairMQLogger::InitInstance(TString bindAddress)
FairMQLogger* FairMQLogger::InitInstance(std::string bindAddress)
{
instance = new FairMQLogger(bindAddress);
return instance;
@ -32,7 +33,7 @@ FairMQLogger::FairMQLogger() :
{
}
FairMQLogger::FairMQLogger(TString bindAddress) :
FairMQLogger::FairMQLogger(std::string bindAddress) :
fBindAddress(bindAddress)
{
}
@ -41,17 +42,17 @@ FairMQLogger::~FairMQLogger()
{
}
void FairMQLogger::Log(Int_t type, TString logmsg)
void FairMQLogger::Log(int type, std::string logmsg)
{
timestamp_t tm = get_timestamp();
timestamp_t ms = tm / 1000.0L;
timestamp_t s = ms / 1000.0L;
std::time_t t = s;
std::size_t fractional_seconds = ms % 1000;
Text_t mbstr[100];
char mbstr[100];
std::strftime(mbstr, 100, "%H:%M:%S:", std::localtime(&t));
TString type_str;
std::string type_str;
switch (type) {
case DEBUG:
type_str = "\033[01;34mDEBUG\033[0m";

View File

@ -7,28 +7,27 @@
#ifndef FAIRMQLOGGER_H_
#define FAIRMQLOGGER_H_
#include <string>
#include <sstream>
#include <sys/time.h>
#include "Rtypes.h"
#include "TString.h"
class FairMQLogger
{
private:
static FairMQLogger* instance;
TString fBindAddress;
std::string fBindAddress;
public:
enum {
DEBUG, INFO, ERROR, STATE
};
FairMQLogger();
FairMQLogger(TString bindAdress);
FairMQLogger(std::string bindAdress);
virtual ~FairMQLogger();
void Log(Int_t type, TString logmsg);
void Log(int type, std::string logmsg);
static FairMQLogger* GetInstance();
static FairMQLogger* InitInstance(TString bindAddress);
static FairMQLogger* InitInstance(std::string bindAddress);
};
typedef unsigned long long timestamp_t;

View File

@ -35,22 +35,24 @@ void FairMQMerger::Run()
items[i].revents = 0;
}
Bool_t received = false;
bool received = false;
while ( fState == RUNNING ) {
FairMQMessage msg;
FairMQMessage* msg = new FairMQMessageZMQ();
zmq_poll(items, fNumInputs, 100);
for(int i = 0; i < fNumInputs; i++) {
if (items[i].revents & ZMQ_POLLIN) {
received = fPayloadInputs->at(i)->Receive(&msg);
received = fPayloadInputs->at(i)->Receive(msg);
}
if (received) {
fPayloadOutputs->at(0)->Send(&msg);
fPayloadOutputs->at(0)->Send(msg);
received = false;
}
}
delete msg;
}
rateLogger.interrupt();

View File

@ -4,96 +4,3 @@
* @since 2012-12-05
* @author: D. Klein, A. Rybalchenko
*/
#include <cstdlib>
#include "FairMQMessage.h"
#include "FairMQLogger.h"
FairMQMessage::FairMQMessage()
{
int rc = zmq_msg_init (&fMessage);
if (rc != 0){
std::stringstream logmsg;
logmsg << "failed initializing message, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
}
}
FairMQMessage::FairMQMessage(size_t size)
{
int rc = zmq_msg_init_size (&fMessage, size);
if (rc != 0){
std::stringstream logmsg;
logmsg << "failed initializing message with size, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
}
}
FairMQMessage::FairMQMessage(void* data, size_t size)
{
int rc = zmq_msg_init_data (&fMessage, data, size, &CleanUp, NULL);
if (rc != 0){
std::stringstream logmsg;
logmsg << "failed initializing message with data, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
}
}
FairMQMessage::~FairMQMessage()
{
int rc = zmq_msg_close (&fMessage);
if (rc != 0){
std::stringstream logmsg;
logmsg << "failed closing message with data, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
}
}
void FairMQMessage::Rebuild(void* data, size_t size)
{
int rc = zmq_msg_close (&fMessage);
if (rc != 0) {
std::stringstream logmsg;
logmsg << "failed closing message, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
}
rc = zmq_msg_init_data (&fMessage, data, size, &CleanUp, NULL);
if (rc != 0) {
std::stringstream logmsg2;
logmsg2 << "failed initializing message with data, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg2.str());
}
}
zmq_msg_t* FairMQMessage::GetMessage()
{
return &fMessage;
}
void* FairMQMessage::GetData()
{
return zmq_msg_data (&fMessage);
}
size_t FairMQMessage::GetSize()
{
return zmq_msg_size (&fMessage);
}
void FairMQMessage::Copy(FairMQMessage* msg)
{
int rc = zmq_msg_copy (&fMessage, &(msg->fMessage));
if (rc != 0) {
std::stringstream logmsg;
logmsg << "failed copying message, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
}
}
void FairMQMessage::CleanUp(void* data, void* hint)
{
free (data);
}

View File

@ -8,34 +8,23 @@
#ifndef FAIRMQMESSAGE_H_
#define FAIRMQMESSAGE_H_
#include <cstddef>
#include <zmq.h>
#include <cstddef> // for size_t
class FairMQMessage
{
public:
FairMQMessage();
FairMQMessage(size_t size);
FairMQMessage(void* data, size_t size);
virtual void Rebuild() = 0;
virtual void Rebuild(size_t size) = 0;
virtual void Rebuild(void* data, size_t site) = 0;
void Rebuild();
void Rebuild(size_t size);
void Rebuild(void* data, size_t site);
virtual void* GetMessage() = 0;
virtual void* GetData() = 0;
virtual size_t GetSize() = 0;
zmq_msg_t* GetMessage();
void* GetData();
size_t GetSize();
virtual void Copy(FairMQMessage* msg) = 0;
void Copy(FairMQMessage* msg);
static void CleanUp(void* data, void* hint);
virtual ~FairMQMessage();
private:
zmq_msg_t fMessage;
virtual ~FairMQMessage() {};
};
#endif /* FAIRMQMESSAGE_H_ */

View File

@ -0,0 +1,111 @@
/**
* FairMQMessageZMQ.cxx
*
* @since 2012-12-05
* @author: D. Klein, A. Rybalchenko
*/
#include <cstdlib>
#include "FairMQMessageZMQ.h"
#include "FairMQLogger.h"
FairMQMessageZMQ::FairMQMessageZMQ()
{
int rc = zmq_msg_init (&fMessage);
if (rc != 0){
std::stringstream logmsg;
logmsg << "failed initializing message, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
}
}
FairMQMessageZMQ::FairMQMessageZMQ(size_t size)
{
int rc = zmq_msg_init_size (&fMessage, size);
if (rc != 0){
std::stringstream logmsg;
logmsg << "failed initializing message with size, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
}
}
FairMQMessageZMQ::FairMQMessageZMQ(void* data, size_t size)
{
int rc = zmq_msg_init_data (&fMessage, data, size, &CleanUp, NULL);
if (rc != 0){
std::stringstream logmsg;
logmsg << "failed initializing message with data, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
}
}
void FairMQMessageZMQ::Rebuild()
{
//TODO
}
void FairMQMessageZMQ::Rebuild(size_t size)
{
//TODO
}
void FairMQMessageZMQ::Rebuild(void* data, size_t size)
{
int rc = zmq_msg_close (&fMessage);
if (rc != 0) {
std::stringstream logmsg;
logmsg << "failed closing message, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
}
rc = zmq_msg_init_data (&fMessage, data, size, &CleanUp, NULL);
if (rc != 0) {
std::stringstream logmsg2;
logmsg2 << "failed initializing message with data, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg2.str());
}
}
void* FairMQMessageZMQ::GetMessage()
{
return &fMessage;
}
void* FairMQMessageZMQ::GetData()
{
return zmq_msg_data (&fMessage);
}
size_t FairMQMessageZMQ::GetSize()
{
return zmq_msg_size (&fMessage);
}
void FairMQMessageZMQ::Copy(FairMQMessage* msg)
{
int rc = zmq_msg_copy (&fMessage, &(static_cast<FairMQMessageZMQ*>(msg)->fMessage));
if (rc != 0) {
std::stringstream logmsg;
logmsg << "failed copying message, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
}
}
void FairMQMessageZMQ::CleanUp(void* data, void* hint)
{
free (data);
}
FairMQMessageZMQ::~FairMQMessageZMQ()
{
int rc = zmq_msg_close (&fMessage);
if (rc != 0){
std::stringstream logmsg;
logmsg << "failed closing message with data, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
}
}

View File

@ -0,0 +1,43 @@
/**
* FairMQMessageZMQ.h
*
* @since 2014-01-17
* @author: A. Rybalchenko
*/
#ifndef FAIRMQMESSAGEZMQ_H_
#define FAIRMQMESSAGEZMQ_H_
#include <cstddef>
#include <zmq.h>
#include "FairMQMessage.h"
class FairMQMessageZMQ: public FairMQMessage
{
public:
FairMQMessageZMQ();
FairMQMessageZMQ(size_t size);
FairMQMessageZMQ(void* data, size_t size);
virtual void Rebuild();
virtual void Rebuild(size_t size);
virtual void Rebuild(void* data, size_t site);
virtual void* GetMessage();
virtual void* GetData();
virtual size_t GetSize();
virtual void Copy(FairMQMessage* msg);
static void CleanUp(void* data, void* hint);
virtual ~FairMQMessageZMQ();
private:
zmq_msg_t fMessage;
};
#endif /* FAIRMQMESSAGEZMQ_H_ */

View File

@ -42,21 +42,23 @@ void FairMQProcessor::Run()
int receivedMsgs = 0;
int sentMsgs = 0;
Bool_t received = false;
bool received = false;
while ( fState == RUNNING ) {
FairMQMessage msg;
FairMQMessage* msg = new FairMQMessageZMQ();
received = fPayloadInputs->at(0)->Receive(&msg);
received = fPayloadInputs->at(0)->Receive(msg);
receivedMsgs++;
if (received) {
fTask->Exec(&msg, NULL);
fTask->Exec(msg, NULL);
fPayloadOutputs->at(0)->Send(&msg);
fPayloadOutputs->at(0)->Send(msg);
sentMsgs++;
received = false;
}
delete msg;
}
std::cout << "I've received " << receivedMsgs << " and sent " << sentMsgs << " messages!" << std::endl;

View File

@ -10,7 +10,6 @@
#include "FairMQDevice.h"
#include "FairMQProcessorTask.h"
#include "Rtypes.h"
class FairMQProcessor: public FairMQDevice

View File

@ -9,6 +9,7 @@
#define FAIRMQPROCESSORTASK_H_
#include <vector>
#include "FairMQMessage.h"
#include "FairTask.h"

View File

@ -25,13 +25,15 @@ void FairMQProxy::Run()
boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this));
FairMQMessage msg;
FairMQMessage* msg = new FairMQMessageZMQ();
while ( fState == RUNNING ) {
fPayloadInputs->at(0)->Receive(&msg);
fPayloadOutputs->at(0)->Send(&msg);
fPayloadInputs->at(0)->Receive(msg);
fPayloadOutputs->at(0)->Send(msg);
}
delete msg;
rateLogger.interrupt();
rateLogger.join();
}

View File

@ -9,8 +9,6 @@
#define FAIRMQPROXY_H_
#include "FairMQDevice.h"
#include "Rtypes.h"
#include "TString.h"
class FairMQProxy: public FairMQDevice

View File

@ -62,7 +62,7 @@ void FairMQSampler::Init()
fFairRunAna->Init();
//fFairRunAna->Run(0, 0);
FairRootManager* ioman = FairRootManager::Instance();
fNumEvents = Int_t((ioman->GetInChain())->GetEntries());
fNumEvents = int((ioman->GetInChain())->GetEntries());
}
void FairMQSampler::Run()
@ -139,9 +139,9 @@ void FairMQSampler::ListenToCommands()
while ( true ) {
try {
FairMQMessage msg;
FairMQMessage* msg = new FairMQMessageZMQ();
received = fPayloadInputs->at(0)->Receive(&msg);
received = fPayloadInputs->at(0)->Receive(msg);
if (received) {
//command handling goes here.
@ -149,6 +149,8 @@ void FairMQSampler::ListenToCommands()
received = false;
}
delete msg;
boost::this_thread::interruption_point();
} catch (boost::thread_interrupted&) {
std::cout << "commandListener interrupted" << std::endl;
@ -158,7 +160,7 @@ void FairMQSampler::ListenToCommands()
FairMQLogger::GetInstance()->Log(FairMQLogger::DEBUG, ">>>>>>> stopping commandListener <<<<<<<");
}
void FairMQSampler::SetProperty(const Int_t& key, const TString& value, const Int_t& slot/*= 0*/)
void FairMQSampler::SetProperty(const int& key, const std::string& value, const int& slot/*= 0*/)
{
switch (key) {
case InputFile:
@ -176,7 +178,7 @@ void FairMQSampler::SetProperty(const Int_t& key, const TString& value, const In
}
}
TString FairMQSampler::GetProperty(const Int_t& key, const TString& default_/*= ""*/, const Int_t& slot/*= 0*/)
std::string FairMQSampler::GetProperty(const int& key, const std::string& default_/*= ""*/, const int& slot/*= 0*/)
{
switch (key) {
case InputFile:
@ -190,7 +192,7 @@ TString FairMQSampler::GetProperty(const Int_t& key, const TString& default_/*=
}
}
void FairMQSampler::SetProperty(const Int_t& key, const Int_t& value, const Int_t& slot/*= 0*/)
void FairMQSampler::SetProperty(const int& key, const int& value, const int& slot/*= 0*/)
{
switch (key) {
case EventRate:
@ -202,7 +204,7 @@ void FairMQSampler::SetProperty(const Int_t& key, const Int_t& value, const Int_
}
}
Int_t FairMQSampler::GetProperty(const Int_t& key, const Int_t& default_/*= 0*/, const Int_t& slot/*= 0*/)
int FairMQSampler::GetProperty(const int& key, const int& default_/*= 0*/, const int& slot/*= 0*/)
{
switch (key) {
case EventRate:

View File

@ -13,7 +13,6 @@
#include "FairTask.h"
#include "FairMQDevice.h"
#include "FairMQSamplerTask.h"
#include "TString.h"
/**
@ -41,19 +40,19 @@ class FairMQSampler: public FairMQDevice
void ResetEventCounter();
virtual void ListenToCommands();
virtual void SetProperty(const Int_t& key, const TString& value, const Int_t& slot = 0);
virtual TString GetProperty(const Int_t& key, const TString& default_ = "", const Int_t& slot = 0);
virtual void SetProperty(const Int_t& key, const Int_t& value, const Int_t& slot = 0);
virtual Int_t GetProperty(const Int_t& key, const Int_t& default_ = 0, const Int_t& slot = 0);
virtual void SetProperty(const int& key, const std::string& value, const int& slot = 0);
virtual std::string GetProperty(const int& key, const std::string& default_ = "", const int& slot = 0);
virtual void SetProperty(const int& key, const int& value, const int& slot = 0);
virtual int GetProperty(const int& key, const int& default_ = 0, const int& slot = 0);
protected:
FairRunAna* fFairRunAna;
Int_t fNumEvents;
int fNumEvents;
FairMQSamplerTask* fSamplerTask;
TString fInputFile; // Filename of a root file containing the simulated digis.
TString fParFile;
TString fBranch; // The name of the sub-detector branch to stream the digis from.
Int_t fEventRate;
Int_t fEventCounter;
std::string fInputFile; // Filename of a root file containing the simulated digis.
std::string fParFile;
std::string fBranch; // The name of the sub-detector branch to stream the digis from.
int fEventRate;
int fEventCounter;
virtual void Init();
virtual void Run();

View File

@ -8,11 +8,11 @@
#include "FairMQSamplerTask.h"
FairMQSamplerTask::FairMQSamplerTask(const Text_t* name, Int_t iVerbose) :
FairMQSamplerTask::FairMQSamplerTask(const Text_t* name, int iVerbose) :
FairTask(name, iVerbose),
fInput(NULL),
fBranch(""),
fOutput(new FairMQMessage)
fOutput(new FairMQMessageZMQ)
{
}
@ -20,7 +20,7 @@ FairMQSamplerTask::FairMQSamplerTask() :
FairTask( "Abstract base task used for loading a branch from a root file into memory"),
fInput(NULL),
fBranch(""),
fOutput(new FairMQMessage)
fOutput(new FairMQMessageZMQ)
{
}

View File

@ -13,6 +13,7 @@
#include "TClonesArray.h"
#include <string>
#include "FairMQMessage.h"
#include "FairMQMessageZMQ.h"
#include "TString.h"
@ -20,7 +21,7 @@ class FairMQSamplerTask: public FairTask
{
public:
FairMQSamplerTask();
FairMQSamplerTask(const Text_t* name, Int_t iVerbose=1);
FairMQSamplerTask(const Text_t* name, int iVerbose=1);
virtual ~FairMQSamplerTask();
virtual InitStatus Init();
virtual void Exec(Option_t* opt) = 0;

View File

@ -17,15 +17,16 @@ FairMQSink::FairMQSink()
void FairMQSink::Run()
{
void* status; //necessary for pthread_join
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> Run <<<<<<<");
boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this));
while ( fState == RUNNING ) {
FairMQMessage msg;
FairMQMessage* msg = new FairMQMessageZMQ();
fPayloadInputs->at(0)->Receive(&msg);
fPayloadInputs->at(0)->Receive(msg);
delete msg;
}
rateLogger.interrupt();

View File

@ -4,178 +4,3 @@
* @since 2012-12-05
* @author D. Klein, A. Rybalchenko
*/
#include "FairMQSocket.h"
#include <sstream>
#include "FairMQLogger.h"
FairMQSocket::FairMQSocket(FairMQContext* context, int type, int num) :
fBytesTx(0),
fBytesRx(0),
fMessagesTx(0),
fMessagesRx(0)
{
std::stringstream id;
id << GetTypeString(type) << "." << num;
fId = id.str();
fSocket = zmq_socket(context->GetContext(), type);
int rc = zmq_setsockopt(fSocket, ZMQ_IDENTITY, &fId, fId.Length());
if (rc != 0) {
std::stringstream logmsg;
logmsg << "failed setting socket option, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
}
if (type == ZMQ_SUB) {
rc = zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, NULL, 0);
if (rc != 0) {
std::stringstream logmsg2;
logmsg2 << "failed setting socket option, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg2.str());
}
}
std::stringstream logmsg3;
logmsg3 << "created socket #" << fId;
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg3.str());
}
FairMQSocket::~FairMQSocket()
{
}
TString FairMQSocket::GetId()
{
return fId;
}
TString FairMQSocket::GetTypeString(int type)
{
switch (type) {
case ZMQ_SUB:
return "sub";
case ZMQ_PUB:
return "pub";
case ZMQ_PUSH:
return "push";
case ZMQ_PULL:
return "pull";
default:
return "";
}
}
void FairMQSocket::Bind(TString address)
{
std::stringstream logmsg;
logmsg << "bind socket #" << fId << " on " << address;
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
int rc = zmq_bind (fSocket, address);
if (rc != 0) {
std::stringstream logmsg2;
logmsg2 << "failed binding socket #" << fId << ", reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg2.str());
}
}
void FairMQSocket::Connect(TString address)
{
std::stringstream logmsg;
logmsg << "connect socket #" << fId << " on " << address;
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
int rc = zmq_connect (fSocket, address);
if (rc != 0) {
std::stringstream logmsg2;
logmsg2 << "failed connecting socket #" << fId << ", reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg2.str());
}
}
size_t FairMQSocket::Send(FairMQMessage* msg)
{
int nbytes = zmq_msg_send (msg->GetMessage(), fSocket, 0);
if (nbytes >= 0){
fBytesTx += nbytes;
++fMessagesTx;
return nbytes;
}
if (zmq_errno() == EAGAIN){
return false;
}
std::stringstream logmsg;
logmsg << "failed sending on socket #" << fId << ", reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
return nbytes;
}
size_t FairMQSocket::Receive(FairMQMessage* msg)
{
int nbytes = zmq_msg_recv (msg->GetMessage(), fSocket, 0);
if (nbytes >= 0){
fBytesRx += nbytes;
++fMessagesRx;
return nbytes;
}
if (zmq_errno() == EAGAIN){
return false;
}
std::stringstream logmsg;
logmsg << "failed receiving on socket #" << fId << ", reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
return nbytes;
}
void FairMQSocket::SetOption(int option, const void* value, size_t valueSize)
{
int rc = zmq_setsockopt(fSocket, option, value, valueSize);
if (rc < 0) {
std::stringstream logmsg;
logmsg << "failed setting socket option, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
}
}
void FairMQSocket::Close()
{
if (fSocket == NULL){
return;
}
int rc = zmq_close (fSocket);
if (rc != 0) {
std::stringstream logmsg;
logmsg << "failed closing socket, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
}
fSocket = NULL;
}
void* FairMQSocket::GetSocket()
{
return fSocket;
}
ULong_t FairMQSocket::GetBytesTx()
{
return fBytesTx;
}
ULong_t FairMQSocket::GetBytesRx()
{
return fBytesRx;
}
ULong_t FairMQSocket::GetMessagesTx()
{
return fMessagesTx;
}
ULong_t FairMQSocket::GetMessagesRx()
{
return fMessagesRx;
}

View File

@ -8,42 +8,34 @@
#ifndef FAIRMQSOCKET_H_
#define FAIRMQSOCKET_H_
#include <zmq.h>
#include <string>
#include "FairMQContext.h"
#include "FairMQMessage.h"
#include "Rtypes.h"
#include "TString.h"
#include "FairMQMessageZMQ.h"
class FairMQSocket
{
public:
FairMQSocket(FairMQContext* context, int type, int num);
virtual ~FairMQSocket();
TString GetId();
static TString GetTypeString(int type);
size_t Send(FairMQMessage* msg);
size_t Receive(FairMQMessage* msg);
void Close();
void Bind(TString address);
void Connect(TString address);
void* GetSocket();
virtual std::string GetId() = 0;
void SetOption(int option, const void* value, size_t valueSize);
virtual void Bind(std::string address) = 0;
virtual void Connect(std::string address) = 0;
ULong_t GetBytesTx();
ULong_t GetBytesRx();
ULong_t GetMessagesTx();
ULong_t GetMessagesRx();
virtual size_t Send(FairMQMessage* msg) = 0;
virtual size_t Receive(FairMQMessage* msg) = 0;
private:
void* fSocket;
TString fId;
ULong_t fBytesTx;
ULong_t fBytesRx;
ULong_t fMessagesTx;
ULong_t fMessagesRx;
virtual void Close() = 0;
virtual void* GetSocket() = 0;
virtual void SetOption(int option, const void* value, size_t valueSize) = 0;
virtual unsigned long GetBytesTx() = 0;
virtual unsigned long GetBytesRx() = 0;
virtual unsigned long GetMessagesTx() = 0;
virtual unsigned long GetMessagesRx() = 0;
virtual ~FairMQSocket() {};
};
#endif /* FAIRMQSOCKET_H_ */

View File

@ -0,0 +1,182 @@
/**
* FairMQSocketZMQ.cxx
*
* @since 2012-12-05
* @author D. Klein, A. Rybalchenko
*/
#include <sstream>
#include "FairMQSocketZMQ.h"
#include "FairMQLogger.h"
FairMQSocketZMQ::FairMQSocketZMQ(FairMQContext* context, int type, int num) :
fBytesTx(0),
fBytesRx(0),
fMessagesTx(0),
fMessagesRx(0)
{
std::stringstream id;
id << GetTypeString(type) << "." << num;
fId = id.str();
fSocket = zmq_socket(context->GetContext(), type);
int rc = zmq_setsockopt(fSocket, ZMQ_IDENTITY, &fId, fId.length());
if (rc != 0) {
std::stringstream logmsg;
logmsg << "failed setting socket option, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
}
if (type == ZMQ_SUB) {
rc = zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, NULL, 0);
if (rc != 0) {
std::stringstream logmsg2;
logmsg2 << "failed setting socket option, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg2.str());
}
}
std::stringstream logmsg3;
logmsg3 << "created socket #" << fId;
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg3.str());
}
std::string FairMQSocketZMQ::GetId()
{
return fId;
}
std::string FairMQSocketZMQ::GetTypeString(int type)
{
switch (type) {
case ZMQ_SUB:
return "sub";
case ZMQ_PUB:
return "pub";
case ZMQ_PUSH:
return "push";
case ZMQ_PULL:
return "pull";
default:
return "";
}
}
void FairMQSocketZMQ::Bind(std::string address)
{
std::stringstream logmsg;
logmsg << "bind socket #" << fId << " on " << address;
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
int rc = zmq_bind (fSocket, address.c_str());
if (rc != 0) {
std::stringstream logmsg2;
logmsg2 << "failed binding socket #" << fId << ", reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg2.str());
}
}
void FairMQSocketZMQ::Connect(std::string address)
{
std::stringstream logmsg;
logmsg << "connect socket #" << fId << " on " << address;
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
int rc = zmq_connect (fSocket, address.c_str());
if (rc != 0) {
std::stringstream logmsg2;
logmsg2 << "failed connecting socket #" << fId << ", reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg2.str());
}
}
size_t FairMQSocketZMQ::Send(FairMQMessage* msg)
{
int nbytes = zmq_msg_send (static_cast<zmq_msg_t*>(msg->GetMessage()), fSocket, 0);
if (nbytes >= 0){
fBytesTx += nbytes;
++fMessagesTx;
return nbytes;
}
if (zmq_errno() == EAGAIN){
return false;
}
std::stringstream logmsg;
logmsg << "failed sending on socket #" << fId << ", reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
return nbytes;
}
size_t FairMQSocketZMQ::Receive(FairMQMessage* msg)
{
int nbytes = zmq_msg_recv (static_cast<zmq_msg_t*>(msg->GetMessage()), fSocket, 0);
if (nbytes >= 0){
fBytesRx += nbytes;
++fMessagesRx;
return nbytes;
}
if (zmq_errno() == EAGAIN){
return false;
}
std::stringstream logmsg;
logmsg << "failed receiving on socket #" << fId << ", reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
return nbytes;
}
void FairMQSocketZMQ::SetOption(int option, const void* value, size_t valueSize)
{
int rc = zmq_setsockopt(fSocket, option, value, valueSize);
if (rc < 0) {
std::stringstream logmsg;
logmsg << "failed setting socket option, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
}
}
void FairMQSocketZMQ::Close()
{
if (fSocket == NULL){
return;
}
int rc = zmq_close (fSocket);
if (rc != 0) {
std::stringstream logmsg;
logmsg << "failed closing socket, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
}
fSocket = NULL;
}
void* FairMQSocketZMQ::GetSocket()
{
return fSocket;
}
unsigned long FairMQSocketZMQ::GetBytesTx()
{
return fBytesTx;
}
unsigned long FairMQSocketZMQ::GetBytesRx()
{
return fBytesRx;
}
unsigned long FairMQSocketZMQ::GetMessagesTx()
{
return fMessagesTx;
}
unsigned long FairMQSocketZMQ::GetMessagesRx()
{
return fMessagesRx;
}
FairMQSocketZMQ::~FairMQSocketZMQ()
{
}

View File

@ -0,0 +1,53 @@
/**
* FairMQSocketZMQ.h
*
* @since 2012-12-05
* @author D. Klein, A. Rybalchenko
*/
#ifndef FAIRMQSOCKETZMQ_H_
#define FAIRMQSOCKETZMQ_H_
#include <zmq.h>
#include "FairMQSocket.h"
#include "FairMQContext.h"
#include "FairMQMessageZMQ.h"
class FairMQSocketZMQ : public FairMQSocket
{
public:
FairMQSocketZMQ(FairMQContext* context, int type, int num);
virtual std::string GetId();
virtual void Bind(std::string address);
virtual void Connect(std::string address);
virtual size_t Send(FairMQMessage* msg);
virtual size_t Receive(FairMQMessage* msg);
virtual void Close();
virtual void* GetSocket();
virtual void SetOption(int option, const void* value, size_t valueSize);
virtual unsigned long GetBytesTx();
virtual unsigned long GetBytesRx();
virtual unsigned long GetMessagesTx();
virtual unsigned long GetMessagesRx();
static std::string GetTypeString(int type);
virtual ~FairMQSocketZMQ();
private:
void* fSocket;
std::string fId;
unsigned long fBytesTx;
unsigned long fBytesRx;
unsigned long fMessagesTx;
unsigned long fMessagesRx;
};
#endif /* FAIRMQSOCKETZMQ_H_ */

View File

@ -30,18 +30,20 @@ void FairMQSplitter::Run()
int direction = 0;
while ( fState == RUNNING ) {
FairMQMessage msg;
FairMQMessage* msg = new FairMQMessageZMQ();
received = fPayloadInputs->at(0)->Receive(&msg);
received = fPayloadInputs->at(0)->Receive(msg);
if (received) {
fPayloadOutputs->at(direction)->Send(&msg);
fPayloadOutputs->at(direction)->Send(msg);
direction++;
if (direction >= fNumOutputs) {
direction = 0;
}
received = false;
}
delete msg;
}
rateLogger.interrupt();

View File

@ -10,6 +10,7 @@
#include "FairMQLogger.h"
#include "FairMQBenchmarkSampler.h"
#include "FairMQTransportFactoryZMQ.h"
FairMQBenchmarkSampler sampler;
@ -50,6 +51,9 @@ int main(int argc, char** argv)
logmsg << "PID: " << getpid();
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
sampler.SetTransport(transportFactory);
int i = 1;
sampler.SetProperty(FairMQBenchmarkSampler::Id, argv[i]);

View File

@ -10,6 +10,7 @@
#include "FairMQLogger.h"
#include "FairMQBuffer.h"
#include "FairMQTransportFactoryZMQ.h"
FairMQBuffer buffer;
@ -50,6 +51,9 @@ int main(int argc, char** argv)
logmsg << "PID: " << getpid();
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
buffer.SetTransport(transportFactory);
int i = 1;
buffer.SetProperty(FairMQBuffer::Id, argv[i]);

View File

@ -10,6 +10,7 @@
#include "FairMQLogger.h"
#include "FairMQMerger.h"
#include "FairMQTransportFactoryZMQ.h"
FairMQMerger merger;
@ -51,6 +52,9 @@ int main(int argc, char** argv)
logmsg << "PID: " << getpid();
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
merger.SetTransport(transportFactory);
int i = 1;
merger.SetProperty(FairMQMerger::Id, argv[i]);

View File

@ -10,6 +10,7 @@
#include "FairMQLogger.h"
#include "FairMQMerger.h"
#include "FairMQTransportFactoryZMQ.h"
FairMQMerger merger;
@ -53,6 +54,9 @@ int main(int argc, char** argv)
logmsg << "PID: " << getpid();
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
merger.SetTransport(transportFactory);
int i = 1;
merger.SetProperty(FairMQMerger::Id, argv[i]);

View File

@ -10,6 +10,7 @@
#include "FairMQLogger.h"
#include "FairMQSplitter.h"
#include "FairMQTransportFactoryZMQ.h"
FairMQSplitter splitter;
@ -52,6 +53,9 @@ int main(int argc, char** argv)
logmsg << "PID: " << getpid();
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
splitter.SetTransport(transportFactory);
int i = 1;
splitter.SetProperty(FairMQSplitter::Id, argv[i]);

View File

@ -10,6 +10,7 @@
#include "FairMQLogger.h"
#include "FairMQProxy.h"
#include "FairMQTransportFactoryZMQ.h"
FairMQProxy proxy;
@ -50,6 +51,9 @@ int main(int argc, char** argv)
logmsg << "PID: " << getpid();
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
proxy.SetTransport(transportFactory);
int i = 1;
proxy.SetProperty(FairMQProxy::Id, argv[i]);

View File

@ -10,6 +10,7 @@
#include "FairMQLogger.h"
#include "FairMQSink.h"
#include "FairMQTransportFactoryZMQ.h"
FairMQSink sink;
@ -50,6 +51,9 @@ int main(int argc, char** argv)
logmsg << "PID: " << getpid();
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
sink.SetTransport(transportFactory);
int i = 1;
sink.SetProperty(FairMQSink::Id, argv[i]);

View File

@ -10,6 +10,7 @@
#include "FairMQLogger.h"
#include "FairMQSplitter.h"
#include "FairMQTransportFactoryZMQ.h"
FairMQSplitter splitter;
@ -51,6 +52,9 @@ int main(int argc, char** argv)
logmsg << "PID: " << getpid();
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
splitter.SetTransport(transportFactory);
int i = 1;
splitter.SetProperty(FairMQSplitter::Id, argv[i]);