Extend the FairMQ transport interface by allowing the user of the FairMQMessage class to define his own deallocation function.

This function will be called when the transport machanism no longer needs the data.

Use this extension with the Protobuf data format, to enable more efficient transport, avoiding memcpy.
This commit is contained in:
Alexey Rybalchenko 2014-06-12 12:47:49 +02:00
parent 865c0e010f
commit 699671a0f1
10 changed files with 38 additions and 18 deletions

View File

@ -17,12 +17,14 @@
#include <cstddef> // for size_t #include <cstddef> // for size_t
typedef void (fairmq_free_fn) (void *data, void *hint);
class FairMQMessage class FairMQMessage
{ {
public: public:
virtual void Rebuild() = 0; virtual void Rebuild() = 0;
virtual void Rebuild(size_t size) = 0; virtual void Rebuild(size_t size) = 0;
virtual void Rebuild(void* data, size_t size) = 0; virtual void Rebuild(void* data, size_t size, fairmq_free_fn *ffn = NULL, void* hint = NULL) = 0;
virtual void* GetMessage() = 0; virtual void* GetMessage() = 0;
virtual void* GetData() = 0; virtual void* GetData() = 0;

View File

@ -29,7 +29,7 @@ class FairMQTransportFactory
public: public:
virtual FairMQMessage* CreateMessage() = 0; virtual FairMQMessage* CreateMessage() = 0;
virtual FairMQMessage* CreateMessage(size_t size) = 0; virtual FairMQMessage* CreateMessage(size_t size) = 0;
virtual FairMQMessage* CreateMessage(void* data, size_t size) = 0; virtual FairMQMessage* CreateMessage(void* data, size_t size, fairmq_free_fn *ffn = NULL, void* hint = NULL) = 0;
virtual FairMQSocket* CreateSocket(const string& type, int num, int numIoThreads) = 0; virtual FairMQSocket* CreateSocket(const string& type, int num, int numIoThreads) = 0;
virtual FairMQPoller* CreatePoller(const vector<FairMQSocket*>& inputs) = 0; virtual FairMQPoller* CreatePoller(const vector<FairMQSocket*>& inputs) = 0;

View File

@ -37,7 +37,7 @@ FairMQMessageNN::FairMQMessageNN(size_t size)
fReceiving = false; fReceiving = false;
} }
FairMQMessageNN::FairMQMessageNN(void* data, size_t size) FairMQMessageNN::FairMQMessageNN(void* data, size_t size, fairmq_free_fn *ffn, void* hint)
{ {
fMessage = nn_allocmsg(size, 0); fMessage = nn_allocmsg(size, 0);
if (!fMessage) if (!fMessage)
@ -47,6 +47,15 @@ FairMQMessageNN::FairMQMessageNN(void* data, size_t size)
memcpy(fMessage, data, size); memcpy(fMessage, data, size);
fSize = size; fSize = size;
fReceiving = false; fReceiving = false;
if(ffn)
{
ffn(data, hint);
}
else
{
if(data) free(data);
}
} }
void FairMQMessageNN::Rebuild() void FairMQMessageNN::Rebuild()
@ -69,7 +78,7 @@ void FairMQMessageNN::Rebuild(size_t size)
fReceiving = false; fReceiving = false;
} }
void FairMQMessageNN::Rebuild(void* data, size_t size) void FairMQMessageNN::Rebuild(void* data, size_t size, fairmq_free_fn *ffn, void* hint)
{ {
Clear(); Clear();
fMessage = nn_allocmsg(size, 0); fMessage = nn_allocmsg(size, 0);
@ -80,6 +89,15 @@ void FairMQMessageNN::Rebuild(void* data, size_t size)
memcpy(fMessage, data, size); memcpy(fMessage, data, size);
fSize = size; fSize = size;
fReceiving = false; fReceiving = false;
if(ffn)
{
ffn(data, hint);
}
else
{
if(data) free(data);
}
} }
void* FairMQMessageNN::GetMessage() void* FairMQMessageNN::GetMessage()

View File

@ -24,11 +24,11 @@ class FairMQMessageNN : public FairMQMessage
public: public:
FairMQMessageNN(); FairMQMessageNN();
FairMQMessageNN(size_t size); FairMQMessageNN(size_t size);
FairMQMessageNN(void* data, size_t size); FairMQMessageNN(void* data, size_t size, fairmq_free_fn *ffn = NULL, void* hint = NULL);
virtual void Rebuild(); virtual void Rebuild();
virtual void Rebuild(size_t size); virtual void Rebuild(size_t size);
virtual void Rebuild(void* data, size_t site); virtual void Rebuild(void* data, size_t size, fairmq_free_fn *ffn = NULL, void* hint = NULL);
virtual void* GetMessage(); virtual void* GetMessage();
virtual void* GetData(); virtual void* GetData();

View File

@ -29,9 +29,9 @@ FairMQMessage* FairMQTransportFactoryNN::CreateMessage(size_t size)
return new FairMQMessageNN(size); return new FairMQMessageNN(size);
} }
FairMQMessage* FairMQTransportFactoryNN::CreateMessage(void* data, size_t size) FairMQMessage* FairMQTransportFactoryNN::CreateMessage(void* data, size_t size, fairmq_free_fn *ffn, void* hint)
{ {
return new FairMQMessageNN(data, size); return new FairMQMessageNN(data, size, ffn, hint);
} }
FairMQSocket* FairMQTransportFactoryNN::CreateSocket(const string& type, int num, int numIoThreads) FairMQSocket* FairMQTransportFactoryNN::CreateSocket(const string& type, int num, int numIoThreads)

