Fix CIDs 10587, 10813, 10911, 10912, 10402, 10403, 10577, 10578, 10579, 10848, 10861, 10865, 10868, 10910.

Move classes inheriting from device to a subdirectory.
Make sure only protobuf library installed by fairsoft is used.
Cleanup FairMQDevice and fix some initialization list warnings.
Loop to duplicate input files in Sampler.
Add some documentation to FairMQ.
This commit is contained in:
Alexey Rybalchenko 2014-07-21 15:15:40 +02:00
parent fe91e5af96
commit 281fcc459c
19 changed files with 183 additions and 159 deletions

View File

@ -7,6 +7,7 @@
################################################################################ ################################################################################
set(INCLUDE_DIRECTORIES set(INCLUDE_DIRECTORIES
${CMAKE_SOURCE_DIR}/fairmq ${CMAKE_SOURCE_DIR}/fairmq
${CMAKE_SOURCE_DIR}/fairmq/devices
${Boost_INCLUDE_DIR} ${Boost_INCLUDE_DIR}
) )
@ -48,12 +49,12 @@ set(SRCS
"FairMQMessage.cxx" "FairMQMessage.cxx"
"FairMQSocket.cxx" "FairMQSocket.cxx"
"FairMQDevice.cxx" "FairMQDevice.cxx"
"FairMQBenchmarkSampler.cxx" "devices/FairMQBenchmarkSampler.cxx"
"FairMQSink.cxx" "devices/FairMQSink.cxx"
"FairMQBuffer.cxx" "devices/FairMQBuffer.cxx"
"FairMQProxy.cxx" "devices/FairMQProxy.cxx"
"FairMQSplitter.cxx" "devices/FairMQSplitter.cxx"
"FairMQMerger.cxx" "devices/FairMQMerger.cxx"
"FairMQPoller.cxx" "FairMQPoller.cxx"
) )
@ -101,7 +102,6 @@ endif(NANOMSG_FOUND)
set(DEPENDENCIES set(DEPENDENCIES
${DEPENDENCIES} ${DEPENDENCIES}
${CMAKE_THREAD_LIBS_INIT}
boost_thread boost_timer boost_system boost_thread boost_timer boost_system
) )

View File

