From 3582091b1c76cbe689b8e66813ea71dd5ff55bc5 Mon Sep 17 00:00:00 2001 From: Dennis Klein Date: Mon, 29 Apr 2019 20:28:40 +0200 Subject: [PATCH] Add experimental static size mode for ofi transport Whenever --ofi-size-hint > 0, the ofi transport does not use the control band. Multipart is not supported. --- fairmq/ofi/Context.cxx | 1 + fairmq/ofi/Context.h | 3 + fairmq/ofi/Socket.cxx | 109 ++++++++++++++++++++++++++- fairmq/ofi/Socket.h | 2 + fairmq/ofi/TransportFactory.cxx | 9 ++- fairmq/options/FairMQProgOptions.cxx | 1 + 6 files changed, 118 insertions(+), 7 deletions(-) diff --git a/fairmq/ofi/Context.cxx b/fairmq/ofi/Context.cxx index 635730aa..e0abb971 100644 --- a/fairmq/ofi/Context.cxx +++ b/fairmq/ofi/Context.cxx @@ -37,6 +37,7 @@ Context::Context(FairMQTransportFactory& sendFactory, : fIoWork(fIoContext) , fReceiveFactory(receiveFactory) , fSendFactory(sendFactory) + , fSizeHint(0) { InitThreadPool(numberIoThreads); } diff --git a/fairmq/ofi/Context.h b/fairmq/ofi/Context.h index 3f3f4ed0..814461e9 100644 --- a/fairmq/ofi/Context.h +++ b/fairmq/ofi/Context.h @@ -72,6 +72,8 @@ class Context auto Reset() -> void; auto MakeReceiveMessage(size_t size) -> MessagePtr; auto MakeSendMessage(size_t size) -> MessagePtr; + auto GetSizeHint() -> size_t { return fSizeHint; } + auto SetSizeHint(size_t size) -> void { fSizeHint = size; } private: boost::asio::io_context fIoContext; @@ -79,6 +81,7 @@ class Context std::vector fThreadPool; FairMQTransportFactory& fReceiveFactory; FairMQTransportFactory& fSendFactory; + size_t fSizeHint; auto InitThreadPool(int numberIoThreads) -> void; }; /* class Context */ diff --git a/fairmq/ofi/Socket.cxx b/fairmq/ofi/Socket.cxx index 03cab8a5..9958a91e 100644 --- a/fairmq/ofi/Socket.cxx +++ b/fairmq/ofi/Socket.cxx @@ -154,8 +154,17 @@ auto Socket::BindDataEndpoint() -> void fDataEndpoint->accept([&]() { LOG(debug) << "OFI transport (" << fId << "): data band connection accepted."; - boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::SendQueueReader, this)); - boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::RecvControlQueueReader, this)); + if (fContext.GetSizeHint()) { + boost::asio::post(fContext.GetIoContext(), + std::bind(&Socket::SendQueueReaderStatic, this)); + boost::asio::post(fContext.GetIoContext(), + std::bind(&Socket::RecvQueueReaderStatic, this)); + } else { + boost::asio::post(fContext.GetIoContext(), + std::bind(&Socket::SendQueueReader, this)); + boost::asio::post(fContext.GetIoContext(), + std::bind(&Socket::RecvControlQueueReader, this)); + } }); }); @@ -174,8 +183,13 @@ try { ConnectEndpoint(fControlEndpoint, Band::Control); ConnectEndpoint(fDataEndpoint, Band::Data); - boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::SendQueueReader, this)); - boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::RecvControlQueueReader, this)); + if (fContext.GetSizeHint()) { + boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::SendQueueReaderStatic, this)); + boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::RecvQueueReaderStatic, this)); + } else { + boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::SendQueueReader, this)); + boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::RecvControlQueueReader, this)); + } return true; } @@ -347,6 +361,57 @@ auto Socket::SendQueueReader() -> void }); } +auto Socket::SendQueueReaderStatic() -> void +{ + fSendPopSem.async_wait([&] { + // Read msg from send queue + std::unique_lock lk(fSendQueueMutex); + std::vector msgVec(std::move(fSendQueue.front())); + fSendQueue.pop(); + lk.unlock(); + + bool postMultiPartStartBuffer = msgVec.size() > 1; + if (postMultiPartStartBuffer) { + throw SocketError{tools::ToString("Multipart API not supported in static size mode.")}; + } + + MessagePtr& msg = msgVec[0]; + + // Send data message + const auto size = msg->GetSize(); + + if (size) { + boost::asio::mutable_buffer buffer(msg->GetData(), size); + + if (fNeedOfiMemoryRegistration) { + asiofi::memory_region mr(*fOfiDomain, buffer, asiofi::mr::access::send); + auto desc = mr.desc(); + + fDataEndpoint->send(buffer, + desc, + [&, size, msg2 = std::move(msg), mr2 = std::move(mr)]( + boost::asio::mutable_buffer) mutable { + fBytesTx += size; + fMessagesTx++; + fSendPushSem.signal(); + }); + } else { + fDataEndpoint->send( + buffer, [&, size, msg2 = std::move(msg)](boost::asio::mutable_buffer) mutable { + fBytesTx += size; + fMessagesTx++; + fSendPushSem.signal(); + }); + } + } else { + ++fMessagesTx; + fSendPushSem.signal(); + } + + boost::asio::dispatch(fContext.GetIoContext(), std::bind(&Socket::SendQueueReaderStatic, this)); + }); +} + auto Socket::Receive(MessagePtr& msg, const int /*timeout*/) -> int try { // timeout argument not yet implemented @@ -472,6 +537,42 @@ auto Socket::OnRecvControl(ofi::unique_ptr ctrl) -> void std::bind(&Socket::RecvControlQueueReader, this)); } +auto Socket::RecvQueueReaderStatic() -> void +{ + fRecvPushSem.async_wait([&] { + static size_t size = fContext.GetSizeHint(); + // Receive data + auto msg = fContext.MakeReceiveMessage(size); + + if (size) { + boost::asio::mutable_buffer buffer(msg->GetData(), size); + + if (fNeedOfiMemoryRegistration) { + asiofi::memory_region mr(*fOfiDomain, buffer, asiofi::mr::access::recv); + auto desc = mr.desc(); + + fDataEndpoint->recv(buffer, + desc, + [&, msg2 = std::move(msg), mr2 = std::move(mr)]( + boost::asio::mutable_buffer) mutable { + DataMessageReceived(std::move(msg2)); + }); + + } else { + fDataEndpoint->recv( + buffer, [&, msg2 = std::move(msg)](boost::asio::mutable_buffer) mutable { + DataMessageReceived(std::move(msg2)); + }); + } + } else { + DataMessageReceived(std::move(msg)); + } + + boost::asio::dispatch(fContext.GetIoContext(), + std::bind(&Socket::RecvQueueReaderStatic, this)); + }); +} + auto Socket::DataMessageReceived(MessagePtr msg) -> void { if (fMultiPartRecvCounter > 0) { diff --git a/fairmq/ofi/Socket.h b/fairmq/ofi/Socket.h index 31ecbedf..598fdfdc 100644 --- a/fairmq/ofi/Socket.h +++ b/fairmq/ofi/Socket.h @@ -110,7 +110,9 @@ class Socket final : public fair::mq::Socket enum class Band { Control, Data }; auto ConnectEndpoint(std::unique_ptr& endpoint, Band type) -> void; auto SendQueueReader() -> void; + auto SendQueueReaderStatic() -> void; auto RecvControlQueueReader() -> void; + auto RecvQueueReaderStatic() -> void; auto OnRecvControl(ofi::unique_ptr ctrl) -> void; auto DataMessageReceived(MessagePtr msg) -> void; }; /* class Socket */ diff --git a/fairmq/ofi/TransportFactory.cxx b/fairmq/ofi/TransportFactory.cxx index 0c2124f7..eac6127d 100644 --- a/fairmq/ofi/TransportFactory.cxx +++ b/fairmq/ofi/TransportFactory.cxx @@ -23,12 +23,15 @@ namespace ofi using namespace std; -TransportFactory::TransportFactory(const string& id, const FairMQProgOptions* /*config*/) +TransportFactory::TransportFactory(const string& id, const FairMQProgOptions* config) try : FairMQTransportFactory(id) , fContext(*this, *this, 1) { - LOG(debug) << "OFI transport: Using AZMQ & " - << "asiofi (" << fContext.GetAsiofiVersion() << ")"; + LOG(debug) << "OFI transport: asiofi (" << fContext.GetAsiofiVersion() << ")"; + + if (config) { + fContext.SetSizeHint(config->GetValue("ofi-size-hint")); + } } catch (ContextError& e) { throw TransportFactoryError{e.what()}; } diff --git a/fairmq/options/FairMQProgOptions.cxx b/fairmq/options/FairMQProgOptions.cxx index f5540f5b..8b2a2b5e 100644 --- a/fairmq/options/FairMQProgOptions.cxx +++ b/fairmq/options/FairMQProgOptions.cxx @@ -66,6 +66,7 @@ FairMQProgOptions::FairMQProgOptions() ("print-channels", po::value()->implicit_value(true), "Print registered channel endpoints in a machine-readable format (::)") ("shm-segment-size", po::value()->default_value(2000000000), "Shared memory: size of the shared memory segment (in bytes).") ("shm-monitor", po::value()->default_value(true), "Shared memory: run monitor daemon.") + ("ofi-size-hint", po::value()->default_value(0), "EXPERIMENTAL: OFI size hint for the allocator.") ("rate", po::value()->default_value(0.), "Rate for conditional run loop (Hz).") ("session", po::value()->default_value("default"), "Session name.");