Retry on EINTR in blocking zmq calls

This commit is contained in:
Alexey Rybalchenko 2020-08-28 10:11:56 +02:00
parent 1f0c94f898
commit 690e8a0370
6 changed files with 59 additions and 45 deletions

View File

@ -127,14 +127,21 @@ class Poller final : public fair::mq::Poller
void Poll(const int timeout) override void Poll(const int timeout) override
{ {
while (true) {
if (zmq_poll(fItems, fNumItems, timeout) < 0) { if (zmq_poll(fItems, fNumItems, timeout) < 0) {
if (errno == ETERM) { if (errno == ETERM) {
LOG(debug) << "polling exited, reason: " << zmq_strerror(errno); LOG(debug) << "polling exited, reason: " << zmq_strerror(errno);
return;
} else if (errno == EINTR) {
LOG(debug) << "polling interrupted by system call";
continue;
} else { } else {
LOG(error) << "polling failed, reason: " << zmq_strerror(errno); LOG(error) << "polling failed, reason: " << zmq_strerror(errno);
throw fair::mq::PollerError(fair::mq::tools::ToString("Polling failed, reason: ", zmq_strerror(errno))); throw fair::mq::PollerError(fair::mq::tools::ToString("Polling failed, reason: ", zmq_strerror(errno)));
} }
} }
break;
}
} }
bool CheckInput(const int index) override bool CheckInput(const int index) override

View File