@ -20,6 +20,8 @@
FairMQDevice::FairMQDevice() FairMQDevice::FairMQDevice()
: fNumIoThreads(1) : fNumIoThreads(1)
, fNumInputs(0)
, fNumOutputs(0)
, fPayloadInputs(new vector<FairMQSocket*>()) , fPayloadInputs(new vector<FairMQSocket*>())
, fPayloadOutputs(new vector<FairMQSocket*>()) , fPayloadOutputs(new vector<FairMQSocket*>())
, fLogIntervalInMs(1000) , fLogIntervalInMs(1000)
@ -32,32 +34,22 @@ void FairMQDevice::Init()
LOG(INFO) << ">>>>>>> Init <<<<<<<"; LOG(INFO) << ">>>>>>> Init <<<<<<<";
LOG(INFO) << "numIoThreads: " << fNumIoThreads; LOG(INFO) << "numIoThreads: " << fNumIoThreads;
fInputAddress = new vector<string>(fNumInputs);
fInputMethod = new vector<string>();
fInputSocketType = new vector<string>();
fInputSndBufSize = new vector<int>();
fInputRcvBufSize = new vector<int>();
for (int i = 0; i < fNumInputs; ++i) for (int i = 0; i < fNumInputs; ++i)
{ {
fInputMethod->push_back("connect"); // default value, can be overwritten in configuration fInputAddress.push_back("ipc://default"); // default value, can be overwritten in configuration
fInputSocketType->push_back("sub"); // default value, can be overwritten in configuration fInputMethod.push_back("connect"); // default value, can be overwritten in configuration
fInputSndBufSize->push_back(10000); // default value, can be overwritten in configuration fInputSocketType.push_back("sub"); // default value, can be overwritten in configuration
fInputRcvBufSize->push_back(10000); // default value, can be overwritten in configuration fInputSndBufSize.push_back(10000); // default value, can be overwritten in configuration
fInputRcvBufSize.push_back(10000); // default value, can be overwritten in configuration
} }
fOutputAddress = new vector<string>(fNumOutputs);
fOutputMethod = new vector<string>();
fOutputSocketType = new vector<string>();
fOutputSndBufSize = new vector<int>();
fOutputRcvBufSize = new vector<int>();
for (int i = 0; i < fNumOutputs; ++i) for (int i = 0; i < fNumOutputs; ++i)
{ {
fOutputMethod->push_back("bind"); // default value, can be overwritten in configuration fOutputAddress.push_back("ipc://default"); // default value, can be overwritten in configuration
fOutputSocketType->push_back("pub"); // default value, can be overwritten in configuration fOutputMethod.push_back("bind"); // default value, can be overwritten in configuration
fOutputSndBufSize->push_back(10000); // default value, can be overwritten in configuration fOutputSocketType.push_back("pub"); // default value, can be overwritten in configuration
fOutputRcvBufSize->push_back(10000); // default value, can be overwritten in configuration fOutputSndBufSize.push_back(10000); // default value, can be overwritten in configuration
fOutputRcvBufSize.push_back(10000); // default value, can be overwritten in configuration
} }
} }
@ -67,26 +59,27 @@ void FairMQDevice::InitInput()
for (int i = 0; i < fNumInputs; ++i) for (int i = 0; i < fNumInputs; ++i)
{ {
FairMQSocket* socket = fTransportFactory->CreateSocket(fInputSocketType->at(i), i, fNumIoThreads); FairMQSocket* socket = fTransportFactory->CreateSocket(fInputSocketType.at(i), i, fNumIoThreads);
socket->SetOption("snd-hwm", &fInputSndBufSize->at(i), sizeof(fInputSndBufSize->at(i))); socket->SetOption("snd-hwm", &fInputSndBufSize.at(i), sizeof(fInputSndBufSize.at(i)));
socket->SetOption("rcv-hwm", &fInputRcvBufSize->at(i), sizeof(fInputRcvBufSize->at(i))); socket->SetOption("rcv-hwm", &fInputRcvBufSize.at(i), sizeof(fInputRcvBufSize.at(i)));
fPayloadInputs->push_back(socket); fPayloadInputs->push_back(socket);
try try
{ {
if (fInputMethod->at(i) == "bind") if (fInputMethod.at(i) == "bind")
{ {
fPayloadInputs->at(i)->Bind(fInputAddress->at(i)); fPayloadInputs->at(i)->Bind(fInputAddress.at(i));
} }
else else
{ {
fPayloadInputs->at(i)->Connect(fInputAddress->at(i)); fPayloadInputs->at(i)->Connect(fInputAddress.at(i));
} }
} }
catch (std::out_of_range& e) catch (std::out_of_range& e)
{ {
LOG(ERROR) << e.what();
} }
} }
} }
@ -97,26 +90,27 @@ void FairMQDevice::InitOutput()
for (int i = 0; i < fNumOutputs; ++i) for (int i = 0; i < fNumOutputs; ++i)
{ {
FairMQSocket* socket = fTransportFactory->CreateSocket(fOutputSocketType->at(i), i, fNumIoThreads); FairMQSocket* socket = fTransportFactory->CreateSocket(fOutputSocketType.at(i), i, fNumIoThreads);
socket->SetOption("snd-hwm", &fOutputSndBufSize->at(i), sizeof(fOutputSndBufSize->at(i))); socket->SetOption("snd-hwm", &fOutputSndBufSize.at(i), sizeof(fOutputSndBufSize.at(i)));
socket->SetOption("rcv-hwm", &fOutputRcvBufSize->at(i), sizeof(fOutputRcvBufSize->at(i))); socket->SetOption("rcv-hwm", &fOutputRcvBufSize.at(i), sizeof(fOutputRcvBufSize.at(i)));
fPayloadOutputs->push_back(socket); fPayloadOutputs->push_back(socket);
try try
{ {
if (fOutputMethod->at(i) == "bind") if (fOutputMethod.at(i) == "bind")
{ {
fPayloadOutputs->at(i)->Bind(fOutputAddress->at(i)); fPayloadOutputs->at(i)->Bind(fOutputAddress.at(i));
} }
else else
{ {
fPayloadOutputs->at(i)->Connect(fOutputAddress->at(i)); fPayloadOutputs->at(i)->Connect(fOutputAddress.at(i));
} }
} }
catch (std::out_of_range& e) catch (std::out_of_range& e)
{ {
LOG(ERROR) << e.what();
} }
} }
} }
@ -138,28 +132,28 @@ void FairMQDevice::SetProperty(const int key, const string& value, const int slo
fId = value; fId = value;
break; break;
case InputAddress: case InputAddress:
fInputAddress->erase(fInputAddress->begin() + slot); fInputAddress.erase(fInputAddress.begin() + slot);
fInputAddress->insert(fInputAddress->begin() + slot, value); fInputAddress.insert(fInputAddress.begin() + slot, value);
break; break;
case OutputAddress: case OutputAddress:
fOutputAddress->erase(fOutputAddress->begin() + slot); fOutputAddress.erase(fOutputAddress.begin() + slot);
fOutputAddress->insert(fOutputAddress->begin() + slot, value); fOutputAddress.insert(fOutputAddress.begin() + slot, value);
break; break;
case InputMethod: case InputMethod:
fInputMethod->erase(fInputMethod->begin() + slot); fInputMethod.erase(fInputMethod.begin() + slot);
fInputMethod->insert(fInputMethod->begin() + slot, value); fInputMethod.insert(fInputMethod.begin() + slot, value);
break; break;
case OutputMethod: case OutputMethod:
fOutputMethod->erase(fOutputMethod->begin() + slot); fOutputMethod.erase(fOutputMethod.begin() + slot);
fOutputMethod->insert(fOutputMethod->begin() + slot, value); fOutputMethod.insert(fOutputMethod.begin() + slot, value);
break; break;
case InputSocketType: case InputSocketType:
fInputSocketType->erase(fInputSocketType->begin() + slot); fInputSocketType.erase(fInputSocketType.begin() + slot);
fInputSocketType->insert(fInputSocketType->begin() + slot, value); fInputSocketType.insert(fInputSocketType.begin() + slot, value);
break; break;
case OutputSocketType: case OutputSocketType:
fOutputSocketType->erase(fOutputSocketType->begin() + slot); fOutputSocketType.erase(fOutputSocketType.begin() + slot);
fOutputSocketType->insert(fOutputSocketType->begin() + slot, value); fOutputSocketType.insert(fOutputSocketType.begin() + slot, value);
break; break;
default: default:
FairMQConfigurable::SetProperty(key, value, slot); FairMQConfigurable::SetProperty(key, value, slot);
@ -185,20 +179,20 @@ void FairMQDevice::SetProperty(const int key, const int value, const int slot /*
fLogIntervalInMs = value; fLogIntervalInMs = value;
break; break;
case InputSndBufSize: case InputSndBufSize:
fInputSndBufSize->erase(fInputSndBufSize->begin() + slot); fInputSndBufSize.erase(fInputSndBufSize.begin() + slot);
fInputSndBufSize->insert(fInputSndBufSize->begin() + slot, value); fInputSndBufSize.insert(fInputSndBufSize.begin() + slot, value);
break; break;
case InputRcvBufSize: case InputRcvBufSize:
fInputRcvBufSize->erase(fInputRcvBufSize->begin() + slot); fInputRcvBufSize.erase(fInputRcvBufSize.begin() + slot);
fInputRcvBufSize->insert(fInputRcvBufSize->begin() + slot, value); fInputRcvBufSize.insert(fInputRcvBufSize.begin() + slot, value);
break; break;
case OutputSndBufSize: case OutputSndBufSize:
fOutputSndBufSize->erase(fOutputSndBufSize->begin() + slot); fOutputSndBufSize.erase(fOutputSndBufSize.begin() + slot);
fOutputSndBufSize->insert(fOutputSndBufSize->begin() + slot, value); fOutputSndBufSize.insert(fOutputSndBufSize.begin() + slot, value);
break; break;
case OutputRcvBufSize: case OutputRcvBufSize:
fOutputRcvBufSize->erase(fOutputRcvBufSize->begin() + slot); fOutputRcvBufSize.erase(fOutputRcvBufSize.begin() + slot);
fOutputRcvBufSize->insert(fOutputRcvBufSize->begin() + slot, value); fOutputRcvBufSize.insert(fOutputRcvBufSize.begin() + slot, value);
break; break;
default: default:
FairMQConfigurable::SetProperty(key, value, slot); FairMQConfigurable::SetProperty(key, value, slot);
@ -214,17 +208,17 @@ string FairMQDevice::GetProperty(const int key, const string& default_ /*= ""*/,
case Id: case Id:
return fId; return fId;
case InputAddress: case InputAddress:
return fInputAddress->at(slot); return fInputAddress.at(slot);
case OutputAddress: case OutputAddress:
return fOutputAddress->at(slot); return fOutputAddress.at(slot);
case InputMethod: case InputMethod:
return fInputMethod->at(slot); return fInputMethod.at(slot);
case OutputMethod: case OutputMethod:
return fOutputMethod->at(slot); return fOutputMethod.at(slot);
case InputSocketType: case InputSocketType:
return fInputSocketType->at(slot); return fInputSocketType.at(slot);
case OutputSocketType: case OutputSocketType:
return fOutputSocketType->at(slot); return fOutputSocketType.at(slot);
default: default:
return FairMQConfigurable::GetProperty(key, default_, slot); return FairMQConfigurable::GetProperty(key, default_, slot);
} }
@ -240,13 +234,13 @@ int FairMQDevice::GetProperty(const int key, const int default_ /*= 0*/, const i
case LogIntervalInMs: case LogIntervalInMs:
return fLogIntervalInMs; return fLogIntervalInMs;
case InputSndBufSize: case InputSndBufSize:
return fInputSndBufSize->at(slot); return fInputSndBufSize.at(slot);
case InputRcvBufSize: case InputRcvBufSize:
return fInputRcvBufSize->at(slot); return fInputRcvBufSize.at(slot);
case OutputSndBufSize: case OutputSndBufSize:
return fOutputSndBufSize->at(slot); return fOutputSndBufSize.at(slot);
case OutputRcvBufSize: case OutputRcvBufSize:
return fOutputRcvBufSize->at(slot); return fOutputRcvBufSize.at(slot);
default: default:
return FairMQConfigurable::GetProperty(key, default_, slot); return FairMQConfigurable::GetProperty(key, default_, slot);
} }
@ -264,20 +258,20 @@ void FairMQDevice::LogSocketRates()
timestamp_t timeSinceLastLog_ms; timestamp_t timeSinceLastLog_ms;
unsigned long* bytesInput = new unsigned long[fNumInputs]; vector<unsigned long> bytesInput(fNumInputs);
unsigned long* messagesInput = new unsigned long[fNumInputs]; vector<unsigned long> messagesInput(fNumInputs);
unsigned long* bytesOutput = new unsigned long[fNumOutputs]; vector<unsigned long> bytesOutput(fNumOutputs);
unsigned long* messagesOutput = new unsigned long[fNumOutputs]; vector<unsigned long> messagesOutput(fNumOutputs);
unsigned long* bytesInputNew = new unsigned long[fNumInputs]; vector<unsigned long> bytesInputNew(fNumInputs);
unsigned long* messagesInputNew = new unsigned long[fNumInputs]; vector<unsigned long> messagesInputNew(fNumInputs);
unsigned long* bytesOutputNew = new unsigned long[fNumOutputs]; vector<unsigned long> bytesOutputNew(fNumOutputs);
unsigned long* messagesOutputNew = new unsigned long[fNumOutputs]; vector<unsigned long> messagesOutputNew(fNumOutputs);
double* megabytesPerSecondInput = new double[fNumInputs]; vector<double> megabytesPerSecondInput(fNumInputs);
double* messagesPerSecondInput = new double[fNumInputs]; vector<double> messagesPerSecondInput(fNumInputs);
double* megabytesPerSecondOutput = new double[fNumOutputs]; vector<double> megabytesPerSecondOutput(fNumOutputs);
double* messagesPerSecondOutput = new double[fNumOutputs]; vector<double> messagesPerSecondOutput(fNumOutputs);
// Temp stuff for process termination // Temp stuff for process termination
// bool receivedSomething = false; // bool receivedSomething = false;
@ -289,16 +283,16 @@ void FairMQDevice::LogSocketRates()
int i = 0; int i = 0;
for (vector<FairMQSocket*>::iterator itr = fPayloadInputs->begin(); itr != fPayloadInputs->end(); itr++) for (vector<FairMQSocket*>::iterator itr = fPayloadInputs->begin(); itr != fPayloadInputs->end(); itr++)
{ {
bytesInput[i] = (*itr)->GetBytesRx(); bytesInput.at(i) = (*itr)->GetBytesRx();
messagesInput[i] = (*itr)->GetMessagesRx(); messagesInput.at(i) = (*itr)->GetMessagesRx();
++i; ++i;
} }
i = 0; i = 0;
for (vector<FairMQSocket*>::iterator itr = fPayloadOutputs->begin(); itr != fPayloadOutputs->end(); itr++) for (vector<FairMQSocket*>::iterator itr = fPayloadOutputs->begin(); itr != fPayloadOutputs->end(); itr++)
{ {
bytesOutput[i] = (*itr)->GetBytesTx(); bytesOutput.at(i) = (*itr)->GetBytesTx();
messagesOutput[i] = (*itr)->GetMessagesTx(); messagesOutput.at(i) = (*itr)->GetMessagesTx();
++i; ++i;
} }
@ -316,20 +310,20 @@ void FairMQDevice::LogSocketRates()
for (vector<FairMQSocket*>::iterator itr = fPayloadInputs->begin(); itr != fPayloadInputs->end(); itr++) for (vector<FairMQSocket*>::iterator itr = fPayloadInputs->begin(); itr != fPayloadInputs->end(); itr++)
{ {
bytesInputNew[i] = (*itr)->GetBytesRx(); bytesInputNew.at(i) = (*itr)->GetBytesRx();
megabytesPerSecondInput[i] = ((double)(bytesInputNew[i] - bytesInput[i]) / (1024. * 1024.)) / (double)timeSinceLastLog_ms * 1000.; megabytesPerSecondInput.at(i) = ((double)(bytesInputNew.at(i) - bytesInput.at(i)) / (1024. * 1024.)) / (double)timeSinceLastLog_ms * 1000.;
bytesInput[i] = bytesInputNew[i]; bytesInput.at(i) = bytesInputNew.at(i);
messagesInputNew[i] = (*itr)->GetMessagesRx(); messagesInputNew.at(i) = (*itr)->GetMessagesRx();
messagesPerSecondInput[i] = (double)(messagesInputNew[i] - messagesInput[i]) / (double)timeSinceLastLog_ms * 1000.; messagesPerSecondInput.at(i) = (double)(messagesInputNew.at(i) - messagesInput.at(i)) / (double)timeSinceLastLog_ms * 1000.;
messagesInput[i] = messagesInputNew[i]; messagesInput.at(i) = messagesInputNew.at(i);
LOG(DEBUG) << "#" << fId << "." << (*itr)->GetId() << ": " << messagesPerSecondInput[i] << " msg/s, " << megabytesPerSecondInput[i] << " MB/s"; LOG(DEBUG) << "#" << fId << "." << (*itr)->GetId() << ": " << messagesPerSecondInput.at(i) << " msg/s, " << megabytesPerSecondInput.at(i) << " MB/s";
// Temp stuff for process termination // Temp stuff for process termination
// if ( !receivedSomething && messagesPerSecondInput[i] > 0 ) { // if ( !receivedSomething && messagesPerSecondInput.at(i) > 0 ) {
// receivedSomething = true; // receivedSomething = true;
// } // }
// if ( receivedSomething && messagesPerSecondInput[i] == 0 ) { // if ( receivedSomething && messagesPerSecondInput.at(i) == 0 ) {
// cout << "Did not receive anything on socket " << i << " for " << didNotReceiveFor++ << " seconds." << endl; // cout << "Did not receive anything on socket " << i << " for " << didNotReceiveFor++ << " seconds." << endl;
// } else { // } else {
// didNotReceiveFor = 0; // didNotReceiveFor = 0;
@ -343,21 +337,21 @@ void FairMQDevice::LogSocketRates()
for (vector<FairMQSocket*>::iterator itr = fPayloadOutputs->begin(); itr != fPayloadOutputs->end(); itr++) for (vector<FairMQSocket*>::iterator itr = fPayloadOutputs->begin(); itr != fPayloadOutputs->end(); itr++)
{ {
bytesOutputNew[i] = (*itr)->GetBytesTx(); bytesOutputNew.at(i) = (*itr)->GetBytesTx();
megabytesPerSecondOutput[i] = ((double)(bytesOutputNew[i] - bytesOutput[i]) / (1024. * 1024.)) / (double)timeSinceLastLog_ms * 1000.; megabytesPerSecondOutput.at(i) = ((double)(bytesOutputNew.at(i) - bytesOutput.at(i)) / (1024. * 1024.)) / (double)timeSinceLastLog_ms * 1000.;
bytesOutput[i] = bytesOutputNew[i]; bytesOutput.at(i) = bytesOutputNew.at(i);
messagesOutputNew[i] = (*itr)->GetMessagesTx(); messagesOutputNew.at(i) = (*itr)->GetMessagesTx();
messagesPerSecondOutput[i] = (double)(messagesOutputNew[i] - messagesOutput[i]) / (double)timeSinceLastLog_ms * 1000.; messagesPerSecondOutput.at(i) = (double)(messagesOutputNew.at(i) - messagesOutput.at(i)) / (double)timeSinceLastLog_ms * 1000.;
messagesOutput[i] = messagesOutputNew[i]; messagesOutput.at(i) = messagesOutputNew.at(i);
LOG(DEBUG) << "#" << fId << "." << (*itr)->GetId() << ": " << messagesPerSecondOutput[i] << " msg/s, " << megabytesPerSecondOutput[i] LOG(DEBUG) << "#" << fId << "." << (*itr)->GetId() << ": " << messagesPerSecondOutput.at(i) << " msg/s, " << megabytesPerSecondOutput.at(i)
<< " MB/s"; << " MB/s";
// Temp stuff for process termination // Temp stuff for process termination
// if ( !sentSomething && messagesPerSecondOutput[i] > 0 ) { // if ( !sentSomething && messagesPerSecondOutput.at(i) > 0 ) {
// sentSomething = true; // sentSomething = true;
// } // }
// if ( sentSomething && messagesPerSecondOutput[i] == 0 ) { // if ( sentSomething && messagesPerSecondOutput.at(i) == 0 ) {
// cout << "Did not send anything on socket " << i << " for " << didNotSendFor++ << " seconds." << endl; // cout << "Did not send anything on socket " << i << " for " << didNotSendFor++ << " seconds." << endl;
// } else { // } else {
// didNotSendFor = 0; // didNotSendFor = 0;
@ -388,21 +382,6 @@ void FairMQDevice::LogSocketRates()
} }
} }
delete[] bytesInput;
delete[] messagesInput;
delete[] bytesOutput;
delete[] messagesOutput;
delete[] bytesInputNew;
delete[] messagesInputNew;
delete[] bytesOutputNew;
delete[] messagesOutputNew;
delete[] megabytesPerSecondInput;
delete[] messagesPerSecondInput;
delete[] megabytesPerSecondOutput;
delete[] messagesPerSecondOutput;
LOG(INFO) << ">>>>>>> stopping rateLogger <<<<<<<"; LOG(INFO) << ">>>>>>> stopping rateLogger <<<<<<<";
} }
@ -423,9 +402,6 @@ void FairMQDevice::Shutdown()
{ {
(*itr)->Close(); (*itr)->Close();
} }
// LOG(INFO) << ">>>>>>> closing context <<<<<<<";
// fPayloadContext->Close();
} }
FairMQDevice::~FairMQDevice() FairMQDevice::~FairMQDevice()
@ -440,8 +416,6 @@ FairMQDevice::~FairMQDevice()
delete (*itr); delete (*itr);
} }
delete fInputAddress;
delete fOutputAddress;
delete fPayloadInputs; delete fPayloadInputs;
delete fPayloadOutputs; delete fPayloadOutputs;
} }

