From 690e8a037098ed4b7097f687d853d20a2b8cc028 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Fri, 28 Aug 2020 10:11:56 +0200 Subject: [PATCH] Retry on EINTR in blocking zmq calls --- fairmq/shmem/Poller.h | 19 +++++++++++++------ fairmq/shmem/Socket.h | 11 ++++------- fairmq/shmem/TransportFactory.h | 29 ++++++++++++++++------------- fairmq/zeromq/Context.h | 15 +++++++++------ fairmq/zeromq/Poller.h | 19 +++++++++++++------ fairmq/zeromq/Socket.h | 11 ++++------- 6 files changed, 59 insertions(+), 45 deletions(-) diff --git a/fairmq/shmem/Poller.h b/fairmq/shmem/Poller.h index 9c7925dc..f8861149 100644 --- a/fairmq/shmem/Poller.h +++ b/fairmq/shmem/Poller.h @@ -127,13 +127,20 @@ class Poller final : public fair::mq::Poller void Poll(const int timeout) override { - if (zmq_poll(fItems, fNumItems, timeout) < 0) { - if (errno == ETERM) { - LOG(debug) << "polling exited, reason: " << zmq_strerror(errno); - } else { - LOG(error) << "polling failed, reason: " << zmq_strerror(errno); - throw fair::mq::PollerError(fair::mq::tools::ToString("Polling failed, reason: ", zmq_strerror(errno))); + while (true) { + if (zmq_poll(fItems, fNumItems, timeout) < 0) { + if (errno == ETERM) { + LOG(debug) << "polling exited, reason: " << zmq_strerror(errno); + return; + } else if (errno == EINTR) { + LOG(debug) << "polling interrupted by system call"; + continue; + } else { + LOG(error) << "polling failed, reason: " << zmq_strerror(errno); + throw fair::mq::PollerError(fair::mq::tools::ToString("Polling failed, reason: ", zmq_strerror(errno))); + } } + break; } } diff --git a/fairmq/shmem/Socket.h b/fairmq/shmem/Socket.h index 575968f3..c40585f5 100644 --- a/fairmq/shmem/Socket.h +++ b/fairmq/shmem/Socket.h @@ -148,9 +148,6 @@ class Socket final : public fair::mq::Socket if (zmq_errno() == ETERM) { LOG(debug) << "Terminating socket " << fId; return -1; - } else if (zmq_errno() == EINTR) { - LOG(debug) << "Transfer interrupted by system call"; - return -1; } else { LOG(error) << "Failed transfer on socket " << fId << ", reason: " << zmq_strerror(errno); return -1; @@ -177,7 +174,7 @@ class Socket final : public fair::mq::Socket size_t size = msg->GetSize(); fBytesTx += size; return size; - } else if (zmq_errno() == EAGAIN) { + } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) { if (ShouldRetry(flags, timeout, elapsed)) { continue; } else { @@ -220,7 +217,7 @@ class Socket final : public fair::mq::Socket fBytesRx += size; ++fMessagesRx; return size; - } else if (zmq_errno() == EAGAIN) { + } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) { if (ShouldRetry(flags, timeout, elapsed)) { continue; } else { @@ -269,7 +266,7 @@ class Socket final : public fair::mq::Socket fBytesTx += totalSize; return totalSize; - } else if (zmq_errno() == EAGAIN) { + } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) { if (ShouldRetry(flags, timeout, elapsed)) { continue; } else { @@ -323,7 +320,7 @@ class Socket final : public fair::mq::Socket fBytesRx += totalSize; return totalSize; - } else if (zmq_errno() == EAGAIN) { + } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) { if (ShouldRetry(flags, timeout, elapsed)) { continue; } else { diff --git a/fairmq/shmem/TransportFactory.h b/fairmq/shmem/TransportFactory.h index ed69a373..c77d4ca5 100644 --- a/fairmq/shmem/TransportFactory.h +++ b/fairmq/shmem/TransportFactory.h @@ -43,7 +43,7 @@ class TransportFactory final : public fair::mq::TransportFactory : fair::mq::TransportFactory(id) , fDeviceId(id) , fShmId() - , fZMQContext(zmq_ctx_new()) + , fZmqCtx(zmq_ctx_new()) , fManager(nullptr) { int major, minor, patch; @@ -51,7 +51,7 @@ class TransportFactory final : public fair::mq::TransportFactory LOG(debug) << "Transport: Using ZeroMQ (" << major << "." << minor << "." << patch << ") & " << "boost::interprocess (" << (BOOST_VERSION / 100000) << "." << (BOOST_VERSION / 100 % 1000) << "." << (BOOST_VERSION % 100) << ")"; - if (!fZMQContext) { + if (!fZmqCtx) { throw std::runtime_error(tools::ToString("failed creating context, reason: ", zmq_strerror(errno))); } @@ -70,12 +70,12 @@ class TransportFactory final : public fair::mq::TransportFactory LOG(debug) << "Generated shmid '" << fShmId << "' out of session id '" << sessionName << "'."; try { - if (zmq_ctx_set(fZMQContext, ZMQ_IO_THREADS, numIoThreads) != 0) { + if (zmq_ctx_set(fZmqCtx, ZMQ_IO_THREADS, numIoThreads) != 0) { LOG(error) << "failed configuring context, reason: " << zmq_strerror(errno); } // Set the maximum number of allowed sockets on the context. - if (zmq_ctx_set(fZMQContext, ZMQ_MAX_SOCKETS, 10000) != 0) { + if (zmq_ctx_set(fZmqCtx, ZMQ_MAX_SOCKETS, 10000) != 0) { LOG(error) << "failed configuring context, reason: " << zmq_strerror(errno); } @@ -121,7 +121,7 @@ class TransportFactory final : public fair::mq::TransportFactory SocketPtr CreateSocket(const std::string& type, const std::string& name) override { - return tools::make_unique(*fManager, type, name, GetId(), fZMQContext, this); + return tools::make_unique(*fManager, type, name, GetId(), fZmqCtx, this); } PollerPtr CreatePoller(const std::vector& channels) const override @@ -179,14 +179,17 @@ class TransportFactory final : public fair::mq::TransportFactory { LOG(debug) << "Destroying Shared Memory transport..."; - if (fZMQContext) { - if (zmq_ctx_term(fZMQContext) != 0) { - if (errno == EINTR) { - LOG(error) << "failed closing context, reason: " << zmq_strerror(errno); - } else { - fZMQContext = nullptr; - return; + if (fZmqCtx) { + while (true) { + if (zmq_ctx_term(fZmqCtx) != 0) { + if (errno == EINTR) { + LOG(debug) << "zmq_ctx_term interrupted by system call, retrying"; + continue; + } else { + fZmqCtx = nullptr; + } } + break; } } else { LOG(error) << "context not available for shutdown"; @@ -196,7 +199,7 @@ class TransportFactory final : public fair::mq::TransportFactory private: std::string fDeviceId; std::string fShmId; - void* fZMQContext; + void* fZmqCtx; std::unique_ptr fManager; }; diff --git a/fairmq/zeromq/Context.h b/fairmq/zeromq/Context.h index bcf639cf..bf061785 100644 --- a/fairmq/zeromq/Context.h +++ b/fairmq/zeromq/Context.h @@ -161,13 +161,16 @@ class Context UnsubscribeFromRegionEvents(); if (fZmqCtx) { - if (zmq_ctx_term(fZmqCtx) != 0) { - if (errno == EINTR) { - LOG(error) << " failed closing context, reason: " << zmq_strerror(errno); - } else { - fZmqCtx = nullptr; - return; + while (true) { + if (zmq_ctx_term(fZmqCtx) != 0) { + if (errno == EINTR) { + LOG(debug) << "zmq_ctx_term interrupted by system call, retrying"; + continue; + } else { + fZmqCtx = nullptr; + } } + break; } } else { LOG(error) << "context not available for shutdown"; diff --git a/fairmq/zeromq/Poller.h b/fairmq/zeromq/Poller.h index e1f41227..b4b27384 100644 --- a/fairmq/zeromq/Poller.h +++ b/fairmq/zeromq/Poller.h @@ -130,13 +130,20 @@ class Poller final : public fair::mq::Poller void Poll(const int timeout) override { - if (zmq_poll(fItems, fNumItems, timeout) < 0) { - if (errno == ETERM) { - LOG(debug) << "polling exited, reason: " << zmq_strerror(errno); - } else { - LOG(error) << "polling failed, reason: " << zmq_strerror(errno); - throw fair::mq::PollerError(fair::mq::tools::ToString("Polling failed, reason: ", zmq_strerror(errno))); + while (true) { + if (zmq_poll(fItems, fNumItems, timeout) < 0) { + if (errno == ETERM) { + LOG(debug) << "polling exited, reason: " << zmq_strerror(errno); + return; + } else if (errno == EINTR) { + LOG(debug) << "polling interrupted by system call"; + continue; + } else { + LOG(error) << "polling failed, reason: " << zmq_strerror(errno); + throw fair::mq::PollerError(fair::mq::tools::ToString("Polling failed, reason: ", zmq_strerror(errno))); + } } + break; } } diff --git a/fairmq/zeromq/Socket.h b/fairmq/zeromq/Socket.h index ab4d9d60..bbf7e578 100644 --- a/fairmq/zeromq/Socket.h +++ b/fairmq/zeromq/Socket.h @@ -126,9 +126,6 @@ class Socket final : public fair::mq::Socket if (zmq_errno() == ETERM) { LOG(debug) << "Terminating socket " << fId; return -1; - } else if (zmq_errno() == EINTR) { - LOG(debug) << "Transfer interrupted by system call"; - return -1; } else { LOG(error) << "Failed transfer on socket " << fId << ", errno: " << errno << ", reason: " << zmq_strerror(errno); return -1; @@ -151,7 +148,7 @@ class Socket final : public fair::mq::Socket fBytesTx += nbytes; ++fMessagesTx; return nbytes; - } else if (zmq_errno() == EAGAIN) { + } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) { if (ShouldRetry(flags, timeout, elapsed)) { continue; } else { @@ -177,7 +174,7 @@ class Socket final : public fair::mq::Socket fBytesRx += nbytes; ++fMessagesRx; return nbytes; - } else if (zmq_errno() == EAGAIN) { + } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) { if (ShouldRetry(flags, timeout, elapsed)) { continue; } else { @@ -212,7 +209,7 @@ class Socket final : public fair::mq::Socket int nbytes = zmq_msg_send(static_cast(msgVec[i].get())->GetMessage(), fSocket, (i < vecSize - 1) ? ZMQ_SNDMORE | flags : flags); if (nbytes >= 0) { totalSize += nbytes; - } else if (zmq_errno() == EAGAIN) { + } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) { if (ShouldRetry(flags, timeout, elapsed)) { repeat = true; break; @@ -261,7 +258,7 @@ class Socket final : public fair::mq::Socket if (nbytes >= 0) { msgVec.push_back(move(part)); totalSize += nbytes; - } else if (zmq_errno() == EAGAIN) { + } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) { if (ShouldRetry(flags, timeout, elapsed)) { repeat = true; break;