diff --git a/fairmq/FairMQChannel.h b/fairmq/FairMQChannel.h index cc1ac47d..71543ef4 100644 --- a/fairmq/FairMQChannel.h +++ b/fairmq/FairMQChannel.h @@ -61,6 +61,15 @@ class FairMQChannel // Wrappers for the socket methods to simplify the usage of channels int Send(const std::unique_ptr& msg) const; + + /// \brief Sends a message in non-blocking mode. + /// \details SendAsync method attempts to send the message without blocking by + /// putting it in the queue. If the queue is full or queueing is not possible + /// for some other reason (e.g. no peers connected for a binding socket), the method returns 0. + /// + /// \param msg Constant reference of unique_ptr to a FairMQMessage + /// \return Returns the number of bytes that have been queued. If queueing failed due to + /// full queue or no connected peers (when binding), returns 0. In case of errors, returns -1. int SendAsync(const std::unique_ptr& msg) const; int SendPart(const std::unique_ptr& msg) const; @@ -73,7 +82,7 @@ class FairMQChannel int Receive(FairMQMessage* msg, const std::string& flag = "") const; int Receive(FairMQMessage* msg, const int flags) const; - /// Checks if the socket is expecting to receive another part of a multipart message. + /// \brief Checks if the socket is expecting to receive another part of a multipart message. /// \return Return true if the socket expects another part of a multipart message and false otherwise. bool ExpectsAnotherPart() const; diff --git a/fairmq/nanomsg/FairMQMessageNN.cxx b/fairmq/nanomsg/FairMQMessageNN.cxx index fe02b92c..34f6f30c 100644 --- a/fairmq/nanomsg/FairMQMessageNN.cxx +++ b/fairmq/nanomsg/FairMQMessageNN.cxx @@ -27,6 +27,11 @@ FairMQMessageNN::FairMQMessageNN() , fSize(0) , fReceiving(false) { + fMessage = nn_allocmsg(0, 0); + if (!fMessage) + { + LOG(ERROR) << "failed allocating message, reason: " << nn_strerror(errno); + } } FairMQMessageNN::FairMQMessageNN(size_t size) @@ -40,7 +45,6 @@ FairMQMessageNN::FairMQMessageNN(size_t size) LOG(ERROR) << "failed allocating message, reason: " << nn_strerror(errno); } fSize = size; - fReceiving = false; } @@ -61,7 +65,6 @@ FairMQMessageNN::FairMQMessageNN(void* data, size_t size, fairmq_free_fn *ffn, v } memcpy(fMessage, data, size); fSize = size; - fReceiving = false; if (ffn) { @@ -76,8 +79,6 @@ FairMQMessageNN::FairMQMessageNN(void* data, size_t size, fairmq_free_fn *ffn, v void FairMQMessageNN::Rebuild() { Clear(); - fSize = 0; - fMessage = NULL; fReceiving = false; } diff --git a/fairmq/nanomsg/FairMQSocketNN.cxx b/fairmq/nanomsg/FairMQSocketNN.cxx index fc5880cb..74539aab 100644 --- a/fairmq/nanomsg/FairMQSocketNN.cxx +++ b/fairmq/nanomsg/FairMQSocketNN.cxx @@ -143,6 +143,7 @@ int FairMQSocketNN::Receive(FairMQMessage* msg, const string& flag) { fBytesRx += rc; ++fMessagesRx; + msg->Rebuild(); msg->SetMessage(ptr, rc); static_cast(msg)->fReceiving = true; }