View File

@ -69,28 +69,29 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable
protected: protected:
string fId; string fId;
int fNumIoThreads; int fNumIoThreads;
FairMQTransportFactory* fTransportFactory;
int fNumInputs; int fNumInputs;
int fNumOutputs; int fNumOutputs;
vector<string>* fInputAddress; vector<string> fInputAddress;
vector<string>* fInputMethod; vector<string> fInputMethod;
vector<string>* fInputSocketType; vector<string> fInputSocketType;
vector<int>* fInputSndBufSize; vector<int> fInputSndBufSize;
vector<int>* fInputRcvBufSize; vector<int> fInputRcvBufSize;
vector<string>* fOutputAddress; vector<string> fOutputAddress;
vector<string>* fOutputMethod; vector<string> fOutputMethod;
vector<string>* fOutputSocketType; vector<string> fOutputSocketType;
vector<int>* fOutputSndBufSize; vector<int> fOutputSndBufSize;
vector<int>* fOutputRcvBufSize; vector<int> fOutputRcvBufSize;
vector<FairMQSocket*>* fPayloadInputs; vector<FairMQSocket*>* fPayloadInputs;
vector<FairMQSocket*>* fPayloadOutputs; vector<FairMQSocket*>* fPayloadOutputs;
int fLogIntervalInMs; int fLogIntervalInMs;
FairMQTransportFactory* fTransportFactory;
virtual void Init(); virtual void Init();
virtual void Run(); virtual void Run();
virtual void Pause(); virtual void Pause();