@ -148,9 +148,6 @@ class Socket final : public fair::mq::Socket
if (zmq_errno() == ETERM) { if (zmq_errno() == ETERM) {
LOG(debug) << "Terminating socket " << fId; LOG(debug) << "Terminating socket " << fId;
return -1; return -1;
} else if (zmq_errno() == EINTR) {
LOG(debug) << "Transfer interrupted by system call";
return -1;
} else { } else {
LOG(error) << "Failed transfer on socket " << fId << ", reason: " << zmq_strerror(errno); LOG(error) << "Failed transfer on socket " << fId << ", reason: " << zmq_strerror(errno);
return -1; return -1;
@ -177,7 +174,7 @@ class Socket final : public fair::mq::Socket
size_t size = msg->GetSize(); size_t size = msg->GetSize();
fBytesTx += size; fBytesTx += size;
return size; return size;
} else if (zmq_errno() == EAGAIN) { } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
if (ShouldRetry(flags, timeout, elapsed)) { if (ShouldRetry(flags, timeout, elapsed)) {
continue; continue;
} else { } else {
@ -220,7 +217,7 @@ class Socket final : public fair::mq::Socket
fBytesRx += size; fBytesRx += size;
++fMessagesRx; ++fMessagesRx;
return size; return size;
} else if (zmq_errno() == EAGAIN) { } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
if (ShouldRetry(flags, timeout, elapsed)) { if (ShouldRetry(flags, timeout, elapsed)) {
continue; continue;
} else { } else {
@ -269,7 +266,7 @@ class Socket final : public fair::mq::Socket
fBytesTx += totalSize; fBytesTx += totalSize;
return totalSize; return totalSize;
} else if (zmq_errno() == EAGAIN) { } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
if (ShouldRetry(flags, timeout, elapsed)) { if (ShouldRetry(flags, timeout, elapsed)) {
continue; continue;
} else { } else {
@ -323,7 +320,7 @@ class Socket final : public fair::mq::Socket
fBytesRx += totalSize; fBytesRx += totalSize;
return totalSize; return totalSize;
} else if (zmq_errno() == EAGAIN) { } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
if (ShouldRetry(flags, timeout, elapsed)) { if (ShouldRetry(flags, timeout, elapsed)) {
continue; continue;
} else { } else {

View File

@ -43,7 +43,7 @@ class TransportFactory final : public fair::mq::TransportFactory
: fair::mq::TransportFactory(id) : fair::mq::TransportFactory(id)
, fDeviceId(id) , fDeviceId(id)
, fShmId() , fShmId()
, fZMQContext(zmq_ctx_new()) , fZmqCtx(zmq_ctx_new())
, fManager(nullptr) , fManager(nullptr)
{ {
int major, minor, patch; int major, minor, patch;
@ -51,7 +51,7 @@ class TransportFactory final : public fair::mq::TransportFactory
LOG(debug) << "Transport: Using ZeroMQ (" << major << "." << minor << "." << patch << ") & " LOG(debug) << "Transport: Using ZeroMQ (" << major << "." << minor << "." << patch << ") & "
<< "boost::interprocess (" << (BOOST_VERSION / 100000) << "." << (BOOST_VERSION / 100 % 1000) << "." << (BOOST_VERSION % 100) << ")"; << "boost::interprocess (" << (BOOST_VERSION / 100000) << "." << (BOOST_VERSION / 100 % 1000) << "." << (BOOST_VERSION % 100) << ")";
if (!fZMQContext) { if (!fZmqCtx) {
throw std::runtime_error(tools::ToString("failed creating context, reason: ", zmq_strerror(errno))); throw std::runtime_error(tools::ToString("failed creating context, reason: ", zmq_strerror(errno)));
} }
@ -70,12 +70,12 @@ class TransportFactory final : public fair::mq::TransportFactory
LOG(debug) << "Generated shmid '" << fShmId << "' out of session id '" << sessionName << "'."; LOG(debug) << "Generated shmid '" << fShmId << "' out of session id '" << sessionName << "'.";
try { try {
if (zmq_ctx_set(fZMQContext, ZMQ_IO_THREADS, numIoThreads) != 0) { if (zmq_ctx_set(fZmqCtx, ZMQ_IO_THREADS, numIoThreads) != 0) {
LOG(error) << "failed configuring context, reason: " << zmq_strerror(errno); LOG(error) << "failed configuring context, reason: " << zmq_strerror(errno);
} }
// Set the maximum number of allowed sockets on the context. // Set the maximum number of allowed sockets on the context.
if (zmq_ctx_set(fZMQContext, ZMQ_MAX_SOCKETS, 10000) != 0) { if (zmq_ctx_set(fZmqCtx, ZMQ_MAX_SOCKETS, 10000) != 0) {
LOG(error) << "failed configuring context, reason: " << zmq_strerror(errno); LOG(error) << "failed configuring context, reason: " << zmq_strerror(errno);
} }
@ -121,7 +121,7 @@ class TransportFactory final : public fair::mq::TransportFactory
SocketPtr CreateSocket(const std::string& type, const std::string& name) override SocketPtr CreateSocket(const std::string& type, const std::string& name) override
{ {
return tools::make_unique<Socket>(*fManager, type, name, GetId(), fZMQContext, this); return tools::make_unique<Socket>(*fManager, type, name, GetId(), fZmqCtx, this);
} }
PollerPtr CreatePoller(const std::vector<FairMQChannel>& channels) const override PollerPtr CreatePoller(const std::vector<FairMQChannel>& channels) const override
@ -179,15 +179,18 @@ class TransportFactory final : public fair::mq::TransportFactory
{ {
LOG(debug) << "Destroying Shared Memory transport..."; LOG(debug) << "Destroying Shared Memory transport...";
if (fZMQContext) { if (fZmqCtx) {
if (zmq_ctx_term(fZMQContext) != 0) { while (true) {
if (zmq_ctx_term(fZmqCtx) != 0) {
if (errno == EINTR) { if (errno == EINTR) {
LOG(error) << "failed closing context, reason: " << zmq_strerror(errno); LOG(debug) << "zmq_ctx_term interrupted by system call, retrying";
continue;
} else { } else {
fZMQContext = nullptr; fZmqCtx = nullptr;
return;
} }
} }
break;
}
} else { } else {
LOG(error) << "context not available for shutdown"; LOG(error) << "context not available for shutdown";
} }
@ -196,7 +199,7 @@ class TransportFactory final : public fair::mq::TransportFactory
private: private:
std::string fDeviceId; std::string fDeviceId;
std::string fShmId; std::string fShmId;
void* fZMQContext; void* fZmqCtx;
std::unique_ptr<Manager> fManager; std::unique_ptr<Manager> fManager;
}; };

View File

@ -161,14 +161,17 @@ class Context
UnsubscribeFromRegionEvents(); UnsubscribeFromRegionEvents();
if (fZmqCtx) { if (fZmqCtx) {
while (true) {
if (zmq_ctx_term(fZmqCtx) != 0) { if (zmq_ctx_term(fZmqCtx) != 0) {
if (errno == EINTR) { if (errno == EINTR) {
LOG(error) << " failed closing context, reason: " << zmq_strerror(errno); LOG(debug) << "zmq_ctx_term interrupted by system call, retrying";
continue;
} else { } else {
fZmqCtx = nullptr; fZmqCtx = nullptr;
return;
} }
} }
break;
}
} else { } else {
LOG(error) << "context not available for shutdown"; LOG(error) << "context not available for shutdown";
} }

View File

@ -130,14 +130,21 @@ class Poller final : public fair::mq::Poller
void Poll(const int timeout) override void Poll(const int timeout) override
{ {
while (true) {
if (zmq_poll(fItems, fNumItems, timeout) < 0) { if (zmq_poll(fItems, fNumItems, timeout) < 0) {
if (errno == ETERM) { if (errno == ETERM) {
LOG(debug) << "polling exited, reason: " << zmq_strerror(errno); LOG(debug) << "polling exited, reason: " << zmq_strerror(errno);
return;
} else if (errno == EINTR) {
LOG(debug) << "polling interrupted by system call";
continue;
} else { } else {
LOG(error) << "polling failed, reason: " << zmq_strerror(errno); LOG(error) << "polling failed, reason: " << zmq_strerror(errno);
throw fair::mq::PollerError(fair::mq::tools::ToString("Polling failed, reason: ", zmq_strerror(errno))); throw fair::mq::PollerError(fair::mq::tools::ToString("Polling failed, reason: ", zmq_strerror(errno)));
} }
} }
break;
}
} }
bool CheckInput(const int index) override bool CheckInput(const int index) override

View File

@ -126,9 +126,6 @@ class Socket final : public fair::mq::Socket
if (zmq_errno() == ETERM) { if (zmq_errno() == ETERM) {
LOG(debug) << "Terminating socket " << fId; LOG(debug) << "Terminating socket " << fId;
return -1; return -1;
} else if (zmq_errno() == EINTR) {
LOG(debug) << "Transfer interrupted by system call";
return -1;
} else { } else {
LOG(error) << "Failed transfer on socket " << fId << ", errno: " << errno << ", reason: " << zmq_strerror(errno); LOG(error) << "Failed transfer on socket " << fId << ", errno: " << errno << ", reason: " << zmq_strerror(errno);
return -1; return -1;
@ -151,7 +148,7 @@ class Socket final : public fair::mq::Socket
fBytesTx += nbytes; fBytesTx += nbytes;
++fMessagesTx; ++fMessagesTx;
return nbytes; return nbytes;
} else if (zmq_errno() == EAGAIN) { } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
if (ShouldRetry(flags, timeout, elapsed)) { if (ShouldRetry(flags, timeout, elapsed)) {
continue; continue;
} else { } else {
@ -177,7 +174,7 @@ class Socket final : public fair::mq::Socket
fBytesRx += nbytes; fBytesRx += nbytes;
++fMessagesRx; ++fMessagesRx;
return nbytes; return nbytes;
} else if (zmq_errno() == EAGAIN) { } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
if (ShouldRetry(flags, timeout, elapsed)) { if (ShouldRetry(flags, timeout, elapsed)) {
continue; continue;
} else { } else {
@ -212,7 +209,7 @@ class Socket final : public fair::mq::Socket
int nbytes = zmq_msg_send(static_cast<Message*>(msgVec[i].get())->GetMessage(), fSocket, (i < vecSize - 1) ? ZMQ_SNDMORE | flags : flags); int nbytes = zmq_msg_send(static_cast<Message*>(msgVec[i].get())->GetMessage(), fSocket, (i < vecSize - 1) ? ZMQ_SNDMORE | flags : flags);
if (nbytes >= 0) { if (nbytes >= 0) {
totalSize += nbytes; totalSize += nbytes;
} else if (zmq_errno() == EAGAIN) { } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
if (ShouldRetry(flags, timeout, elapsed)) { if (ShouldRetry(flags, timeout, elapsed)) {
repeat = true; repeat = true;
break; break;
@ -261,7 +258,7 @@ class Socket final : public fair::mq::Socket
if (nbytes >= 0) { if (nbytes >= 0) {
msgVec.push_back(move(part)); msgVec.push_back(move(part));
totalSize += nbytes; totalSize += nbytes;
} else if (zmq_errno() == EAGAIN) { } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
if (ShouldRetry(flags, timeout, elapsed)) { if (ShouldRetry(flags, timeout, elapsed)) {
repeat = true; repeat = true;
break; break;