From 699671a0f16bd6a34ec1777406f20787e89f6dfb Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Thu, 12 Jun 2014 12:47:49 +0200 Subject: [PATCH] Extend the FairMQ transport interface by allowing the user of the FairMQMessage class to define his own deallocation function. This function will be called when the transport machanism no longer needs the data. Use this extension with the Protobuf data format, to enable more efficient transport, avoiding memcpy. --- fairmq/FairMQMessage.h | 4 +++- fairmq/FairMQTransportFactory.h | 2 +- fairmq/nanomsg/FairMQMessageNN.cxx | 22 +++++++++++++++++++-- fairmq/nanomsg/FairMQMessageNN.h | 4 ++-- fairmq/nanomsg/FairMQTransportFactoryNN.cxx | 4 ++-- fairmq/nanomsg/FairMQTransportFactoryNN.h | 2 +- fairmq/zeromq/FairMQMessageZMQ.cxx | 8 ++++---- fairmq/zeromq/FairMQMessageZMQ.h | 4 ++-- fairmq/zeromq/FairMQTransportFactoryZMQ.cxx | 4 ++-- fairmq/zeromq/FairMQTransportFactoryZMQ.h | 2 +- 10 files changed, 38 insertions(+), 18 deletions(-) diff --git a/fairmq/FairMQMessage.h b/fairmq/FairMQMessage.h index 132e9a9e..9811eeb6 100644 --- a/fairmq/FairMQMessage.h +++ b/fairmq/FairMQMessage.h @@ -17,12 +17,14 @@ #include // for size_t +typedef void (fairmq_free_fn) (void *data, void *hint); + class FairMQMessage { public: virtual void Rebuild() = 0; virtual void Rebuild(size_t size) = 0; - virtual void Rebuild(void* data, size_t size) = 0; + virtual void Rebuild(void* data, size_t size, fairmq_free_fn *ffn = NULL, void* hint = NULL) = 0; virtual void* GetMessage() = 0; virtual void* GetData() = 0; diff --git a/fairmq/FairMQTransportFactory.h b/fairmq/FairMQTransportFactory.h index 3bbcd9de..daeed2d5 100644 --- a/fairmq/FairMQTransportFactory.h +++ b/fairmq/FairMQTransportFactory.h @@ -29,7 +29,7 @@ class FairMQTransportFactory public: virtual FairMQMessage* CreateMessage() = 0; virtual FairMQMessage* CreateMessage(size_t size) = 0; - virtual FairMQMessage* CreateMessage(void* data, size_t size) = 0; + virtual FairMQMessage* CreateMessage(void* data, size_t size, fairmq_free_fn *ffn = NULL, void* hint = NULL) = 0; virtual FairMQSocket* CreateSocket(const string& type, int num, int numIoThreads) = 0; virtual FairMQPoller* CreatePoller(const vector& inputs) = 0; diff --git a/fairmq/nanomsg/FairMQMessageNN.cxx b/fairmq/nanomsg/FairMQMessageNN.cxx index e244bf79..93983111 100644 --- a/fairmq/nanomsg/FairMQMessageNN.cxx +++ b/fairmq/nanomsg/FairMQMessageNN.cxx @@ -37,7 +37,7 @@ FairMQMessageNN::FairMQMessageNN(size_t size) fReceiving = false; } -FairMQMessageNN::FairMQMessageNN(void* data, size_t size) +FairMQMessageNN::FairMQMessageNN(void* data, size_t size, fairmq_free_fn *ffn, void* hint) { fMessage = nn_allocmsg(size, 0); if (!fMessage) @@ -47,6 +47,15 @@ FairMQMessageNN::FairMQMessageNN(void* data, size_t size) memcpy(fMessage, data, size); fSize = size; fReceiving = false; + + if(ffn) + { + ffn(data, hint); + } + else + { + if(data) free(data); + } } void FairMQMessageNN::Rebuild() @@ -69,7 +78,7 @@ void FairMQMessageNN::Rebuild(size_t size) fReceiving = false; } -void FairMQMessageNN::Rebuild(void* data, size_t size) +void FairMQMessageNN::Rebuild(void* data, size_t size, fairmq_free_fn *ffn, void* hint) { Clear(); fMessage = nn_allocmsg(size, 0); @@ -80,6 +89,15 @@ void FairMQMessageNN::Rebuild(void* data, size_t size) memcpy(fMessage, data, size); fSize = size; fReceiving = false; + + if(ffn) + { + ffn(data, hint); + } + else + { + if(data) free(data); + } } void* FairMQMessageNN::GetMessage() diff --git a/fairmq/nanomsg/FairMQMessageNN.h b/fairmq/nanomsg/FairMQMessageNN.h index b99f191b..b59b01b1 100644 --- a/fairmq/nanomsg/FairMQMessageNN.h +++ b/fairmq/nanomsg/FairMQMessageNN.h @@ -24,11 +24,11 @@ class FairMQMessageNN : public FairMQMessage public: FairMQMessageNN(); FairMQMessageNN(size_t size); - FairMQMessageNN(void* data, size_t size); + FairMQMessageNN(void* data, size_t size, fairmq_free_fn *ffn = NULL, void* hint = NULL); virtual void Rebuild(); virtual void Rebuild(size_t size); - virtual void Rebuild(void* data, size_t site); + virtual void Rebuild(void* data, size_t size, fairmq_free_fn *ffn = NULL, void* hint = NULL); virtual void* GetMessage(); virtual void* GetData(); diff --git a/fairmq/nanomsg/FairMQTransportFactoryNN.cxx b/fairmq/nanomsg/FairMQTransportFactoryNN.cxx index 53cee8bf..3e539f9e 100644 --- a/fairmq/nanomsg/FairMQTransportFactoryNN.cxx +++ b/fairmq/nanomsg/FairMQTransportFactoryNN.cxx @@ -29,9 +29,9 @@ FairMQMessage* FairMQTransportFactoryNN::CreateMessage(size_t size) return new FairMQMessageNN(size); } -FairMQMessage* FairMQTransportFactoryNN::CreateMessage(void* data, size_t size) +FairMQMessage* FairMQTransportFactoryNN::CreateMessage(void* data, size_t size, fairmq_free_fn *ffn, void* hint) { - return new FairMQMessageNN(data, size); + return new FairMQMessageNN(data, size, ffn, hint); } FairMQSocket* FairMQTransportFactoryNN::CreateSocket(const string& type, int num, int numIoThreads) diff --git a/fairmq/nanomsg/FairMQTransportFactoryNN.h b/fairmq/nanomsg/FairMQTransportFactoryNN.h index e1721a35..eed2eb5b 100644 --- a/fairmq/nanomsg/FairMQTransportFactoryNN.h +++ b/fairmq/nanomsg/FairMQTransportFactoryNN.h @@ -29,7 +29,7 @@ class FairMQTransportFactoryNN : public FairMQTransportFactory virtual FairMQMessage* CreateMessage(); virtual FairMQMessage* CreateMessage(size_t size); - virtual FairMQMessage* CreateMessage(void* data, size_t size); + virtual FairMQMessage* CreateMessage(void* data, size_t size, fairmq_free_fn *ffn = NULL, void* hint = NULL); virtual FairMQSocket* CreateSocket(const string& type, int num, int numIoThreads); virtual FairMQPoller* CreatePoller(const vector& inputs); diff --git a/fairmq/zeromq/FairMQMessageZMQ.cxx b/fairmq/zeromq/FairMQMessageZMQ.cxx index 8f00a50b..147926ba 100644 --- a/fairmq/zeromq/FairMQMessageZMQ.cxx +++ b/fairmq/zeromq/FairMQMessageZMQ.cxx @@ -36,9 +36,9 @@ FairMQMessageZMQ::FairMQMessageZMQ(size_t size) } } -FairMQMessageZMQ::FairMQMessageZMQ(void* data, size_t size) +FairMQMessageZMQ::FairMQMessageZMQ(void* data, size_t size, fairmq_free_fn *ffn, void* hint) { - int rc = zmq_msg_init_data(&fMessage, data, size, &CleanUp, NULL); // TODO: expose the cleanup function part in the interface? + int rc = zmq_msg_init_data(&fMessage, data, size, ffn ? ffn : &CleanUp, hint); if (rc != 0) { LOG(ERROR) << "failed initializing message with data, reason: " << zmq_strerror(errno); @@ -65,10 +65,10 @@ void FairMQMessageZMQ::Rebuild(size_t size) } } -void FairMQMessageZMQ::Rebuild(void* data, size_t size) +void FairMQMessageZMQ::Rebuild(void* data, size_t size, fairmq_free_fn *ffn, void* hint) { CloseMessage(); - int rc = zmq_msg_init_data(&fMessage, data, size, &CleanUp, NULL); // TODO: expose the cleanup function part in the interface? + int rc = zmq_msg_init_data(&fMessage, data, size, ffn ? ffn : &CleanUp, hint); if (rc != 0) { LOG(ERROR) << "failed initializing message with data, reason: " << zmq_strerror(errno); diff --git a/fairmq/zeromq/FairMQMessageZMQ.h b/fairmq/zeromq/FairMQMessageZMQ.h index d9848f84..5749349d 100644 --- a/fairmq/zeromq/FairMQMessageZMQ.h +++ b/fairmq/zeromq/FairMQMessageZMQ.h @@ -26,11 +26,11 @@ class FairMQMessageZMQ : public FairMQMessage public: FairMQMessageZMQ(); FairMQMessageZMQ(size_t size); - FairMQMessageZMQ(void* data, size_t size); + FairMQMessageZMQ(void* data, size_t size, fairmq_free_fn *ffn = &CleanUp, void* hint = NULL); virtual void Rebuild(); virtual void Rebuild(size_t size); - virtual void Rebuild(void* data, size_t size); + virtual void Rebuild(void* data, size_t size, fairmq_free_fn *ffn = &CleanUp, void* hint = NULL); virtual void* GetMessage(); virtual void* GetData(); diff --git a/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx b/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx index 82da4eae..0458394f 100644 --- a/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx +++ b/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx @@ -33,9 +33,9 @@ FairMQMessage* FairMQTransportFactoryZMQ::CreateMessage(size_t size) return new FairMQMessageZMQ(size); } -FairMQMessage* FairMQTransportFactoryZMQ::CreateMessage(void* data, size_t size) +FairMQMessage* FairMQTransportFactoryZMQ::CreateMessage(void* data, size_t size, fairmq_free_fn *ffn, void* hint) { - return new FairMQMessageZMQ(data, size); + return new FairMQMessageZMQ(data, size, ffn, hint); } FairMQSocket* FairMQTransportFactoryZMQ::CreateSocket(const string& type, int num, int numIoThreads) diff --git a/fairmq/zeromq/FairMQTransportFactoryZMQ.h b/fairmq/zeromq/FairMQTransportFactoryZMQ.h index a7744805..052bcb56 100644 --- a/fairmq/zeromq/FairMQTransportFactoryZMQ.h +++ b/fairmq/zeromq/FairMQTransportFactoryZMQ.h @@ -30,7 +30,7 @@ class FairMQTransportFactoryZMQ : public FairMQTransportFactory virtual FairMQMessage* CreateMessage(); virtual FairMQMessage* CreateMessage(size_t size); - virtual FairMQMessage* CreateMessage(void* data, size_t size); + virtual FairMQMessage* CreateMessage(void* data, size_t size, fairmq_free_fn *ffn = NULL, void* hint = NULL); virtual FairMQSocket* CreateSocket(const string& type, int num, int numIoThreads); virtual FairMQPoller* CreatePoller(const vector& inputs);