From fb42b1e2f0146b4f2012ab5e500a0ff40beec7de Mon Sep 17 00:00:00 2001 From: Dennis Klein Date: Tue, 12 Mar 2019 16:23:47 +0100 Subject: [PATCH] Adapt to new asiofi release --- CMakeLists.txt | 2 +- fairmq/ofi/Socket.cxx | 95 ++++++++++++++++++------------------------- fairmq/ofi/Socket.h | 2 +- 3 files changed, 42 insertions(+), 57 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 941640d6..1a9d1222 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -61,7 +61,7 @@ endif() if(BUILD_OFI_TRANSPORT) find_package2(PRIVATE asiofi REQUIRED - VERSION 0.2.0 + VERSION 0.3.0 ) find_package2(PRIVATE OFI REQUIRED VERSION ${asiofi_OFI_VERSION} diff --git a/fairmq/ofi/Socket.cxx b/fairmq/ofi/Socket.cxx index 18125ba4..ab93d095 100644 --- a/fairmq/ofi/Socket.cxx +++ b/fairmq/ofi/Socket.cxx @@ -334,18 +334,16 @@ auto Socket::Receive(std::vector& msgVec, const int timeout) -> int6 auto Socket::SendQueueReader() -> void { - fSendSem.async_wait([&](const boost::system::error_code& ec) { - if (!ec) { - // LOG(debug) << "OFI transport (" << fId << "): < Wait fSendSem=" << - // fSendSem.get_value(); - fSendQueueRead.async_receive([&](const boost::system::error_code& ec2, - azmq::message& zmsg, - size_t bytes_transferred) { - if (!ec2) { - OnSend(zmsg, bytes_transferred); - } - }); - } + fSendSem.async_wait([&] { + // LOG(debug) << "OFI transport (" << fId << "): < Wait fSendSem=" << + // fSendSem.get_value(); + fSendQueueRead.async_receive([&](const boost::system::error_code& ec2, + azmq::message& zmsg, + size_t bytes_transferred) { + if (!ec2) { + OnSend(zmsg, bytes_transferred); + } + }); }); } @@ -392,11 +390,9 @@ auto Socket::OnSend(azmq::message& zmsg, size_t /*bytes_transferred*/) -> void // buffer sent"; fBytesTx += size; fMessagesTx++; - fSendSem.async_signal([&](const boost::system::error_code& ec) { - if (!ec) { - // LOG(debug) << "OFI transport (" << fId << "): > - // Signal fSendSem=" << fSendSem.get_value(); - } + fSendSem.async_signal([&] { + // LOG(debug) << "OFI transport (" << fId << "): > + // Signal fSendSem=" << fSendSem.get_value(); }); }); @@ -406,20 +402,16 @@ auto Socket::OnSend(azmq::message& zmsg, size_t /*bytes_transferred*/) -> void // LOG(debug) << "OFI transport (" << fId << "): >>>>> Data buffer sent"; fBytesTx += size; fMessagesTx++; - fSendSem.async_signal([&](const boost::system::error_code& ec) { - if (!ec) { - // LOG(debug) << "OFI transport (" << fId << "): > Signal fSendSem=" - // << fSendSem.get_value(); - } + fSendSem.async_signal([&] { + // LOG(debug) << "OFI transport (" << fId << "): > Signal fSendSem=" + // << fSendSem.get_value(); }); }); } } else { ++fMessagesTx; - fSendSem.async_signal([&](const boost::system::error_code& ec) { - if (!ec) { - // LOG(debug) << "OFI transport (" << fId << "): > Signal fSendSem=" << fSendSem.get_value(); - } + fSendSem.async_signal([&] { + // LOG(debug) << "OFI transport (" << fId << "): > Signal fSendSem=" << fSendSem.get_value(); }); } @@ -429,25 +421,24 @@ auto Socket::OnSend(azmq::message& zmsg, size_t /*bytes_transferred*/) -> void auto Socket::RecvControlQueueReader() -> void { - fRecvSem.async_wait([&](const boost::system::error_code& ec) { - if (!ec) { - auto ctrl = MakeControlMessageWithPmr(&fControlMemPool); - auto ctrl_msg = boost::asio::mutable_buffer(ctrl.get(), sizeof(PostBuffer)); + fRecvSem.async_wait([&] { + auto ctrl = MakeControlMessageWithPmr(&fControlMemPool); + auto ctrl_msg = boost::asio::mutable_buffer(ctrl.get(), sizeof(PostBuffer)); - if (fNeedOfiMemoryRegistration) { - asiofi::memory_region mr(*fOfiDomain, ctrl_msg, asiofi::mr::access::recv); - auto desc = mr.desc(); + if (fNeedOfiMemoryRegistration) { + asiofi::memory_region mr(*fOfiDomain, ctrl_msg, asiofi::mr::access::recv); + auto desc = mr.desc(); - fControlEndpoint->recv( - ctrl_msg, desc, [&, ctrl2 = std::move(ctrl), mr2 = std::move(mr)](boost::asio::mutable_buffer) mutable { - OnRecvControl(std::move(ctrl2)); - }); - } else { - fControlEndpoint->recv( - ctrl_msg, [&, ctrl2 = std::move(ctrl)](boost::asio::mutable_buffer) mutable { - OnRecvControl(std::move(ctrl2)); - }); - } + fControlEndpoint->recv( + ctrl_msg, + desc, + [&, ctrl2 = std::move(ctrl), mr2 = std::move(mr)]( + boost::asio::mutable_buffer) mutable { OnRecvControl(std::move(ctrl2)); }); + } else { + fControlEndpoint->recv( + ctrl_msg, [&, ctrl2 = std::move(ctrl)](boost::asio::mutable_buffer) mutable { + OnRecvControl(std::move(ctrl2)); + }); } }); } @@ -481,10 +472,8 @@ auto Socket::OnRecvControl(ofi::unique_ptr ctrl) -> void // LOG(debug) << "OFI transport (" << fId // << "): <<<<< Data buffer received, bytes_transferred2=" // << bytes_transferred2; - fRecvSem.async_signal([&](const boost::system::error_code& ec2) { - if (!ec2) { - //LOG(debug) << "OFI transport (" << fId << "): < Signal fRecvSem"; - } + fRecvSem.async_signal([&] { + //LOG(debug) << "OFI transport (" << fId << "): < Signal fRecvSem"; }); } }); @@ -501,10 +490,8 @@ auto Socket::OnRecvControl(ofi::unique_ptr ctrl) -> void // LOG(debug) << "OFI transport (" << fId // << "): <<<<< Data buffer received, bytes_transferred2=" // << bytes_transferred2; - fRecvSem.async_signal([&](const boost::system::error_code& ec2) { - if (!ec2) { - // LOG(debug) << "OFI transport (" << fId << "): < Signal fRecvSem"; - } + fRecvSem.async_signal([&] { + // LOG(debug) << "OFI transport (" << fId << "): < Signal fRecvSem"; }); } }); @@ -518,10 +505,8 @@ auto Socket::OnRecvControl(ofi::unique_ptr ctrl) -> void // LOG(debug) << "OFI transport (" << fId // << "): <<<<< Data buffer received, bytes_transferred2=" // << bytes_transferred2; - fRecvSem.async_signal([&](const boost::system::error_code& ec2) { - if (!ec2) { - // LOG(debug) << "OFI transport (" << fId << "): < Signal fRecvSem"; - } + fRecvSem.async_signal([&] { + // LOG(debug) << "OFI transport (" << fId << "): < Signal fRecvSem"; }); } }); diff --git a/fairmq/ofi/Socket.h b/fairmq/ofi/Socket.h index 3ad031e1..67a742e9 100644 --- a/fairmq/ofi/Socket.h +++ b/fairmq/ofi/Socket.h @@ -99,7 +99,7 @@ class Socket final : public fair::mq::Socket int fRcvTimeout; azmq::socket fSendQueueWrite, fSendQueueRead; azmq::socket fRecvQueueWrite, fRecvQueueRead; - asiofi::semaphore fSendSem, fRecvSem; + asiofi::synchronized_semaphore fSendSem, fRecvSem; std::atomic fNeedOfiMemoryRegistration; auto SendQueueReader() -> void;