From 8f59db12830fc02960aaca1098dfff3bf3b33f6b Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Thu, 15 Oct 2015 16:43:30 +0200 Subject: [PATCH] Add methods to set timeout on blocking Send/Receive --- fairmq/FairMQChannel.cxx | 106 ++++++++++++--- fairmq/FairMQChannel.h | 31 ++++- fairmq/FairMQDevice.cxx | 4 +- fairmq/FairMQDevice.h | 110 +++++++++++----- fairmq/FairMQSocket.h | 17 ++- fairmq/nanomsg/FairMQSocketNN.cxx | 156 ++++++++++++++++------- fairmq/nanomsg/FairMQSocketNN.h | 17 ++- fairmq/test/CMakeLists.txt | 7 + fairmq/test/runTransferTimeoutTest.cxx | 166 ++++++++++++++++++++++++ fairmq/zeromq/FairMQSocketZMQ.cxx | 170 +++++++++++++++++++++---- fairmq/zeromq/FairMQSocketZMQ.h | 17 ++- 11 files changed, 655 insertions(+), 146 deletions(-) create mode 100644 fairmq/test/runTransferTimeoutTest.cxx diff --git a/fairmq/FairMQChannel.cxx b/fairmq/FairMQChannel.cxx index bf7e7c7e..5bbab091 100644 --- a/fairmq/FairMQChannel.cxx +++ b/fairmq/FairMQChannel.cxx @@ -38,6 +38,8 @@ FairMQChannel::FairMQChannel() , fTransportFactory(nullptr) , fNoBlockFlag(0) , fSndMoreFlag(0) + , fSndTimeoutInMs(-1) + , fRcvTimeoutInMs(-1) { } @@ -56,6 +58,8 @@ FairMQChannel::FairMQChannel(const string& type, const string& method, const str , fTransportFactory(nullptr) , fNoBlockFlag(0) , fSndMoreFlag(0) + , fSndTimeoutInMs(-1) + , fRcvTimeoutInMs(-1) { } @@ -376,12 +380,12 @@ void FairMQChannel::ResetChannel() int FairMQChannel::Send(const unique_ptr& msg) const { - fPoller->Poll(-1); + fPoller->Poll(fSndTimeoutInMs); if (fPoller->CheckInput(0)) { HandleUnblock(); - return -1; + return -2; } if (fPoller->CheckOutput(1)) @@ -389,7 +393,7 @@ int FairMQChannel::Send(const unique_ptr& msg) const return fSocket->Send(msg.get(), 0); } - return -1; + return -2; } int FairMQChannel::SendAsync(const unique_ptr& msg) const @@ -404,12 +408,12 @@ int FairMQChannel::SendPart(const unique_ptr& msg) const int FairMQChannel::Receive(const unique_ptr& msg) const { - fPoller->Poll(-1); + fPoller->Poll(fRcvTimeoutInMs); if (fPoller->CheckInput(0)) { HandleUnblock(); - return -1; + return -2; } if (fPoller->CheckInput(1)) @@ -417,7 +421,7 @@ int FairMQChannel::Receive(const unique_ptr& msg) const return fSocket->Receive(msg.get(), 0); } - return -1; + return -2; } int FairMQChannel::ReceiveAsync(const unique_ptr& msg) const @@ -429,100 +433,160 @@ int FairMQChannel::Send(FairMQMessage* msg, const string& flag) const { if (flag == "") { - fPoller->Poll(-1); + fPoller->Poll(fSndTimeoutInMs); if (fPoller->CheckInput(0)) { HandleUnblock(); - return -1; + return -2; } if (fPoller->CheckOutput(1)) { return fSocket->Send(msg, flag); } + + return -2; } else { return fSocket->Send(msg, flag); } - - return -1; } int FairMQChannel::Send(FairMQMessage* msg, const int flags) const { if (flags == 0) { - fPoller->Poll(-1); + fPoller->Poll(fSndTimeoutInMs); if (fPoller->CheckInput(0)) { HandleUnblock(); - return -1; + return -2; } if (fPoller->CheckOutput(1)) { return fSocket->Send(msg, flags); } + + return -2; } else { return fSocket->Send(msg, flags); } - - return -1; } int FairMQChannel::Receive(FairMQMessage* msg, const string& flag) const { if (flag == "") { - fPoller->Poll(-1); + fPoller->Poll(fRcvTimeoutInMs); if (fPoller->CheckInput(0)) { HandleUnblock(); - return -1; + return -2; } if (fPoller->CheckInput(1)) { return fSocket->Receive(msg, flag); } + + return -2; } else { return fSocket->Receive(msg, flag); } - - return -1; } int FairMQChannel::Receive(FairMQMessage* msg, const int flags) const { if (flags == 0) { - fPoller->Poll(-1); + fPoller->Poll(fRcvTimeoutInMs); if (fPoller->CheckInput(0)) { HandleUnblock(); - return -1; + return -2; } if (fPoller->CheckInput(1)) { return fSocket->Receive(msg, flags); } + + return -2; } else { return fSocket->Receive(msg, flags); } +} - return -1; +bool FairMQChannel::SetSendTimeout(const int timeout) +{ + if (fSocket) + { + if (fSocket->SetSendTimeout(timeout, fAddress, fMethod)) + { + fSndTimeoutInMs = timeout; + return true; + } + } + else + { + LOG(ERROR) << "SetSendTimeout() failed - socket is not initialized!"; + return false; + } +} + +int FairMQChannel::GetSendTimeout() const +{ + if (fSocket) + { + return fSocket->GetSendTimeout(); + } + else + { + LOG(ERROR) << "GetSendTimeout() failed - socket is not initialized!"; + return -1; + } +} + +bool FairMQChannel::SetReceiveTimeout(const int timeout) +{ + if (fSocket) + { + if (fSocket->SetReceiveTimeout(timeout, fAddress, fMethod)) + { + fRcvTimeoutInMs = timeout; + return true; + } + } + else + { + LOG(ERROR) << "SetReceiveTimeout() failed - socket is not initialized!"; + return false; + } +} + +int FairMQChannel::GetReceiveTimeout() const +{ + if (fSocket) + { + return fSocket->GetReceiveTimeout(); + } + else + { + LOG(ERROR) << "GetReceiveTimeout() failed - socket is not initialized!"; + return -1; + } } bool FairMQChannel::ExpectsAnotherPart() const diff --git a/fairmq/FairMQChannel.h b/fairmq/FairMQChannel.h index 34fd33ad..cb19224a 100644 --- a/fairmq/FairMQChannel.h +++ b/fairmq/FairMQChannel.h @@ -101,7 +101,7 @@ class FairMQChannel /// for some other reason (e.g. no peers connected for a binding socket), the method blocks. /// /// @param msg Constant reference of unique_ptr to a FairMQMessage - /// @return Returns the number of bytes that have been queued. In case of errors, returns -1. + /// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. In case of errors, returns -1. int Send(const std::unique_ptr& msg) const; /// Sends a message in non-blocking mode. @@ -111,7 +111,7 @@ class FairMQChannel /// /// @param msg Constant reference of unique_ptr to a FairMQMessage /// @return Returns the number of bytes that have been queued. If queueing failed due to - /// full queue or no connected peers (when binding), returns 0. In case of errors, returns -1. + /// full queue or no connected peers (when binding), returns -2. In case of errors, returns -1. int SendAsync(const std::unique_ptr& msg) const; /// Queues the current message as a part of a multi-part message @@ -119,7 +119,7 @@ class FairMQChannel /// The actual transfer over the network is initiated once final part has been queued with the Send() or SendAsync methods. /// /// @param msg Constant reference of unique_ptr to a FairMQMessage - /// @return Returns the number of bytes that have been queued. In case of errors, returns -1. + /// @return Returns the number of bytes that have been queued. -2 If queueing was not possible. In case of errors, returns -1. int SendPart(const std::unique_ptr& msg) const; /// Receives a message from the socket queue. @@ -127,7 +127,7 @@ class FairMQChannel /// If the queue is empty the method blocks. /// /// @param msg Constant reference of unique_ptr to a FairMQMessage - /// @return Returns the number of bytes that have been received. In case of errors, returns -1. + /// @return Returns the number of bytes that have been received. -2 If reading from the queue was not possible or timed out. In case of errors, returns -1. int Receive(const std::unique_ptr& msg) const; /// Receives a message in non-blocking mode. @@ -135,7 +135,7 @@ class FairMQChannel /// If the queue is empty the method returns 0. /// /// @param msg Constant reference of unique_ptr to a FairMQMessage - /// @return Returns the number of bytes that have been received. If queue is empty, returns 0. + /// @return Returns the number of bytes that have been received. If queue is empty, returns -2. /// In case of errors, returns -1. int ReceiveAsync(const std::unique_ptr& msg) const; @@ -145,6 +145,24 @@ class FairMQChannel int Receive(FairMQMessage* msg, const std::string& flag = "") const; int Receive(FairMQMessage* msg, const int flags) const; + /// Sets a timeout on the (blocking) Send method + /// @param timeout timeout value in milliseconds + /// @return true if operation was successfull, otherwise false. + bool SetSendTimeout(const int timeout); + + /// Gets the current value of the timeout on the (blocking) Send method + /// @return Timeout value in milliseconds. -1 for no timeout. + int GetSendTimeout() const; + + /// Sets a timeout on the (blocking) Receive method + /// @param timeout timeout value in milliseconds + /// @return true if operation was successfull, otherwise false. + bool SetReceiveTimeout(const int timeout); + + /// Gets the current value of the timeout on the (blocking) Receive method + /// @return Timeout value in milliseconds. -1 for no timeout. + int GetReceiveTimeout() const; + /// Checks if the socket is expecting to receive another part of a multipart message. /// @return Return true if the socket expects another part of a multipart message and false otherwise. bool ExpectsAnotherPart() const; @@ -168,6 +186,9 @@ class FairMQChannel int fNoBlockFlag; int fSndMoreFlag; + int fSndTimeoutInMs; + int fRcvTimeoutInMs; + bool InitCommandInterface(FairMQTransportFactory* factory, int numIoThreads); bool HandleUnblock() const; diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index 18be0b1b..036d7a19 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -77,7 +77,7 @@ void FairMQDevice::SignalHandler(int signal) Shutdown(); fTerminateStateThread.join(); - MQLOG(INFO) << "Exiting."; + LOG(INFO) << "Exiting."; stop(); // std::abort(); exit(EXIT_FAILURE); @@ -558,8 +558,8 @@ void FairMQDevice::LogSocketRates() void FairMQDevice::InteractiveStateLoop() { - char c; // hold the user console input bool running = true; + char c; // hold the user console input struct termios t; tcgetattr(STDIN_FILENO, &t); // get the current terminal I/O structure diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index e89befc6..ba86b5b5 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -34,95 +34,143 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable public: enum { - Id = FairMQConfigurable::Last, - NumIoThreads, - MaxInitializationTime, - PortRangeMin, - PortRangeMax, - LogIntervalInMs, + Id = FairMQConfigurable::Last, ///< Device ID + MaxInitializationTime, ///< Timeout for the initialization + NumIoThreads, ///< Number of ZeroMQ I/O threads + PortRangeMin, ///< Minimum value for the port range (if dynamic) + PortRangeMax, ///< Maximum value for the port range (if dynamic) + LogIntervalInMs, ///< Interval for logging the socket transfer rates Last }; + /// Default constructor FairMQDevice(); + /// Default destructor + virtual ~FairMQDevice(); + /// Catches interrupt signals (SIGINT, SIGTERM) void CatchSignals(); + /// Outputs the socket transfer rates virtual void LogSocketRates(); + /// Sorts a channel by address, with optional reindexing of the sorted values + /// @param name Channel name + /// @param reindex Should reindexing be done void SortChannel(const std::string& name, const bool reindex = true); + + /// Prints channel configuration + /// @param name Name of the channel void PrintChannel(const std::string& name); + /// Waits for the first initialization run to finish void WaitForInitialValidation(); + /// Starts interactive (console) loop for controlling the device + /// Works only when running in a terminal. Running in background would exit, because no interactive input (std::cin) is possible. void InteractiveStateLoop(); + /// Prints the available commands of the InteractiveStateLoop() void PrintInteractiveStateLoopHelp(); + /// Set Device properties stored as strings + /// @param key Property key + /// @param value Property value virtual void SetProperty(const int key, const std::string& value); + /// Get Device properties stored as strings + /// @param key Property key + /// @param default_ not used + /// @return Property value virtual std::string GetProperty(const int key, const std::string& default_ = ""); + /// Set Device properties stored as integers + /// @param key Property key + /// @param value Property value virtual void SetProperty(const int key, const int value); + /// Get Device properties stored as integers + /// @param key Property key + /// @param default_ not used + /// @return Property value virtual int GetProperty(const int key, const int default_ = 0); /// Get property description for a given property name - /// @param key Property name/key - /// @return String with the property description + /// @param key Property name/key + /// @return String with the property description virtual std::string GetPropertyDescription(const int key); /// Print all properties of this and the parent class to LOG(INFO) virtual void ListProperties(); + /// Configures the device with a transport factory + /// @param factory Pointer to the transport factory object virtual void SetTransport(FairMQTransportFactory* factory); + /// Implements the sort algorithm used in SortChannel() + /// @param lhs Right hand side value for comparison + /// @param rhs Left hand side value for comparison static bool SortSocketsByAddress(const FairMQChannel &lhs, const FairMQChannel &rhs); - virtual ~FairMQDevice(); - - std::unordered_map> fChannels; + std::unordered_map> fChannels; ///< Device channels protected: - std::string fId; + std::string fId; ///< Device ID - int fMaxInitializationTime; + int fMaxInitializationTime; ///< Timeout for the initialization - int fNumIoThreads; + int fNumIoThreads; ///< Number of ZeroMQ I/O threads - int fPortRangeMin; - int fPortRangeMax; + int fPortRangeMin; ///< Minimum value for the port range (if dynamic) + int fPortRangeMax; ///< Maximum value for the port range (if dynamic) - int fLogIntervalInMs; + int fLogIntervalInMs; ///< Interval for logging the socket transfer rates - FairMQSocket* fCmdSocket; + FairMQSocket* fCmdSocket; ///< Socket used for the internal unblocking mechanism - FairMQTransportFactory* fTransportFactory; + FairMQTransportFactory* fTransportFactory; ///< Transport factory - void InitWrapper(); + /// Additional user initialization (can be overloaded in child classes). Prefer to use InitTask(). virtual void Init(); - void InitTaskWrapper(); + /// Task initialization (can be overloaded in child classes) virtual void InitTask(); - void RunWrapper(); + /// Runs the device (to be overloaded in child classes) virtual void Run(); + /// Handles the PAUSE state virtual void Pause(); - void ResetTaskWrapper(); + /// Resets the user task (to be overloaded in child classes) virtual void ResetTask(); - void ResetWrapper(); + /// Resets the device (can be overloaded in child classes) virtual void Reset(); - void Shutdown(); - - void Terminate(); - void Unblock(); - - bool InitChannel(FairMQChannel&); - private: // condition variable to notify parent thread about end of initial validation. bool fInitialValidationFinished; boost::condition_variable fInitialValidationCondition; boost::mutex fInitialValidationMutex; + /// Handles the initialization and the Init() method + void InitWrapper(); + /// Handles the InitTask() method + void InitTaskWrapper(); + /// Handles the Run() method + void RunWrapper(); + /// Handles the ResetTask() method + void ResetTaskWrapper(); + /// Handles the Reset() method + void ResetWrapper(); + /// Shuts down the device (closses socket connections) + void Shutdown(); + + /// Terminates the transport interface + void Terminate(); + /// Unblocks blocking channel send/receive calls + void Unblock(); + + /// Initializes a single channel (used in InitWrapper) + bool InitChannel(FairMQChannel&); + + /// Signal handler void SignalHandler(int signal); bool fCatchingSignals; diff --git a/fairmq/FairMQSocket.h b/fairmq/FairMQSocket.h index dafff06f..0cb59371 100644 --- a/fairmq/FairMQSocket.h +++ b/fairmq/FairMQSocket.h @@ -42,18 +42,23 @@ class FairMQSocket virtual int Receive(FairMQMessage* msg, const std::string& flag = "") = 0; virtual int Receive(FairMQMessage* msg, const int flags = 0) = 0; - virtual void* GetSocket() = 0; - virtual int GetSocket(int nothing) = 0; + virtual void* GetSocket() const = 0; + virtual int GetSocket(int nothing) const = 0; virtual void Close() = 0; virtual void Terminate() = 0; virtual void SetOption(const std::string& option, const void* value, size_t valueSize) = 0; virtual void GetOption(const std::string& option, void* value, size_t* valueSize) = 0; - virtual unsigned long GetBytesTx() = 0; - virtual unsigned long GetBytesRx() = 0; - virtual unsigned long GetMessagesTx() = 0; - virtual unsigned long GetMessagesRx() = 0; + virtual unsigned long GetBytesTx() const = 0; + virtual unsigned long GetBytesRx() const = 0; + virtual unsigned long GetMessagesTx() const = 0; + virtual unsigned long GetMessagesRx() const = 0; + + virtual bool SetSendTimeout(const int timeout, const std::string& address, const std::string& method) = 0; + virtual int GetSendTimeout() const = 0; + virtual bool SetReceiveTimeout(const int timeout, const std::string& address, const std::string& method) = 0; + virtual int GetReceiveTimeout() const = 0; virtual ~FairMQSocket() {}; }; diff --git a/fairmq/nanomsg/FairMQSocketNN.cxx b/fairmq/nanomsg/FairMQSocketNN.cxx index 635bb3e5..98011144 100644 --- a/fairmq/nanomsg/FairMQSocketNN.cxx +++ b/fairmq/nanomsg/FairMQSocketNN.cxx @@ -97,76 +97,96 @@ void FairMQSocketNN::Connect(const string& address) int FairMQSocketNN::Send(FairMQMessage* msg, const string& flag) { void* ptr = msg->GetMessage(); - int rc = nn_send(fSocket, &ptr, NN_MSG, 0); - if (rc < 0) + int nbytes = nn_send(fSocket, &ptr, NN_MSG, GetConstant(flag)); + if (nbytes >= 0) { - LOG(ERROR) << "failed sending on socket " << fId << ", reason: " << nn_strerror(errno); - } - else - { - fBytesTx += rc; + fBytesTx += nbytes; ++fMessagesTx; static_cast(msg)->fReceiving = false; } - - return rc; + if (nn_errno() == EAGAIN) + { + return -2; + } + if (nn_errno() == ETERM) + { + LOG(INFO) << "terminating socket " << fId; + return -1; + } + LOG(ERROR) << "Failed sending on socket " << fId << ", reason: " << nn_strerror(errno); + return nbytes; } int FairMQSocketNN::Send(FairMQMessage* msg, const int flags) { void* ptr = msg->GetMessage(); - int rc = nn_send(fSocket, &ptr, NN_MSG, flags); - if (rc < 0) + int nbytes = nn_send(fSocket, &ptr, NN_MSG, flags); + if (nbytes >= 0) { - LOG(ERROR) << "failed sending on socket " << fId << ", reason: " << nn_strerror(errno); - } - else - { - fBytesTx += rc; + fBytesTx += nbytes; ++fMessagesTx; static_cast(msg)->fReceiving = false; } - - return rc; + if (nn_errno() == EAGAIN) + { + return -2; + } + if (nn_errno() == ETERM) + { + LOG(INFO) << "terminating socket " << fId; + return -1; + } + LOG(ERROR) << "Failed sending on socket " << fId << ", reason: " << nn_strerror(errno); + return nbytes; } int FairMQSocketNN::Receive(FairMQMessage* msg, const string& flag) { void* ptr = NULL; - int rc = nn_recv(fSocket, &ptr, NN_MSG, 0); - if (rc < 0) + int nbytes = nn_recv(fSocket, &ptr, NN_MSG, GetConstant(flag)); + if (nbytes >= 0) { - LOG(ERROR) << "failed receiving on socket " << fId << ", reason: " << nn_strerror(errno); - } - else - { - fBytesRx += rc; + fBytesRx += nbytes; ++fMessagesRx; msg->Rebuild(); - msg->SetMessage(ptr, rc); + msg->SetMessage(ptr, nbytes); static_cast(msg)->fReceiving = true; } - - return rc; + if (nn_errno() == EAGAIN) + { + return -2; + } + if (nn_errno() == ETERM) + { + LOG(INFO) << "terminating socket " << fId; + return -1; + } + LOG(ERROR) << "Failed receiving on socket " << fId << ", reason: " << nn_strerror(errno); + return nbytes; } int FairMQSocketNN::Receive(FairMQMessage* msg, const int flags) { void* ptr = NULL; - int rc = nn_recv(fSocket, &ptr, NN_MSG, flags); - if (rc < 0) + int nbytes = nn_recv(fSocket, &ptr, NN_MSG, flags); + if (nbytes >= 0) { - LOG(ERROR) << "failed receiving on socket " << fId << ", reason: " << nn_strerror(errno); - } - else - { - fBytesRx += rc; + fBytesRx += nbytes; ++fMessagesRx; - msg->SetMessage(ptr, rc); + msg->SetMessage(ptr, nbytes); static_cast(msg)->fReceiving = true; } - - return rc; + if (nn_errno() == EAGAIN) + { + return -2; + } + if (nn_errno() == ETERM) + { + LOG(INFO) << "terminating socket " << fId; + return -1; + } + LOG(ERROR) << "Failed receiving on socket " << fId << ", reason: " << nn_strerror(errno); + return nbytes; } void FairMQSocketNN::Close() @@ -179,12 +199,12 @@ void FairMQSocketNN::Terminate() nn_term(); } -void* FairMQSocketNN::GetSocket() +void* FairMQSocketNN::GetSocket() const { return NULL; // dummy method to comply with the interface. functionality not possible in zeromq. } -int FairMQSocketNN::GetSocket(int nothing) +int FairMQSocketNN::GetSocket(int nothing) const { return fSocket; } @@ -206,26 +226,74 @@ void FairMQSocketNN::GetOption(const string& option, void* value, size_t* valueS } } -unsigned long FairMQSocketNN::GetBytesTx() +unsigned long FairMQSocketNN::GetBytesTx() const { return fBytesTx; } -unsigned long FairMQSocketNN::GetBytesRx() +unsigned long FairMQSocketNN::GetBytesRx() const { return fBytesRx; } -unsigned long FairMQSocketNN::GetMessagesTx() +unsigned long FairMQSocketNN::GetMessagesTx() const { return fMessagesTx; } -unsigned long FairMQSocketNN::GetMessagesRx() +unsigned long FairMQSocketNN::GetMessagesRx() const { return fMessagesRx; } +bool FairMQSocketNN::SetSendTimeout(const int timeout, const string& address, const string& method) +{ + if (nn_setsockopt(fSocket, NN_SOL_SOCKET, NN_SNDTIMEO, &timeout, sizeof(int)) != 0) + { + LOG(ERROR) << "Failed setting option 'send timeout' on socket " << fId << ", reason: " << nn_strerror(errno); + return false; + } + + return true; +} + +int FairMQSocketNN::GetSendTimeout() const +{ + int timeout = -1; + size_t size = sizeof(timeout); + + if (nn_getsockopt(fSocket, NN_SOL_SOCKET, NN_SNDTIMEO, &timeout, &size) != 0) + { + LOG(ERROR) << "Failed getting option 'send timeout' on socket " << fId << ", reason: " << nn_strerror(errno); + } + + return timeout; +} + +bool FairMQSocketNN::SetReceiveTimeout(const int timeout, const string& address, const string& method) +{ + if (nn_setsockopt(fSocket, NN_SOL_SOCKET, NN_RCVTIMEO, &timeout, sizeof(int)) != 0) + { + LOG(ERROR) << "Failed setting option 'receive timeout' on socket " << fId << ", reason: " << nn_strerror(errno); + return false; + } + + return true; +} + +int FairMQSocketNN::GetReceiveTimeout() const +{ + int timeout = -1; + size_t size = sizeof(timeout); + + if (nn_getsockopt(fSocket, NN_SOL_SOCKET, NN_RCVTIMEO, &timeout, &size) != 0) + { + LOG(ERROR) << "Failed getting option 'receive timeout' on socket " << fId << ", reason: " << nn_strerror(errno); + } + + return timeout; +} + int FairMQSocketNN::GetConstant(const string& constant) { if (constant == "") diff --git a/fairmq/nanomsg/FairMQSocketNN.h b/fairmq/nanomsg/FairMQSocketNN.h index d9e4c4b1..ac5684a9 100644 --- a/fairmq/nanomsg/FairMQSocketNN.h +++ b/fairmq/nanomsg/FairMQSocketNN.h @@ -38,18 +38,23 @@ class FairMQSocketNN : public FairMQSocket virtual int Receive(FairMQMessage* msg, const std::string& flag = ""); virtual int Receive(FairMQMessage* msg, const int flags = 0); - virtual void* GetSocket(); - virtual int GetSocket(int nothing); + virtual void* GetSocket() const; + virtual int GetSocket(int nothing) const; virtual void Close(); virtual void Terminate(); virtual void SetOption(const std::string& option, const void* value, size_t valueSize); virtual void GetOption(const std::string& option, void* value, size_t* valueSize); - unsigned long GetBytesTx(); - unsigned long GetBytesRx(); - unsigned long GetMessagesTx(); - unsigned long GetMessagesRx(); + unsigned long GetBytesTx() const; + unsigned long GetBytesRx() const; + unsigned long GetMessagesTx() const; + unsigned long GetMessagesRx() const; + + virtual bool SetSendTimeout(const int timeout, const std::string& address, const std::string& method); + virtual int GetSendTimeout() const; + virtual bool SetReceiveTimeout(const int timeout, const std::string& address, const std::string& method); + virtual int GetReceiveTimeout() const; static int GetConstant(const std::string& constant); diff --git a/fairmq/test/CMakeLists.txt b/fairmq/test/CMakeLists.txt index 2cebdadd..ad41aa5a 100644 --- a/fairmq/test/CMakeLists.txt +++ b/fairmq/test/CMakeLists.txt @@ -15,6 +15,7 @@ Set(INCLUDE_DIRECTORIES ${CMAKE_SOURCE_DIR}/fairmq/devices ${CMAKE_SOURCE_DIR}/fairmq/tools ${CMAKE_SOURCE_DIR}/fairmq/options + ${CMAKE_SOURCE_DIR}/fairmq/test/ ${CMAKE_SOURCE_DIR}/fairmq/test/push-pull ${CMAKE_SOURCE_DIR}/fairmq/test/pub-sub ${CMAKE_SOURCE_DIR}/fairmq/test/req-rep @@ -71,6 +72,7 @@ set(Exe_Names test-fairmq-sub test-fairmq-req test-fairmq-rep + test-fairmq-transfer-timeout ) set(Exe_Source @@ -80,6 +82,7 @@ set(Exe_Source pub-sub/runTestSub.cxx req-rep/runTestReq.cxx req-rep/runTestRep.cxx + runTransferTimeoutTest.cxx ) list(LENGTH Exe_Names _length) @@ -105,3 +108,7 @@ set_tests_properties(run_fairmq_pub_sub PROPERTIES PASS_REGULAR_EXPRESSION "PUB- add_test(NAME run_fairmq_req_rep COMMAND ${CMAKE_BINARY_DIR}/fairmq/test/test-fairmq-req-rep.sh) set_tests_properties(run_fairmq_req_rep PROPERTIES TIMEOUT "30") set_tests_properties(run_fairmq_req_rep PROPERTIES PASS_REGULAR_EXPRESSION "REQ-REP test successfull") + +add_test(NAME run_fairmq_transfer_timeout COMMAND ${CMAKE_BINARY_DIR}/bin/test-fairmq-transfer-timeout) +set_tests_properties(run_fairmq_transfer_timeout PROPERTIES TIMEOUT "30") +set_tests_properties(run_fairmq_transfer_timeout PROPERTIES PASS_REGULAR_EXPRESSION "Transfer timeout test successfull") diff --git a/fairmq/test/runTransferTimeoutTest.cxx b/fairmq/test/runTransferTimeoutTest.cxx new file mode 100644 index 00000000..600e3356 --- /dev/null +++ b/fairmq/test/runTransferTimeoutTest.cxx @@ -0,0 +1,166 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +/** + * runTransferTimeoutTester.cxx + * + * @since 2015-09-05 + * @author A. Rybalchenko + */ + +#include "FairMQLogger.h" +#include "FairMQDevice.h" + +#ifdef NANOMSG +#include "FairMQTransportFactoryNN.h" +#else +#include "FairMQTransportFactoryZMQ.h" +#endif + +class TransferTimeoutTester : public FairMQDevice +{ + public: + TransferTimeoutTester() {} + virtual ~TransferTimeoutTester() {} + + protected: + virtual void Run() + { + bool setSndOK = false; + bool setRcvOK = false; + bool getSndOK = false; + bool getRcvOK = false; + bool sendCanceling = false; + bool receiveCanceling = false; + + if (fChannels.at("data-out").at(0).SetSendTimeout(1000)) + { + setSndOK = true; + LOG(INFO) << "set send timeout OK"; + } + else + { + LOG(ERROR) << "set send timeout failed"; + } + + if (fChannels.at("data-in").at(0).SetReceiveTimeout(1000)) + { + setRcvOK = true; + LOG(INFO) << "set receive timeout OK"; + } + else + { + LOG(ERROR) << "set receive timeout failed"; + } + + if (setSndOK && setRcvOK) + { + if (fChannels.at("data-out").at(0).GetSendTimeout() == 1000) + { + getSndOK = true; + LOG(INFO) << "get send timeout OK: " << fChannels.at("data-out").at(0).GetSendTimeout(); + } + else + { + LOG(ERROR) << "get send timeout failed"; + } + + if (fChannels.at("data-in").at(0).GetReceiveTimeout() == 1000) + { + getRcvOK = true; + LOG(INFO) << "get receive timeout OK: " << fChannels.at("data-in").at(0).GetReceiveTimeout(); + } + else + { + LOG(ERROR) << "get receive timeout failed"; + } + } + + if (getSndOK && getRcvOK) + { + void* buffer = operator new[](1000); + std::unique_ptr msg1(fTransportFactory->CreateMessage(buffer, 1000)); + std::unique_ptr msg2(fTransportFactory->CreateMessage()); + + if (fChannels.at("data-out").at(0).Send(msg1) == -2) + { + LOG(INFO) << "send canceled"; + sendCanceling = true; + } + else + { + LOG(ERROR) << "send did not cancel"; + } + + if (fChannels.at("data-in").at(0).Receive(msg2) == -2) + { + LOG(INFO) << "receive canceled"; + receiveCanceling = true; + } + else + { + LOG(ERROR) << "receive did not cancel"; + } + + if (sendCanceling && receiveCanceling) + { + LOG(INFO) << "Transfer timeout test successfull"; + } + } + } +}; + +int main(int argc, char** argv) +{ + TransferTimeoutTester timeoutTester; + timeoutTester.CatchSignals(); + +#ifdef NANOMSG + timeoutTester.SetTransport(new FairMQTransportFactoryNN()); +#else + timeoutTester.SetTransport(new FairMQTransportFactoryZMQ()); +#endif + + timeoutTester.SetProperty(TransferTimeoutTester::Id, "timeoutTester"); + + FairMQChannel dataOutChannel; + dataOutChannel.UpdateType("push"); + dataOutChannel.UpdateMethod("bind"); + dataOutChannel.UpdateAddress("tcp://127.0.0.1:5559"); + dataOutChannel.UpdateSndBufSize(1000); + dataOutChannel.UpdateRcvBufSize(1000); + dataOutChannel.UpdateRateLogging(0); + timeoutTester.fChannels["data-out"].push_back(dataOutChannel); + + FairMQChannel dataInChannel; + dataInChannel.UpdateType("pull"); + dataInChannel.UpdateMethod("bind"); + dataInChannel.UpdateAddress("tcp://127.0.0.1:5560"); + dataInChannel.UpdateSndBufSize(1000); + dataInChannel.UpdateRcvBufSize(1000); + dataInChannel.UpdateRateLogging(0); + timeoutTester.fChannels["data-in"].push_back(dataInChannel); + + timeoutTester.ChangeState(TransferTimeoutTester::INIT_DEVICE); + timeoutTester.WaitForEndOfState(TransferTimeoutTester::INIT_DEVICE); + + timeoutTester.ChangeState(TransferTimeoutTester::INIT_TASK); + timeoutTester.WaitForEndOfState(TransferTimeoutTester::INIT_TASK); + + timeoutTester.ChangeState(TransferTimeoutTester::RUN); + timeoutTester.WaitForEndOfState(TransferTimeoutTester::RUN); + + timeoutTester.ChangeState(TransferTimeoutTester::RESET_TASK); + timeoutTester.WaitForEndOfState(TransferTimeoutTester::RESET_TASK); + + timeoutTester.ChangeState(TransferTimeoutTester::RESET_DEVICE); + timeoutTester.WaitForEndOfState(TransferTimeoutTester::RESET_DEVICE); + + timeoutTester.ChangeState(TransferTimeoutTester::END); + + return 0; +} diff --git a/fairmq/zeromq/FairMQSocketZMQ.cxx b/fairmq/zeromq/FairMQSocketZMQ.cxx index ecf17092..ef4c4616 100644 --- a/fairmq/zeromq/FairMQSocketZMQ.cxx +++ b/fairmq/zeromq/FairMQSocketZMQ.cxx @@ -37,20 +37,20 @@ FairMQSocketZMQ::FairMQSocketZMQ(const string& type, const string& name, int num if (zmq_ctx_set(fContext->GetContext(), ZMQ_IO_THREADS, numIoThreads) != 0) { - LOG(ERROR) << "failed configuring context, reason: " << zmq_strerror(errno); + LOG(ERROR) << "Failed configuring context, reason: " << zmq_strerror(errno); } fSocket = zmq_socket(fContext->GetContext(), GetConstant(type)); if (fSocket == NULL) { - LOG(ERROR) << "failed creating socket " << fId << ", reason: " << zmq_strerror(errno); + LOG(ERROR) << "Failed creating socket " << fId << ", reason: " << zmq_strerror(errno); exit(EXIT_FAILURE); } if (zmq_setsockopt(fSocket, ZMQ_IDENTITY, &fId, fId.length()) != 0) { - LOG(ERROR) << "failed setting ZMQ_IDENTITY socket option, reason: " << zmq_strerror(errno); + LOG(ERROR) << "Failed setting ZMQ_IDENTITY socket option, reason: " << zmq_strerror(errno); } // Tell socket to try and send/receive outstanding messages for milliseconds before terminating. @@ -58,14 +58,14 @@ FairMQSocketZMQ::FairMQSocketZMQ(const string& type, const string& name, int num int linger = 500; if (zmq_setsockopt(fSocket, ZMQ_LINGER, &linger, sizeof(linger)) != 0) { - LOG(ERROR) << "failed setting ZMQ_LINGER socket option, reason: " << zmq_strerror(errno); + LOG(ERROR) << "Failed setting ZMQ_LINGER socket option, reason: " << zmq_strerror(errno); } if (type == "sub") { if (zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, NULL, 0) != 0) { - LOG(ERROR) << "failed setting ZMQ_SUBSCRIBE socket option, reason: " << zmq_strerror(errno); + LOG(ERROR) << "Failed setting ZMQ_SUBSCRIBE socket option, reason: " << zmq_strerror(errno); } } @@ -87,7 +87,7 @@ bool FairMQSocketZMQ::Bind(const string& address) // do not print error in this case, this is handled by FairMQDevice in case no connection could be established after trying a number of random ports from a range. return false; } - LOG(ERROR) << "failed binding socket " << fId << ", reason: " << zmq_strerror(errno); + LOG(ERROR) << "Failed binding socket " << fId << ", reason: " << zmq_strerror(errno); return false; } return true; @@ -99,7 +99,7 @@ void FairMQSocketZMQ::Connect(const string& address) if (zmq_connect(fSocket, address.c_str()) != 0) { - LOG(ERROR) << "failed connecting socket " << fId << ", reason: " << zmq_strerror(errno); + LOG(ERROR) << "Failed connecting socket " << fId << ", reason: " << zmq_strerror(errno); // error here means incorrect configuration. exit if it happens. exit(EXIT_FAILURE); } @@ -116,14 +116,14 @@ int FairMQSocketZMQ::Send(FairMQMessage* msg, const string& flag) } if (zmq_errno() == EAGAIN) { - return 0; + return -2; } if (zmq_errno() == ETERM) { LOG(INFO) << "terminating socket " << fId; return -1; } - LOG(ERROR) << "failed sending on socket " << fId << ", reason: " << zmq_strerror(errno); + LOG(ERROR) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno); return nbytes; } @@ -138,14 +138,14 @@ int FairMQSocketZMQ::Send(FairMQMessage* msg, const int flags) } if (zmq_errno() == EAGAIN) { - return 0; + return -2; } if (zmq_errno() == ETERM) { LOG(INFO) << "terminating socket " << fId; return -1; } - LOG(ERROR) << "failed sending on socket " << fId << ", reason: " << zmq_strerror(errno); + LOG(ERROR) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno); return nbytes; } @@ -160,14 +160,14 @@ int FairMQSocketZMQ::Receive(FairMQMessage* msg, const string& flag) } if (zmq_errno() == EAGAIN) { - return 0; + return -2; } if (zmq_errno() == ETERM) { LOG(INFO) << "terminating socket " << fId; return -1; } - LOG(ERROR) << "failed receiving on socket " << fId << ", reason: " << zmq_strerror(errno); + LOG(ERROR) << "Failed receiving on socket " << fId << ", reason: " << zmq_strerror(errno); return nbytes; } @@ -182,14 +182,14 @@ int FairMQSocketZMQ::Receive(FairMQMessage* msg, const int flags) } if (zmq_errno() == EAGAIN) { - return 0; + return -2; } if (zmq_errno() == ETERM) { LOG(INFO) << "terminating socket " << fId; return -1; } - LOG(ERROR) << "failed receiving on socket " << fId << ", reason: " << zmq_strerror(errno); + LOG(ERROR) << "Failed receiving on socket " << fId << ", reason: " << zmq_strerror(errno); return nbytes; } @@ -204,7 +204,7 @@ void FairMQSocketZMQ::Close() if (zmq_close(fSocket) != 0) { - LOG(ERROR) << "failed closing socket " << fId << ", reason: " << zmq_strerror(errno); + LOG(ERROR) << "Failed closing socket " << fId << ", reason: " << zmq_strerror(errno); } fSocket = NULL; @@ -214,16 +214,16 @@ void FairMQSocketZMQ::Terminate() { if (zmq_ctx_destroy(fContext->GetContext()) != 0) { - LOG(ERROR) << "failed terminating context, reason: " << zmq_strerror(errno); + LOG(ERROR) << "Failed terminating context, reason: " << zmq_strerror(errno); } } -void* FairMQSocketZMQ::GetSocket() +void* FairMQSocketZMQ::GetSocket() const { return fSocket; } -int FairMQSocketZMQ::GetSocket(int nothing) +int FairMQSocketZMQ::GetSocket(int nothing) const { // dummy method to comply with the interface. functionality not possible in zeromq. return -1; @@ -233,7 +233,7 @@ void FairMQSocketZMQ::SetOption(const string& option, const void* value, size_t { if (zmq_setsockopt(fSocket, GetConstant(option), value, valueSize) < 0) { - LOG(ERROR) << "failed setting socket option, reason: " << zmq_strerror(errno); + LOG(ERROR) << "Failed setting socket option, reason: " << zmq_strerror(errno); } } @@ -241,30 +241,150 @@ void FairMQSocketZMQ::GetOption(const string& option, void* value, size_t* value { if (zmq_getsockopt(fSocket, GetConstant(option), value, valueSize) < 0) { - LOG(ERROR) << "failed getting socket option, reason: " << zmq_strerror(errno); + LOG(ERROR) << "Failed getting socket option, reason: " << zmq_strerror(errno); } } -unsigned long FairMQSocketZMQ::GetBytesTx() +unsigned long FairMQSocketZMQ::GetBytesTx() const { return fBytesTx; } -unsigned long FairMQSocketZMQ::GetBytesRx() +unsigned long FairMQSocketZMQ::GetBytesRx() const { return fBytesRx; } -unsigned long FairMQSocketZMQ::GetMessagesTx() +unsigned long FairMQSocketZMQ::GetMessagesTx() const { return fMessagesTx; } -unsigned long FairMQSocketZMQ::GetMessagesRx() +unsigned long FairMQSocketZMQ::GetMessagesRx() const { return fMessagesRx; } +bool FairMQSocketZMQ::SetSendTimeout(const int timeout, const string& address, const string& method) +{ + if (method == "bind") + { + if (zmq_unbind(fSocket, address.c_str()) != 0) + { + LOG(ERROR) << "Failed unbinding socket " << fId << ", reason: " << zmq_strerror(errno); + return false; + } + if (zmq_setsockopt(fSocket, ZMQ_SNDTIMEO, &timeout, sizeof(int)) != 0) + { + LOG(ERROR) << "Failed setting option on socket " << fId << ", reason: " << zmq_strerror(errno); + return false; + } + if (zmq_bind(fSocket, address.c_str()) != 0) + { + LOG(ERROR) << "Failed binding socket " << fId << ", reason: " << zmq_strerror(errno); + return false; + } + } + else if (method == "connect") + { + if (zmq_disconnect(fSocket, address.c_str()) != 0) + { + LOG(ERROR) << "Failed disconnecting socket " << fId << ", reason: " << zmq_strerror(errno); + return false; + } + if (zmq_setsockopt(fSocket, ZMQ_SNDTIMEO, &timeout, sizeof(int)) != 0) + { + LOG(ERROR) << "Failed setting option on socket " << fId << ", reason: " << zmq_strerror(errno); + return false; + } + if (zmq_connect(fSocket, address.c_str()) != 0) + { + LOG(ERROR) << "Failed connecting socket " << fId << ", reason: " << zmq_strerror(errno); + return false; + } + } + else + { + LOG(ERROR) << "SetSendTimeout() failed - unknown method provided!"; + return false; + } + + return true; +} + +int FairMQSocketZMQ::GetSendTimeout() const +{ + int timeout = -1; + size_t size = sizeof(timeout); + + if (zmq_getsockopt(fSocket, ZMQ_SNDTIMEO, &timeout, &size) != 0) + { + LOG(ERROR) << "Failed getting option 'receive timeout' on socket " << fId << ", reason: " << zmq_strerror(errno); + } + + return timeout; +} + +bool FairMQSocketZMQ::SetReceiveTimeout(const int timeout, const string& address, const string& method) +{ + if (method == "bind") + { + if (zmq_unbind(fSocket, address.c_str()) != 0) + { + LOG(ERROR) << "Failed unbinding socket " << fId << ", reason: " << zmq_strerror(errno); + return false; + } + if (zmq_setsockopt(fSocket, ZMQ_RCVTIMEO, &timeout, sizeof(int)) != 0) + { + LOG(ERROR) << "Failed setting option on socket " << fId << ", reason: " << zmq_strerror(errno); + return false; + } + if (zmq_bind(fSocket, address.c_str()) != 0) + { + LOG(ERROR) << "Failed binding socket " << fId << ", reason: " << zmq_strerror(errno); + return false; + } + } + else if (method == "connect") + { + if (zmq_disconnect(fSocket, address.c_str()) != 0) + { + LOG(ERROR) << "Failed disconnecting socket " << fId << ", reason: " << zmq_strerror(errno); + return false; + } + if (zmq_setsockopt(fSocket, ZMQ_RCVTIMEO, &timeout, sizeof(int)) != 0) + { + LOG(ERROR) << "Failed setting option on socket " << fId << ", reason: " << zmq_strerror(errno); + return false; + } + if (zmq_connect(fSocket, address.c_str()) != 0) + { + LOG(ERROR) << "Failed connecting socket " << fId << ", reason: " << zmq_strerror(errno); + return false; + } + } + else + { + LOG(ERROR) << "SetReceiveTimeout() failed - unknown method provided!"; + return false; + } + + return true; +} + +int FairMQSocketZMQ::GetReceiveTimeout() const +{ + int timeout = -1; + size_t size = sizeof(timeout); + + if (zmq_getsockopt(fSocket, ZMQ_RCVTIMEO, &timeout, &size) != 0) + { + LOG(ERROR) << "Failed getting option 'receive timeout' on socket " << fId << ", reason: " << zmq_strerror(errno); + } + + return timeout; +} + int FairMQSocketZMQ::GetConstant(const string& constant) { if (constant == "") diff --git a/fairmq/zeromq/FairMQSocketZMQ.h b/fairmq/zeromq/FairMQSocketZMQ.h index 88ed4f97..6fff15f6 100644 --- a/fairmq/zeromq/FairMQSocketZMQ.h +++ b/fairmq/zeromq/FairMQSocketZMQ.h @@ -35,18 +35,23 @@ class FairMQSocketZMQ : public FairMQSocket virtual int Receive(FairMQMessage* msg, const std::string& flag = ""); virtual int Receive(FairMQMessage* msg, const int flags = 0); - virtual void* GetSocket(); - virtual int GetSocket(int nothing); + virtual void* GetSocket() const; + virtual int GetSocket(int nothing) const; virtual void Close(); virtual void Terminate(); virtual void SetOption(const std::string& option, const void* value, size_t valueSize); virtual void GetOption(const std::string& option, void* value, size_t* valueSize); - virtual unsigned long GetBytesTx(); - virtual unsigned long GetBytesRx(); - virtual unsigned long GetMessagesTx(); - virtual unsigned long GetMessagesRx(); + virtual unsigned long GetBytesTx() const; + virtual unsigned long GetBytesRx() const; + virtual unsigned long GetMessagesTx() const; + virtual unsigned long GetMessagesRx() const; + + virtual bool SetSendTimeout(const int timeout, const std::string& address, const std::string& method); + virtual int GetSendTimeout() const; + virtual bool SetReceiveTimeout(const int timeout, const std::string& address, const std::string& method); + virtual int GetReceiveTimeout() const; static int GetConstant(const std::string& constant);