View File

@ -36,7 +36,7 @@ std::ostringstream& FairMQLogger::Log(int type)
timestamp_t ms = tm / 1000.0L; timestamp_t ms = tm / 1000.0L;
timestamp_t s = ms / 1000.0L; timestamp_t s = ms / 1000.0L;
std::time_t t = s; std::time_t t = s;
std::size_t fractional_seconds = ms % 1000; // std::size_t fractional_seconds = ms % 1000;
char mbstr[100]; char mbstr[100];
std::strftime(mbstr, 100, "%H:%M:%S", std::localtime(&t)); std::strftime(mbstr, 100, "%H:%M:%S", std::localtime(&t));

View File

@ -1,9 +1,35 @@
fairmq # FairMQ
========
The standard FairRoot is running all the different analysis tasks within one process. The FairMQ ([Message Queue](http://en.wikipedia.org/wiki/Message_queue)) allows starting tasks on different processes and provides the communication layer between these processes. The standard FairRoot is running all the different analysis tasks within one process. The FairMQ ([Message Queue](http://en.wikipedia.org/wiki/Message_queue)) allows starting tasks on different processes and provides the communication layer between these processes.
The underlying communication layer in the FairMQ is now provided by: ## Devices
The components encapsulating the tasks are called **devices** and derive from the common base class `FairMQDevice`. FairMQ provides ready to use devices to organize the dataflow between the components (without touching the contents of a message), providing functionality like merging and splitting of the data stream (see subdirectory `devices`).
A number of devices to handle the data from the Tutorial3 detector of FairRoot are provided as an example and can be found in `FairRoot/base/MQ` directory. The implementation of the tasks run by these devices can be found `FairRoot/example/Tutorial3`. The implementation includes sending raw binary data as well as serializing the data with either [Boost Serialization](www.boost.org/doc/libs/release/libs/serialization/), [Google Protocol Buffers](https://developers.google.com/protocol-buffers/) or [Root TMessage](http://root.cern.ch/root/html/TMessage.html). Following the examples you can implement your own devices to transport arbitrary data.
## Topology
Devices are arranged into **topologies** where each device has a defined number of data inputs and outputs.
Example of a simple FairMQ topology:
![example of FairMQ topology](../docs/images/fairmq-example-topology.png?raw=true "Example of possible FairMQ topology")
Topology configuration is currently happening via setup scripts. This is very rudimentary and a much more flexible system is now in development. For now, example setup scripts can be found in directory `FairRoot/example/Tutorial3/` along with some additional documentation.
## Messages
Devices transport data between each other in form of `FairMQMessage`s. These can be filled with arbitrary content and transport either raw data or serialized data as described above. Message can be initialized in three different ways:
- **with no parameters**: This is usefull for receiving a message, since neither size nor contents are yet known.
- **given message size**: Initialize message body with a size and fill the contents later, either with `memcpy` or by writing directly into message memory.
- **given message size and buffer**: initialize the message given an existing buffer. This is a zero-copy operation.
After sending the message, the queueing system takes over control over the message body and will free it with `free()` after it is no longer used. A callback can be given to the message object, to be called instead of the destruction with `free()`.
## Transport Interface
The communication layer is available through an interface. Two interface implementations are currently available. Main implementation uses the [ZeroMQ](http://zeromq.org) library. Alternative implementation relies on the [nanomsg](http://nanomsg.org) library. Here is an overview to give an idea how interface is implemented:
![FairMQ transport interface](../docs/images/fairmq-transport-interface.png?raw=true "FairMQ transport interface")
- [ZeroMQ](http://zeromq.org);
- [NanoMSG](http://nanomsg.org).

View File

@ -66,11 +66,14 @@ void FairMQBenchmarkSampler::Run()
delete base_msg; delete base_msg;
try {
rateLogger.interrupt(); rateLogger.interrupt();
resetEventCounter.interrupt(); resetEventCounter.interrupt();
rateLogger.join(); rateLogger.join();
resetEventCounter.join(); resetEventCounter.join();
} catch(boost::thread_resource_error& e) {
LOG(ERROR) << e.what();
}
} }
void FairMQBenchmarkSampler::ResetEventCounter() void FairMQBenchmarkSampler::ResetEventCounter()

View File

@ -46,8 +46,12 @@ void FairMQBuffer::Run()
delete msg; delete msg;
} }
try {
rateLogger.interrupt(); rateLogger.interrupt();
rateLogger.join(); rateLogger.join();
} catch(boost::thread_resource_error& e) {
LOG(ERROR) << e.what();
}
} }
FairMQBuffer::~FairMQBuffer() FairMQBuffer::~FairMQBuffer()

