diff --git a/fairmq/FairMQChannel.cxx b/fairmq/FairMQChannel.cxx index b604bf7b..c39a91e6 100644 --- a/fairmq/FairMQChannel.cxx +++ b/fairmq/FairMQChannel.cxx @@ -23,8 +23,6 @@ using namespace std; mutex FairMQChannel::fChannelMutex; -atomic FairMQChannel::fInterrupted(false); - FairMQChannel::FairMQChannel() : fSocket(nullptr) , fType("unspecified") @@ -38,8 +36,6 @@ FairMQChannel::FairMQChannel() , fRateLogging(1) , fName("") , fIsValid(false) - , fPoller(nullptr) - , fChannelCmdSocket(nullptr) , fTransportType(FairMQ::Transport::DEFAULT) , fTransportFactory(nullptr) , fMultipart(false) @@ -61,8 +57,6 @@ FairMQChannel::FairMQChannel(const string& type, const string& method, const str , fRateLogging(1) , fName("") , fIsValid(false) - , fPoller(nullptr) - , fChannelCmdSocket(nullptr) , fTransportType(FairMQ::Transport::DEFAULT) , fTransportFactory(nullptr) , fMultipart(false) @@ -84,8 +78,6 @@ FairMQChannel::FairMQChannel(const string& name, const string& type, std::shared , fRateLogging(1) , fName(name) , fIsValid(false) - , fPoller(nullptr) - , fChannelCmdSocket(nullptr) , fTransportType(factory->GetType()) , fTransportFactory(factory) , fMultipart(false) @@ -107,8 +99,6 @@ FairMQChannel::FairMQChannel(const FairMQChannel& chan) , fRateLogging(chan.fRateLogging) , fName(chan.fName) , fIsValid(false) - , fPoller(nullptr) - , fChannelCmdSocket(nullptr) , fTransportType(FairMQ::Transport::DEFAULT) , fTransportFactory(nullptr) , fMultipart(chan.fMultipart) @@ -130,8 +120,6 @@ FairMQChannel& FairMQChannel::operator=(const FairMQChannel& chan) fSocket = nullptr; fName = chan.fName; fIsValid = false; - fPoller = nullptr; - fChannelCmdSocket = nullptr; fTransportType = FairMQ::Transport::DEFAULT; fTransportFactory = nullptr; @@ -660,23 +648,6 @@ void FairMQChannel::InitTransport(shared_ptr factory) fTransportType = factory->GetType(); } -bool FairMQChannel::InitCommandInterface(FairMQSocketPtr channelCmdSocket) -{ - fChannelCmdSocket = std::move(channelCmdSocket); - if (fChannelCmdSocket) - { - fChannelCmdSocket->Connect("inproc://commands"); - - fPoller = fTransportFactory->CreatePoller(*fChannelCmdSocket, *fSocket); - - return true; - } - else - { - return false; - } -} - void FairMQChannel::ResetChannel() { unique_lock lock(fChannelMutex); @@ -698,56 +669,24 @@ int FairMQChannel::Receive(unique_ptr& msg) const int FairMQChannel::Send(unique_ptr& msg, int sndTimeoutInMs) const { - fPoller->Poll(sndTimeoutInMs); - - if (fPoller->CheckInput(0)) - { - HandleUnblock(); - if (fInterrupted) - { - return -2; - } - } - - if (fPoller->CheckOutput(1)) - { - return Send(msg); - } - - return -2; + return fSocket->Send(msg, sndTimeoutInMs); } int FairMQChannel::Receive(unique_ptr& msg, int rcvTimeoutInMs) const { - fPoller->Poll(rcvTimeoutInMs); - - if (fPoller->CheckInput(0)) - { - HandleUnblock(); - if (fInterrupted) - { - return -2; - } - } - - if (fPoller->CheckInput(1)) - { - return Receive(msg); - } - - return -2; + return fSocket->Receive(msg, rcvTimeoutInMs); } int FairMQChannel::SendAsync(unique_ptr& msg) const { CheckCompatibility(msg); - return fSocket->SendAsync(msg); + return fSocket->TrySend(msg); } int FairMQChannel::ReceiveAsync(unique_ptr& msg) const { CheckCompatibility(msg); - return fSocket->ReceiveAsync(msg); + return fSocket->TryReceive(msg); } int64_t FairMQChannel::Send(vector>& msgVec) const @@ -764,50 +703,18 @@ int64_t FairMQChannel::Receive(vector>& msgVec) const int64_t FairMQChannel::Send(vector>& msgVec, int sndTimeoutInMs) const { - fPoller->Poll(sndTimeoutInMs); - - if (fPoller->CheckInput(0)) - { - HandleUnblock(); - if (fInterrupted) - { - return -2; - } - } - - if (fPoller->CheckOutput(1)) - { - return Send(msgVec); - } - - return -2; + return fSocket->Send(msgVec, sndTimeoutInMs); } int64_t FairMQChannel::Receive(vector>& msgVec, int rcvTimeoutInMs) const { - fPoller->Poll(rcvTimeoutInMs); - - if (fPoller->CheckInput(0)) - { - HandleUnblock(); - if (fInterrupted) - { - return -2; - } - } - - if (fPoller->CheckInput(1)) - { - return Receive(msgVec); - } - - return -2; + return fSocket->Receive(msgVec, rcvTimeoutInMs); } int64_t FairMQChannel::SendAsync(vector>& msgVec) const { CheckCompatibility(msgVec); - return fSocket->SendAsync(msgVec); + return fSocket->TrySend(msgVec); } /// Receives a vector of messages in non-blocking mode. @@ -818,17 +725,7 @@ int64_t FairMQChannel::SendAsync(vector>& msgVec) cons int64_t FairMQChannel::ReceiveAsync(vector>& msgVec) const { CheckCompatibility(msgVec); - return fSocket->ReceiveAsync(msgVec); -} - -inline bool FairMQChannel::HandleUnblock() const -{ - FairMQMessagePtr cmd(fTransportFactory->CreateMessage()); - if (fChannelCmdSocket->Receive(cmd) >= 0) - { - // LOG(debug) << "unblocked"; - } - return true; + return fSocket->TryReceive(msgVec); } FairMQChannel::~FairMQChannel() diff --git a/fairmq/FairMQChannel.h b/fairmq/FairMQChannel.h index a522b0fb..fb517d93 100644 --- a/fairmq/FairMQChannel.h +++ b/fairmq/FairMQChannel.h @@ -17,7 +17,6 @@ #include #include -#include #include #include #include @@ -312,9 +311,6 @@ class FairMQChannel std::string fName; std::atomic fIsValid; - FairMQPollerPtr fPoller; - FairMQSocketPtr fChannelCmdSocket; - FairMQ::Transport fTransportType; std::shared_ptr fTransportFactory; @@ -322,9 +318,6 @@ class FairMQChannel bool CheckCompatibility(std::vector>& msgVec) const; void InitTransport(std::shared_ptr factory); - bool InitCommandInterface(FairMQSocketPtr channelCmdSocket); - - bool HandleUnblock() const; // use static mutex to make the class easily copyable // implication: same mutex is used for all instances of the class @@ -332,7 +325,6 @@ class FairMQChannel // possible TODO: improve this static std::mutex fChannelMutex; - static std::atomic fInterrupted; bool fMultipart; bool fModified; auto SetModified(const bool modified) -> void; diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index f006f172..f0201ee8 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -48,7 +48,6 @@ FairMQDevice::FairMQDevice() , fDefaultTransport() , fInitializationTimeoutInS(120) , fDataCallbacks(false) - , fDeviceCmdSocket(nullptr) , fMsgInputs() , fMultipartInputs() , fMultitransportInputs() @@ -79,7 +78,6 @@ FairMQDevice::FairMQDevice(const fair::mq::tools::Version version) , fDefaultTransport() , fInitializationTimeoutInS(120) , fDataCallbacks(false) - , fDeviceCmdSocket(nullptr) , fMsgInputs() , fMultipartInputs() , fMultitransportInputs() @@ -116,8 +114,6 @@ void FairMQDevice::InitWrapper() // if (vi->fReset) // { // vi->fSocket.reset(); - // vi->fPoller.reset(); - // vi->fChannelCmdSocket.reset(); // } // set channel name: name + vector index vi->fName = fair::mq::tools::ToString(mi.first, "[", vi - (mi.second).begin(), "]"); @@ -233,7 +229,6 @@ void FairMQDevice::AttachChannels(vector& chans) { if (AttachChannel(**itr)) { - (*itr)->InitCommandInterface(Transport()->CreateSocket("sub", "device-commands")); (*itr)->SetModified(false); itr = chans.erase(itr); } @@ -459,9 +454,11 @@ void FairMQDevice::RunWrapper() // start the rate logger thread thread rateLogger(&FairMQDevice::LogSocketRates, this); - // notify channels to resume transfers - FairMQChannel::fInterrupted = false; - fDeviceCmdSocket->Resume(); + // notify transports to resume transfers + for (auto& t : fTransports) + { + t.second->Resume(); + } try { @@ -762,12 +759,6 @@ shared_ptr FairMQDevice::AddTransport(const string& tran pair> trPair(FairMQ::TransportTypes.at(transport), tr); fTransports.insert(trPair); - if (!fDeviceCmdSocket) { - fDeviceCmdSocket = Transport()->CreateSocket("pub", "device-commands"); - if(!fDeviceCmdSocket->Bind("inproc://commands")) { - exit(EXIT_FAILURE); - } - } return tr; } else @@ -949,10 +940,10 @@ void FairMQDevice::LogSocketRates() void FairMQDevice::Unblock() { - FairMQChannel::fInterrupted = true; - fDeviceCmdSocket->Interrupt(); - FairMQMessagePtr cmd(Transport()->CreateMessage()); - fDeviceCmdSocket->Send(cmd); + for (auto& t : fTransports) + { + t.second->Interrupt(); + } } void FairMQDevice::ResetTaskWrapper() @@ -987,8 +978,6 @@ void FairMQDevice::Reset() { // vi.fReset = true; vi.fSocket.reset(); - vi.fPoller.reset(); - vi.fChannelCmdSocket.reset(); } } } diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index 856efe75..825e10ea 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -517,7 +517,6 @@ class FairMQDevice : public FairMQStateMachine void CreateOwnConfig(); bool fDataCallbacks; - FairMQSocketPtr fDeviceCmdSocket; ///< Socket used for the internal unblocking mechanism std::unordered_map fMsgInputs; std::unordered_map fMultipartInputs; std::unordered_map> fMultitransportInputs; diff --git a/fairmq/FairMQSocket.h b/fairmq/FairMQSocket.h index 8ae16201..e198a8fa 100644 --- a/fairmq/FairMQSocket.h +++ b/fairmq/FairMQSocket.h @@ -25,24 +25,21 @@ class FairMQSocket virtual bool Bind(const std::string& address) = 0; virtual void Connect(const std::string& address) = 0; - virtual int Send(FairMQMessagePtr& msg) = 0; - virtual int SendAsync(FairMQMessagePtr& msg) = 0; - virtual int Receive(FairMQMessagePtr& msg) = 0; - virtual int ReceiveAsync(FairMQMessagePtr& msg) = 0; + virtual int Send(FairMQMessagePtr& msg, int timeout = 0) = 0; + virtual int Receive(FairMQMessagePtr& msg, int timeout = 0) = 0; + virtual int64_t Send(std::vector>& msgVec, int timeout = 0) = 0; + virtual int64_t Receive(std::vector>& msgVec, int timeout = 0) = 0; - virtual int64_t Send(std::vector>& msgVec) = 0; - virtual int64_t SendAsync(std::vector>& msgVec) = 0; - virtual int64_t Receive(std::vector>& msgVec) = 0; - virtual int64_t ReceiveAsync(std::vector>& msgVec) = 0; + virtual int TrySend(FairMQMessagePtr& msg) = 0; + virtual int TryReceive(FairMQMessagePtr& msg) = 0; + virtual int64_t TrySend(std::vector>& msgVec) = 0; + virtual int64_t TryReceive(std::vector>& msgVec) = 0; virtual void* GetSocket() const = 0; virtual int GetSocket(int nothing) const = 0; virtual void Close() = 0; - virtual void Interrupt() = 0; - virtual void Resume() = 0; - virtual void SetOption(const std::string& option, const void* value, size_t valueSize) = 0; virtual void GetOption(const std::string& option, void* value, size_t* valueSize) = 0; diff --git a/fairmq/FairMQTransportFactory.h b/fairmq/FairMQTransportFactory.h index 84e4486e..54677d8d 100644 --- a/fairmq/FairMQTransportFactory.h +++ b/fairmq/FairMQTransportFactory.h @@ -71,6 +71,9 @@ class FairMQTransportFactory /// Get transport type virtual FairMQ::Transport GetType() const = 0; + virtual void Interrupt() = 0; + virtual void Resume() = 0; + virtual ~FairMQTransportFactory() noexcept(false) {}; static auto CreateTransportFactory(const std::string& type, const std::string& id = "", const FairMQProgOptions* config = nullptr) -> std::shared_ptr; diff --git a/fairmq/nanomsg/FairMQSocketNN.cxx b/fairmq/nanomsg/FairMQSocketNN.cxx index 829e3c53..c50c02d0 100644 --- a/fairmq/nanomsg/FairMQSocketNN.cxx +++ b/fairmq/nanomsg/FairMQSocketNN.cxx @@ -39,6 +39,8 @@ FairMQSocketNN::FairMQSocketNN(const string& type, const string& name, const str , fBytesRx(0) , fMessagesTx(0) , fMessagesRx(0) + , fSndTimeout(100) + , fRcvTimeout(100) { if (type == "router" || type == "dealer") { @@ -66,14 +68,12 @@ FairMQSocketNN::FairMQSocketNN(const string& type, const string& name, const str } } - int sndTimeout = 700; - if (nn_setsockopt(fSocket, NN_SOL_SOCKET, NN_SNDTIMEO, &sndTimeout, sizeof(sndTimeout)) != 0) + if (nn_setsockopt(fSocket, NN_SOL_SOCKET, NN_SNDTIMEO, &fSndTimeout, sizeof(fSndTimeout)) != 0) { LOG(error) << "Failed setting NN_SNDTIMEO socket option, reason: " << nn_strerror(errno); } - int rcvTimeout = 700; - if (nn_setsockopt(fSocket, NN_SOL_SOCKET, NN_RCVTIMEO, &rcvTimeout, sizeof(rcvTimeout)) != 0) + if (nn_setsockopt(fSocket, NN_SOL_SOCKET, NN_RCVTIMEO, &fRcvTimeout, sizeof(fRcvTimeout)) != 0) { LOG(error) << "Failed setting NN_RCVTIMEO socket option, reason: " << nn_strerror(errno); } @@ -118,19 +118,20 @@ void FairMQSocketNN::Connect(const string& address) } } -int FairMQSocketNN::Send(FairMQMessagePtr& msg) { return Send(msg, 0); } -int FairMQSocketNN::SendAsync(FairMQMessagePtr& msg) { return Send(msg, NN_DONTWAIT); } -int FairMQSocketNN::Receive(FairMQMessagePtr& msg) { return Receive(msg, 0); } -int FairMQSocketNN::ReceiveAsync(FairMQMessagePtr& msg) { return Receive(msg, NN_DONTWAIT); } +int FairMQSocketNN::Send(FairMQMessagePtr& msg, const int timeout) { return SendImpl(msg, 0, timeout); } +int FairMQSocketNN::Receive(FairMQMessagePtr& msg, const int timeout) { return ReceiveImpl(msg, 0, timeout); } +int64_t FairMQSocketNN::Send(vector>& msgVec, const int timeout) { return SendImpl(msgVec, 0, timeout); } +int64_t FairMQSocketNN::Receive(vector>& msgVec, const int timeout) { return ReceiveImpl(msgVec, 0, timeout); } -int64_t FairMQSocketNN::Send(std::vector>& msgVec) { return Send(msgVec, 0); } -int64_t FairMQSocketNN::SendAsync(std::vector>& msgVec) { return Send(msgVec, NN_DONTWAIT); } -int64_t FairMQSocketNN::Receive(std::vector>& msgVec) { return Receive(msgVec, 0); } -int64_t FairMQSocketNN::ReceiveAsync(std::vector>& msgVec) { return Receive(msgVec, NN_DONTWAIT); } +int FairMQSocketNN::TrySend(FairMQMessagePtr& msg) { return SendImpl(msg, NN_DONTWAIT, 0); } +int FairMQSocketNN::TryReceive(FairMQMessagePtr& msg) { return ReceiveImpl(msg, NN_DONTWAIT, 0); } +int64_t FairMQSocketNN::TrySend(vector>& msgVec) { return SendImpl(msgVec, NN_DONTWAIT, 0); } +int64_t FairMQSocketNN::TryReceive(vector>& msgVec) { return ReceiveImpl(msgVec, NN_DONTWAIT, 0); } -int FairMQSocketNN::Send(FairMQMessagePtr& msg, const int flags) +int FairMQSocketNN::SendImpl(FairMQMessagePtr& msg, const int flags, const int timeout) { int nbytes = -1; + int elapsed = 0; FairMQMessageNN* msgPtr = static_cast(msg.get()); void* bufPtr = msgPtr->GetMessage(); @@ -164,6 +165,14 @@ int FairMQSocketNN::Send(FairMQMessagePtr& msg, const int flags) { if (!fInterrupted && ((flags & NN_DONTWAIT) == 0)) { + if (timeout) + { + elapsed += fSndTimeout; + if (elapsed >= timeout) + { + return -2; + } + } continue; } else @@ -188,9 +197,10 @@ int FairMQSocketNN::Send(FairMQMessagePtr& msg, const int flags) } } -int FairMQSocketNN::Receive(FairMQMessagePtr& msg, const int flags) +int FairMQSocketNN::ReceiveImpl(FairMQMessagePtr& msg, const int flags, const int timeout) { int nbytes = -1; + int elapsed = 0; FairMQMessageNN* msgPtr = static_cast(msg.get()); @@ -214,6 +224,14 @@ int FairMQSocketNN::Receive(FairMQMessagePtr& msg, const int flags) { if (!fInterrupted && ((flags & NN_DONTWAIT) == 0)) { + if (timeout) + { + elapsed += fRcvTimeout; + if (elapsed >= timeout) + { + return -2; + } + } continue; } else @@ -238,10 +256,11 @@ int FairMQSocketNN::Receive(FairMQMessagePtr& msg, const int flags) } } -int64_t FairMQSocketNN::Send(vector& msgVec, const int flags) +int64_t FairMQSocketNN::SendImpl(vector& msgVec, const int flags, const int timeout) { const unsigned int vecSize = msgVec.size(); #ifdef MSGPACK_FOUND + int elapsed = 0; // create msgpack simple buffer msgpack::sbuffer sbuf; @@ -282,6 +301,14 @@ int64_t FairMQSocketNN::Send(vector& msgVec, const int flags) { if (!fInterrupted && ((flags & NN_DONTWAIT) == 0)) { + if (timeout) + { + elapsed += fSndTimeout; + if (elapsed >= timeout) + { + return -2; + } + } continue; } else @@ -310,7 +337,7 @@ int64_t FairMQSocketNN::Send(vector& msgVec, const int flags) #endif /*MSGPACK_FOUND*/ } -int64_t FairMQSocketNN::Receive(vector& msgVec, const int flags) +int64_t FairMQSocketNN::ReceiveImpl(vector& msgVec, const int flags, const int timeout) { #ifdef MSGPACK_FOUND // Warn if the vector is filled before Receive() and empty it. @@ -320,6 +347,8 @@ int64_t FairMQSocketNN::Receive(vector& msgVec, const int flag // msgVec.clear(); // } + int elapsed = 0; + while (true) { // pointer to point to received message buffer @@ -338,7 +367,7 @@ int64_t FairMQSocketNN::Receive(vector& msgVec, const int flag while (offset != static_cast(nbytes)) // continue until all parts have been read { // vector of chars to hold blob (unlike char*/void* this type can be converted to by msgpack) - std::vector buf; + vector buf; // unpack and convert chunk msgpack::unpacked result; @@ -364,6 +393,14 @@ int64_t FairMQSocketNN::Receive(vector& msgVec, const int flag { if (!fInterrupted && ((flags & NN_DONTWAIT) == 0)) { + if (timeout) + { + elapsed += fRcvTimeout; + if (elapsed >= timeout) + { + return -2; + } + } continue; } else @@ -472,7 +509,8 @@ unsigned long FairMQSocketNN::GetMessagesRx() const bool FairMQSocketNN::SetSendTimeout(const int timeout, const string& /*address*/, const string& /*method*/) { - if (nn_setsockopt(fSocket, NN_SOL_SOCKET, NN_SNDTIMEO, &timeout, sizeof(int)) != 0) + fSndTimeout = timeout; + if (nn_setsockopt(fSocket, NN_SOL_SOCKET, NN_SNDTIMEO, &fSndTimeout, sizeof(fSndTimeout)) != 0) { LOG(error) << "Failed setting option 'send timeout' on socket " << fId << ", reason: " << nn_strerror(errno); return false; @@ -483,20 +521,13 @@ bool FairMQSocketNN::SetSendTimeout(const int timeout, const string& /*address*/ int FairMQSocketNN::GetSendTimeout() const { - int timeout = -1; - size_t size = sizeof(timeout); - - if (nn_getsockopt(fSocket, NN_SOL_SOCKET, NN_SNDTIMEO, &timeout, &size) != 0) - { - LOG(error) << "Failed getting option 'send timeout' on socket " << fId << ", reason: " << nn_strerror(errno); - } - - return timeout; + return fSndTimeout; } bool FairMQSocketNN::SetReceiveTimeout(const int timeout, const string& /*address*/, const string& /*method*/) { - if (nn_setsockopt(fSocket, NN_SOL_SOCKET, NN_RCVTIMEO, &timeout, sizeof(int)) != 0) + fRcvTimeout = timeout; + if (nn_setsockopt(fSocket, NN_SOL_SOCKET, NN_RCVTIMEO, &fRcvTimeout, sizeof(fRcvTimeout)) != 0) { LOG(error) << "Failed setting option 'receive timeout' on socket " << fId << ", reason: " << nn_strerror(errno); return false; @@ -507,15 +538,7 @@ bool FairMQSocketNN::SetReceiveTimeout(const int timeout, const string& /*addres int FairMQSocketNN::GetReceiveTimeout() const { - int timeout = -1; - size_t size = sizeof(timeout); - - if (nn_getsockopt(fSocket, NN_SOL_SOCKET, NN_RCVTIMEO, &timeout, &size) != 0) - { - LOG(error) << "Failed getting option 'receive timeout' on socket " << fId << ", reason: " << nn_strerror(errno); - } - - return timeout; + return fRcvTimeout; } int FairMQSocketNN::GetConstant(const string& constant) diff --git a/fairmq/nanomsg/FairMQSocketNN.h b/fairmq/nanomsg/FairMQSocketNN.h index c256209e..632cf460 100644 --- a/fairmq/nanomsg/FairMQSocketNN.h +++ b/fairmq/nanomsg/FairMQSocketNN.h @@ -22,44 +22,45 @@ class FairMQSocketNN : public FairMQSocket FairMQSocketNN(const FairMQSocketNN&) = delete; FairMQSocketNN operator=(const FairMQSocketNN&) = delete; - virtual std::string GetId(); + std::string GetId() override; - virtual bool Bind(const std::string& address); - virtual void Connect(const std::string& address); + bool Bind(const std::string& address) override; + void Connect(const std::string& address) override; - virtual int Send(FairMQMessagePtr& msg); - virtual int SendAsync(FairMQMessagePtr& msg); - virtual int Receive(FairMQMessagePtr& msg); - virtual int ReceiveAsync(FairMQMessagePtr& msg); + int Send(FairMQMessagePtr& msg, const int timeout = 0) override; + int Receive(FairMQMessagePtr& msg, const int timeout = 0) override; + int64_t Send(std::vector>& msgVec, const int timeout = 0) override; + int64_t Receive(std::vector>& msgVec, const int timeout = 0) override; - virtual int64_t Send(std::vector>& msgVec); - virtual int64_t SendAsync(std::vector>& msgVec); - virtual int64_t Receive(std::vector>& msgVec); - virtual int64_t ReceiveAsync(std::vector>& msgVec); + int TrySend(FairMQMessagePtr& msg) override; + int TryReceive(FairMQMessagePtr& msg) override; + int64_t TrySend(std::vector>& msgVec) override; + int64_t TryReceive(std::vector>& msgVec) override; - virtual void* GetSocket() const; - virtual int GetSocket(int nothing) const; - virtual void Close(); + void* GetSocket() const override; + int GetSocket(int nothing) const override; - virtual void Interrupt(); - virtual void Resume(); + void Close() override; - virtual void SetOption(const std::string& option, const void* value, size_t valueSize); - virtual void GetOption(const std::string& option, void* value, size_t* valueSize); + static void Interrupt(); + static void Resume(); - unsigned long GetBytesTx() const; - unsigned long GetBytesRx() const; - unsigned long GetMessagesTx() const; - unsigned long GetMessagesRx() const; + void SetOption(const std::string& option, const void* value, size_t valueSize) override; + void GetOption(const std::string& option, void* value, size_t* valueSize) override; - virtual bool SetSendTimeout(const int timeout, const std::string& address, const std::string& method); - virtual int GetSendTimeout() const; - virtual bool SetReceiveTimeout(const int timeout, const std::string& address, const std::string& method); - virtual int GetReceiveTimeout() const; + unsigned long GetBytesTx() const override; + unsigned long GetBytesRx() const override; + unsigned long GetMessagesTx() const override; + unsigned long GetMessagesRx() const override; + + bool SetSendTimeout(const int timeout, const std::string& address, const std::string& method) override; + int GetSendTimeout() const override; + bool SetReceiveTimeout(const int timeout, const std::string& address, const std::string& method) override; + int GetReceiveTimeout() const override; static int GetConstant(const std::string& constant); - virtual ~FairMQSocketNN(); + ~FairMQSocketNN() override; private: int fSocket; @@ -68,12 +69,16 @@ class FairMQSocketNN : public FairMQSocket std::atomic fBytesRx; std::atomic fMessagesTx; std::atomic fMessagesRx; + static std::atomic fInterrupted; - int Send(FairMQMessagePtr& msg, const int flags = 0); - int Receive(FairMQMessagePtr& msg, const int flags = 0); - int64_t Send(std::vector>& msgVec, const int flags = 0); - int64_t Receive(std::vector>& msgVec, const int flags = 0); + int fSndTimeout; + int fRcvTimeout; + + int SendImpl(FairMQMessagePtr& msg, const int flags, const int timeout); + int ReceiveImpl(FairMQMessagePtr& msg, const int flags, const int timeout); + int64_t SendImpl(std::vector>& msgVec, const int flags, const int timeout); + int64_t ReceiveImpl(std::vector>& msgVec, const int flags, const int timeout); }; #endif /* FAIRMQSOCKETNN_H_ */ diff --git a/fairmq/nanomsg/FairMQTransportFactoryNN.h b/fairmq/nanomsg/FairMQTransportFactoryNN.h index 53653da8..22eb6e24 100644 --- a/fairmq/nanomsg/FairMQTransportFactoryNN.h +++ b/fairmq/nanomsg/FairMQTransportFactoryNN.h @@ -41,6 +41,9 @@ class FairMQTransportFactoryNN : public FairMQTransportFactory FairMQ::Transport GetType() const override; + void Interrupt() override { FairMQSocketNN::Interrupt(); } + void Resume() override { FairMQSocketNN::Resume(); } + private: static FairMQ::Transport fTransportType; }; diff --git a/fairmq/ofi/Socket.cxx b/fairmq/ofi/Socket.cxx index 1fd116e1..1145441f 100644 --- a/fairmq/ofi/Socket.cxx +++ b/fairmq/ofi/Socket.cxx @@ -27,6 +27,8 @@ Socket::Socket(const string& type, const string& name, const string& id /*= ""*/ , fBytesRx{0} , fMessagesTx{0} , fMessagesRx{0} + , fSndTimeout{100} + , fRcvTimeout{100} { assert(zmqContext); @@ -51,13 +53,11 @@ Socket::Socket(const string& type, const string& name, const string& id /*= ""*/ throw SocketError{tools::ToString("Failed setting ZMQ_LINGER socket option, reason: ", zmq_strerror(errno))}; } - int sndTimeout = 700; - if (zmq_setsockopt(fMetaSocket, ZMQ_SNDTIMEO, &sndTimeout, sizeof(sndTimeout)) != 0) { + if (zmq_setsockopt(fMetaSocket, ZMQ_SNDTIMEO, &fSndTimeout, sizeof(fSndTimeout)) != 0) { throw SocketError{tools::ToString("Failed setting ZMQ_SNDTIMEO socket option, reason: ", zmq_strerror(errno))}; } - int rcvTimeout = 700; - if (zmq_setsockopt(fMetaSocket, ZMQ_RCVTIMEO, &rcvTimeout, sizeof(rcvTimeout)) != 0) { + if (zmq_setsockopt(fMetaSocket, ZMQ_RCVTIMEO, &fRcvTimeout, sizeof(fRcvTimeout)) != 0) { throw SocketError{tools::ToString("Failed setting ZMQ_RCVTIMEO socket option, reason: ", zmq_strerror(errno))}; } } @@ -83,20 +83,22 @@ auto Socket::Connect(const string& address) -> void } } -auto Socket::Send(FairMQMessagePtr& msg) -> int { return Send(msg, 0); } -auto Socket::SendAsync(FairMQMessagePtr& msg) -> int { return Send(msg, ZMQ_DONTWAIT); } -auto Socket::Receive(FairMQMessagePtr& msg) -> int { return Receive(msg, 0); } -auto Socket::ReceiveAsync(FairMQMessagePtr& msg) -> int { return Receive(msg, ZMQ_DONTWAIT); } +auto Socket::Send(FairMQMessagePtr& msg, const int timeout) -> int { return SendImpl(msg, 0, timeout); } +auto Socket::Receive(FairMQMessagePtr& msg, const int timeout) -> int { return ReceiveImpl(msg, 0, timeout); } +auto Socket::Send(std::vector>& msgVec, const int timeout) -> int64_t { return SendImpl(msgVec, 0, timeout); } +auto Socket::Receive(std::vector>& msgVec, const int timeout) -> int64_t { return ReceiveImpl(msgVec, 0, timeout); } -auto Socket::Send(std::vector>& msgVec) -> int64_t { return Send(msgVec, 0); } -auto Socket::SendAsync(std::vector>& msgVec) -> int64_t { return Send(msgVec, ZMQ_DONTWAIT); } -auto Socket::Receive(std::vector>& msgVec) -> int64_t { return Receive(msgVec, 0); } -auto Socket::ReceiveAsync(std::vector>& msgVec) -> int64_t { return Receive(msgVec, ZMQ_DONTWAIT); } +auto Socket::TrySend(FairMQMessagePtr& msg) -> int { return SendImpl(msg, ZMQ_DONTWAIT, 0); } +auto Socket::TryReceive(FairMQMessagePtr& msg) -> int { return ReceiveImpl(msg, ZMQ_DONTWAIT, 0); } +auto Socket::TrySend(std::vector>& msgVec) -> int64_t { return SendImpl(msgVec, ZMQ_DONTWAIT, 0); } +auto Socket::TryReceive(std::vector>& msgVec) -> int64_t { return ReceiveImpl(msgVec, ZMQ_DONTWAIT, 0); } -auto Socket::Send(FairMQMessagePtr& msg, const int flags) -> int +auto Socket::SendImpl(FairMQMessagePtr& msg, const int flags, const int timeout) -> int { throw SocketError{"Not yet implemented."}; // int nbytes = -1; + // int elapsed = 0; + // // while (true && !fInterrupted) // { // nbytes = zmq_msg_send(static_cast(msg.get())->GetMessage(), fSocket, flags); @@ -118,6 +120,14 @@ auto Socket::Send(FairMQMessagePtr& msg, const int flags) -> int // { // if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) // { + // if (timeout) + // { + // elapsed += fSndTimeout; + // if (elapsed >= timeout) + // { + // return -2; + // } + // } // continue; // } // else @@ -140,10 +150,12 @@ auto Socket::Send(FairMQMessagePtr& msg, const int flags) -> int // return -1; } -auto Socket::Receive(FairMQMessagePtr& msg, const int flags) -> int +auto Socket::ReceiveImpl(FairMQMessagePtr& msg, const int flags, const int timeout) -> int { throw SocketError{"Not yet implemented."}; // int nbytes = -1; + // int elapsed = 0; + // // zmq_msg_t* msgPtr = static_cast(msg.get())->GetMessage(); // while (true) // { @@ -173,6 +185,14 @@ auto Socket::Receive(FairMQMessagePtr& msg, const int flags) -> int // { // if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) // { + // if (timeout) + // { + // elapsed += fSndTimeout; + // if (elapsed >= timeout) + // { + // return -2; + // } + // } // continue; // } // else @@ -193,10 +213,11 @@ auto Socket::Receive(FairMQMessagePtr& msg, const int flags) -> int // } } -auto Socket::Send(vector& msgVec, const int flags) -> int64_t +auto Socket::SendImpl(vector& msgVec, const int flags, const int timeout) -> int64_t { throw SocketError{"Not yet implemented."}; // const unsigned int vecSize = msgVec.size(); + // int elapsed = 0; // // // Sending vector typicaly handles more then one part // if (vecSize > 1) @@ -226,6 +247,14 @@ auto Socket::Send(vector& msgVec, const int flags) -> int64_t // { // if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) // { + // if (timeout) + // { + // elapsed += fSndTimeout; + // if (elapsed >= timeout) + // { + // return -2; + // } + // } // repeat = true; // break; // } @@ -268,12 +297,13 @@ auto Socket::Send(vector& msgVec, const int flags) -> int64_t // } } -auto Socket::Receive(vector& msgVec, const int flags) -> int64_t +auto Socket::ReceiveImpl(vector& msgVec, const int flags, const int timeout) -> int64_t { throw SocketError{"Not yet implemented."}; // int64_t totalSize = 0; // int64_t more = 0; // bool repeat = false; + // int elapsed = 0; // // while (true) // { @@ -316,6 +346,14 @@ auto Socket::Receive(vector& msgVec, const int flags) -> int64 // { // if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) // { + // if (timeout) + // { + // elapsed += fSndTimeout; + // if (elapsed >= timeout) + // { + // return -2; + // } + // } // repeat = true; // break; // } @@ -353,16 +391,6 @@ auto Socket::Close() -> void } } -auto Socket::Interrupt() -> void -{ - throw SocketError{"Not yet implemented."}; -} - -auto Socket::Resume() -> void -{ - throw SocketError{"Not yet implemented."}; -} - auto Socket::SetOption(const string& option, const void* value, size_t valueSize) -> void { if (zmq_setsockopt(fMetaSocket, GetConstant(option), value, valueSize) < 0) { @@ -380,6 +408,7 @@ auto Socket::GetOption(const string& option, void* value, size_t* valueSize) -> auto Socket::SetSendTimeout(const int timeout, const string& address, const string& method) -> bool { throw SocketError{"Not yet implemented."}; + // fSndTimeout = timeout; // if (method == "bind") // { // if (zmq_unbind(fSocket, address.c_str()) != 0) @@ -387,7 +416,7 @@ auto Socket::SetSendTimeout(const int timeout, const string& address, const stri // LOG(error) << "Failed unbinding socket " << fId << ", reason: " << zmq_strerror(errno); // return false; // } - // if (zmq_setsockopt(fSocket, ZMQ_SNDTIMEO, &timeout, sizeof(int)) != 0) + // if (zmq_setsockopt(fSocket, ZMQ_SNDTIMEO, &fSndTimeout, sizeof(fSndTimeout)) != 0) // { // LOG(error) << "Failed setting option on socket " << fId << ", reason: " << zmq_strerror(errno); // return false; @@ -405,7 +434,7 @@ auto Socket::SetSendTimeout(const int timeout, const string& address, const stri // LOG(error) << "Failed disconnecting socket " << fId << ", reason: " << zmq_strerror(errno); // return false; // } - // if (zmq_setsockopt(fSocket, ZMQ_SNDTIMEO, &timeout, sizeof(int)) != 0) + // if (zmq_setsockopt(fSocket, ZMQ_SNDTIMEO, &fSndTimeout, sizeof(fSndTimeout)) != 0) // { // LOG(error) << "Failed setting option on socket " << fId << ", reason: " << zmq_strerror(errno); // return false; @@ -428,20 +457,13 @@ auto Socket::SetSendTimeout(const int timeout, const string& address, const stri auto Socket::GetSendTimeout() const -> int { throw SocketError{"Not yet implemented."}; - // int timeout = -1; - // size_t size = sizeof(timeout); - // - // if (zmq_getsockopt(fSocket, ZMQ_SNDTIMEO, &timeout, &size) != 0) - // { - // LOG(error) << "Failed getting option 'receive timeout' on socket " << fId << ", reason: " << zmq_strerror(errno); - // } - // - // return timeout; + // return fSndTimeout; } auto Socket::SetReceiveTimeout(const int timeout, const string& address, const string& method) -> bool { throw SocketError{"Not yet implemented."}; + // fRcvTimeout = timeout; // if (method == "bind") // { // if (zmq_unbind(fSocket, address.c_str()) != 0) @@ -449,7 +471,7 @@ auto Socket::SetReceiveTimeout(const int timeout, const string& address, const s // LOG(error) << "Failed unbinding socket " << fId << ", reason: " << zmq_strerror(errno); // return false; // } - // if (zmq_setsockopt(fSocket, ZMQ_RCVTIMEO, &timeout, sizeof(int)) != 0) + // if (zmq_setsockopt(fSocket, ZMQ_RCVTIMEO, &fRcvTimeout, sizeof(fRcvTimeout)) != 0) // { // LOG(error) << "Failed setting option on socket " << fId << ", reason: " << zmq_strerror(errno); // return false; @@ -467,7 +489,7 @@ auto Socket::SetReceiveTimeout(const int timeout, const string& address, const s // LOG(error) << "Failed disconnecting socket " << fId << ", reason: " << zmq_strerror(errno); // return false; // } - // if (zmq_setsockopt(fSocket, ZMQ_RCVTIMEO, &timeout, sizeof(int)) != 0) + // if (zmq_setsockopt(fSocket, ZMQ_RCVTIMEO, &fRcvTimeout, sizeof(fRcvTimeout)) != 0) // { // LOG(error) << "Failed setting option on socket " << fId << ", reason: " << zmq_strerror(errno); // return false; @@ -490,15 +512,7 @@ auto Socket::SetReceiveTimeout(const int timeout, const string& address, const s auto Socket::GetReceiveTimeout() const -> int { throw SocketError{"Not yet implemented."}; - // int timeout = -1; - // size_t size = sizeof(timeout); - // - // if (zmq_getsockopt(fSocket, ZMQ_RCVTIMEO, &timeout, &size) != 0) - // { - // LOG(error) << "Failed getting option 'receive timeout' on socket " << fId << ", reason: " << zmq_strerror(errno); - // } - // - // return timeout; + // return fRcvTimeout; } auto Socket::GetConstant(const string& constant) -> int diff --git a/fairmq/ofi/Socket.h b/fairmq/ofi/Socket.h index a43db331..6e34fc1c 100644 --- a/fairmq/ofi/Socket.h +++ b/fairmq/ofi/Socket.h @@ -39,24 +39,21 @@ class Socket : public FairMQSocket auto Bind(const std::string& address) -> bool override; auto Connect(const std::string& address) -> void override; - auto Send(FairMQMessagePtr& msg) -> int override; - auto SendAsync(FairMQMessagePtr& msg) -> int override; - auto Receive(FairMQMessagePtr& msg) -> int override; - auto ReceiveAsync(FairMQMessagePtr& msg) -> int override; + auto Send(FairMQMessagePtr& msg, int timeout = 0) -> int override; + auto Receive(FairMQMessagePtr& msg, int timeout = 0) -> int override; + auto Send(std::vector>& msgVec, int timeout = 0) -> int64_t override; + auto Receive(std::vector>& msgVec, int timeout = 0) -> int64_t override; - auto Send(std::vector>& msgVec) -> int64_t override; - auto SendAsync(std::vector>& msgVec) -> int64_t override; - auto Receive(std::vector>& msgVec) -> int64_t override; - auto ReceiveAsync(std::vector>& msgVec) -> int64_t override; + auto TrySend(FairMQMessagePtr& msg) -> int override; + auto TryReceive(FairMQMessagePtr& msg) -> int override; + auto TrySend(std::vector>& msgVec) -> int64_t override; + auto TryReceive(std::vector>& msgVec) -> int64_t override; auto GetSocket() const -> void* override { return fMetaSocket; } auto GetSocket(int nothing) const -> int override { return -1; } auto Close() -> void override; - auto Interrupt() -> void override; - auto Resume() -> void override; - auto SetOption(const std::string& option, const void* value, size_t valueSize) -> void override; auto GetOption(const std::string& option, void* value, size_t* valueSize) -> void override; @@ -82,10 +79,13 @@ class Socket : public FairMQSocket std::atomic fMessagesTx; std::atomic fMessagesRx; - auto Send(FairMQMessagePtr& msg, const int flags) -> int; - auto Receive(FairMQMessagePtr& msg, const int flags) -> int; - auto Send(std::vector>& msgVec, const int flags) -> int64_t; - auto Receive(std::vector>& msgVec, const int flags) -> int64_t; + int fSndTimeout; + int fRcvTimeout; + + auto SendImpl(FairMQMessagePtr& msg, const int flags, const int timeout) -> int; + auto ReceiveImpl(FairMQMessagePtr& msg, const int flags, const int timeout) -> int; + auto SendImpl(std::vector>& msgVec, const int flags, const int timeout) -> int64_t; + auto ReceiveImpl(std::vector>& msgVec, const int flags, const int timeout) -> int64_t; }; /* class Socket */ } /* namespace ofi */ diff --git a/fairmq/ofi/TransportFactory.h b/fairmq/ofi/TransportFactory.h index 88c6ecce..b058e2b5 100644 --- a/fairmq/ofi/TransportFactory.h +++ b/fairmq/ofi/TransportFactory.h @@ -48,6 +48,9 @@ class TransportFactory : public FairMQTransportFactory auto GetType() const -> Transport override; + void Interrupt() override {} + void Resume() override {} + ~TransportFactory() noexcept(false) override; private: diff --git a/fairmq/shmem/FairMQSocketSHM.cxx b/fairmq/shmem/FairMQSocketSHM.cxx index ade0d7f5..93b2d17a 100644 --- a/fairmq/shmem/FairMQSocketSHM.cxx +++ b/fairmq/shmem/FairMQSocketSHM.cxx @@ -27,6 +27,8 @@ FairMQSocketSHM::FairMQSocketSHM(Manager& manager, const string& type, const str , fBytesRx(0) , fMessagesTx(0) , fMessagesRx(0) + , fSndTimeout(100) + , fRcvTimeout(100) { assert(context); fSocket = zmq_socket(context, GetConstant(type)); @@ -50,14 +52,12 @@ FairMQSocketSHM::FairMQSocketSHM(Manager& manager, const string& type, const str LOG(error) << "Failed setting ZMQ_LINGER socket option, reason: " << zmq_strerror(errno); } - int sndTimeout = 700; - if (zmq_setsockopt(fSocket, ZMQ_SNDTIMEO, &sndTimeout, sizeof(sndTimeout)) != 0) + if (zmq_setsockopt(fSocket, ZMQ_SNDTIMEO, &fSndTimeout, sizeof(fSndTimeout)) != 0) { LOG(error) << "Failed setting ZMQ_SNDTIMEO socket option, reason: " << zmq_strerror(errno); } - int rcvTimeout = 700; - if (zmq_setsockopt(fSocket, ZMQ_RCVTIMEO, &rcvTimeout, sizeof(rcvTimeout)) != 0) + if (zmq_setsockopt(fSocket, ZMQ_RCVTIMEO, &fRcvTimeout, sizeof(fRcvTimeout)) != 0) { LOG(error) << "Failed setting ZMQ_RCVTIMEO socket option, reason: " << zmq_strerror(errno); } @@ -101,19 +101,21 @@ void FairMQSocketSHM::Connect(const string& address) } } -int FairMQSocketSHM::Send(FairMQMessagePtr& msg) { return Send(msg, 0); } -int FairMQSocketSHM::SendAsync(FairMQMessagePtr& msg) { return Send(msg, ZMQ_DONTWAIT); } -int FairMQSocketSHM::Receive(FairMQMessagePtr& msg) { return Receive(msg, 0); } -int FairMQSocketSHM::ReceiveAsync(FairMQMessagePtr& msg) { return Receive(msg, ZMQ_DONTWAIT); } +int FairMQSocketSHM::Send(FairMQMessagePtr& msg, const int timeout) { return SendImpl(msg, 0, timeout); } +int FairMQSocketSHM::Receive(FairMQMessagePtr& msg, const int timeout) { return ReceiveImpl(msg, 0, timeout); } +int64_t FairMQSocketSHM::Send(vector>& msgVec, const int timeout) { return SendImpl(msgVec, 0, timeout); } +int64_t FairMQSocketSHM::Receive(vector>& msgVec, const int timeout) { return ReceiveImpl(msgVec, 0, timeout); } -int64_t FairMQSocketSHM::Send(std::vector>& msgVec) { return Send(msgVec, 0); } -int64_t FairMQSocketSHM::SendAsync(std::vector>& msgVec) { return Send(msgVec, ZMQ_DONTWAIT); } -int64_t FairMQSocketSHM::Receive(std::vector>& msgVec) { return Receive(msgVec, 0); } -int64_t FairMQSocketSHM::ReceiveAsync(std::vector>& msgVec) { return Receive(msgVec, ZMQ_DONTWAIT); } +int FairMQSocketSHM::TrySend(FairMQMessagePtr& msg) { return SendImpl(msg, ZMQ_DONTWAIT, 0); } +int FairMQSocketSHM::TryReceive(FairMQMessagePtr& msg) { return ReceiveImpl(msg, ZMQ_DONTWAIT, 0); } +int64_t FairMQSocketSHM::TrySend(vector>& msgVec) { return SendImpl(msgVec, ZMQ_DONTWAIT, 0); } +int64_t FairMQSocketSHM::TryReceive(vector>& msgVec) { return ReceiveImpl(msgVec, ZMQ_DONTWAIT, 0); } -int FairMQSocketSHM::Send(FairMQMessagePtr& msg, const int flags) +int FairMQSocketSHM::SendImpl(FairMQMessagePtr& msg, const int flags, const int timeout) { int nbytes = -1; + int elapsed = 0; + while (true && !fInterrupted) { nbytes = zmq_msg_send(static_cast(msg.get())->GetMessage(), fSocket, flags); @@ -135,6 +137,14 @@ int FairMQSocketSHM::Send(FairMQMessagePtr& msg, const int flags) { if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) { + if (timeout) + { + elapsed += fSndTimeout; + if (elapsed >= timeout) + { + return -2; + } + } continue; } else @@ -157,7 +167,7 @@ int FairMQSocketSHM::Send(FairMQMessagePtr& msg, const int flags) return -1; } -int FairMQSocketSHM::Receive(FairMQMessagePtr& msg, const int flags) +int FairMQSocketSHM::ReceiveImpl(FairMQMessagePtr& msg, const int flags, const int timeout) { int nbytes = -1; @@ -199,6 +209,14 @@ int FairMQSocketSHM::Receive(FairMQMessagePtr& msg, const int flags) { if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) { + if (timeout) + { + elapsed += fRcvTimeout; + if (elapsed >= timeout) + { + return -2; + } + } continue; } else @@ -219,7 +237,7 @@ int FairMQSocketSHM::Receive(FairMQMessagePtr& msg, const int flags) } } -int64_t FairMQSocketSHM::Send(vector& msgVec, const int flags) +int64_t FairMQSocketSHM::SendImpl(vector& msgVec, const int flags, const int timeout) { const unsigned int vecSize = msgVec.size(); int64_t totalSize = 0; @@ -392,14 +410,14 @@ void FairMQSocketSHM::Close() void FairMQSocketSHM::Interrupt() { - fManager.Interrupt(); + Manager::Interrupt(); FairMQMessageSHM::fInterrupted = true; fInterrupted = true; } void FairMQSocketSHM::Resume() { - fManager.Resume(); + Manager::Resume(); FairMQMessageSHM::fInterrupted = false; fInterrupted = false; } @@ -453,6 +471,7 @@ unsigned long FairMQSocketSHM::GetMessagesRx() const bool FairMQSocketSHM::SetSendTimeout(const int timeout, const string& address, const string& method) { + fSndTimeout = timeout; if (method == "bind") { if (zmq_unbind(fSocket, address.c_str()) != 0) @@ -460,7 +479,7 @@ bool FairMQSocketSHM::SetSendTimeout(const int timeout, const string& address, c LOG(error) << "Failed unbinding socket " << fId << ", reason: " << zmq_strerror(errno); return false; } - if (zmq_setsockopt(fSocket, ZMQ_SNDTIMEO, &timeout, sizeof(int)) != 0) + if (zmq_setsockopt(fSocket, ZMQ_SNDTIMEO, &fSndTimeout, sizeof(fSndTimeout)) != 0) { LOG(error) << "Failed setting option on socket " << fId << ", reason: " << zmq_strerror(errno); return false; @@ -478,7 +497,7 @@ bool FairMQSocketSHM::SetSendTimeout(const int timeout, const string& address, c LOG(error) << "Failed disconnecting socket " << fId << ", reason: " << zmq_strerror(errno); return false; } - if (zmq_setsockopt(fSocket, ZMQ_SNDTIMEO, &timeout, sizeof(int)) != 0) + if (zmq_setsockopt(fSocket, ZMQ_SNDTIMEO, &fSndTimeout, sizeof(fSndTimeout)) != 0) { LOG(error) << "Failed setting option on socket " << fId << ", reason: " << zmq_strerror(errno); return false; @@ -500,19 +519,12 @@ bool FairMQSocketSHM::SetSendTimeout(const int timeout, const string& address, c int FairMQSocketSHM::GetSendTimeout() const { - int timeout = -1; - size_t size = sizeof(timeout); - - if (zmq_getsockopt(fSocket, ZMQ_SNDTIMEO, &timeout, &size) != 0) - { - LOG(error) << "Failed getting option 'receive timeout' on socket " << fId << ", reason: " << zmq_strerror(errno); - } - - return timeout; + return fSndTimeout; } bool FairMQSocketSHM::SetReceiveTimeout(const int timeout, const string& address, const string& method) { + fRcvTimeout = timeout; if (method == "bind") { if (zmq_unbind(fSocket, address.c_str()) != 0) @@ -520,7 +532,7 @@ bool FairMQSocketSHM::SetReceiveTimeout(const int timeout, const string& address LOG(error) << "Failed unbinding socket " << fId << ", reason: " << zmq_strerror(errno); return false; } - if (zmq_setsockopt(fSocket, ZMQ_RCVTIMEO, &timeout, sizeof(int)) != 0) + if (zmq_setsockopt(fSocket, ZMQ_RCVTIMEO, &fRcvTimeout, sizeof(fRcvTimeout)) != 0) { LOG(error) << "Failed setting option on socket " << fId << ", reason: " << zmq_strerror(errno); return false; @@ -538,7 +550,7 @@ bool FairMQSocketSHM::SetReceiveTimeout(const int timeout, const string& address LOG(error) << "Failed disconnecting socket " << fId << ", reason: " << zmq_strerror(errno); return false; } - if (zmq_setsockopt(fSocket, ZMQ_RCVTIMEO, &timeout, sizeof(int)) != 0) + if (zmq_setsockopt(fSocket, ZMQ_RCVTIMEO, &fRcvTimeout, sizeof(fRcvTimeout)) != 0) { LOG(error) << "Failed setting option on socket " << fId << ", reason: " << zmq_strerror(errno); return false; @@ -560,63 +572,34 @@ bool FairMQSocketSHM::SetReceiveTimeout(const int timeout, const string& address int FairMQSocketSHM::GetReceiveTimeout() const { - int timeout = -1; - size_t size = sizeof(timeout); - - if (zmq_getsockopt(fSocket, ZMQ_RCVTIMEO, &timeout, &size) != 0) - { - LOG(error) << "Failed getting option 'receive timeout' on socket " << fId << ", reason: " << zmq_strerror(errno); - } - - return timeout; + return fRcvTimeout; } int FairMQSocketSHM::GetConstant(const string& constant) { - if (constant == "") - return 0; - if (constant == "sub") - return ZMQ_SUB; - if (constant == "pub") - return ZMQ_PUB; - if (constant == "xsub") - return ZMQ_XSUB; - if (constant == "xpub") - return ZMQ_XPUB; - if (constant == "push") - return ZMQ_PUSH; - if (constant == "pull") - return ZMQ_PULL; - if (constant == "req") - return ZMQ_REQ; - if (constant == "rep") - return ZMQ_REP; - if (constant == "dealer") - return ZMQ_DEALER; - if (constant == "router") - return ZMQ_ROUTER; - if (constant == "pair") - return ZMQ_PAIR; + if (constant == "") return 0; + if (constant == "sub") return ZMQ_SUB; + if (constant == "pub") return ZMQ_PUB; + if (constant == "xsub") return ZMQ_XSUB; + if (constant == "xpub") return ZMQ_XPUB; + if (constant == "push") return ZMQ_PUSH; + if (constant == "pull") return ZMQ_PULL; + if (constant == "req") return ZMQ_REQ; + if (constant == "rep") return ZMQ_REP; + if (constant == "dealer") return ZMQ_DEALER; + if (constant == "router") return ZMQ_ROUTER; + if (constant == "pair") return ZMQ_PAIR; - if (constant == "snd-hwm") - return ZMQ_SNDHWM; - if (constant == "rcv-hwm") - return ZMQ_RCVHWM; - if (constant == "snd-size") - return ZMQ_SNDBUF; - if (constant == "rcv-size") - return ZMQ_RCVBUF; - if (constant == "snd-more") - return ZMQ_SNDMORE; - if (constant == "rcv-more") - return ZMQ_RCVMORE; + if (constant == "snd-hwm") return ZMQ_SNDHWM; + if (constant == "rcv-hwm") return ZMQ_RCVHWM; + if (constant == "snd-size") return ZMQ_SNDBUF; + if (constant == "rcv-size") return ZMQ_RCVBUF; + if (constant == "snd-more") return ZMQ_SNDMORE; + if (constant == "rcv-more") return ZMQ_RCVMORE; - if (constant == "linger") - return ZMQ_LINGER; - if (constant == "no-block") - return ZMQ_DONTWAIT; - if (constant == "snd-more no-block") - return ZMQ_DONTWAIT|ZMQ_SNDMORE; + if (constant == "linger") return ZMQ_LINGER; + if (constant == "no-block") return ZMQ_DONTWAIT; + if (constant == "snd-more no-block") return ZMQ_DONTWAIT|ZMQ_SNDMORE; return -1; } diff --git a/fairmq/shmem/FairMQSocketSHM.h b/fairmq/shmem/FairMQSocketSHM.h index e5541c3f..5e6a888c 100644 --- a/fairmq/shmem/FairMQSocketSHM.h +++ b/fairmq/shmem/FairMQSocketSHM.h @@ -28,22 +28,23 @@ class FairMQSocketSHM : public FairMQSocket bool Bind(const std::string& address) override; void Connect(const std::string& address) override; - int Send(FairMQMessagePtr& msg) override; - int SendAsync(FairMQMessagePtr& msg) override; - int Receive(FairMQMessagePtr& msg) override; - int ReceiveAsync(FairMQMessagePtr& msg) override; + int Send(FairMQMessagePtr& msg, const int timeout = 0) override; + int Receive(FairMQMessagePtr& msg, const int timeout = 0) override; + int64_t Send(std::vector>& msgVec, const int timeout = 0) override; + int64_t Receive(std::vector>& msgVec, const int timeout = 0) override; - int64_t Send(std::vector>& msgVec) override; - int64_t SendAsync(std::vector>& msgVec) override; - int64_t Receive(std::vector>& msgVec) override; - int64_t ReceiveAsync(std::vector>& msgVec) override; + int TrySend(FairMQMessagePtr& msg) override; + int TryReceive(FairMQMessagePtr& msg) override; + int64_t TrySend(std::vector>& msgVec) override; + int64_t TryReceive(std::vector>& msgVec) override; void* GetSocket() const override; int GetSocket(int nothing) const override; + void Close() override; - void Interrupt() override; - void Resume() override; + static void Interrupt(); + static void Resume(); void SetOption(const std::string& option, const void* value, size_t valueSize) override; void GetOption(const std::string& option, void* value, size_t* valueSize) override; @@ -73,10 +74,14 @@ class FairMQSocketSHM : public FairMQSocket static std::atomic fInterrupted; - int Send(FairMQMessagePtr& msg, const int flags); - int Receive(FairMQMessagePtr& msg, const int flags); - int64_t Send(std::vector>& msgVec, const int flags); - int64_t Receive(std::vector>& msgVec, const int flags); + int fSndTimeout; + int fRcvTimeout; + + int SendImpl(FairMQMessagePtr& msg, const int flags, const int timeout); + int ReceiveImpl(FairMQMessagePtr& msg, const int flags, const int timeout); + + int64_t SendImpl(std::vector>& msgVec, const int flags, const int timeout); + int64_t ReceiveImpl(std::vector>& msgVec, const int flags, const int timeout); }; #endif /* FAIRMQSOCKETSHM_H_ */ diff --git a/fairmq/shmem/FairMQTransportFactorySHM.h b/fairmq/shmem/FairMQTransportFactorySHM.h index 00ddeb7b..71484b4a 100644 --- a/fairmq/shmem/FairMQTransportFactorySHM.h +++ b/fairmq/shmem/FairMQTransportFactorySHM.h @@ -49,6 +49,9 @@ class FairMQTransportFactorySHM : public FairMQTransportFactory FairMQ::Transport GetType() const override; + void Interrupt() override { FairMQSocketSHM::Interrupt(); } + void Resume() override { FairMQSocketSHM::Resume(); } + ~FairMQTransportFactorySHM() override; private: diff --git a/fairmq/shmem/Manager.cxx b/fairmq/shmem/Manager.cxx index fdc70ea6..c19d0dde 100644 --- a/fairmq/shmem/Manager.cxx +++ b/fairmq/shmem/Manager.cxx @@ -19,13 +19,14 @@ namespace shmem using namespace std; namespace bipc = boost::interprocess; +std::unordered_map Manager::fRegions; + Manager::Manager(const string& name, size_t size) : fSessionName(name) , fSegmentName("fmq_shm_" + fSessionName + "_main") , fManagementSegmentName("fmq_shm_" + fSessionName + "_management") , fSegment(bipc::open_or_create, fSegmentName.c_str(), size) , fManagementSegment(bipc::open_or_create, fManagementSegmentName.c_str(), 65536) - , fRegions() {} bipc::managed_shared_memory& Manager::Segment() diff --git a/fairmq/shmem/Manager.h b/fairmq/shmem/Manager.h index cd4101ad..0987d242 100644 --- a/fairmq/shmem/Manager.h +++ b/fairmq/shmem/Manager.h @@ -49,8 +49,8 @@ class Manager boost::interprocess::managed_shared_memory& Segment(); - void Interrupt(); - void Resume(); + static void Interrupt(); + static void Resume(); boost::interprocess::mapped_region* CreateRegion(const size_t size, const uint64_t id, FairMQRegionCallback callback); Region* GetRemoteRegion(const uint64_t id); @@ -66,7 +66,7 @@ class Manager std::string fManagementSegmentName; boost::interprocess::managed_shared_memory fSegment; boost::interprocess::managed_shared_memory fManagementSegment; - std::unordered_map fRegions; + static std::unordered_map fRegions; }; } // namespace shmem diff --git a/fairmq/test/protocols/_transfer_timeout.cxx b/fairmq/test/protocols/_transfer_timeout.cxx index f08f6ff5..0ece5d60 100644 --- a/fairmq/test/protocols/_transfer_timeout.cxx +++ b/fairmq/test/protocols/_transfer_timeout.cxx @@ -22,8 +22,8 @@ auto RunTransferTimeout(string transport) -> void { size_t session{fair::mq::tools::UuidHash()}; stringstream cmd; - cmd << runTestDevice << " --id transfer_timeout_" << transport << " --control static " - << "--session " << session << " --color false --mq-config \"" << mqConfig << "\""; + cmd << runTestDevice << " --id transfer_timeout_" << transport << " --control static --transport " << transport + << " --session " << session << " --color false --mq-config \"" << mqConfig << "\""; auto res = execute(cmd.str()); cerr << res.console_out; diff --git a/fairmq/zeromq/FairMQSocketZMQ.cxx b/fairmq/zeromq/FairMQSocketZMQ.cxx index e5560152..5724ba07 100644 --- a/fairmq/zeromq/FairMQSocketZMQ.cxx +++ b/fairmq/zeromq/FairMQSocketZMQ.cxx @@ -23,6 +23,8 @@ FairMQSocketZMQ::FairMQSocketZMQ(const string& type, const string& name, const s , fBytesRx(0) , fMessagesTx(0) , fMessagesRx(0) + , fSndTimeout(100) + , fRcvTimeout(100) { assert(context); fSocket = zmq_socket(context, GetConstant(type)); @@ -46,14 +48,12 @@ FairMQSocketZMQ::FairMQSocketZMQ(const string& type, const string& name, const s LOG(error) << "Failed setting ZMQ_LINGER socket option, reason: " << zmq_strerror(errno); } - int sndTimeout = 700; - if (zmq_setsockopt(fSocket, ZMQ_SNDTIMEO, &sndTimeout, sizeof(sndTimeout)) != 0) + if (zmq_setsockopt(fSocket, ZMQ_SNDTIMEO, &fSndTimeout, sizeof(fSndTimeout)) != 0) { LOG(error) << "Failed setting ZMQ_SNDTIMEO socket option, reason: " << zmq_strerror(errno); } - int rcvTimeout = 700; - if (zmq_setsockopt(fSocket, ZMQ_RCVTIMEO, &rcvTimeout, sizeof(rcvTimeout)) != 0) + if (zmq_setsockopt(fSocket, ZMQ_RCVTIMEO, &fRcvTimeout, sizeof(fRcvTimeout)) != 0) { LOG(error) << "Failed setting ZMQ_RCVTIMEO socket option, reason: " << zmq_strerror(errno); } @@ -102,19 +102,20 @@ void FairMQSocketZMQ::Connect(const string& address) } } -int FairMQSocketZMQ::Send(FairMQMessagePtr& msg) { return Send(msg, 0); } -int FairMQSocketZMQ::SendAsync(FairMQMessagePtr& msg) { return Send(msg, ZMQ_DONTWAIT); } -int FairMQSocketZMQ::Receive(FairMQMessagePtr& msg) { return Receive(msg, 0); } -int FairMQSocketZMQ::ReceiveAsync(FairMQMessagePtr& msg) { return Receive(msg, ZMQ_DONTWAIT); } +int FairMQSocketZMQ::Send(FairMQMessagePtr& msg, const int timeout) { return SendImpl(msg, 0, timeout); } +int FairMQSocketZMQ::Receive(FairMQMessagePtr& msg, const int timeout) { return ReceiveImpl(msg, 0, timeout); } +int64_t FairMQSocketZMQ::Send(vector>& msgVec, const int timeout) { return SendImpl(msgVec, 0, timeout); } +int64_t FairMQSocketZMQ::Receive(vector>& msgVec, const int timeout) { return ReceiveImpl(msgVec, 0, timeout); } -int64_t FairMQSocketZMQ::Send(std::vector>& msgVec) { return Send(msgVec, 0); } -int64_t FairMQSocketZMQ::SendAsync(std::vector>& msgVec) { return Send(msgVec, ZMQ_DONTWAIT); } -int64_t FairMQSocketZMQ::Receive(std::vector>& msgVec) { return Receive(msgVec, 0); } -int64_t FairMQSocketZMQ::ReceiveAsync(std::vector>& msgVec) { return Receive(msgVec, ZMQ_DONTWAIT); } +int FairMQSocketZMQ::TrySend(FairMQMessagePtr& msg) { return SendImpl(msg, ZMQ_DONTWAIT, 0); } +int FairMQSocketZMQ::TryReceive(FairMQMessagePtr& msg) { return ReceiveImpl(msg, ZMQ_DONTWAIT, 0); } +int64_t FairMQSocketZMQ::TrySend(vector>& msgVec) { return SendImpl(msgVec, ZMQ_DONTWAIT, 0); } +int64_t FairMQSocketZMQ::TryReceive(vector>& msgVec) { return ReceiveImpl(msgVec, ZMQ_DONTWAIT, 0); } -int FairMQSocketZMQ::Send(FairMQMessagePtr& msg, const int flags) +int FairMQSocketZMQ::SendImpl(FairMQMessagePtr& msg, const int flags, const int timeout) { int nbytes = -1; + int elapsed = 0; static_cast(msg.get())->ApplyUsedSize(); @@ -132,6 +133,14 @@ int FairMQSocketZMQ::Send(FairMQMessagePtr& msg, const int flags) { if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) { + if (timeout) + { + elapsed += fSndTimeout; + if (elapsed >= timeout) + { + return -2; + } + } continue; } else @@ -152,9 +161,10 @@ int FairMQSocketZMQ::Send(FairMQMessagePtr& msg, const int flags) } } -int FairMQSocketZMQ::Receive(FairMQMessagePtr& msg, const int flags) +int FairMQSocketZMQ::ReceiveImpl(FairMQMessagePtr& msg, const int flags, const int timeout) { int nbytes = -1; + int elapsed = 0; while (true) { @@ -169,6 +179,14 @@ int FairMQSocketZMQ::Receive(FairMQMessagePtr& msg, const int flags) { if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) { + if (timeout) + { + elapsed += fRcvTimeout; + if (elapsed >= timeout) + { + return -2; + } + } continue; } else @@ -189,9 +207,10 @@ int FairMQSocketZMQ::Receive(FairMQMessagePtr& msg, const int flags) } } -int64_t FairMQSocketZMQ::Send(vector& msgVec, const int flags) +int64_t FairMQSocketZMQ::SendImpl(vector& msgVec, const int flags, const int timeout) { const unsigned int vecSize = msgVec.size(); + int elapsed = 0; // Sending vector typicaly handles more then one part if (vecSize > 1) @@ -223,6 +242,14 @@ int64_t FairMQSocketZMQ::Send(vector& msgVec, const int flags) { if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) { + if (timeout) + { + elapsed += fSndTimeout; + if (elapsed >= timeout) + { + return -2; + } + } repeat = true; break; } @@ -263,11 +290,12 @@ int64_t FairMQSocketZMQ::Send(vector& msgVec, const int flags) } } -int64_t FairMQSocketZMQ::Receive(vector& msgVec, const int flags) +int64_t FairMQSocketZMQ::ReceiveImpl(vector& msgVec, const int flags, const int timeout) { int64_t totalSize = 0; int64_t more = 0; bool repeat = false; + int elapsed = 0; while (true) { @@ -289,6 +317,14 @@ int64_t FairMQSocketZMQ::Receive(vector& msgVec, const int fla { if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) { + if (timeout) + { + elapsed += fRcvTimeout; + if (elapsed >= timeout) + { + return -2; + } + } repeat = true; break; } @@ -395,6 +431,7 @@ unsigned long FairMQSocketZMQ::GetMessagesRx() const bool FairMQSocketZMQ::SetSendTimeout(const int timeout, const string& address, const string& method) { + fSndTimeout = timeout; if (method == "bind") { if (zmq_unbind(fSocket, address.c_str()) != 0) @@ -402,7 +439,7 @@ bool FairMQSocketZMQ::SetSendTimeout(const int timeout, const string& address, c LOG(error) << "Failed unbinding socket " << fId << ", reason: " << zmq_strerror(errno); return false; } - if (zmq_setsockopt(fSocket, ZMQ_SNDTIMEO, &timeout, sizeof(int)) != 0) + if (zmq_setsockopt(fSocket, ZMQ_SNDTIMEO, &fSndTimeout, sizeof(fSndTimeout)) != 0) { LOG(error) << "Failed setting option on socket " << fId << ", reason: " << zmq_strerror(errno); return false; @@ -420,7 +457,7 @@ bool FairMQSocketZMQ::SetSendTimeout(const int timeout, const string& address, c LOG(error) << "Failed disconnecting socket " << fId << ", reason: " << zmq_strerror(errno); return false; } - if (zmq_setsockopt(fSocket, ZMQ_SNDTIMEO, &timeout, sizeof(int)) != 0) + if (zmq_setsockopt(fSocket, ZMQ_SNDTIMEO, &fSndTimeout, sizeof(fSndTimeout)) != 0) { LOG(error) << "Failed setting option on socket " << fId << ", reason: " << zmq_strerror(errno); return false; @@ -442,19 +479,12 @@ bool FairMQSocketZMQ::SetSendTimeout(const int timeout, const string& address, c int FairMQSocketZMQ::GetSendTimeout() const { - int timeout = -1; - size_t size = sizeof(timeout); - - if (zmq_getsockopt(fSocket, ZMQ_SNDTIMEO, &timeout, &size) != 0) - { - LOG(error) << "Failed getting option 'receive timeout' on socket " << fId << ", reason: " << zmq_strerror(errno); - } - - return timeout; + return fSndTimeout; } bool FairMQSocketZMQ::SetReceiveTimeout(const int timeout, const string& address, const string& method) { + fRcvTimeout = timeout; if (method == "bind") { if (zmq_unbind(fSocket, address.c_str()) != 0) @@ -462,7 +492,7 @@ bool FairMQSocketZMQ::SetReceiveTimeout(const int timeout, const string& address LOG(error) << "Failed unbinding socket " << fId << ", reason: " << zmq_strerror(errno); return false; } - if (zmq_setsockopt(fSocket, ZMQ_RCVTIMEO, &timeout, sizeof(int)) != 0) + if (zmq_setsockopt(fSocket, ZMQ_RCVTIMEO, &fRcvTimeout, sizeof(fRcvTimeout)) != 0) { LOG(error) << "Failed setting option on socket " << fId << ", reason: " << zmq_strerror(errno); return false; @@ -480,7 +510,7 @@ bool FairMQSocketZMQ::SetReceiveTimeout(const int timeout, const string& address LOG(error) << "Failed disconnecting socket " << fId << ", reason: " << zmq_strerror(errno); return false; } - if (zmq_setsockopt(fSocket, ZMQ_RCVTIMEO, &timeout, sizeof(int)) != 0) + if (zmq_setsockopt(fSocket, ZMQ_RCVTIMEO, &fRcvTimeout, sizeof(fRcvTimeout)) != 0) { LOG(error) << "Failed setting option on socket " << fId << ", reason: " << zmq_strerror(errno); return false; @@ -502,59 +532,32 @@ bool FairMQSocketZMQ::SetReceiveTimeout(const int timeout, const string& address int FairMQSocketZMQ::GetReceiveTimeout() const { - int timeout = -1; - size_t size = sizeof(timeout); - - if (zmq_getsockopt(fSocket, ZMQ_RCVTIMEO, &timeout, &size) != 0) - { - LOG(error) << "Failed getting option 'receive timeout' on socket " << fId << ", reason: " << zmq_strerror(errno); - } - - return timeout; + return fRcvTimeout; } int FairMQSocketZMQ::GetConstant(const string& constant) { - if (constant == "") - return 0; - if (constant == "sub") - return ZMQ_SUB; - if (constant == "pub") - return ZMQ_PUB; - if (constant == "xsub") - return ZMQ_XSUB; - if (constant == "xpub") - return ZMQ_XPUB; - if (constant == "push") - return ZMQ_PUSH; - if (constant == "pull") - return ZMQ_PULL; - if (constant == "req") - return ZMQ_REQ; - if (constant == "rep") - return ZMQ_REP; - if (constant == "dealer") - return ZMQ_DEALER; - if (constant == "router") - return ZMQ_ROUTER; - if (constant == "pair") - return ZMQ_PAIR; + if (constant == "") return 0; + if (constant == "sub") return ZMQ_SUB; + if (constant == "pub") return ZMQ_PUB; + if (constant == "xsub") return ZMQ_XSUB; + if (constant == "xpub") return ZMQ_XPUB; + if (constant == "push") return ZMQ_PUSH; + if (constant == "pull") return ZMQ_PULL; + if (constant == "req") return ZMQ_REQ; + if (constant == "rep") return ZMQ_REP; + if (constant == "dealer") return ZMQ_DEALER; + if (constant == "router") return ZMQ_ROUTER; + if (constant == "pair") return ZMQ_PAIR; - if (constant == "snd-hwm") - return ZMQ_SNDHWM; - if (constant == "rcv-hwm") - return ZMQ_RCVHWM; - if (constant == "snd-size") - return ZMQ_SNDBUF; - if (constant == "rcv-size") - return ZMQ_RCVBUF; - if (constant == "snd-more") - return ZMQ_SNDMORE; - if (constant == "rcv-more") - return ZMQ_RCVMORE; + if (constant == "snd-hwm") return ZMQ_SNDHWM; + if (constant == "rcv-hwm") return ZMQ_RCVHWM; + if (constant == "snd-size") return ZMQ_SNDBUF; + if (constant == "rcv-size") return ZMQ_RCVBUF; + if (constant == "snd-more") return ZMQ_SNDMORE; + if (constant == "rcv-more") return ZMQ_RCVMORE; - if (constant == "linger") - return ZMQ_LINGER; + if (constant == "linger") return ZMQ_LINGER; return -1; } diff --git a/fairmq/zeromq/FairMQSocketZMQ.h b/fairmq/zeromq/FairMQSocketZMQ.h index fa96ab8b..66ad24d1 100644 --- a/fairmq/zeromq/FairMQSocketZMQ.h +++ b/fairmq/zeromq/FairMQSocketZMQ.h @@ -28,23 +28,23 @@ class FairMQSocketZMQ : public FairMQSocket bool Bind(const std::string& address) override; void Connect(const std::string& address) override; - int Send(FairMQMessagePtr& msg) override; - int SendAsync(FairMQMessagePtr& msg) override; - int Receive(FairMQMessagePtr& msg) override; - int ReceiveAsync(FairMQMessagePtr& msg) override; + int Send(FairMQMessagePtr& msg, const int timeout = 0) override; + int Receive(FairMQMessagePtr& msg, const int timeout = 0) override; + int64_t Send(std::vector>& msgVec, const int timeout = 0) override; + int64_t Receive(std::vector>& msgVec, const int timeout = 0) override; - int64_t Send(std::vector>& msgVec) override; - int64_t SendAsync(std::vector>& msgVec) override; - int64_t Receive(std::vector>& msgVec) override; - int64_t ReceiveAsync(std::vector>& msgVec) override; + int TrySend(FairMQMessagePtr& msg) override; + int TryReceive(FairMQMessagePtr& msg) override; + int64_t TrySend(std::vector>& msgVec) override; + int64_t TryReceive(std::vector>& msgVec) override; void* GetSocket() const override; int GetSocket(int nothing) const override; void Close() override; - void Interrupt() override; - void Resume() override; + static void Interrupt(); + static void Resume(); void SetOption(const std::string& option, const void* value, size_t valueSize) override; void GetOption(const std::string& option, void* value, size_t* valueSize) override; @@ -61,7 +61,7 @@ class FairMQSocketZMQ : public FairMQSocket static int GetConstant(const std::string& constant); - virtual ~FairMQSocketZMQ(); + ~FairMQSocketZMQ() override; private: void* fSocket; @@ -73,10 +73,14 @@ class FairMQSocketZMQ : public FairMQSocket static std::atomic fInterrupted; - int Send(FairMQMessagePtr& msg, const int flags = 0); - int Receive(FairMQMessagePtr& msg, const int flags = 0); - int64_t Send(std::vector>& msgVec, const int flags = 0); - int64_t Receive(std::vector>& msgVec, const int flags = 0); + int fSndTimeout; + int fRcvTimeout; + + int SendImpl(FairMQMessagePtr& msg, const int flags, const int timeout); + int ReceiveImpl(FairMQMessagePtr& msg, const int flags, const int timeout); + + int64_t SendImpl(std::vector>& msgVec, const int flags, const int timeout); + int64_t ReceiveImpl(std::vector>& msgVec, const int flags, const int timeout); }; #endif /* FAIRMQSOCKETZMQ_H_ */ diff --git a/fairmq/zeromq/FairMQTransportFactoryZMQ.h b/fairmq/zeromq/FairMQTransportFactoryZMQ.h index 79db637c..8cf7559c 100644 --- a/fairmq/zeromq/FairMQTransportFactoryZMQ.h +++ b/fairmq/zeromq/FairMQTransportFactoryZMQ.h @@ -49,6 +49,10 @@ class FairMQTransportFactoryZMQ : public FairMQTransportFactory FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback) const override; FairMQ::Transport GetType() const override; + + void Interrupt() override { FairMQSocketZMQ::Interrupt(); } + void Resume() override { FairMQSocketZMQ::Resume(); } + private: static FairMQ::Transport fTransportType; void* fContext;