From ad198edd59aa3b56ea629566bb68debd3ab37262 Mon Sep 17 00:00:00 2001 From: Dennis Klein Date: Wed, 27 Feb 2019 18:06:53 +0100 Subject: [PATCH] Remove asio strand --- fairmq/ofi/Socket.cxx | 87 +++++++++++++++++++++---------------------- fairmq/ofi/Socket.h | 1 - 2 files changed, 42 insertions(+), 46 deletions(-) diff --git a/fairmq/ofi/Socket.cxx b/fairmq/ofi/Socket.cxx index 806c0195..cb33cb56 100644 --- a/fairmq/ofi/Socket.cxx +++ b/fairmq/ofi/Socket.cxx @@ -14,7 +14,6 @@ #include #include -#include #include #include #include @@ -51,15 +50,14 @@ Socket::Socket(Context& context, const string& type, const string& name, const s , fBytesRx(0) , fMessagesTx(0) , fMessagesRx(0) - , fIoStrand(fContext.GetIoContext()) , fSndTimeout(100) , fRcvTimeout(100) - , fSendQueueWrite(fIoStrand.context(), ZMQ_PUSH) - , fSendQueueRead(fIoStrand.context(), ZMQ_PULL) - , fRecvQueueWrite(fIoStrand.context(), ZMQ_PUSH) - , fRecvQueueRead(fIoStrand.context(), ZMQ_PULL) - , fSendSem(fIoStrand.context(), 300) - , fRecvSem(fIoStrand.context(), 300) + , fSendQueueWrite(fContext.GetIoContext(), ZMQ_PUSH) + , fSendQueueRead(fContext.GetIoContext(), ZMQ_PULL) + , fRecvQueueWrite(fContext.GetIoContext(), ZMQ_PUSH) + , fRecvQueueRead(fContext.GetIoContext(), ZMQ_PULL) + , fSendSem(fContext.GetIoContext(), 300) + , fRecvSem(fContext.GetIoContext(), 300) , fNeedOfiMemoryRegistration(false) { if (type != "pair") { @@ -110,7 +108,7 @@ auto Socket::InitOfi(Address addr) -> void fOfiInfo = tools::make_unique(addr.Ip.c_str(), std::to_string(addr.Port).c_str(), FI_SOURCE, hints); } - // LOG(debug) << "OFI transport: " << *fOfiInfo; + LOG(debug) << "OFI transport: " << *fOfiInfo; fOfiFabric = tools::make_unique(*fOfiInfo); @@ -127,7 +125,7 @@ try { InitOfi(fLocalAddr); - fPassiveEndpoint = tools::make_unique(fIoStrand.context(), *fOfiFabric); + fPassiveEndpoint = tools::make_unique(fContext.GetIoContext(), *fOfiFabric); fPassiveEndpoint->set_local_address(Context::ConvertAddress(fLocalAddr)); BindControlEndpoint(); @@ -155,7 +153,7 @@ auto Socket::BindControlEndpoint() -> void LOG(debug) << "OFI transport (" << fId << "): control band connection request received. Accepting ..."; fControlEndpoint = tools::make_unique( - fIoStrand.context(), *fOfiDomain, info); + fContext.GetIoContext(), *fOfiDomain, info); fControlEndpoint->enable(); fControlEndpoint->accept([&]() { LOG(debug) << "OFI transport (" << fId << "): control band connection accepted."; @@ -175,13 +173,13 @@ auto Socket::BindDataEndpoint() -> void LOG(debug) << "OFI transport (" << fId << "): data band connection request received. Accepting ..."; fDataEndpoint = tools::make_unique( - fIoStrand.context(), *fOfiDomain, info); + fContext.GetIoContext(), *fOfiDomain, info); fDataEndpoint->enable(); fDataEndpoint->accept([&]() { LOG(debug) << "OFI transport (" << fId << "): data band connection accepted."; - boost::asio::post(fIoStrand, std::bind(&Socket::SendQueueReader, this)); - boost::asio::post(fIoStrand, std::bind(&Socket::RecvControlQueueReader, this)); + boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::SendQueueReader, this)); + boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::RecvControlQueueReader, this)); }); }); @@ -191,9 +189,7 @@ auto Socket::BindDataEndpoint() -> void auto Socket::Connect(const string& address) -> bool try { fRemoteAddr = Context::VerifyAddress(address); - if (fRemoteAddr.Protocol == "verbs") { - fNeedOfiMemoryRegistration = true; - } + fNeedOfiMemoryRegistration = (fRemoteAddr.Protocol == "verbs"); InitOfi(fRemoteAddr); @@ -201,15 +197,14 @@ try { ConnectEndpoint(fDataEndpoint, Band::Data); - boost::asio::post(fIoStrand, std::bind(&Socket::SendQueueReader, this)); - boost::asio::post(fIoStrand, std::bind(&Socket::RecvControlQueueReader, this)); + boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::SendQueueReader, this)); + boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::RecvControlQueueReader, this)); return true; } catch (const SilentSocketError& e) { // do not print error in this case, this is handled by FairMQDevice - // in case no connection could be established after trying a number of random ports from a range. return false; } catch (const std::exception& e) @@ -227,7 +222,7 @@ auto Socket::ConnectEndpoint(std::unique_ptr& endpoi asiofi::eq::event status(asiofi::eq::event::connrefused); std::string band(type == Band::Control ? "control" : "data"); - endpoint = tools::make_unique(fIoStrand.context(), *fOfiDomain); + endpoint = tools::make_unique(fContext.GetIoContext(), *fOfiDomain); endpoint->enable(); LOG(debug) << "OFI transport (" << fId << "): Sending " << band << " band connection request to " @@ -235,6 +230,7 @@ auto Socket::ConnectEndpoint(std::unique_ptr& endpoi while (true) { endpoint->connect(Context::ConvertAddress(fRemoteAddr), [&](asiofi::eq::event event) { + LOG(debug) << "OFI transport (" << fId << "): conn event happened"; { std::unique_lock lk(m); status = event; @@ -242,6 +238,7 @@ auto Socket::ConnectEndpoint(std::unique_ptr& endpoi cv.notify_one(); }); + LOG(debug) << "OFI transport (" << fId << "): endpoint->connect called."; { std::unique_lock lk(m); cv.wait(lk); @@ -347,6 +344,7 @@ auto Socket::Send(std::vector& msgVec, const int timeout) -> int64_t { return SendImpl(msgVec, 0, timeout); } + auto Socket::Receive(std::vector& msgVec, const int timeout) -> int64_t { return ReceiveImpl(msgVec, 0, timeout); @@ -354,20 +352,19 @@ auto Socket::Receive(std::vector& msgVec, const int timeout) -> int6 auto Socket::SendQueueReader() -> void { - fSendSem.async_wait( - boost::asio::bind_executor(fIoStrand, [&](const boost::system::error_code& ec) { - if (!ec) { - // LOG(debug) << "OFI transport (" << fId << "): < Wait fSendSem=" << - // fSendSem.get_value(); - fSendQueueRead.async_receive([&](const boost::system::error_code& ec2, - azmq::message& zmsg, - size_t bytes_transferred) { - if (!ec2) { - OnSend(zmsg, bytes_transferred); - } - }); - } - })); + fSendSem.async_wait([&](const boost::system::error_code& ec) { + if (!ec) { + // LOG(debug) << "OFI transport (" << fId << "): < Wait fSendSem=" << + // fSendSem.get_value(); + fSendQueueRead.async_receive([&](const boost::system::error_code& ec2, + azmq::message& zmsg, + size_t bytes_transferred) { + if (!ec2) { + OnSend(zmsg, bytes_transferred); + } + }); + } + }); } auto Socket::OnSend(azmq::message& zmsg, size_t /*bytes_transferred*/) -> void @@ -444,23 +441,23 @@ auto Socket::OnSend(azmq::message& zmsg, size_t /*bytes_transferred*/) -> void }); } - boost::asio::post(fIoStrand, std::bind(&Socket::SendQueueReader, this)); + boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::SendQueueReader, this)); // LOG(debug) << "OFI transport (" << fId << "): LEAVE OnSend"; } auto Socket::RecvControlQueueReader() -> void { - fRecvSem.async_wait( - boost::asio::bind_executor(fIoStrand, [&](const boost::system::error_code& ec) { - if (!ec) { - auto ctrl = MakeControlMessageWithPmr(&fControlMemPool); - auto ctrl_msg = boost::asio::mutable_buffer(ctrl.get(), sizeof(PostBuffer)); + fRecvSem.async_wait([&](const boost::system::error_code& ec) { + if (!ec) { + auto ctrl = MakeControlMessageWithPmr(&fControlMemPool); + auto ctrl_msg = boost::asio::mutable_buffer(ctrl.get(), sizeof(PostBuffer)); - fControlEndpoint->recv(ctrl_msg, [&, ctrl2 = std::move(ctrl)](boost::asio::mutable_buffer) mutable { + fControlEndpoint->recv( + ctrl_msg, [&, ctrl2 = std::move(ctrl)](boost::asio::mutable_buffer) mutable { OnRecvControl(std::move(ctrl2)); }); - } - })); + } + }); } auto Socket::OnRecvControl(ofi::unique_ptr ctrl) -> void @@ -542,7 +539,7 @@ auto Socket::OnRecvControl(ofi::unique_ptr ctrl) -> void }); } - boost::asio::post(fIoStrand, std::bind(&Socket::RecvControlQueueReader, this)); + boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::RecvControlQueueReader, this)); // LOG(debug) << "OFI transport (" << fId << "): LEAVE OnRecvControl"; } diff --git a/fairmq/ofi/Socket.h b/fairmq/ofi/Socket.h index de6f4d63..3ad031e1 100644 --- a/fairmq/ofi/Socket.h +++ b/fairmq/ofi/Socket.h @@ -95,7 +95,6 @@ class Socket final : public fair::mq::Socket std::atomic fMessagesRx; Address fRemoteAddr; Address fLocalAddr; - boost::asio::io_service::strand fIoStrand; int fSndTimeout; int fRcvTimeout; azmq::socket fSendQueueWrite, fSendQueueRead;