View File

@ -61,6 +61,10 @@ void FairMQMerger::Run()
delete poller; delete poller;
try {
rateLogger.interrupt(); rateLogger.interrupt();
rateLogger.join(); rateLogger.join();
} catch(boost::thread_resource_error& e) {
LOG(ERROR) << e.what();
}
} }

View File

@ -48,6 +48,10 @@ void FairMQProxy::Run()
delete msg; delete msg;
try {
rateLogger.interrupt(); rateLogger.interrupt();
rateLogger.join(); rateLogger.join();
} catch(boost::thread_resource_error& e) {
LOG(ERROR) << e.what();
}
} }

View File

@ -39,8 +39,12 @@ void FairMQSink::Run()
delete msg; delete msg;
} }
try {
rateLogger.interrupt(); rateLogger.interrupt();
rateLogger.join(); rateLogger.join();
} catch(boost::thread_resource_error& e) {
LOG(ERROR) << e.what();
}
} }
FairMQSink::~FairMQSink() FairMQSink::~FairMQSink()

View File

@ -55,6 +55,10 @@ void FairMQSplitter::Run()
delete msg; delete msg;
} }
try {
rateLogger.interrupt(); rateLogger.interrupt();
rateLogger.join(); rateLogger.join();
} catch(boost::thread_resource_error& e) {
LOG(ERROR) << e.what();
}
} }

