diff --git a/fairmq/zeromq/Socket.h b/fairmq/zeromq/Socket.h index b1fd3f91..1447a70f 100644 --- a/fairmq/zeromq/Socket.h +++ b/fairmq/zeromq/Socket.h @@ -142,12 +142,14 @@ class Socket final : public fair::mq::Socket static_cast(msg.get())->ApplyUsedSize(); + int64_t actualBytes = zmq_msg_size(static_cast(msg.get())->GetMessage()); + while (true) { int nbytes = zmq_msg_send(static_cast(msg.get())->GetMessage(), fSocket, flags); if (nbytes >= 0) { - fBytesTx += nbytes; + fBytesTx += actualBytes; ++fMessagesTx; - return nbytes; + return actualBytes; } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) { if (fCtx.Interrupted()) { return static_cast(TransferCode::interrupted); @@ -173,9 +175,10 @@ class Socket final : public fair::mq::Socket while (true) { int nbytes = zmq_msg_recv(static_cast(msg.get())->GetMessage(), fSocket, flags); if (nbytes >= 0) { - fBytesRx += nbytes; + int64_t actualBytes = zmq_msg_size(static_cast(msg.get())->GetMessage()); + fBytesRx += actualBytes; ++fMessagesRx; - return nbytes; + return actualBytes; } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) { if (fCtx.Interrupted()) { return static_cast(TransferCode::interrupted);