From 763c21ffdd8b8ae9f80d267d2c3eeee3ee83c295 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Mon, 4 Mar 2019 17:06:56 +0100 Subject: [PATCH] Remove azmq on send, make connect/bind blocking --- fairmq/ofi/Message.cxx | 2 + fairmq/ofi/Socket.cxx | 190 ++++++++++++++++------------------------- fairmq/ofi/Socket.h | 6 +- 3 files changed, 77 insertions(+), 121 deletions(-) diff --git a/fairmq/ofi/Message.cxx b/fairmq/ofi/Message.cxx index 8411a682..82d8dfab 100644 --- a/fairmq/ofi/Message.cxx +++ b/fairmq/ofi/Message.cxx @@ -43,6 +43,8 @@ Message::Message(boost::container::pmr::memory_resource* pmr, const size_t size) , fPmr(pmr) { if (size) { + // static void* buffer = fPmr->allocate(size); + // fData = buffer; fData = fPmr->allocate(size); assert(fData); } diff --git a/fairmq/ofi/Socket.cxx b/fairmq/ofi/Socket.cxx index 18125ba4..0dacabc5 100644 --- a/fairmq/ofi/Socket.cxx +++ b/fairmq/ofi/Socket.cxx @@ -52,13 +52,13 @@ Socket::Socket(Context& context, const string& type, const string& name, const s , fMessagesRx(0) , fSndTimeout(100) , fRcvTimeout(100) - , fSendQueueWrite(fContext.GetIoContext(), ZMQ_PUSH) - , fSendQueueRead(fContext.GetIoContext(), ZMQ_PULL) , fRecvQueueWrite(fContext.GetIoContext(), ZMQ_PUSH) , fRecvQueueRead(fContext.GetIoContext(), ZMQ_PULL) , fSendSem(fContext.GetIoContext(), 300) , fRecvSem(fContext.GetIoContext(), 300) , fNeedOfiMemoryRegistration(false) + , fBound(false) + , fConnected(false) { if (type != "pair") { throw SocketError{tools::ToString("Socket type '", type, "' not implemented for ofi transport.")}; @@ -66,10 +66,6 @@ Socket::Socket(Context& context, const string& type, const string& name, const s // TODO wire this up with config azmq::socket::snd_hwm send_max(300); azmq::socket::rcv_hwm recv_max(300); - fSendQueueRead.set_option(send_max); - fSendQueueRead.set_option(recv_max); - fSendQueueWrite.set_option(send_max); - fSendQueueWrite.set_option(recv_max); fRecvQueueRead.set_option(send_max); fRecvQueueRead.set_option(recv_max); fRecvQueueWrite.set_option(send_max); @@ -78,10 +74,6 @@ Socket::Socket(Context& context, const string& type, const string& name, const s // Setup internal queue auto hashed_id = std::hash()(fId); auto queue_id = tools::ToString("inproc://TXQUEUE", hashed_id); - LOG(debug) << "OFI transport (" << fId << "): " << "Binding SQR: " << queue_id; - fSendQueueRead.bind(queue_id); - LOG(debug) << "OFI transport (" << fId << "): " << "Connecting SQW: " << queue_id; - fSendQueueWrite.connect(queue_id); queue_id = tools::ToString("inproc://RXQUEUE", hashed_id); LOG(debug) << "OFI transport (" << fId << "): " << "Binding RQR: " << queue_id; fRecvQueueRead.bind(queue_id); @@ -118,6 +110,7 @@ auto Socket::InitOfi(Address addr) -> void auto Socket::Bind(const string& addr) -> bool try { + fBound = false; fLocalAddr = Context::VerifyAddress(addr); if (fLocalAddr.Protocol == "verbs") { fNeedOfiMemoryRegistration = true; @@ -130,17 +123,16 @@ try { BindControlEndpoint(); + while (!fBound) { + this_thread::sleep_for(chrono::milliseconds(100)); + } + return true; -} -// TODO catch the correct ofi error -catch (const SilentSocketError& e) -{ +} catch (const SilentSocketError& e) {// TODO catch the correct ofi error // do not print error in this case, this is handled by FairMQDevice // in case no connection could be established after trying a number of random ports from a range. return false; -} -catch (const SocketError& e) -{ +} catch (const SocketError& e) { LOG(error) << "OFI transport: " << e.what(); return false; } @@ -150,10 +142,8 @@ auto Socket::BindControlEndpoint() -> void assert(!fControlEndpoint); fPassiveEndpoint->listen([&](asiofi::info&& info) { - LOG(debug) << "OFI transport (" << fId - << "): control band connection request received. Accepting ..."; - fControlEndpoint = tools::make_unique( - fContext.GetIoContext(), *fOfiDomain, info); + LOG(debug) << "OFI transport (" << fId << "): control band connection request received. Accepting ..."; + fControlEndpoint = tools::make_unique(fContext.GetIoContext(), *fOfiDomain, info); fControlEndpoint->enable(); fControlEndpoint->accept([&]() { LOG(debug) << "OFI transport (" << fId << "): control band connection accepted."; @@ -170,16 +160,14 @@ auto Socket::BindDataEndpoint() -> void assert(!fDataEndpoint); fPassiveEndpoint->listen([&](asiofi::info&& info) { - LOG(debug) << "OFI transport (" << fId - << "): data band connection request received. Accepting ..."; - fDataEndpoint = tools::make_unique( - fContext.GetIoContext(), *fOfiDomain, info); + LOG(debug) << "OFI transport (" << fId << "): data band connection request received. Accepting ..."; + fDataEndpoint = tools::make_unique(fContext.GetIoContext(), *fOfiDomain, info); fDataEndpoint->enable(); fDataEndpoint->accept([&]() { LOG(debug) << "OFI transport (" << fId << "): data band connection accepted."; - boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::SendQueueReader, this)); boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::RecvControlQueueReader, this)); + fBound = true; }); }); @@ -188,6 +176,7 @@ auto Socket::BindDataEndpoint() -> void auto Socket::Connect(const string& address) -> bool try { + fConnected = false; fRemoteAddr = Context::VerifyAddress(address); if (fRemoteAddr.Protocol == "verbs") { fNeedOfiMemoryRegistration = true; @@ -197,15 +186,15 @@ try { ConnectEndpoint(fControlEndpoint, Band::Control); + while (!fConnected) { + this_thread::sleep_for(chrono::milliseconds(100)); + } + return true; -} -catch (const SilentSocketError& e) -{ +} catch (const SilentSocketError& e) { // do not print error in this case, this is handled by FairMQDevice return false; -} -catch (const std::exception& e) -{ +} catch (const std::exception& e) { LOG(error) << "OFI transport: " << e.what(); return false; } @@ -228,11 +217,11 @@ auto Socket::ConnectEndpoint(std::unique_ptr& endpoi if (type == Band::Control) { ConnectEndpoint(fDataEndpoint, Band::Data); } else { - boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::SendQueueReader, this)); boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::RecvControlQueueReader, this)); + fConnected = true; } } else { - LOG(error) << "asdf"; + LOG(error) << "Could not connect on the first try"; } }); } @@ -277,41 +266,10 @@ auto Socket::Send(MessagePtr& msg, const int /*timeout*/) -> int { // LOG(debug) << "OFI transport (" << fId << "): ENTER Send: data=" << msg->GetData() << ",size=" << msg->GetSize(); - MessagePtr* msgptr(new std::unique_ptr(std::move(msg))); try { - auto res = fSendQueueWrite.send(boost::asio::const_buffer(msgptr, sizeof(MessagePtr)), 0); - - // LOG(debug) << "OFI transport (" << fId << "): LEAVE Send"; - return res; - } catch (const std::exception& e) { - msg = std::move(*msgptr); - LOG(error) << e.what(); - return -1; - } catch (const boost::system::error_code& e) { - msg = std::move(*msgptr); - LOG(error) << e; - return -1; - } -} - -auto Socket::Receive(MessagePtr& msg, const int /*timeout*/) -> int -{ - // LOG(debug) << "OFI transport (" << fId << "): ENTER Receive"; - - try { - azmq::message zmsg; - auto recv = fRecvQueueRead.receive(zmsg); - - size_t size(0); - if (recv > 0) { - msg = std::move(*(static_cast(zmsg.buffer().data()))); - size = msg->GetSize(); - } - - fBytesRx += size; - fMessagesRx++; - - // LOG(debug) << "OFI transport (" << fId << "): LEAVE Receive"; + fSendSem.wait(); + size_t size = msg->GetSize(); + OnSend(msg); return size; } catch (const std::exception& e) { LOG(error) << e.what(); @@ -322,38 +280,10 @@ auto Socket::Receive(MessagePtr& msg, const int /*timeout*/) -> int } } -auto Socket::Send(std::vector& msgVec, const int timeout) -> int64_t +auto Socket::OnSend(MessagePtr& msg) -> void { - return SendImpl(msgVec, 0, timeout); -} + // LOG(debug) << "OFI transport (" << fId << "): ENTER OnSend"; -auto Socket::Receive(std::vector& msgVec, const int timeout) -> int64_t -{ - return ReceiveImpl(msgVec, 0, timeout); -} - -auto Socket::SendQueueReader() -> void -{ - fSendSem.async_wait([&](const boost::system::error_code& ec) { - if (!ec) { - // LOG(debug) << "OFI transport (" << fId << "): < Wait fSendSem=" << - // fSendSem.get_value(); - fSendQueueRead.async_receive([&](const boost::system::error_code& ec2, - azmq::message& zmsg, - size_t bytes_transferred) { - if (!ec2) { - OnSend(zmsg, bytes_transferred); - } - }); - } - }); -} - -auto Socket::OnSend(azmq::message& zmsg, size_t /*bytes_transferred*/) -> void -{ - // LOG(debug) << "OFI transport (" << fId << "): ENTER OnSend: bytes_transferred=" << bytes_transferred; - - MessagePtr msg(std::move(*(static_cast(zmsg.buffer().data())))); auto size = msg->GetSize(); // LOG(debug) << "OFI transport (" << fId << "): OnSend: data=" << msg->GetData() << ",size=" << msg->GetSize(); @@ -372,8 +302,7 @@ auto Socket::OnSend(azmq::message& zmsg, size_t /*bytes_transferred*/) -> void } else { fControlEndpoint->send(ctrl_msg, [&, ctrl2 = std::move(ctrl)](boost::asio::mutable_buffer) mutable { - // LOG(debug) << "OFI transport (" << fId << "): >>>>> Control - // message sent"; + // LOG(debug) << "OFI transport (" << fId << "): >>>>> Control message sent"; }); } @@ -388,14 +317,12 @@ auto Socket::OnSend(azmq::message& zmsg, size_t /*bytes_transferred*/) -> void desc, [&, size, msg2 = std::move(msg), mr2 = std::move(mr)]( boost::asio::mutable_buffer) mutable { - // LOG(debug) << "OFI transport (" << fId << "): >>>>> Data - // buffer sent"; + // LOG(debug) << "OFI transport (" << fId << "): >>>>> Data buffer sent"; fBytesTx += size; fMessagesTx++; fSendSem.async_signal([&](const boost::system::error_code& ec) { if (!ec) { - // LOG(debug) << "OFI transport (" << fId << "): > - // Signal fSendSem=" << fSendSem.get_value(); + // LOG(debug) << "OFI transport (" << fId << "): > Signal fSendSem=" << fSendSem.get_value(); } }); }); @@ -408,8 +335,7 @@ auto Socket::OnSend(azmq::message& zmsg, size_t /*bytes_transferred*/) -> void fMessagesTx++; fSendSem.async_signal([&](const boost::system::error_code& ec) { if (!ec) { - // LOG(debug) << "OFI transport (" << fId << "): > Signal fSendSem=" - // << fSendSem.get_value(); + // LOG(debug) << "OFI transport (" << fId << "): > Signal fSendSem=" << fSendSem.get_value(); } }); }); @@ -423,10 +349,39 @@ auto Socket::OnSend(azmq::message& zmsg, size_t /*bytes_transferred*/) -> void }); } - boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::SendQueueReader, this)); // LOG(debug) << "OFI transport (" << fId << "): LEAVE OnSend"; } +auto Socket::Receive(MessagePtr& msg, const int /*timeout*/) -> int +try { + // LOG(debug) << "OFI transport (" << fId << "): ENTER Receive"; + azmq::message zmsg; + auto recv = fRecvQueueRead.receive(zmsg); + + size_t size(0); + if (recv > 0) { + msg = std::move(*(static_cast(zmsg.buffer().data()))); + size = msg->GetSize(); + } + + fBytesRx += size; + fMessagesRx++; + + // LOG(debug) << "OFI transport (" << fId << "): LEAVE Receive"; + return size; +} catch (const std::exception& e) { + LOG(error) << e.what(); + return -1; +} catch (const boost::system::error_code& e) { + LOG(error) << e; + return -1; +} + +auto Socket::Receive(std::vector& msgVec, const int timeout) -> int64_t +{ + return ReceiveImpl(msgVec, 0, timeout); +} + auto Socket::RecvControlQueueReader() -> void { fRecvSem.async_wait([&](const boost::system::error_code& ec) { @@ -442,12 +397,12 @@ auto Socket::RecvControlQueueReader() -> void ctrl_msg, desc, [&, ctrl2 = std::move(ctrl), mr2 = std::move(mr)](boost::asio::mutable_buffer) mutable { OnRecvControl(std::move(ctrl2)); }); - } else { + } else { fControlEndpoint->recv( ctrl_msg, [&, ctrl2 = std::move(ctrl)](boost::asio::mutable_buffer) mutable { OnRecvControl(std::move(ctrl2)); }); - } + } } }); } @@ -478,9 +433,7 @@ auto Socket::OnRecvControl(ofi::unique_ptr ctrl) -> void azmq::message(boost::asio::const_buffer(msgptr, sizeof(MessagePtr))), [&](const boost::system::error_code& ec, size_t /*bytes_transferred2*/) { if (!ec) { - // LOG(debug) << "OFI transport (" << fId - // << "): <<<<< Data buffer received, bytes_transferred2=" - // << bytes_transferred2; + // LOG(debug) << "OFI transport (" << fId << "): <<<<< Data buffer received, bytes_transferred2=" << bytes_transferred2; fRecvSem.async_signal([&](const boost::system::error_code& ec2) { if (!ec2) { //LOG(debug) << "OFI transport (" << fId << "): < Signal fRecvSem"; @@ -498,9 +451,7 @@ auto Socket::OnRecvControl(ofi::unique_ptr ctrl) -> void azmq::message(boost::asio::const_buffer(msgptr, sizeof(MessagePtr))), [&](const boost::system::error_code& ec, size_t /*bytes_transferred2*/) { if (!ec) { - // LOG(debug) << "OFI transport (" << fId - // << "): <<<<< Data buffer received, bytes_transferred2=" - // << bytes_transferred2; + // LOG(debug) << "OFI transport (" << fId << "): <<<<< Data buffer received, bytes_transferred2=" << bytes_transferred2; fRecvSem.async_signal([&](const boost::system::error_code& ec2) { if (!ec2) { // LOG(debug) << "OFI transport (" << fId << "): < Signal fRecvSem"; @@ -515,9 +466,7 @@ auto Socket::OnRecvControl(ofi::unique_ptr ctrl) -> void azmq::message(boost::asio::const_buffer(nullptr, 0)), [&](const boost::system::error_code& ec, size_t /*bytes_transferred2*/) { if (!ec) { - // LOG(debug) << "OFI transport (" << fId - // << "): <<<<< Data buffer received, bytes_transferred2=" - // << bytes_transferred2; + // LOG(debug) << "OFI transport (" << fId << "): <<<<< Data buffer received, bytes_transferred2=" << bytes_transferred2; fRecvSem.async_signal([&](const boost::system::error_code& ec2) { if (!ec2) { // LOG(debug) << "OFI transport (" << fId << "): < Signal fRecvSem"; @@ -532,6 +481,11 @@ auto Socket::OnRecvControl(ofi::unique_ptr ctrl) -> void // LOG(debug) << "OFI transport (" << fId << "): LEAVE OnRecvControl"; } +auto Socket::Send(std::vector& msgVec, const int timeout) -> int64_t +{ + return SendImpl(msgVec, 0, timeout); +} + auto Socket::SendImpl(vector& /*msgVec*/, const int /*flags*/, const int /*timeout*/) -> int64_t { throw SocketError{"Not yet implemented."}; diff --git a/fairmq/ofi/Socket.h b/fairmq/ofi/Socket.h index 3ad031e1..a9d6516a 100644 --- a/fairmq/ofi/Socket.h +++ b/fairmq/ofi/Socket.h @@ -97,13 +97,13 @@ class Socket final : public fair::mq::Socket Address fLocalAddr; int fSndTimeout; int fRcvTimeout; - azmq::socket fSendQueueWrite, fSendQueueRead; azmq::socket fRecvQueueWrite, fRecvQueueRead; asiofi::semaphore fSendSem, fRecvSem; std::atomic fNeedOfiMemoryRegistration; + std::atomic fBound; + std::atomic fConnected; - auto SendQueueReader() -> void; - auto OnSend(azmq::message& msg, size_t bytes_transferred) -> void; + auto OnSend(MessagePtr& msg) -> void; auto RecvControlQueueReader() -> void; auto OnRecvControl(ofi::unique_ptr ctrl) -> void; auto OnReceive() -> void;