Adjust transfer methods behaviour when interrupted

A transer is attempted even if the transport has been interrupted
(with a timeout). When the timeout is reached, transfer methods will
return TransferResult::interrupted (-3).
This commit is contained in:
Alexey Rybalchenko
2020-09-15 17:18:53 +02:00
committed by Dennis Klein
parent 5e97d85956
commit 6932f88c84
8 changed files with 169 additions and 91 deletions

View File

@@ -250,7 +250,7 @@ class FairMQChannel
/// Sends a message to the socket queue.
/// @param msg Constant reference of unique_ptr to a FairMQMessage
/// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
/// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. -1 if there was an error.
/// @return Number of bytes that have been queued, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
int Send(FairMQMessagePtr& msg, int sndTimeoutInMs = -1)
{
CheckSendCompatibility(msg);
@@ -260,7 +260,7 @@ class FairMQChannel
/// Receives a message from the socket queue.
/// @param msg Constant reference of unique_ptr to a FairMQMessage
/// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
/// @return Number of bytes that have been received. -2 if reading from the queue was not possible or timed out. -1 if there was an error.
/// @return Number of bytes that have been received, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
int Receive(FairMQMessagePtr& msg, int rcvTimeoutInMs = -1)
{
CheckReceiveCompatibility(msg);
@@ -270,7 +270,7 @@ class FairMQChannel
/// Send a vector of messages
/// @param msgVec message vector reference
/// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
/// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. -1 if there was an error.
/// @return Number of bytes that have been queued, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
int64_t Send(std::vector<FairMQMessagePtr>& msgVec, int sndTimeoutInMs = -1)
{
CheckSendCompatibility(msgVec);
@@ -280,7 +280,7 @@ class FairMQChannel
/// Receive a vector of messages
/// @param msgVec message vector reference
/// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
/// @return Number of bytes that have been received. -2 if reading from the queue was not possible or timed out. -1 if there was an error.
/// @return Number of bytes that have been received, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
int64_t Receive(std::vector<FairMQMessagePtr>& msgVec, int rcvTimeoutInMs = -1)
{
CheckReceiveCompatibility(msgVec);
@@ -290,7 +290,7 @@ class FairMQChannel
/// Send FairMQParts
/// @param parts FairMQParts reference
/// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
/// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. -1 if there was an error.
/// @return Number of bytes that have been queued, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
int64_t Send(FairMQParts& parts, int sndTimeoutInMs = -1)
{
return Send(parts.fParts, sndTimeoutInMs);
@@ -299,7 +299,7 @@ class FairMQChannel
/// Receive FairMQParts
/// @param parts FairMQParts reference
/// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
/// @return Number of bytes that have been received. -2 if reading from the queue was not possible or timed out. -1 if there was an error.
/// @return Number of bytes that have been received, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
int64_t Receive(FairMQParts& parts, int rcvTimeoutInMs = -1)
{
return Receive(parts.fParts, rcvTimeoutInMs);

View File

@@ -129,7 +129,7 @@ class FairMQDevice
/// @param chan channel name
/// @param i channel index
/// @param sndTimeoutInMs send timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
/// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. -1 if there was an error.
/// @return Number of bytes that have been queued, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
int Send(FairMQMessagePtr& msg, const std::string& channel, const int index = 0, int sndTimeoutInMs = -1)
{
return GetChannel(channel, index).Send(msg, sndTimeoutInMs);
@@ -140,7 +140,7 @@ class FairMQDevice
/// @param chan channel name
/// @param i channel index
/// @param rcvTimeoutInMs receive timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
/// @return Number of bytes that have been received. -2 if reading from the queue was not possible or timed out. -1 if there was an error.
/// @return Number of bytes that have been received, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
int Receive(FairMQMessagePtr& msg, const std::string& channel, const int index = 0, int rcvTimeoutInMs = -1)
{
return GetChannel(channel, index).Receive(msg, rcvTimeoutInMs);
@@ -151,7 +151,7 @@ class FairMQDevice
/// @param chan channel name
/// @param i channel index
/// @param sndTimeoutInMs send timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
/// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. -1 if there was an error.
/// @return Number of bytes that have been queued, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
int64_t Send(FairMQParts& parts, const std::string& channel, const int index = 0, int sndTimeoutInMs = -1)
{
return GetChannel(channel, index).Send(parts.fParts, sndTimeoutInMs);
@@ -162,7 +162,7 @@ class FairMQDevice
/// @param chan channel name
/// @param i channel index
/// @param rcvTimeoutInMs receive timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
/// @return Number of bytes that have been received. -2 if reading from the queue was not possible or timed out. -1 if there was an error.
/// @return Number of bytes that have been received, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
int64_t Receive(FairMQParts& parts, const std::string& channel, const int index = 0, int rcvTimeoutInMs = -1)
{
return GetChannel(channel, index).Receive(parts.fParts, rcvTimeoutInMs);

View File

@@ -9,14 +9,31 @@
#ifndef FAIRMQSOCKET_H_
#define FAIRMQSOCKET_H_
#include "FairMQMessage.h"
#include <memory>
#include <ostream>
#include <stdexcept>
#include <string>
#include <vector>
#include "FairMQMessage.h"
class FairMQTransportFactory;
namespace fair
{
namespace mq
{
enum class TransferResult : int
{
error = -1,
timeout = -2,
interrupted = -3
};
} // namespace mq
} // namespace fair
class FairMQSocket
{
public:

View File

@@ -272,7 +272,7 @@ try {
int size(0);
for (auto& msg : msgVec) {
size += msg->GetSize();
}
}
fSendPushSem.wait();
{
@@ -284,7 +284,7 @@ try {
return size;
} catch (const std::exception& e) {
LOG(error) << e.what();
return -1;
return TransferResult::error;
}
auto Socket::SendQueueReader() -> void
@@ -431,7 +431,7 @@ try {
return size;
} catch (const std::exception& e) {
LOG(error) << e.what();
return -1;
return TransferResult::error;
}
auto Socket::Receive(std::vector<MessagePtr>& msgVec, const int /*timeout*/) -> int64_t
@@ -449,14 +449,14 @@ try {
int64_t size(0);
for (auto& msg : msgVec) {
size += msg->GetSize();
}
}
fBytesRx += size;
++fMessagesRx;
return size;
return size;
} catch (const std::exception& e) {
LOG(error) << e.what();
return -1;
return TransferResult::error;
}
auto Socket::RecvControlQueueReader() -> void

View File

@@ -130,7 +130,7 @@ class Socket final : public fair::mq::Socket
bool ShouldRetry(int flags, int timeout, int& elapsed) const
{
if (!fManager.Interrupted() && ((flags & ZMQ_DONTWAIT) == 0)) {
if ((flags & ZMQ_DONTWAIT) == 0) {
if (timeout > 0) {
elapsed += fTimeout;
if (elapsed >= timeout) {
@@ -147,10 +147,10 @@ class Socket final : public fair::mq::Socket
{
if (zmq_errno() == ETERM) {
LOG(debug) << "Terminating socket " << fId;
return -1;
return static_cast<int>(TransferResult::error);
} else {
LOG(error) << "Failed transfer on socket " << fId << ", reason: " << zmq_strerror(errno);
return -1;
return static_cast<int>(TransferResult::error);
}
}
@@ -166,7 +166,7 @@ class Socket final : public fair::mq::Socket
ZMsg zmqMsg(sizeof(MetaHeader));
std::memcpy(zmqMsg.Data(), &(shmMsg->fMeta), sizeof(MetaHeader));
while (true && !fManager.Interrupted()) {
while (true) {
int nbytes = zmq_msg_send(zmqMsg.Msg(), fSocket, flags);
if (nbytes > 0) {
shmMsg->fQueued = true;
@@ -175,17 +175,19 @@ class Socket final : public fair::mq::Socket
fBytesTx += size;
return size;
} else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
if (ShouldRetry(flags, timeout, elapsed)) {
if (fManager.Interrupted()) {
return static_cast<int>(TransferResult::interrupted);
} else if (ShouldRetry(flags, timeout, elapsed)) {
continue;
} else {
return -2;
return static_cast<int>(TransferResult::timeout);
}
} else {
return HandleErrors();
}
}
return -1;
return static_cast<int>(TransferResult::error);
}
int Receive(MessagePtr& msg, const int timeout = -1) override
@@ -218,10 +220,12 @@ class Socket final : public fair::mq::Socket
++fMessagesRx;
return size;
} else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
if (ShouldRetry(flags, timeout, elapsed)) {
if (fManager.Interrupted()) {
return static_cast<int>(TransferResult::interrupted);
} else if (ShouldRetry(flags, timeout, elapsed)) {
continue;
} else {
return -2;
return static_cast<int>(TransferResult::timeout);
}
} else {
return HandleErrors();
@@ -249,7 +253,7 @@ class Socket final : public fair::mq::Socket
std::memcpy(metas++, &(shmMsg->fMeta), sizeof(MetaHeader));
}
while (!fManager.Interrupted()) {
while (true) {
int64_t totalSize = 0;
int nbytes = zmq_msg_send(zmqMsg.Msg(), fSocket, flags);
if (nbytes > 0) {
@@ -267,17 +271,19 @@ class Socket final : public fair::mq::Socket
return totalSize;
} else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
if (ShouldRetry(flags, timeout, elapsed)) {
if (fManager.Interrupted()) {
return static_cast<int>(TransferResult::interrupted);
} else if (ShouldRetry(flags, timeout, elapsed)) {
continue;
} else {
return -2;
return static_cast<int>(TransferResult::timeout);
}
} else {
return HandleErrors();
}
}
return -1;
return static_cast<int>(TransferResult::error);
}
int64_t Receive(std::vector<MessagePtr>& msgVec, const int timeout = -1) override
@@ -290,7 +296,7 @@ class Socket final : public fair::mq::Socket
ZMsg zmqMsg;
while (!fManager.Interrupted()) {
while (true) {
int64_t totalSize = 0;
int nbytes = zmq_msg_recv(zmqMsg.Msg(), fSocket, flags);
if (nbytes > 0) {
@@ -321,17 +327,19 @@ class Socket final : public fair::mq::Socket
return totalSize;
} else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
if (ShouldRetry(flags, timeout, elapsed)) {
if (fManager.Interrupted()) {
return static_cast<int>(TransferResult::interrupted);
} else if (ShouldRetry(flags, timeout, elapsed)) {
continue;
} else {
return -2;
return static_cast<int>(TransferResult::timeout);
}
} else {
return HandleErrors();
}
}
return -1;
return static_cast<int>(TransferResult::error);
}
void* GetSocket() const { return fSocket; }
@@ -498,7 +506,7 @@ class Socket final : public fair::mq::Socket
if (constant == "pollout")
return ZMQ_POLLOUT;
return -1;
throw SocketError(tools::ToString("GetConstant called with an invalid argument: ", constant));
}
~Socket() override { Close(); }

View File

@@ -108,7 +108,7 @@ class Socket final : public fair::mq::Socket
bool ShouldRetry(int flags, int timeout, int& elapsed) const
{
if (!fCtx.Interrupted() && ((flags & ZMQ_DONTWAIT) == 0)) {
if ((flags & ZMQ_DONTWAIT) == 0) {
if (timeout > 0) {
elapsed += fTimeout;
if (elapsed >= timeout) {
@@ -125,10 +125,10 @@ class Socket final : public fair::mq::Socket
{
if (zmq_errno() == ETERM) {
LOG(debug) << "Terminating socket " << fId;
return -1;
return static_cast<int>(TransferResult::error);
} else {
LOG(error) << "Failed transfer on socket " << fId << ", errno: " << errno << ", reason: " << zmq_strerror(errno);
return -1;
return static_cast<int>(TransferResult::error);
}
}
@@ -149,10 +149,12 @@ class Socket final : public fair::mq::Socket
++fMessagesTx;
return nbytes;
} else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
if (ShouldRetry(flags, timeout, elapsed)) {
if (fCtx.Interrupted()) {
return static_cast<int>(TransferResult::interrupted);
} else if (ShouldRetry(flags, timeout, elapsed)) {
continue;
} else {
return -2;
return static_cast<int>(TransferResult::timeout);
}
} else {
return HandleErrors();
@@ -175,10 +177,12 @@ class Socket final : public fair::mq::Socket
++fMessagesRx;
return nbytes;
} else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
if (ShouldRetry(flags, timeout, elapsed)) {
if (fCtx.Interrupted()) {
return static_cast<int>(TransferResult::interrupted);
} else if (ShouldRetry(flags, timeout, elapsed)) {
continue;
} else {
return -2;
return static_cast<int>(TransferResult::timeout);
}
} else {
return HandleErrors();
@@ -210,11 +214,13 @@ class Socket final : public fair::mq::Socket
if (nbytes >= 0) {
totalSize += nbytes;
} else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
if (ShouldRetry(flags, timeout, elapsed)) {
if (fCtx.Interrupted()) {
return static_cast<int>(TransferResult::interrupted);
} else if (ShouldRetry(flags, timeout, elapsed)) {
repeat = true;
break;
} else {
return -2;
return static_cast<int>(TransferResult::timeout);
}
} else {
return HandleErrors();
@@ -234,7 +240,7 @@ class Socket final : public fair::mq::Socket
return Send(msgVec.back(), timeout);
} else { // if the vector is empty, something might be wrong
LOG(warn) << "Will not send empty vector";
return -1;
return static_cast<int>(TransferResult::error);
}
}
@@ -259,11 +265,13 @@ class Socket final : public fair::mq::Socket
msgVec.push_back(move(part));
totalSize += nbytes;
} else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
if (ShouldRetry(flags, timeout, elapsed)) {
if (fCtx.Interrupted()) {
return static_cast<int>(TransferResult::interrupted);
} else if (ShouldRetry(flags, timeout, elapsed)) {
repeat = true;
break;
} else {
return -2;
return static_cast<int>(TransferResult::timeout);
}
} else {
return HandleErrors();
@@ -446,7 +454,7 @@ class Socket final : public fair::mq::Socket
if (constant == "pollout")
return ZMQ_POLLOUT;
return -1;
throw SocketError(tools::ToString("GetConstant called with an invalid argument: ", constant));
}
~Socket() override { Close(); }