diff --git a/fairmq/ofi/Socket.cxx b/fairmq/ofi/Socket.cxx index b00c58d9..aa73bcf3 100644 --- a/fairmq/ofi/Socket.cxx +++ b/fairmq/ofi/Socket.cxx @@ -329,7 +329,6 @@ auto Socket::SendQueueReader() -> void fMessagesTx++; fSendPushSem.signal(); }); - } else { fDataEndpoint->send( buffer, [&, size, msg2 = std::move(msg)](boost::asio::mutable_buffer) mutable { @@ -477,21 +476,24 @@ auto Socket::DataMessageReceived(MessagePtr msg) -> void { if (fMultiPartRecvCounter > 0) { --fMultiPartRecvCounter; - fInflightMultiPartMessage.emplace_back(std::move(msg)); + fInflightMultiPartMessage.push_back(std::move(msg)); } - std::unique_lock lk(fRecvQueueMutex); if (fMultiPartRecvCounter == 0) { + std::unique_lock lk(fRecvQueueMutex); fRecvQueue.push(std::move(fInflightMultiPartMessage)); + lk.unlock(); fMultiPartRecvCounter = -1; - } else { + fRecvPopSem.signal(); + } else if (fMultiPartRecvCounter == -1) { std::vector msgVec; msgVec.push_back(std::move(msg)); + std::unique_lock lk(fRecvQueueMutex); fRecvQueue.push(std::move(msgVec)); + lk.unlock(); + fRecvPopSem.signal(); } - lk.unlock(); - fRecvPopSem.signal(); } auto Socket::Close() -> void {}