View File

@ -16,7 +16,7 @@
FairMQTransportFactoryNN::FairMQTransportFactoryNN() FairMQTransportFactoryNN::FairMQTransportFactoryNN()
{ {
LOG(INFO) << "Using nanonsg library"; LOG(INFO) << "Using nanomsg library";
} }
FairMQMessage* FairMQTransportFactoryNN::CreateMessage() FairMQMessage* FairMQTransportFactoryNN::CreateMessage()

View File

@ -35,8 +35,8 @@ void FairMQBinSink::Run()
fPayloadInputs->at(0)->Receive(msg); fPayloadInputs->at(0)->Receive(msg);
int inputSize = msg->GetSize(); int inputSize = msg->GetSize();
int numInput = inputSize / sizeof(Content); // int numInput = inputSize / sizeof(Content);
Content* input = reinterpret_cast<Content*>(msg->GetData()); // Content* input = reinterpret_cast<Content*>(msg->GetData());
// for (int i = 0; i < numInput; ++i) { // for (int i = 0; i < numInput; ++i) {
// LOG(INFO) << (&input[i])->x << " " << (&input[i])->y << " " << (&input[i])->z << " " << (&input[i])->a << " " << (&input[i])->b; // LOG(INFO) << (&input[i])->x << " " << (&input[i])->y << " " << (&input[i])->z << " " << (&input[i])->a << " " << (&input[i])->b;