diff --git a/fairmq/FairMQChannel.cxx b/fairmq/FairMQChannel.cxx index 7476995a..1f11f78c 100644 --- a/fairmq/FairMQChannel.cxx +++ b/fairmq/FairMQChannel.cxx @@ -689,77 +689,52 @@ void FairMQChannel::ResetChannel() // TODO: implement channel resetting } -int FairMQChannel::Send(unique_ptr& msg) const -{ - CheckSendCompatibility(msg); - return fSocket->Send(msg); -} - -int FairMQChannel::Receive(unique_ptr& msg) const -{ - CheckReceiveCompatibility(msg); - return fSocket->Receive(msg); -} - int FairMQChannel::Send(unique_ptr& msg, int sndTimeoutInMs) const { + CheckSendCompatibility(msg); return fSocket->Send(msg, sndTimeoutInMs); } int FairMQChannel::Receive(unique_ptr& msg, int rcvTimeoutInMs) const { + CheckReceiveCompatibility(msg); return fSocket->Receive(msg, rcvTimeoutInMs); } int FairMQChannel::SendAsync(unique_ptr& msg) const { CheckSendCompatibility(msg); - return fSocket->TrySend(msg); + return fSocket->Send(msg, 0); } int FairMQChannel::ReceiveAsync(unique_ptr& msg) const { CheckReceiveCompatibility(msg); - return fSocket->TryReceive(msg); -} - -int64_t FairMQChannel::Send(vector>& msgVec) const -{ - CheckSendCompatibility(msgVec); - return fSocket->Send(msgVec); -} - -int64_t FairMQChannel::Receive(vector>& msgVec) const -{ - CheckReceiveCompatibility(msgVec); - return fSocket->Receive(msgVec); + return fSocket->Receive(msg, 0); } int64_t FairMQChannel::Send(vector>& msgVec, int sndTimeoutInMs) const { + CheckSendCompatibility(msgVec); return fSocket->Send(msgVec, sndTimeoutInMs); } int64_t FairMQChannel::Receive(vector>& msgVec, int rcvTimeoutInMs) const { + CheckReceiveCompatibility(msgVec); return fSocket->Receive(msgVec, rcvTimeoutInMs); } int64_t FairMQChannel::SendAsync(vector>& msgVec) const { CheckSendCompatibility(msgVec); - return fSocket->TrySend(msgVec); + return fSocket->Send(msgVec, 0); } -/// Receives a vector of messages in non-blocking mode. -/// -/// @param msgVec message vector reference -/// @return Number of bytes that have been received. If queue is empty, returns -2. -/// In case of errors, returns -1. int64_t FairMQChannel::ReceiveAsync(vector>& msgVec) const { CheckReceiveCompatibility(msgVec); - return fSocket->TryReceive(msgVec); + return fSocket->Receive(msgVec, 0); } FairMQChannel::~FairMQChannel() diff --git a/fairmq/FairMQChannel.h b/fairmq/FairMQChannel.h index 207aff4b..9430d66d 100644 --- a/fairmq/FairMQChannel.h +++ b/fairmq/FairMQChannel.h @@ -174,106 +174,62 @@ class FairMQChannel /// Resets the channel (requires validation to be used again). void ResetChannel(); - int Send(FairMQMessagePtr& msg) const; - int Receive(FairMQMessagePtr& msg) const; - /// Sends a message to the socket queue. - /// @details Send method attempts to send a message by - /// putting it in the output queue. If the queue is full or queueing is not possible - /// for some other reason (e.g. no peers connected for a binding socket), the method blocks. - /// /// @param msg Constant reference of unique_ptr to a FairMQMessage - /// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. - /// In case of errors, returns -1. - int Send(FairMQMessagePtr& msg, int sndTimeoutInMs) const; + /// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send) + /// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. -1 if there was an error. + int Send(FairMQMessagePtr& msg, int sndTimeoutInMs = -1) const; /// Receives a message from the socket queue. - /// @details Receive method attempts to receive a message from the input queue. - /// If the queue is empty the method blocks. - /// /// @param msg Constant reference of unique_ptr to a FairMQMessage - /// @return Number of bytes that have been received. -2 If reading from the queue was not possible or timed out. - /// In case of errors, returns -1. - int Receive(FairMQMessagePtr& msg, int rcvTimeoutInMs) const; + /// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive) + /// @return Number of bytes that have been received. -2 if reading from the queue was not possible or timed out. -1 if there was an error. + int Receive(FairMQMessagePtr& msg, int rcvTimeoutInMs = -1) const; - /// Sends a message in non-blocking mode. - /// @details SendAsync method attempts to send a message without blocking by - /// putting it in the queue. - /// - /// @param msg Constant reference of unique_ptr to a FairMQMessage - /// @return Number of bytes that have been queued. If queueing failed due to - /// full queue or no connected peers (when binding), returns -2. - /// In case of errors, returns -1. - int SendAsync(FairMQMessagePtr& msg) const; - - /// Receives a message in non-blocking mode. - /// - /// @param msg Constant reference of unique_ptr to a FairMQMessage - /// @return Number of bytes that have been received. If queue is empty, returns -2. - /// In case of errors, returns -1. - int ReceiveAsync(FairMQMessagePtr& msg) const; - - int64_t Send(std::vector& msgVec) const; - int64_t Receive(std::vector& msgVec) const; + int SendAsync(FairMQMessagePtr& msg) const __attribute__((deprecated("For non-blocking Send, use timeout version with timeout of 0: Send(msg, timeout);"))); + int ReceiveAsync(FairMQMessagePtr& msg) const __attribute__((deprecated("For non-blocking Receive, use timeout version with timeout of 0: Receive(msg, timeout);"))); /// Send a vector of messages - /// /// @param msgVec message vector reference - /// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. - /// In case of errors, returns -1. - int64_t Send(std::vector& msgVec, int sndTimeoutInMs) const; + /// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send) + /// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. -1 if there was an error. + int64_t Send(std::vector& msgVec, int sndTimeoutInMs = -1) const; /// Receive a vector of messages - /// /// @param msgVec message vector reference - /// @return Number of bytes that have been received. -2 If reading from the queue was not possible or timed out. - /// In case of errors, returns -1. - int64_t Receive(std::vector& msgVec, int rcvTimeoutInMs) const; + /// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive) + /// @return Number of bytes that have been received. -2 if reading from the queue was not possible or timed out. -1 if there was an error. + int64_t Receive(std::vector& msgVec, int rcvTimeoutInMs = -1) const; - /// Sends a vector of message in non-blocking mode. - /// @details SendAsync method attempts to send a vector of messages without blocking by - /// putting it them the queue. - /// - /// @param msgVec message vector reference - /// @return Number of bytes that have been queued. If queueing failed due to - /// full queue or no connected peers (when binding), returns -2. In case of errors, returns -1. - int64_t SendAsync(std::vector& msgVec) const; + int64_t SendAsync(std::vector& msgVec) const __attribute__((deprecated("For non-blocking Send, use timeout version with timeout of 0: Send(msgVec, timeout);"))); + int64_t ReceiveAsync(std::vector& msgVec) const __attribute__((deprecated("For non-blocking Receive, use timeout version with timeout of 0: Receive(msgVec, timeout);"))); - /// Receives a vector of messages in non-blocking mode. - /// - /// @param msgVec message vector reference - /// @return Number of bytes that have been received. If queue is empty, returns -2. - /// In case of errors, returns -1. - int64_t ReceiveAsync(std::vector& msgVec) const; - - int64_t Send(FairMQParts& parts) const - { - return Send(parts.fParts); - } - - int64_t Receive(FairMQParts& parts) const - { - return Receive(parts.fParts); - } - - int64_t Send(FairMQParts& parts, int sndTimeoutInMs) const + /// Send FairMQParts + /// @param parts FairMQParts reference + /// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send) + /// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. -1 if there was an error. + int64_t Send(FairMQParts& parts, int sndTimeoutInMs = -1) const { return Send(parts.fParts, sndTimeoutInMs); } - int64_t Receive(FairMQParts& parts, int rcvTimeoutInMs) const + /// Receive FairMQParts + /// @param parts FairMQParts reference + /// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive) + /// @return Number of bytes that have been received. -2 if reading from the queue was not possible or timed out. -1 if there was an error. + int64_t Receive(FairMQParts& parts, int rcvTimeoutInMs = -1) const { return Receive(parts.fParts, rcvTimeoutInMs); } - int64_t SendAsync(FairMQParts& parts) const + int64_t SendAsync(FairMQParts& parts) const __attribute__((deprecated("For non-blocking Send, use timeout version with timeout of 0: Send(parts, timeout);"))) { - return SendAsync(parts.fParts); + return Send(parts.fParts, 0); } - int64_t ReceiveAsync(FairMQParts& parts) const + int64_t ReceiveAsync(FairMQParts& parts) const __attribute__((deprecated("For non-blocking Receive, use timeout version with timeout of 0: Receive(parts, timeout);"))) { - return ReceiveAsync(parts.fParts); + return Receive(parts.fParts, 0); } unsigned long GetBytesTx() const; diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index 21a1d6eb..5ebdda2f 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -96,23 +96,13 @@ class FairMQDevice : public FairMQStateMachine Deserializer().Deserialize(msg, std::forward(data), std::forward(args)...); } - int Send(FairMQMessagePtr& msg, const std::string& chan, const int i = 0) const - { - return fChannels.at(chan).at(i).Send(msg); - } - - int Receive(FairMQMessagePtr& msg, const std::string& chan, const int i = 0) const - { - return fChannels.at(chan).at(i).Receive(msg); - } - /// Shorthand method to send `msg` on `chan` at index `i` /// @param msg message reference /// @param chan channel name /// @param i channel index - /// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. - /// In case of errors, returns -1. - int Send(FairMQMessagePtr& msg, const std::string& chan, const int i, int sndTimeoutInMs) const + /// @param sndTimeoutInMs send timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send) + /// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. -1 if there was an error. + int Send(FairMQMessagePtr& msg, const std::string& chan, const int i = 0, int sndTimeoutInMs = -1) const { return fChannels.at(chan).at(i).Send(msg, sndTimeoutInMs); } @@ -121,52 +111,29 @@ class FairMQDevice : public FairMQStateMachine /// @param msg message reference /// @param chan channel name /// @param i channel index - /// @return Number of bytes that have been received. -2 If reading from the queue was not possible or timed out. - /// In case of errors, returns -1. - int Receive(FairMQMessagePtr& msg, const std::string& chan, const int i, int rcvTimeoutInMs) const + /// @param rcvTimeoutInMs receive timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive) + /// @return Number of bytes that have been received. -2 if reading from the queue was not possible or timed out. -1 if there was an error. + int Receive(FairMQMessagePtr& msg, const std::string& chan, const int i = 0, int rcvTimeoutInMs = -1) const { return fChannels.at(chan).at(i).Receive(msg, rcvTimeoutInMs); } - /// Shorthand method to send `msg` on `chan` at index `i` without blocking - /// @param msg message reference - /// @param chan channel name - /// @param i channel index - /// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. - /// In case of errors, returns -1. - int SendAsync(FairMQMessagePtr& msg, const std::string& chan, const int i = 0) const + int SendAsync(FairMQMessagePtr& msg, const std::string& chan, const int i = 0) const __attribute__((deprecated("For non-blocking Send, use timeout version with timeout of 0: Send(msg, \"channelA\", subchannelIndex, timeout);"))) { - return fChannels.at(chan).at(i).SendAsync(msg); + return fChannels.at(chan).at(i).Send(msg, 0); } - - /// Shorthand method to receive `msg` on `chan` at index `i` without blocking - /// @param msg message reference - /// @param chan channel name - /// @param i channel index - /// @return Number of bytes that have been received. -2 If reading from the queue was not possible or timed out. - /// In case of errors, returns -1. - int ReceiveAsync(FairMQMessagePtr& msg, const std::string& chan, const int i = 0) const + int ReceiveAsync(FairMQMessagePtr& msg, const std::string& chan, const int i = 0) const __attribute__((deprecated("For non-blocking Receive, use timeout version with timeout of 0: Receive(msg, \"channelA\", subchannelIndex, timeout);"))) { - return fChannels.at(chan).at(i).ReceiveAsync(msg); - } - - int64_t Send(FairMQParts& parts, const std::string& chan, const int i = 0) const - { - return fChannels.at(chan).at(i).Send(parts.fParts); - } - - int64_t Receive(FairMQParts& parts, const std::string& chan, const int i = 0) const - { - return fChannels.at(chan).at(i).Receive(parts.fParts); + return fChannels.at(chan).at(i).Receive(msg, 0); } /// Shorthand method to send FairMQParts on `chan` at index `i` /// @param parts parts reference /// @param chan channel name /// @param i channel index - /// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. - /// In case of errors, returns -1. - int64_t Send(FairMQParts& parts, const std::string& chan, const int i, int sndTimeoutInMs) const + /// @param sndTimeoutInMs send timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send) + /// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. -1 if there was an error. + int64_t Send(FairMQParts& parts, const std::string& chan, const int i = 0, int sndTimeoutInMs = -1) const { return fChannels.at(chan).at(i).Send(parts.fParts, sndTimeoutInMs); } @@ -175,33 +142,20 @@ class FairMQDevice : public FairMQStateMachine /// @param parts parts reference /// @param chan channel name /// @param i channel index - /// @return Number of bytes that have been received. -2 If reading from the queue was not possible or timed out. - /// In case of errors, returns -1. - int64_t Receive(FairMQParts& parts, const std::string& chan, const int i, int rcvTimeoutInMs) const + /// @param rcvTimeoutInMs receive timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive) + /// @return Number of bytes that have been received. -2 if reading from the queue was not possible or timed out. -1 if there was an error. + int64_t Receive(FairMQParts& parts, const std::string& chan, const int i = 0, int rcvTimeoutInMs = -1) const { return fChannels.at(chan).at(i).Receive(parts.fParts, rcvTimeoutInMs); } - /// Shorthand method to send FairMQParts on `chan` at index `i` without blocking - /// @param parts parts reference - /// @param chan channel name - /// @param i channel index - /// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. - /// In case of errors, returns -1. - int64_t SendAsync(FairMQParts& parts, const std::string& chan, const int i = 0) const + int64_t SendAsync(FairMQParts& parts, const std::string& chan, const int i = 0) const __attribute__((deprecated("For non-blocking Send, use timeout version with timeout of 0: Send(parts, \"channelA\", subchannelIndex, timeout);"))) { - return fChannels.at(chan).at(i).SendAsync(parts.fParts); + return fChannels.at(chan).at(i).Send(parts.fParts, 0); } - - /// Shorthand method to receive FairMQParts on `chan` at index `i` without blocking - /// @param parts parts reference - /// @param chan channel name - /// @param i channel index - /// @return Number of bytes that have been received. -2 If reading from the queue was not possible or timed out. - /// In case of errors, returns -1. - int64_t ReceiveAsync(FairMQParts& parts, const std::string& chan, const int i = 0) const + int64_t ReceiveAsync(FairMQParts& parts, const std::string& chan, const int i = 0) const __attribute__((deprecated("For non-blocking Receive, use timeout version with timeout of 0: Receive(parts, \"channelA\", subchannelIndex, timeout);"))) { - return fChannels.at(chan).at(i).ReceiveAsync(parts.fParts); + return fChannels.at(chan).at(i).Receive(parts.fParts, 0); } /// @brief Getter for default transport factory diff --git a/fairmq/FairMQSocket.h b/fairmq/FairMQSocket.h index 166c34cf..40f70843 100644 --- a/fairmq/FairMQSocket.h +++ b/fairmq/FairMQSocket.h @@ -25,15 +25,10 @@ class FairMQSocket virtual bool Bind(const std::string& address) = 0; virtual void Connect(const std::string& address) = 0; - virtual int Send(FairMQMessagePtr& msg, int timeout = 0) = 0; - virtual int Receive(FairMQMessagePtr& msg, int timeout = 0) = 0; - virtual int64_t Send(std::vector>& msgVec, int timeout = 0) = 0; - virtual int64_t Receive(std::vector>& msgVec, int timeout = 0) = 0; - - virtual int TrySend(FairMQMessagePtr& msg) = 0; - virtual int TryReceive(FairMQMessagePtr& msg) = 0; - virtual int64_t TrySend(std::vector>& msgVec) = 0; - virtual int64_t TryReceive(std::vector>& msgVec) = 0; + virtual int Send(FairMQMessagePtr& msg, int timeout = -1) = 0; + virtual int Receive(FairMQMessagePtr& msg, int timeout = -1) = 0; + virtual int64_t Send(std::vector>& msgVec, int timeout = -1) = 0; + virtual int64_t Receive(std::vector>& msgVec, int timeout = -1) = 0; virtual void Close() = 0; diff --git a/fairmq/nanomsg/FairMQSocketNN.cxx b/fairmq/nanomsg/FairMQSocketNN.cxx index 357b9946..dbbf28a7 100644 --- a/fairmq/nanomsg/FairMQSocketNN.cxx +++ b/fairmq/nanomsg/FairMQSocketNN.cxx @@ -119,18 +119,13 @@ void FairMQSocketNN::Connect(const string& address) } } -int FairMQSocketNN::Send(FairMQMessagePtr& msg, const int timeout) { return SendImpl(msg, 0, timeout); } -int FairMQSocketNN::Receive(FairMQMessagePtr& msg, const int timeout) { return ReceiveImpl(msg, 0, timeout); } -int64_t FairMQSocketNN::Send(vector>& msgVec, const int timeout) { return SendImpl(msgVec, 0, timeout); } -int64_t FairMQSocketNN::Receive(vector>& msgVec, const int timeout) { return ReceiveImpl(msgVec, 0, timeout); } - -int FairMQSocketNN::TrySend(FairMQMessagePtr& msg) { return SendImpl(msg, NN_DONTWAIT, 0); } -int FairMQSocketNN::TryReceive(FairMQMessagePtr& msg) { return ReceiveImpl(msg, NN_DONTWAIT, 0); } -int64_t FairMQSocketNN::TrySend(vector>& msgVec) { return SendImpl(msgVec, NN_DONTWAIT, 0); } -int64_t FairMQSocketNN::TryReceive(vector>& msgVec) { return ReceiveImpl(msgVec, NN_DONTWAIT, 0); } - -int FairMQSocketNN::SendImpl(FairMQMessagePtr& msg, const int flags, const int timeout) +int FairMQSocketNN::Send(FairMQMessagePtr& msg, const int timeout) { + int flags = 0; + if (timeout == 0) + { + flags = NN_DONTWAIT; + } int nbytes = -1; int elapsed = 0; @@ -162,7 +157,7 @@ int FairMQSocketNN::SendImpl(FairMQMessagePtr& msg, const int flags, const int t { if (!fInterrupted && ((flags & NN_DONTWAIT) == 0)) { - if (timeout) + if (timeout > 0) { elapsed += fSndTimeout; if (elapsed >= timeout) @@ -194,8 +189,13 @@ int FairMQSocketNN::SendImpl(FairMQMessagePtr& msg, const int flags, const int t } } -int FairMQSocketNN::ReceiveImpl(FairMQMessagePtr& msg, const int flags, const int timeout) +int FairMQSocketNN::Receive(FairMQMessagePtr& msg, const int timeout) { + int flags = 0; + if (timeout == 0) + { + flags = NN_DONTWAIT; + } int elapsed = 0; FairMQMessageNN* msgPtr = static_cast(msg.get()); @@ -216,7 +216,7 @@ int FairMQSocketNN::ReceiveImpl(FairMQMessagePtr& msg, const int flags, const in { if (!fInterrupted && ((flags & NN_DONTWAIT) == 0)) { - if (timeout) + if (timeout > 0) { elapsed += fRcvTimeout; if (elapsed >= timeout) @@ -248,8 +248,13 @@ int FairMQSocketNN::ReceiveImpl(FairMQMessagePtr& msg, const int flags, const in } } -int64_t FairMQSocketNN::SendImpl(vector& msgVec, const int flags, const int timeout) +int64_t FairMQSocketNN::Send(vector& msgVec, const int timeout) { + int flags = 0; + if (timeout == 0) + { + flags = NN_DONTWAIT; + } const unsigned int vecSize = msgVec.size(); int elapsed = 0; @@ -286,7 +291,7 @@ int64_t FairMQSocketNN::SendImpl(vector& msgVec, const int fla { if (!fInterrupted && ((flags & NN_DONTWAIT) == 0)) { - if (timeout) + if (timeout > 0) { elapsed += fSndTimeout; if (elapsed >= timeout) @@ -318,8 +323,13 @@ int64_t FairMQSocketNN::SendImpl(vector& msgVec, const int fla } } -int64_t FairMQSocketNN::ReceiveImpl(vector& msgVec, const int flags, const int timeout) +int64_t FairMQSocketNN::Receive(vector& msgVec, const int timeout) { + int flags = 0; + if (timeout == 0) + { + flags = NN_DONTWAIT; + } // Warn if the vector is filled before Receive() and empty it. // if (msgVec.size() > 0) // { @@ -369,7 +379,7 @@ int64_t FairMQSocketNN::ReceiveImpl(vector& msgVec, const int { if (!fInterrupted && ((flags & NN_DONTWAIT) == 0)) { - if (timeout) + if (timeout > 0) { elapsed += fRcvTimeout; if (elapsed >= timeout) diff --git a/fairmq/nanomsg/FairMQSocketNN.h b/fairmq/nanomsg/FairMQSocketNN.h index c42fbfc8..e24df213 100644 --- a/fairmq/nanomsg/FairMQSocketNN.h +++ b/fairmq/nanomsg/FairMQSocketNN.h @@ -27,15 +27,10 @@ class FairMQSocketNN final : public FairMQSocket bool Bind(const std::string& address) override; void Connect(const std::string& address) override; - int Send(FairMQMessagePtr& msg, const int timeout = 0) override; - int Receive(FairMQMessagePtr& msg, const int timeout = 0) override; - int64_t Send(std::vector>& msgVec, const int timeout = 0) override; - int64_t Receive(std::vector>& msgVec, const int timeout = 0) override; - - int TrySend(FairMQMessagePtr& msg) override; - int TryReceive(FairMQMessagePtr& msg) override; - int64_t TrySend(std::vector>& msgVec) override; - int64_t TryReceive(std::vector>& msgVec) override; + int Send(FairMQMessagePtr& msg, const int timeout = -1) override; + int Receive(FairMQMessagePtr& msg, const int timeout = -1) override; + int64_t Send(std::vector>& msgVec, const int timeout = -1) override; + int64_t Receive(std::vector>& msgVec, const int timeout = -1) override; int GetSocket() const; @@ -80,11 +75,6 @@ class FairMQSocketNN final : public FairMQSocket int fSndTimeout; int fRcvTimeout; int fLinger; - - int SendImpl(FairMQMessagePtr& msg, const int flags, const int timeout); - int ReceiveImpl(FairMQMessagePtr& msg, const int flags, const int timeout); - int64_t SendImpl(std::vector>& msgVec, const int flags, const int timeout); - int64_t ReceiveImpl(std::vector>& msgVec, const int flags, const int timeout); }; #endif /* FAIRMQSOCKETNN_H_ */ diff --git a/fairmq/shmem/FairMQSocketSHM.cxx b/fairmq/shmem/FairMQSocketSHM.cxx index 63133b16..97c26303 100644 --- a/fairmq/shmem/FairMQSocketSHM.cxx +++ b/fairmq/shmem/FairMQSocketSHM.cxx @@ -111,18 +111,13 @@ void FairMQSocketSHM::Connect(const string& address) } } -int FairMQSocketSHM::Send(FairMQMessagePtr& msg, const int timeout) { return SendImpl(msg, 0, timeout); } -int FairMQSocketSHM::Receive(FairMQMessagePtr& msg, const int timeout) { return ReceiveImpl(msg, 0, timeout); } -int64_t FairMQSocketSHM::Send(vector>& msgVec, const int timeout) { return SendImpl(msgVec, 0, timeout); } -int64_t FairMQSocketSHM::Receive(vector>& msgVec, const int timeout) { return ReceiveImpl(msgVec, 0, timeout); } - -int FairMQSocketSHM::TrySend(FairMQMessagePtr& msg) { return SendImpl(msg, ZMQ_DONTWAIT, 0); } -int FairMQSocketSHM::TryReceive(FairMQMessagePtr& msg) { return ReceiveImpl(msg, ZMQ_DONTWAIT, 0); } -int64_t FairMQSocketSHM::TrySend(vector>& msgVec) { return SendImpl(msgVec, ZMQ_DONTWAIT, 0); } -int64_t FairMQSocketSHM::TryReceive(vector>& msgVec) { return ReceiveImpl(msgVec, ZMQ_DONTWAIT, 0); } - -int FairMQSocketSHM::SendImpl(FairMQMessagePtr& msg, const int flags, const int timeout) +int FairMQSocketSHM::Send(FairMQMessagePtr& msg, const int timeout) { + int flags = 0; + if (timeout == 0) + { + flags = ZMQ_DONTWAIT; + } int elapsed = 0; while (true && !fInterrupted) @@ -146,7 +141,7 @@ int FairMQSocketSHM::SendImpl(FairMQMessagePtr& msg, const int flags, const int { if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) { - if (timeout) + if (timeout > 0) { elapsed += fSndTimeout; if (elapsed >= timeout) @@ -176,8 +171,13 @@ int FairMQSocketSHM::SendImpl(FairMQMessagePtr& msg, const int flags, const int return -1; } -int FairMQSocketSHM::ReceiveImpl(FairMQMessagePtr& msg, const int flags, const int timeout) +int FairMQSocketSHM::Receive(FairMQMessagePtr& msg, const int timeout) { + int flags = 0; + if (timeout == 0) + { + flags = ZMQ_DONTWAIT; + } int elapsed = 0; zmq_msg_t* msgPtr = static_cast(msg.get())->GetMessage(); @@ -218,7 +218,7 @@ int FairMQSocketSHM::ReceiveImpl(FairMQMessagePtr& msg, const int flags, const i { if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) { - if (timeout) + if (timeout > 0) { elapsed += fRcvTimeout; if (elapsed >= timeout) @@ -246,13 +246,18 @@ int FairMQSocketSHM::ReceiveImpl(FairMQMessagePtr& msg, const int flags, const i } } -int64_t FairMQSocketSHM::SendImpl(vector& msgVec, const int flags, const int timeout) +int64_t FairMQSocketSHM::Send(vector& msgVec, const int timeout) { + int flags = 0; + if (timeout == 0) + { + flags = ZMQ_DONTWAIT; + } const unsigned int vecSize = msgVec.size(); int elapsed = 0; if (vecSize == 1) { - return SendImpl(msgVec.back(), flags, timeout); + return Send(msgVec.back(), timeout); } // put it into zmq message @@ -299,7 +304,7 @@ int64_t FairMQSocketSHM::SendImpl(vector& msgVec, const int fl { if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) { - if (timeout) + if (timeout > 0) { elapsed += fSndTimeout; if (elapsed >= timeout) @@ -334,8 +339,13 @@ int64_t FairMQSocketSHM::SendImpl(vector& msgVec, const int fl return -1; } -int64_t FairMQSocketSHM::ReceiveImpl(vector& msgVec, const int flags, const int timeout) +int64_t FairMQSocketSHM::Receive(vector& msgVec, const int timeout) { + int flags = 0; + if (timeout == 0) + { + flags = ZMQ_DONTWAIT; + } int elapsed = 0; zmq_msg_t zmqMsg; @@ -392,7 +402,7 @@ int64_t FairMQSocketSHM::ReceiveImpl(vector& msgVec, const int { if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) { - if (timeout) + if (timeout > 0) { elapsed += fRcvTimeout; if (elapsed >= timeout) diff --git a/fairmq/shmem/FairMQSocketSHM.h b/fairmq/shmem/FairMQSocketSHM.h index 17efe7a3..0f581028 100644 --- a/fairmq/shmem/FairMQSocketSHM.h +++ b/fairmq/shmem/FairMQSocketSHM.h @@ -28,15 +28,10 @@ class FairMQSocketSHM final : public FairMQSocket bool Bind(const std::string& address) override; void Connect(const std::string& address) override; - int Send(FairMQMessagePtr& msg, const int timeout = 0) override; - int Receive(FairMQMessagePtr& msg, const int timeout = 0) override; - int64_t Send(std::vector>& msgVec, const int timeout = 0) override; - int64_t Receive(std::vector>& msgVec, const int timeout = 0) override; - - int TrySend(FairMQMessagePtr& msg) override; - int TryReceive(FairMQMessagePtr& msg) override; - int64_t TrySend(std::vector>& msgVec) override; - int64_t TryReceive(std::vector>& msgVec) override; + int Send(FairMQMessagePtr& msg, const int timeout = -1) override; + int Receive(FairMQMessagePtr& msg, const int timeout = -1) override; + int64_t Send(std::vector>& msgVec, const int timeout = -1) override; + int64_t Receive(std::vector>& msgVec, const int timeout = -1) override; void* GetSocket() const; @@ -81,12 +76,6 @@ class FairMQSocketSHM final : public FairMQSocket int fSndTimeout; int fRcvTimeout; - - int SendImpl(FairMQMessagePtr& msg, const int flags, const int timeout); - int ReceiveImpl(FairMQMessagePtr& msg, const int flags, const int timeout); - - int64_t SendImpl(std::vector>& msgVec, const int flags, const int timeout); - int64_t ReceiveImpl(std::vector>& msgVec, const int flags, const int timeout); }; #endif /* FAIRMQSOCKETSHM_H_ */ diff --git a/fairmq/zeromq/FairMQSocketZMQ.cxx b/fairmq/zeromq/FairMQSocketZMQ.cxx index b430aa08..de54994a 100644 --- a/fairmq/zeromq/FairMQSocketZMQ.cxx +++ b/fairmq/zeromq/FairMQSocketZMQ.cxx @@ -106,18 +106,13 @@ void FairMQSocketZMQ::Connect(const string& address) } } -int FairMQSocketZMQ::Send(FairMQMessagePtr& msg, const int timeout) { return SendImpl(msg, 0, timeout); } -int FairMQSocketZMQ::Receive(FairMQMessagePtr& msg, const int timeout) { return ReceiveImpl(msg, 0, timeout); } -int64_t FairMQSocketZMQ::Send(vector>& msgVec, const int timeout) { return SendImpl(msgVec, 0, timeout); } -int64_t FairMQSocketZMQ::Receive(vector>& msgVec, const int timeout) { return ReceiveImpl(msgVec, 0, timeout); } - -int FairMQSocketZMQ::TrySend(FairMQMessagePtr& msg) { return SendImpl(msg, ZMQ_DONTWAIT, 0); } -int FairMQSocketZMQ::TryReceive(FairMQMessagePtr& msg) { return ReceiveImpl(msg, ZMQ_DONTWAIT, 0); } -int64_t FairMQSocketZMQ::TrySend(vector>& msgVec) { return SendImpl(msgVec, ZMQ_DONTWAIT, 0); } -int64_t FairMQSocketZMQ::TryReceive(vector>& msgVec) { return ReceiveImpl(msgVec, ZMQ_DONTWAIT, 0); } - -int FairMQSocketZMQ::SendImpl(FairMQMessagePtr& msg, const int flags, const int timeout) +int FairMQSocketZMQ::Send(FairMQMessagePtr& msg, const int timeout) { + int flags = 0; + if (timeout == 0) + { + flags = ZMQ_DONTWAIT; + } int elapsed = 0; static_cast(msg.get())->ApplyUsedSize(); @@ -136,7 +131,7 @@ int FairMQSocketZMQ::SendImpl(FairMQMessagePtr& msg, const int flags, const int { if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) { - if (timeout) + if (timeout > 0) { elapsed += fSndTimeout; if (elapsed >= timeout) @@ -164,8 +159,13 @@ int FairMQSocketZMQ::SendImpl(FairMQMessagePtr& msg, const int flags, const int } } -int FairMQSocketZMQ::ReceiveImpl(FairMQMessagePtr& msg, const int flags, const int timeout) +int FairMQSocketZMQ::Receive(FairMQMessagePtr& msg, const int timeout) { + int flags = 0; + if (timeout == 0) + { + flags = ZMQ_DONTWAIT; + } int elapsed = 0; while (true) @@ -181,7 +181,7 @@ int FairMQSocketZMQ::ReceiveImpl(FairMQMessagePtr& msg, const int flags, const i { if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) { - if (timeout) + if (timeout > 0) { elapsed += fRcvTimeout; if (elapsed >= timeout) @@ -209,14 +209,21 @@ int FairMQSocketZMQ::ReceiveImpl(FairMQMessagePtr& msg, const int flags, const i } } -int64_t FairMQSocketZMQ::SendImpl(vector& msgVec, const int flags, const int timeout) +int64_t FairMQSocketZMQ::Send(vector& msgVec, const int timeout) { + int flags = 0; + if (timeout == 0) + { + flags = ZMQ_DONTWAIT; + } + const unsigned int vecSize = msgVec.size(); // Sending vector typicaly handles more then one part if (vecSize > 1) { int elapsed = 0; + while (true) { int64_t totalSize = 0; @@ -240,7 +247,7 @@ int64_t FairMQSocketZMQ::SendImpl(vector& msgVec, const int fl { if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) { - if (timeout) + if (timeout > 0) { elapsed += fSndTimeout; if (elapsed >= timeout) @@ -279,7 +286,7 @@ int64_t FairMQSocketZMQ::SendImpl(vector& msgVec, const int fl } // If there's only one part, send it as a regular message else if (vecSize == 1) { - return SendImpl(msgVec.back(), flags, timeout); + return Send(msgVec.back(), timeout); } else // if the vector is empty, something might be wrong { @@ -288,8 +295,13 @@ int64_t FairMQSocketZMQ::SendImpl(vector& msgVec, const int fl } } -int64_t FairMQSocketZMQ::ReceiveImpl(vector& msgVec, const int flags, const int timeout) +int64_t FairMQSocketZMQ::Receive(vector& msgVec, const int timeout) { + int flags = 0; + if (timeout == 0) + { + flags = ZMQ_DONTWAIT; + } int elapsed = 0; while (true) @@ -312,7 +324,7 @@ int64_t FairMQSocketZMQ::ReceiveImpl(vector& msgVec, const int { if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) { - if (timeout) + if (timeout > 0) { elapsed += fRcvTimeout; if (elapsed >= timeout) diff --git a/fairmq/zeromq/FairMQSocketZMQ.h b/fairmq/zeromq/FairMQSocketZMQ.h index cfc43da4..6d6a2607 100644 --- a/fairmq/zeromq/FairMQSocketZMQ.h +++ b/fairmq/zeromq/FairMQSocketZMQ.h @@ -28,15 +28,10 @@ class FairMQSocketZMQ final : public FairMQSocket bool Bind(const std::string& address) override; void Connect(const std::string& address) override; - int Send(FairMQMessagePtr& msg, const int timeout = 0) override; - int Receive(FairMQMessagePtr& msg, const int timeout = 0) override; - int64_t Send(std::vector>& msgVec, const int timeout = 0) override; - int64_t Receive(std::vector>& msgVec, const int timeout = 0) override; - - int TrySend(FairMQMessagePtr& msg) override; - int TryReceive(FairMQMessagePtr& msg) override; - int64_t TrySend(std::vector>& msgVec) override; - int64_t TryReceive(std::vector>& msgVec) override; + int Send(FairMQMessagePtr& msg, const int timeout = -1) override; + int Receive(FairMQMessagePtr& msg, const int timeout = -1) override; + int64_t Send(std::vector>& msgVec, const int timeout = -1) override; + int64_t Receive(std::vector>& msgVec, const int timeout = -1) override; void* GetSocket() const; @@ -80,12 +75,6 @@ class FairMQSocketZMQ final : public FairMQSocket int fSndTimeout; int fRcvTimeout; - - int SendImpl(FairMQMessagePtr& msg, const int flags, const int timeout); - int ReceiveImpl(FairMQMessagePtr& msg, const int flags, const int timeout); - - int64_t SendImpl(std::vector>& msgVec, const int flags, const int timeout); - int64_t ReceiveImpl(std::vector>& msgVec, const int flags, const int timeout); }; #endif /* FAIRMQSOCKETZMQ_H_ */