From 144aa912d7a9f56aac1675e9a4b52c4fd75f29e2 Mon Sep 17 00:00:00 2001 From: Dennis Klein Date: Wed, 7 Mar 2018 00:14:13 +0100 Subject: [PATCH] FairMQ: Implement blocking ofi::Socket Send/Receive with FI_MSG Completion events are not yet working. --- fairmq/CMakeLists.txt | 2 +- fairmq/ofi/Context.cxx | 29 ++++++++- fairmq/ofi/Context.h | 13 +++- fairmq/ofi/Control.proto | 10 +++ fairmq/ofi/Message.cxx | 55 ++++++++++++++-- fairmq/ofi/Message.h | 2 + fairmq/ofi/Socket.cxx | 68 +++++++++++++++++--- fairmq/ofi/Socket.h | 2 + fairmq/ofi/TransportFactory.cxx | 8 ++- fairmq/test/helper/devices/TestPairLeft.cxx | 16 +++-- fairmq/test/helper/devices/TestPairRight.cxx | 16 +++-- 11 files changed, 189 insertions(+), 32 deletions(-) diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index 91597c06..d5d88e7b 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -209,7 +209,7 @@ add_custom_command( ${CMAKE_CURRENT_BINARY_DIR}/ofi/Control.pb.cc COMMAND ${PROTOBUF_PROTOC_EXECUTABLE} -I=${CMAKE_CURRENT_SOURCE_DIR}/ofi --cpp_out=${CMAKE_CURRENT_BINARY_DIR}/ofi Control.proto WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR} - DEPENDS mkofibuilddir + DEPENDS mkofibuilddir ${CMAKE_CURRENT_SOURCE_DIR}/ofi/Control.proto ) set_source_files_properties(${CMAKE_CURRENT_BINARY_DIR}/ofi/Control.pb.h PROPERTIES GENERATED TRUE) set_source_files_properties(${CMAKE_CURRENT_BINARY_DIR}/ofi/Control.pb.cc PROPERTIES GENERATED TRUE) diff --git a/fairmq/ofi/Context.cxx b/fairmq/ofi/Context.cxx index 64aa74c0..61d11d0f 100644 --- a/fairmq/ofi/Context.cxx +++ b/fairmq/ofi/Context.cxx @@ -11,6 +11,7 @@ #include #include +#include #include #include #include @@ -41,18 +42,37 @@ Context::Context(int numberIoThreads) , fOfiAddressVector(nullptr) , fOfiEventQueue(nullptr) , fZmqContext(zmq_ctx_new()) + , fIoWork(fIoContext) { if (!fZmqContext) throw ContextError{tools::ToString("Failed creating zmq context, reason: ", zmq_strerror(errno))}; GOOGLE_PROTOBUF_VERIFY_VERSION; + + InitThreadPool(numberIoThreads); +} + +auto Context::InitThreadPool(int numberIoThreads) -> void +{ + assert(numberIoThreads > 0); + + for (int i = 1; i <= numberIoThreads; ++i) { + fThreadPool.emplace_back([&, i, numberIoThreads]{ + LOG(debug) << "I/O thread #" << i << "/" << numberIoThreads << " started"; + fIoContext.run(); + LOG(debug) << "I/O thread #" << i << "/" << numberIoThreads << " stopped"; + }); + } } Context::~Context() { - if (zmq_ctx_term(fZmqContext) != 0) { + fIoContext.stop(); + for (auto& thread : fThreadPool) + thread.join(); + + if (zmq_ctx_term(fZmqContext) != 0) LOG(error) << "Failed closing zmq context, reason: " << zmq_strerror(errno); - } if (fOfiEventQueue) { auto ret = fi_close(&fOfiEventQueue->fid); @@ -97,6 +117,11 @@ auto Context::GetPbVersion() const -> string return google::protobuf::internal::VersionString(GOOGLE_PROTOBUF_VERSION); } +auto Context::GetBoostVersion() const -> std::string +{ + return tools::ToString(BOOST_VERSION / 100000, ".", BOOST_VERSION / 100 % 1000, ".", BOOST_VERSION % 100); +} + auto Context::InitOfi(ConnectionType type, std::string addr) -> void { auto addr2 = ConvertAddress(addr); diff --git a/fairmq/ofi/Context.h b/fairmq/ofi/Context.h index 3993032d..6e222e94 100644 --- a/fairmq/ofi/Context.h +++ b/fairmq/ofi/Context.h @@ -9,12 +9,15 @@ #ifndef FAIR_MQ_OFI_CONTEXT_H #define FAIR_MQ_OFI_CONTEXT_H +#include #include #include #include #include -#include #include +#include +#include +#include namespace fair { @@ -35,7 +38,7 @@ enum class Direction : bool { Receive, Transmit }; class Context { public: - Context(int numberIoThreads = 1); + Context(int numberIoThreads = 2); ~Context(); auto InitOfi(ConnectionType type, std::string address) -> void; @@ -44,7 +47,9 @@ class Context auto GetZmqVersion() const -> std::string; auto GetOfiApiVersion() const -> std::string; auto GetPbVersion() const -> std::string; + auto GetBoostVersion() const -> std::string; auto GetZmqContext() const -> void* { return fZmqContext; } + auto GetIoContext() -> boost::asio::io_service& { return fIoContext; } auto InsertAddressVector(sockaddr_in address) -> fi_addr_t; struct Address { std::string Protocol; @@ -64,11 +69,15 @@ class Context fid_domain* fOfiDomain; fid_av* fOfiAddressVector; fid_eq* fOfiEventQueue; + boost::asio::io_service fIoContext; + boost::asio::io_service::work fIoWork; + std::vector fThreadPool; auto OpenOfiFabric() -> void; auto OpenOfiEventQueue() -> void; auto OpenOfiDomain() -> void; auto OpenOfiAddressVector() -> void; + auto InitThreadPool(int numberIoThreads) -> void; }; /* class Context */ struct ContextError : std::runtime_error { using std::runtime_error::runtime_error; }; diff --git a/fairmq/ofi/Control.proto b/fairmq/ofi/Control.proto index ff0be7af..69eee4a7 100644 --- a/fairmq/ofi/Control.proto +++ b/fairmq/ofi/Control.proto @@ -8,8 +8,18 @@ message DataAddressAnnouncement { uint32 port = 2; // in_port_t from } +message PostBuffer { + uint64 size = 1; // buffer size (size_t) +} + +message PostBufferAcknowledgement { + uint64 size = 1; // size_t +} + message ControlMessage { oneof type { DataAddressAnnouncement data_address_announcement = 1; + PostBuffer post_buffer = 2; + PostBufferAcknowledgement post_buffer_acknowledgement = 3; } } diff --git a/fairmq/ofi/Message.cxx b/fairmq/ofi/Message.cxx index 571be3b2..1fff9b6b 100644 --- a/fairmq/ofi/Message.cxx +++ b/fairmq/ofi/Message.cxx @@ -23,6 +23,11 @@ namespace ofi using namespace std; Message::Message() + : fInitialSize(0) + , fSize(0) + , fData(nullptr) + , fFreeFunction(nullptr) + , fHint(nullptr) { } @@ -30,12 +35,18 @@ Message::Message(const size_t size) : fInitialSize(size) , fSize(size) , fData(nullptr) + , fFreeFunction(nullptr) + , fHint(nullptr) { } Message::Message(void* data, const size_t size, fairmq_free_fn* ffn, void* hint) + : fInitialSize(size) + , fSize(size) + , fData(data) + , fFreeFunction(ffn) + , fHint(hint) { - throw MessageError{"Not yet implemented."}; } Message::Message(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint) @@ -45,17 +56,48 @@ Message::Message(FairMQUnmanagedRegionPtr& region, void* data, const size_t size auto Message::Rebuild() -> void { - throw MessageError{"Not implemented."}; + if (fFreeFunction) { + fFreeFunction(fData, fHint); + } else { + free(fData); + } + fData = nullptr; + fInitialSize = 0; + fSize = 0; + fFreeFunction = nullptr; + fHint = nullptr; } auto Message::Rebuild(const size_t size) -> void { - throw MessageError{"Not implemented."}; + if (fFreeFunction) { + fFreeFunction(fData, fHint); + fData = nullptr; + fData = malloc(size); + } else { + fData = realloc(fData, size); + } + assert(fData); + fInitialSize = size; + fSize = size; + fFreeFunction = nullptr; + fHint = nullptr; } auto Message::Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint) -> void { - throw MessageError{"Not implemented."}; + if (fFreeFunction) { + fFreeFunction(fData, fHint); + fData = nullptr; + fData = malloc(size); + } else { + fData = realloc(fData, size); + } + assert(fData); + fInitialSize = size; + fSize = size; + fFreeFunction = ffn; + fHint = hint; } auto Message::GetData() const -> void* @@ -91,6 +133,11 @@ auto Message::Copy(const fair::mq::MessagePtr& msg) -> void Message::~Message() { + if (fFreeFunction) { + fFreeFunction(fData, fHint); + } else { + free(fData); + } } } /* namespace ofi */ diff --git a/fairmq/ofi/Message.h b/fairmq/ofi/Message.h index a50f8914..8586043f 100644 --- a/fairmq/ofi/Message.h +++ b/fairmq/ofi/Message.h @@ -61,6 +61,8 @@ class Message : public fair::mq::Message size_t fInitialSize; size_t fSize; void* fData; + fairmq_free_fn* fFreeFunction; + void* fHint; }; /* class Message */ } /* namespace ofi */ diff --git a/fairmq/ofi/Socket.cxx b/fairmq/ofi/Socket.cxx index f5dd88b9..7e55b88a 100644 --- a/fairmq/ofi/Socket.cxx +++ b/fairmq/ofi/Socket.cxx @@ -42,6 +42,7 @@ Socket::Socket(Context& context, const string& type, const string& name, const s , fRcvTimeout(100) , fContext(context) , fWaitingForControlPeer(false) + , fIoStrand(fContext.GetIoContext()) { if (type != "pair") { throw SocketError{tools::ToString("Socket type '", type, "' not implemented for ofi transport.")}; @@ -206,6 +207,7 @@ try { auto Socket::SendControlMessage(unique_ptr ctrl) -> void { assert(fControlSocket); + LOG(debug) << "About to send control message: " << ctrl->DebugString(); // Serialize string* str = new string(); @@ -234,6 +236,7 @@ auto Socket::ReceiveControlMessage() -> unique_ptr auto ctrl = tools::make_unique(); ctrl->ParseFromArray(zmq_msg_data(&msg), zmq_msg_size(&msg)); + LOG(debug) << "Received control message: " << ctrl->DebugString(); return ctrl; } @@ -293,11 +296,36 @@ try { ProcessDataAddressAnnouncement(ReceiveControlMessage()); } - auto ret = zmq_send(fControlSocket, nullptr, 0, flags); - if (ret == EAGAIN) throw SilentSocketError("EAGAIN"); - if (ret == -1) throw SocketError(tools::ToString("Failed sending control message on socket ", fId, ", reason: ", zmq_strerror(errno))); + auto size = msg->GetSize(); - return ret; + // Create and send control message + auto ctrl = tools::make_unique(); + auto buf = tools::make_unique(); + buf->set_size(size); + ctrl->set_allocated_post_buffer(buf.release()); + assert(ctrl->IsInitialized()); + SendControlMessage(move(ctrl)); + + if (size) { + // Receive and process control message + auto ctrl2 = ReceiveControlMessage(); + assert(ctrl2->has_post_buffer_acknowledgement()); + assert(ctrl2->post_buffer_acknowledgement().size() == size); + + // Send data + auto ret = fi_send(fDataEndpoint, msg->GetData(), size, nullptr, fRemoteDataAddr, nullptr); + if (ret != FI_SUCCESS) + throw SocketError(tools::ToString("Failed posting ofi send buffer, reason: ", fi_strerror(ret))); + + fi_cq_err_entry cqEntry; + ret = fi_cq_sread(fDataCompletionQueueTx, &cqEntry, 1, nullptr, 1000); + if (ret != 1) + throw SocketError(tools::ToString("Failed reading ofi tx completion queue event, reason: ", fi_strerror(ret))); + } + + // TODO free msg on tx completion? + + return size; } catch (const SilentSocketError& e) { @@ -317,11 +345,35 @@ try { ProcessDataAddressAnnouncement(ReceiveControlMessage()); } - auto ret = zmq_recv(fControlSocket, nullptr, 0, flags); - if (ret == EAGAIN) throw SilentSocketError("EAGAIN"); - if (ret == -1) throw SocketError(tools::ToString("Failed sending control message on socket ", fId, ", reason: ", zmq_strerror(errno))); + // Receive and process control message + auto ctrl = ReceiveControlMessage(); + assert(ctrl->has_post_buffer()); + auto postBuffer = ctrl->post_buffer(); + auto size = postBuffer.size(); + LOG(debug) << "Received post buffer control message with size: " << size; - return ret; + // Receive data + if (size) { + msg->Rebuild(size); + auto ret = fi_recv(fDataEndpoint, msg->GetData(), msg->GetSize(), nullptr, fRemoteDataAddr, nullptr); + if (ret != FI_SUCCESS) + throw SocketError(tools::ToString("Failed posting ofi receive buffer, reason: ", fi_strerror(ret))); + + // Create and send control message + auto ctrl2 = tools::make_unique(); + auto ack = tools::make_unique(); + ack->set_size(msg->GetSize()); + ctrl2->set_allocated_post_buffer_acknowledgement(ack.release()); + assert(ctrl2->IsInitialized()); + SendControlMessage(move(ctrl2)); + + fi_cq_err_entry cqEntry; + ret = fi_cq_sread(fDataCompletionQueueRx, &cqEntry, 1, nullptr, 1000); + if (ret != 1) + throw SocketError(tools::ToString("Failed reading ofi rx completion queue event, reason: ", fi_strerror(ret))); + } + + return size; } catch (const SilentSocketError& e) { diff --git a/fairmq/ofi/Socket.h b/fairmq/ofi/Socket.h index 1237f7b0..15617531 100644 --- a/fairmq/ofi/Socket.h +++ b/fairmq/ofi/Socket.h @@ -14,6 +14,7 @@ #include #include +#include #include // unique_ptr #include #include @@ -90,6 +91,7 @@ class Socket : public fair::mq::Socket fi_addr_t fRemoteDataAddr; sockaddr_in fLocalDataAddr; bool fWaitingForControlPeer; + boost::asio::io_service::strand fIoStrand; int fSndTimeout; int fRcvTimeout; diff --git a/fairmq/ofi/TransportFactory.cxx b/fairmq/ofi/TransportFactory.cxx index 7106f00b..41229c93 100644 --- a/fairmq/ofi/TransportFactory.cxx +++ b/fairmq/ofi/TransportFactory.cxx @@ -28,9 +28,13 @@ try : FairMQTransportFactory{id} { LOG(debug) << "Transport: Using ZeroMQ (" << fContext.GetZmqVersion() << ") & " << "OFI libfabric (API " << fContext.GetOfiApiVersion() << ") & " - << "Google Protobuf (" << fContext.GetPbVersion() << ")"; + << "Google Protobuf (" << fContext.GetPbVersion() << ") & " + << "Boost.Asio (" << fContext.GetBoostVersion() << ")"; +} +catch (ContextError& e) +{ + throw TransportFactoryError{e.what()}; } -catch (ContextError& e) { throw TransportFactoryError{e.what()}; } auto TransportFactory::CreateMessage() const -> MessagePtr { diff --git a/fairmq/test/helper/devices/TestPairLeft.cxx b/fairmq/test/helper/devices/TestPairLeft.cxx index d2f24b81..680a164a 100644 --- a/fairmq/test/helper/devices/TestPairLeft.cxx +++ b/fairmq/test/helper/devices/TestPairLeft.cxx @@ -34,19 +34,23 @@ class PairLeft : public FairMQDevice // Simple empty message ping pong auto msg1{NewMessageFor("data", 0)}; - auto msg2{NewMessageFor("data", 0)}; - auto msg3{NewMessageFor("data", 0)}; if (Send(msg1, "data") >= 0) counter++; + auto msg2{NewMessageFor("data", 0)}; if (Receive(msg2, "data") >= 0) counter++; - if (Send(msg2, "data") >= 0) counter++; - if (Receive(msg3, "data") >= 0) counter++; + auto msg3{NewMessageFor("data", 0)}; + if (Send(msg3, "data") >= 0) counter++; + auto msg4{NewMessageFor("data", 0)}; + if (Receive(msg4, "data") >= 0) counter++; if (counter == 4) LOG(info) << "Simple empty message ping pong successfull"; // Simple message with short text data - auto msg4{NewSimpleMessageFor("data", 0, "testdata1234")}; - if (Send(msg4, "data") >= 0) counter++; + auto msg5{NewSimpleMessageFor("data", 0, "testdata1234")}; + LOG(info) << "Will send msg5"; + if (Send(msg5, "data") >= 0) counter++; + LOG(info) << "Sent msg5"; if (counter == 5) LOG(info) << "Simple message with short text data successfull"; + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); assert(counter == 5); }; }; diff --git a/fairmq/test/helper/devices/TestPairRight.cxx b/fairmq/test/helper/devices/TestPairRight.cxx index e657476d..121d5ef9 100644 --- a/fairmq/test/helper/devices/TestPairRight.cxx +++ b/fairmq/test/helper/devices/TestPairRight.cxx @@ -36,19 +36,21 @@ class PairRight : public FairMQDevice // Simple empty message ping pong auto msg1{NewMessageFor("data", 0)}; if (Receive(msg1, "data") >= 0) counter++; - if (Send(msg1, "data") >= 0) counter++; auto msg2{NewMessageFor("data", 0)}; - if (Receive(msg2, "data") >= 0) counter++; if (Send(msg2, "data") >= 0) counter++; + auto msg3{NewMessageFor("data", 0)}; + if (Receive(msg3, "data") >= 0) counter++; + auto msg4{NewMessageFor("data", 0)}; + if (Send(msg4, "data") >= 0) counter++; if (counter == 4) LOG(info) << "Simple empty message ping pong successfull"; // Simple message with short text data - auto msg3{NewMessageFor("data", 0)}; - auto ret = Receive(msg3, "data"); + auto msg5{NewMessageFor("data", 0)}; + auto ret = Receive(msg5, "data"); if (ret > 0) { - auto content = std::string{static_cast(msg3->GetData()), msg3->GetSize()}; - LOG(info) << ret << ", " << msg3->GetSize() << ", '" << content << "'"; - if (msg3->GetSize() == ret && content == "testdata1234") counter++; + auto content = std::string{static_cast(msg5->GetData()), msg5->GetSize()}; + LOG(info) << ret << ", " << msg5->GetSize() << ", '" << content << "'"; + if (msg5->GetSize() == ret && content == "testdata1234") counter++; } if (counter == 5) LOG(info) << "Simple message with short text data successfull";