From a892a5a744f22b512309e27aca521f34ab2bb0ca Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Mon, 4 Apr 2016 09:43:48 +0200 Subject: [PATCH] Update FairMQParts with doxygen comments and non-blocking send --- fairmq/FairMQChannel.h | 70 +++++++++++++++-------- fairmq/FairMQDevice.h | 94 ++++++++++++++++++++----------- fairmq/FairMQParts.h | 1 - fairmq/FairMQSocket.h | 4 +- fairmq/nanomsg/FairMQSocketNN.cxx | 8 +-- fairmq/nanomsg/FairMQSocketNN.h | 4 +- fairmq/zeromq/FairMQSocketZMQ.cxx | 38 ++++++++++--- fairmq/zeromq/FairMQSocketZMQ.h | 4 +- 8 files changed, 147 insertions(+), 76 deletions(-) diff --git a/fairmq/FairMQChannel.h b/fairmq/FairMQChannel.h index 31955159..265024db 100644 --- a/fairmq/FairMQChannel.h +++ b/fairmq/FairMQChannel.h @@ -119,17 +119,18 @@ class FairMQChannel /// for some other reason (e.g. no peers connected for a binding socket), the method blocks. /// /// @param msg Constant reference of unique_ptr to a FairMQMessage - /// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. In case of errors, returns -1. + /// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. + /// In case of errors, returns -1. int Send(const std::unique_ptr& msg) const; /// Sends a message in non-blocking mode. /// @details SendAsync method attempts to send a message without blocking by - /// putting it in the queue. If the queue is full or queueing is not possible - /// for some other reason (e.g. no peers connected for a binding socket), the method returns 0. - /// + /// putting it in the queue. + /// /// @param msg Constant reference of unique_ptr to a FairMQMessage /// @return Number of bytes that have been queued. If queueing failed due to - /// full queue or no connected peers (when binding), returns -2. In case of errors, returns -1. + /// full queue or no connected peers (when binding), returns -2. + /// In case of errors, returns -1. inline int SendAsync(const std::unique_ptr& msg) const { return fSocket->Send(msg.get(), fNoBlockFlag); @@ -138,9 +139,10 @@ class FairMQChannel /// Queues the current message as a part of a multi-part message /// @details SendPart method queues the provided message as a part of a multi-part message. /// The actual transfer over the network is initiated once final part has been queued with the Send() or SendAsync() methods. - /// + /// /// @param msg Constant reference of unique_ptr to a FairMQMessage - /// @return Number of bytes that have been queued. -2 If queueing was not possible. In case of errors, returns -1. + /// @return Number of bytes that have been queued. -2 If queueing was not possible. + /// In case of errors, returns -1. inline int SendPart(const std::unique_ptr& msg) const { return fSocket->Send(msg.get(), fSndMoreFlag); @@ -149,25 +151,44 @@ class FairMQChannel /// Queues the current message as a part of a multi-part message without blocking /// @details SendPart method queues the provided message as a part of a multi-part message without blocking. /// The actual transfer over the network is initiated once final part has been queued with the Send() or SendAsync() methods. - /// + /// /// @param msg Constant reference of unique_ptr to a FairMQMessage - /// @return Number of bytes that have been queued. -2 If queueing was not possible. In case of errors, returns -1. + /// @return Number of bytes that have been queued. -2 If queueing was not possible. + /// In case of errors, returns -1. inline int SendPartAsync(const std::unique_ptr& msg) const { return fSocket->Send(msg.get(), fSndMoreFlag|fNoBlockFlag); } + /// Send a vector of messages + /// + /// @param msgVec message vector reference + /// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. + /// In case of errors, returns -1. + int64_t Send(const std::vector>& msgVec) const; + + /// Sends a vector of message in non-blocking mode. + /// @details SendAsync method attempts to send a vector of messages without blocking by + /// putting it them the queue. + /// + /// @param msgVec message vector reference + /// @return Number of bytes that have been queued. If queueing failed due to + /// full queue or no connected peers (when binding), returns -2. In case of errors, returns -1. + inline int64_t SendAsync(const std::vector>& msgVec) const + { + return fSocket->Send(msgVec, fNoBlockFlag); + } + /// Receives a message from the socket queue. /// @details Receive method attempts to receive a message from the input queue. /// If the queue is empty the method blocks. /// /// @param msg Constant reference of unique_ptr to a FairMQMessage - /// @return Number of bytes that have been received. -2 If reading from the queue was not possible or timed out. In case of errors, returns -1. + /// @return Number of bytes that have been received. -2 If reading from the queue was not possible or timed out. + /// In case of errors, returns -1. int Receive(const std::unique_ptr& msg) const; /// Receives a message in non-blocking mode. - /// @details ReceiveAsync method attempts to receive a message without blocking from the input queue. - /// If the queue is empty the method returns 0. /// /// @param msg Constant reference of unique_ptr to a FairMQMessage /// @return Number of bytes that have been received. If queue is empty, returns -2. @@ -177,20 +198,23 @@ class FairMQChannel return fSocket->Receive(msg.get(), fNoBlockFlag); } - /// Shorthand method to send a vector of messages on `chan` at index `i` + /// Receive a vector of messages + /// /// @param msgVec message vector reference - /// @param chan channel name - /// @param i channel index - /// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. In case of errors, returns -1. - int64_t Send(const std::vector>& msgVec) const; - - /// Shorthand method to receive a vector of messages on `chan` at index `i` - /// @param msgVec message vector reference - /// @param chan channel name - /// @param i channel index - /// @return Number of bytes that have been received. -2 If reading from the queue was not possible or timed out. In case of errors, returns -1. + /// @return Number of bytes that have been received. -2 If reading from the queue was not possible or timed out. + /// In case of errors, returns -1. int64_t Receive(std::vector>& msgVec) const; + /// Receives a vector of messages in non-blocking mode. + /// + /// @param msgVec message vector reference + /// @return Number of bytes that have been received. If queue is empty, returns -2. + /// In case of errors, returns -1. + inline int64_t ReceiveAsync(std::vector>& msgVec) const + { + return fSocket->Receive(msgVec, fNoBlockFlag); + } + // DEPRECATED socket method wrappers with raw pointers and flag checks int Send(FairMQMessage* msg, const std::string& flag = "") const; int Send(FairMQMessage* msg, const int flags) const; diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index d1585d10..6b429771 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -69,78 +69,106 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable /// @param name Name of the channel void PrintChannel(const std::string& name); - /// Shorthand method to send `msg` on `chan` at index `i` - /// @param msg message reference - /// @param chan channel name - /// @param i channel index - /// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. In case of errors, returns -1. - inline int Send(const std::unique_ptr& msg, const std::string& chan, const int i = 0) const - { - return fChannels.at(chan).at(i).Send(msg); - } - template void Serialize(FairMQMessage& msg, DataType&& data, Args&&... args) const { - Serializer().Serialize(msg,std::forward(data),std::forward(args)...); + Serializer().Serialize(msg, std::forward(data), std::forward(args)...); } template void Deserialize(FairMQMessage& msg, DataType&& data, Args&&... args) const { - Deserializer().Deserialize(msg,std::forward(data),std::forward(args)...); + Deserializer().Deserialize(msg, std::forward(data), std::forward(args)...); } - /// Shorthand method to receive `msg` on `chan` at index `i` + /// Shorthand method to send `msg` on `chan` at index `i` /// @param msg message reference /// @param chan channel name /// @param i channel index - /// @return Number of bytes that have been received. -2 If reading from the queue was not possible or timed out. In case of errors, returns -1. - inline int Receive(const std::unique_ptr& msg, const std::string& chan, const int i = 0) const + /// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. + /// In case of errors, returns -1. + inline int Send(const std::unique_ptr& msg, const std::string& chan, const int i = 0) const { - return fChannels.at(chan).at(i).Receive(msg); + return fChannels.at(chan).at(i).Send(msg); } - /// Shorthand method to send a vector of messages on `chan` at index `i` - /// @param msgVec message vector reference + /// Shorthand method to send `msg` on `chan` at index `i` without blocking + /// @param msg message reference /// @param chan channel name /// @param i channel index - /// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. In case of errors, returns -1. - inline int64_t Send(const std::vector>& msgVec, const std::string& chan, const int i = 0) const + /// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. + /// In case of errors, returns -1. + inline int SendAsync(const std::unique_ptr& msg, const std::string& chan, const int i = 0) const { - return fChannels.at(chan).at(i).Send(msgVec); - } - - /// Shorthand method to receive a vector of messages on `chan` at index `i` - /// @param msgVec message vector reference - /// @param chan channel name - /// @param i channel index - /// @return Number of bytes that have been received. -2 If reading from the queue was not possible or timed out. In case of errors, returns -1. - inline int64_t Receive(std::vector>& msgVec, const std::string& chan, const int i = 0) const - { - return fChannels.at(chan).at(i).Receive(msgVec); + return fChannels.at(chan).at(i).SendAsync(msg); } /// Shorthand method to send FairMQParts on `chan` at index `i` /// @param parts parts reference /// @param chan channel name /// @param i channel index - /// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. In case of errors, returns -1. + /// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. + /// In case of errors, returns -1. inline int64_t Send(const FairMQParts& parts, const std::string& chan, const int i = 0) const { return fChannels.at(chan).at(i).Send(parts.fParts); } + /// Shorthand method to send FairMQParts on `chan` at index `i` without blocking + /// @param parts parts reference + /// @param chan channel name + /// @param i channel index + /// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. + /// In case of errors, returns -1. + inline int64_t SendAsync(const FairMQParts& parts, const std::string& chan, const int i = 0) const + { + return fChannels.at(chan).at(i).SendAsync(parts.fParts); + } + + /// Shorthand method to receive `msg` on `chan` at index `i` + /// @param msg message reference + /// @param chan channel name + /// @param i channel index + /// @return Number of bytes that have been received. -2 If reading from the queue was not possible or timed out. + /// In case of errors, returns -1. + inline int Receive(const std::unique_ptr& msg, const std::string& chan, const int i = 0) const + { + return fChannels.at(chan).at(i).Receive(msg); + } + + /// Shorthand method to receive `msg` on `chan` at index `i` without blocking + /// @param msg message reference + /// @param chan channel name + /// @param i channel index + /// @return Number of bytes that have been received. -2 If reading from the queue was not possible or timed out. + /// In case of errors, returns -1. + inline int ReceiveAsync(const std::unique_ptr& msg, const std::string& chan, const int i = 0) const + { + return fChannels.at(chan).at(i).ReceiveAsync(msg); + } + /// Shorthand method to receive FairMQParts on `chan` at index `i` /// @param parts parts reference /// @param chan channel name /// @param i channel index - /// @return Number of bytes that have been received. -2 If reading from the queue was not possible or timed out. In case of errors, returns -1. + /// @return Number of bytes that have been received. -2 If reading from the queue was not possible or timed out. + /// In case of errors, returns -1. inline int64_t Receive(FairMQParts& parts, const std::string& chan, const int i = 0) const { return fChannels.at(chan).at(i).Receive(parts.fParts); } + /// Shorthand method to receive FairMQParts on `chan` at index `i` without blocking + /// @param parts parts reference + /// @param chan channel name + /// @param i channel index + /// @return Number of bytes that have been received. -2 If reading from the queue was not possible or timed out. + /// In case of errors, returns -1. + inline int64_t ReceiveAsync(FairMQParts& parts, const std::string& chan, const int i = 0) const + { + return fChannels.at(chan).at(i).ReceiveAsync(parts.fParts); + } + /// @brief Create empty FairMQMessage /// @return pointer to FairMQMessage inline FairMQMessage* NewMessage() const diff --git a/fairmq/FairMQParts.h b/fairmq/FairMQParts.h index 94b379a6..2309d91b 100644 --- a/fairmq/FairMQParts.h +++ b/fairmq/FairMQParts.h @@ -37,7 +37,6 @@ class FairMQParts { fParts.push_back(std::unique_ptr(msg)); } - /// Adds part (std::unique_ptr&) to the container (move) /// @param msg unique pointer to FairMQMessage diff --git a/fairmq/FairMQSocket.h b/fairmq/FairMQSocket.h index e590cbfa..6d89a4a6 100644 --- a/fairmq/FairMQSocket.h +++ b/fairmq/FairMQSocket.h @@ -40,11 +40,11 @@ class FairMQSocket virtual int Send(FairMQMessage* msg, const std::string& flag = "") = 0; virtual int Send(FairMQMessage* msg, const int flags = 0) = 0; - virtual int64_t Send(const std::vector>& msgVec) = 0; + virtual int64_t Send(const std::vector>& msgVec, const int flags = 0) = 0; virtual int Receive(FairMQMessage* msg, const std::string& flag = "") = 0; virtual int Receive(FairMQMessage* msg, const int flags = 0) = 0; - virtual int64_t Receive(std::vector>& msgVec) = 0; + virtual int64_t Receive(std::vector>& msgVec, const int flags = 0) = 0; virtual void* GetSocket() const = 0; virtual int GetSocket(int nothing) const = 0; diff --git a/fairmq/nanomsg/FairMQSocketNN.cxx b/fairmq/nanomsg/FairMQSocketNN.cxx index 3116ed6b..1eaf4d89 100644 --- a/fairmq/nanomsg/FairMQSocketNN.cxx +++ b/fairmq/nanomsg/FairMQSocketNN.cxx @@ -145,7 +145,7 @@ int FairMQSocketNN::Send(FairMQMessage* msg, const int flags) return nbytes; } -int64_t FairMQSocketNN::Send(const vector>& msgVec) +int64_t FairMQSocketNN::Send(const vector>& msgVec, const int flags) { #ifdef MSGPACK_FOUND // create msgpack simple buffer @@ -161,7 +161,7 @@ int64_t FairMQSocketNN::Send(const vector>& msgVec) packer.pack_bin_body(static_cast(msgVec[i]->GetData()), msgVec[i]->GetSize()); } - int64_t nbytes = nn_send(fSocket, sbuf.data(), sbuf.size(), 0); + int64_t nbytes = nn_send(fSocket, sbuf.data(), sbuf.size(), flags); if (nbytes >= 0) { fBytesTx += nbytes; @@ -236,7 +236,7 @@ int FairMQSocketNN::Receive(FairMQMessage* msg, const int flags) return nbytes; } -int64_t FairMQSocketNN::Receive(vector>& msgVec) +int64_t FairMQSocketNN::Receive(vector>& msgVec, const int flags) { #ifdef MSGPACK_FOUND // Warn if the vector is filled before Receive() and empty it. @@ -249,7 +249,7 @@ int64_t FairMQSocketNN::Receive(vector>& msgVec) // pointer to point to received message buffer char* ptr = NULL; // receive the message into a buffer allocated by nanomsg and let ptr point to it - int nbytes = nn_recv(fSocket, &ptr, NN_MSG, 0); + int nbytes = nn_recv(fSocket, &ptr, NN_MSG, flags); if (nbytes >= 0) // if no errors or non-blocking timeouts { // store statistics on how many bytes received diff --git a/fairmq/nanomsg/FairMQSocketNN.h b/fairmq/nanomsg/FairMQSocketNN.h index 3e22dbb3..8cc13575 100644 --- a/fairmq/nanomsg/FairMQSocketNN.h +++ b/fairmq/nanomsg/FairMQSocketNN.h @@ -39,11 +39,11 @@ class FairMQSocketNN : public FairMQSocket virtual int Send(FairMQMessage* msg, const std::string& flag = ""); virtual int Send(FairMQMessage* msg, const int flags = 0); - virtual int64_t Send(const std::vector>& msgVec); + virtual int64_t Send(const std::vector>& msgVec, const int flags = 0); virtual int Receive(FairMQMessage* msg, const std::string& flag = ""); virtual int Receive(FairMQMessage* msg, const int flags = 0); - virtual int64_t Receive(std::vector>& msgVec); + virtual int64_t Receive(std::vector>& msgVec, const int flags = 0); virtual void* GetSocket() const; virtual int GetSocket(int nothing) const; diff --git a/fairmq/zeromq/FairMQSocketZMQ.cxx b/fairmq/zeromq/FairMQSocketZMQ.cxx index e82db9dd..a3c6f9e8 100644 --- a/fairmq/zeromq/FairMQSocketZMQ.cxx +++ b/fairmq/zeromq/FairMQSocketZMQ.cxx @@ -150,7 +150,7 @@ int FairMQSocketZMQ::Send(FairMQMessage* msg, const int flags) return nbytes; } -int64_t FairMQSocketZMQ::Send(const vector>& msgVec) +int64_t FairMQSocketZMQ::Send(const vector>& msgVec, const int flags) { // Sending vector typicaly handles more then one part if (msgVec.size() > 1) @@ -159,7 +159,7 @@ int64_t FairMQSocketZMQ::Send(const vector>& msgVec) for (unsigned int i = 0; i < msgVec.size() - 1; ++i) { - int nbytes = zmq_msg_send(static_cast(msgVec[i]->GetMessage()), fSocket, ZMQ_SNDMORE); + int nbytes = zmq_msg_send(static_cast(msgVec[i]->GetMessage()), fSocket, ZMQ_SNDMORE|flags); if (nbytes >= 0) { totalSize += nbytes; @@ -167,18 +167,38 @@ int64_t FairMQSocketZMQ::Send(const vector>& msgVec) } else { + if (zmq_errno() == EAGAIN) + { + return -2; + } + if (zmq_errno() == ETERM) + { + LOG(INFO) << "terminating socket " << fId; + return -1; + } + LOG(ERROR) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno); return nbytes; } } - int n = zmq_msg_send(static_cast(msgVec.back()->GetMessage()), fSocket, 0); - if (n >= 0) + int nbytes = zmq_msg_send(static_cast(msgVec.back()->GetMessage()), fSocket, flags); + if (nbytes >= 0) { - totalSize += n; + totalSize += nbytes; } else { - return n; + if (zmq_errno() == EAGAIN) + { + return -2; + } + if (zmq_errno() == ETERM) + { + LOG(INFO) << "terminating socket " << fId; + return -1; + } + LOG(ERROR) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno); + return nbytes; } // store statistics on how many messages have been sent (handle all parts as a single message) @@ -187,7 +207,7 @@ int64_t FairMQSocketZMQ::Send(const vector>& msgVec) } // If there's only one part, send it as a regular message else if (msgVec.size() == 1) { - return zmq_msg_send(static_cast(msgVec.back()->GetMessage()), fSocket, 0); + return Send(msgVec.back().get(), flags); } else // if the vector is empty, something might be wrong { @@ -240,7 +260,7 @@ int FairMQSocketZMQ::Receive(FairMQMessage* msg, const int flags) return nbytes; } -int64_t FairMQSocketZMQ::Receive(vector>& msgVec) +int64_t FairMQSocketZMQ::Receive(vector>& msgVec, const int flags) { // Warn if the vector is filled before Receive() and empty it. if (msgVec.size() > 0) @@ -256,7 +276,7 @@ int64_t FairMQSocketZMQ::Receive(vector>& msgVec) { unique_ptr part(new FairMQMessageZMQ()); - int nbytes = zmq_msg_recv(static_cast(part->GetMessage()), fSocket, 0); + int nbytes = zmq_msg_recv(static_cast(part->GetMessage()), fSocket, flags); if (nbytes >= 0) { msgVec.push_back(move(part)); diff --git a/fairmq/zeromq/FairMQSocketZMQ.h b/fairmq/zeromq/FairMQSocketZMQ.h index 042d08d4..986c87ae 100644 --- a/fairmq/zeromq/FairMQSocketZMQ.h +++ b/fairmq/zeromq/FairMQSocketZMQ.h @@ -34,11 +34,11 @@ class FairMQSocketZMQ : public FairMQSocket virtual int Send(FairMQMessage* msg, const std::string& flag = ""); virtual int Send(FairMQMessage* msg, const int flags = 0); - virtual int64_t Send(const std::vector>& msgVec); + virtual int64_t Send(const std::vector>& msgVec, const int flags = 0); virtual int Receive(FairMQMessage* msg, const std::string& flag = ""); virtual int Receive(FairMQMessage* msg, const int flags = 0); - virtual int64_t Receive(std::vector>& msgVec); + virtual int64_t Receive(std::vector>& msgVec, const int flags = 0); virtual void* GetSocket() const; virtual int GetSocket(int nothing) const;