mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-15 09:31:45 +00:00
Add (optional) Google Protocol Buffers support (example in Tutorial 3).
To use protobuf, run cmake as follows: cmake -DUSE_PROTOBUF=1 .. For this, protobuf library has to be installed on the system. Further changes: Clean up splitter/merger: default are N-to-1-merger and 1-to-N-splitter. Fix bug in nanomsg message deallocation. Setup proper buffer sizes for nanomsg/zeromq via cmake/bash script. chmod +x for start scripts.
This commit is contained in:
@@ -5,34 +5,32 @@
|
||||
* @author D. Klein, A. Rybalchenko, N. Winckler
|
||||
*/
|
||||
|
||||
#include <cstring>
|
||||
#include <cstdlib>
|
||||
|
||||
#include "FairMQMessageZMQ.h"
|
||||
#include "FairMQLogger.h"
|
||||
|
||||
|
||||
FairMQMessageZMQ::FairMQMessageZMQ() :
|
||||
fMessage(new zmq_msg_t())
|
||||
FairMQMessageZMQ::FairMQMessageZMQ()
|
||||
{
|
||||
int rc = zmq_msg_init (fMessage);
|
||||
int rc = zmq_msg_init (&fMessage);
|
||||
if (rc != 0) {
|
||||
LOG(ERROR) << "failed initializing message, reason: " << zmq_strerror(errno);
|
||||
}
|
||||
}
|
||||
|
||||
FairMQMessageZMQ::FairMQMessageZMQ(size_t size) :
|
||||
fMessage(new zmq_msg_t())
|
||||
FairMQMessageZMQ::FairMQMessageZMQ(size_t size)
|
||||
{
|
||||
int rc = zmq_msg_init_size (fMessage, size);
|
||||
int rc = zmq_msg_init_size (&fMessage, size);
|
||||
if (rc != 0) {
|
||||
LOG(ERROR) << "failed initializing message with size, reason: " << zmq_strerror(errno);
|
||||
}
|
||||
}
|
||||
|
||||
FairMQMessageZMQ::FairMQMessageZMQ(void* data, size_t size) :
|
||||
fMessage(new zmq_msg_t())
|
||||
FairMQMessageZMQ::FairMQMessageZMQ(void* data, size_t size)
|
||||
{
|
||||
int rc = zmq_msg_init_data (fMessage, data, size, &CleanUp, NULL);
|
||||
int rc = zmq_msg_init_data (&fMessage, data, size, &CleanUp, NULL);
|
||||
if (rc != 0) {
|
||||
LOG(ERROR) << "failed initializing message with data, reason: " << zmq_strerror(errno);
|
||||
}
|
||||
@@ -41,7 +39,7 @@ FairMQMessageZMQ::FairMQMessageZMQ(void* data, size_t size) :
|
||||
void FairMQMessageZMQ::Rebuild()
|
||||
{
|
||||
CloseMessage();
|
||||
int rc = zmq_msg_init (fMessage);
|
||||
int rc = zmq_msg_init (&fMessage);
|
||||
if (rc != 0) {
|
||||
LOG(ERROR) << "failed initializing message, reason: " << zmq_strerror(errno);
|
||||
}
|
||||
@@ -50,7 +48,7 @@ void FairMQMessageZMQ::Rebuild()
|
||||
void FairMQMessageZMQ::Rebuild(size_t size)
|
||||
{
|
||||
CloseMessage();
|
||||
int rc = zmq_msg_init_size (fMessage, size);
|
||||
int rc = zmq_msg_init_size (&fMessage, size);
|
||||
if (rc != 0) {
|
||||
LOG(ERROR) << "failed initializing message with size, reason: " << zmq_strerror(errno);
|
||||
}
|
||||
@@ -59,7 +57,7 @@ void FairMQMessageZMQ::Rebuild(size_t size)
|
||||
void FairMQMessageZMQ::Rebuild(void* data, size_t size)
|
||||
{
|
||||
CloseMessage();
|
||||
int rc = zmq_msg_init_data (fMessage, data, size, &CleanUp, NULL);
|
||||
int rc = zmq_msg_init_data (&fMessage, data, size, &CleanUp, NULL);
|
||||
if (rc != 0) {
|
||||
LOG(ERROR) << "failed initializing message with data, reason: " << zmq_strerror(errno);
|
||||
}
|
||||
@@ -67,17 +65,17 @@ void FairMQMessageZMQ::Rebuild(void* data, size_t size)
|
||||
|
||||
void* FairMQMessageZMQ::GetMessage()
|
||||
{
|
||||
return fMessage;
|
||||
return &fMessage;
|
||||
}
|
||||
|
||||
void* FairMQMessageZMQ::GetData()
|
||||
{
|
||||
return zmq_msg_data (fMessage);
|
||||
return zmq_msg_data (&fMessage);
|
||||
}
|
||||
|
||||
size_t FairMQMessageZMQ::GetSize()
|
||||
{
|
||||
return zmq_msg_size (fMessage);
|
||||
return zmq_msg_size (&fMessage);
|
||||
}
|
||||
|
||||
void FairMQMessageZMQ::SetMessage(void* data, size_t size)
|
||||
@@ -85,21 +83,22 @@ void FairMQMessageZMQ::SetMessage(void* data, size_t size)
|
||||
// dummy method to comply with the interface. functionality not allowed in zeromq.
|
||||
}
|
||||
|
||||
void FairMQMessageZMQ::Copy(FairMQMessage* msg)
|
||||
{
|
||||
CloseMessage();
|
||||
size_t size = msg->GetSize();
|
||||
zmq_msg_init_size(&fMessage, size);
|
||||
std::memcpy(zmq_msg_data(&fMessage), msg->GetData(), size);
|
||||
}
|
||||
|
||||
inline void FairMQMessageZMQ::CloseMessage()
|
||||
{
|
||||
int rc = zmq_msg_close (fMessage);
|
||||
int rc = zmq_msg_close (&fMessage);
|
||||
if (rc != 0) {
|
||||
LOG(ERROR) << "failed closing message, reason: " << zmq_strerror(errno);
|
||||
}
|
||||
}
|
||||
|
||||
void FairMQMessageZMQ::Copy(FairMQMessage* msg)
|
||||
{
|
||||
int rc = zmq_msg_copy (fMessage, (static_cast<FairMQMessageZMQ*>(msg)->fMessage));
|
||||
if (rc != 0) {
|
||||
LOG(ERROR) << "failed copying message, reason: " << zmq_strerror(errno);
|
||||
}
|
||||
}
|
||||
|
||||
void FairMQMessageZMQ::CleanUp(void* data, void* hint)
|
||||
{
|
||||
@@ -108,7 +107,7 @@ void FairMQMessageZMQ::CleanUp(void* data, void* hint)
|
||||
|
||||
FairMQMessageZMQ::~FairMQMessageZMQ()
|
||||
{
|
||||
int rc = zmq_msg_close (fMessage);
|
||||
int rc = zmq_msg_close (&fMessage);
|
||||
if (rc != 0) {
|
||||
LOG(ERROR) << "failed closing message with data, reason: " << zmq_strerror(errno);
|
||||
}
|
||||
|
Reference in New Issue
Block a user