View File

@ -29,7 +29,7 @@ class FairMQTransportFactoryNN : public FairMQTransportFactory
virtual FairMQMessage* CreateMessage(); virtual FairMQMessage* CreateMessage();
virtual FairMQMessage* CreateMessage(size_t size); virtual FairMQMessage* CreateMessage(size_t size);
virtual FairMQMessage* CreateMessage(void* data, size_t size); virtual FairMQMessage* CreateMessage(void* data, size_t size, fairmq_free_fn *ffn = NULL, void* hint = NULL);
virtual FairMQSocket* CreateSocket(const string& type, int num, int numIoThreads); virtual FairMQSocket* CreateSocket(const string& type, int num, int numIoThreads);
virtual FairMQPoller* CreatePoller(const vector<FairMQSocket*>& inputs); virtual FairMQPoller* CreatePoller(const vector<FairMQSocket*>& inputs);

View File

@ -36,9 +36,9 @@ FairMQMessageZMQ::FairMQMessageZMQ(size_t size)
} }
} }
FairMQMessageZMQ::FairMQMessageZMQ(void* data, size_t size) FairMQMessageZMQ::FairMQMessageZMQ(void* data, size_t size, fairmq_free_fn *ffn, void* hint)
{ {
int rc = zmq_msg_init_data(&fMessage, data, size, &CleanUp, NULL); // TODO: expose the cleanup function part in the interface? int rc = zmq_msg_init_data(&fMessage, data, size, ffn ? ffn : &CleanUp, hint);
if (rc != 0) if (rc != 0)
{ {
LOG(ERROR) << "failed initializing message with data, reason: " << zmq_strerror(errno); LOG(ERROR) << "failed initializing message with data, reason: " << zmq_strerror(errno);
@ -65,10 +65,10 @@ void FairMQMessageZMQ::Rebuild(size_t size)
} }
} }
void FairMQMessageZMQ::Rebuild(void* data, size_t size) void FairMQMessageZMQ::Rebuild(void* data, size_t size, fairmq_free_fn *ffn, void* hint)
{ {
CloseMessage(); CloseMessage();
int rc = zmq_msg_init_data(&fMessage, data, size, &CleanUp, NULL); // TODO: expose the cleanup function part in the interface? int rc = zmq_msg_init_data(&fMessage, data, size, ffn ? ffn : &CleanUp, hint);
if (rc != 0) if (rc != 0)
{ {
LOG(ERROR) << "failed initializing message with data, reason: " << zmq_strerror(errno); LOG(ERROR) << "failed initializing message with data, reason: " << zmq_strerror(errno);

View File

@ -26,11 +26,11 @@ class FairMQMessageZMQ : public FairMQMessage
public: public:
FairMQMessageZMQ(); FairMQMessageZMQ();
FairMQMessageZMQ(size_t size); FairMQMessageZMQ(size_t size);
FairMQMessageZMQ(void* data, size_t size); FairMQMessageZMQ(void* data, size_t size, fairmq_free_fn *ffn = &CleanUp, void* hint = NULL);
virtual void Rebuild(); virtual void Rebuild();
virtual void Rebuild(size_t size); virtual void Rebuild(size_t size);
virtual void Rebuild(void* data, size_t size); virtual void Rebuild(void* data, size_t size, fairmq_free_fn *ffn = &CleanUp, void* hint = NULL);
virtual void* GetMessage(); virtual void* GetMessage();
virtual void* GetData(); virtual void* GetData();

View File

@ -33,9 +33,9 @@ FairMQMessage* FairMQTransportFactoryZMQ::CreateMessage(size_t size)
return new FairMQMessageZMQ(size); return new FairMQMessageZMQ(size);
} }
FairMQMessage* FairMQTransportFactoryZMQ::CreateMessage(void* data, size_t size) FairMQMessage* FairMQTransportFactoryZMQ::CreateMessage(void* data, size_t size, fairmq_free_fn *ffn, void* hint)
{ {
return new FairMQMessageZMQ(data, size); return new FairMQMessageZMQ(data, size, ffn, hint);
} }
FairMQSocket* FairMQTransportFactoryZMQ::CreateSocket(const string& type, int num, int numIoThreads) FairMQSocket* FairMQTransportFactoryZMQ::CreateSocket(const string& type, int num, int numIoThreads)

View File

@ -30,7 +30,7 @@ class FairMQTransportFactoryZMQ : public FairMQTransportFactory
virtual FairMQMessage* CreateMessage(); virtual FairMQMessage* CreateMessage();
virtual FairMQMessage* CreateMessage(size_t size); virtual FairMQMessage* CreateMessage(size_t size);
virtual FairMQMessage* CreateMessage(void* data, size_t size); virtual FairMQMessage* CreateMessage(void* data, size_t size, fairmq_free_fn *ffn = NULL, void* hint = NULL);
virtual FairMQSocket* CreateSocket(const string& type, int num, int numIoThreads); virtual FairMQSocket* CreateSocket(const string& type, int num, int numIoThreads);
virtual FairMQPoller* CreatePoller(const vector<FairMQSocket*>& inputs); virtual FairMQPoller* CreatePoller(const vector<FairMQSocket*>& inputs);