diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index f8aac600..6db399ee 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -227,7 +227,7 @@ if(BUILD_NANOMSG_TRANSPORT) set(NANOMSG_DEPS nanomsg msgpackc-cxx) endif() if(BUILD_OFI_TRANSPORT) - set(OFI_DEPS asiofi::asiofi msgpackc) + set(OFI_DEPS asiofi::asiofi msgpack::msgpack) endif() set(optional_deps ${NANOMSG_DEPS} ${OFI_DEPS}) if(optional_deps) diff --git a/fairmq/ofi/Context.cxx b/fairmq/ofi/Context.cxx index c894a7d3..6cbb3385 100644 --- a/fairmq/ofi/Context.cxx +++ b/fairmq/ofi/Context.cxx @@ -13,7 +13,6 @@ #include #include #include -#include #include #include #include @@ -47,8 +46,6 @@ Context::Context(int numberIoThreads) if (!fZmqContext) throw ContextError{tools::ToString("Failed creating zmq context, reason: ", zmq_strerror(errno))}; - GOOGLE_PROTOBUF_VERIFY_VERSION; - InitThreadPool(numberIoThreads); } @@ -114,11 +111,6 @@ auto Context::GetOfiApiVersion() const -> string return "unknown"; } -auto Context::GetPbVersion() const -> string -{ - return google::protobuf::internal::VersionString(GOOGLE_PROTOBUF_VERSION); -} - auto Context::GetBoostVersion() const -> std::string { return tools::ToString(BOOST_VERSION / 100000, ".", BOOST_VERSION / 100 % 1000, ".", BOOST_VERSION % 100); diff --git a/fairmq/ofi/Context.h b/fairmq/ofi/Context.h index a744c3c6..bc5db721 100644 --- a/fairmq/ofi/Context.h +++ b/fairmq/ofi/Context.h @@ -45,7 +45,6 @@ class Context auto CreateOfiCompletionQueue(Direction dir) -> fid_cq*; auto GetZmqVersion() const -> std::string; auto GetOfiApiVersion() const -> std::string; - auto GetPbVersion() const -> std::string; auto GetBoostVersion() const -> std::string; auto GetZmqContext() const -> void* { return fZmqContext; } auto GetIoContext() -> boost::asio::io_service& { return fIoContext; } diff --git a/fairmq/ofi/Socket.cxx b/fairmq/ofi/Socket.cxx index b00539a5..8cee2286 100644 --- a/fairmq/ofi/Socket.cxx +++ b/fairmq/ofi/Socket.cxx @@ -136,19 +136,19 @@ auto Socket::ConnectControlSocket(Context::Address address) -> void throw SocketError(tools::ToString("Failed connecting control socket ", fId, ", reason: ", zmq_strerror(errno))); } -auto Socket::ProcessDataAddressAnnouncement(std::unique_ptr ctrl) -> void -{ - assert(ctrl->has_data_address_announcement()); - auto daa = ctrl->data_address_announcement(); - - sockaddr_in remoteAddr; - remoteAddr.sin_family = AF_INET; - remoteAddr.sin_port = daa.port(); - remoteAddr.sin_addr.s_addr = daa.ipv4(); - - LOG(debug) << "Data address announcement of remote ofi endpoint received: " << Context::ConvertAddress(remoteAddr); - fRemoteDataAddr = fContext.InsertAddressVector(remoteAddr); -} +// auto Socket::ProcessDataAddressAnnouncement(std::unique_ptr ctrl) -> void +// { + // assert(ctrl->has_data_address_announcement()); + // auto daa = ctrl->data_address_announcement(); +// + // sockaddr_in remoteAddr; + // remoteAddr.sin_family = AF_INET; + // remoteAddr.sin_port = daa.port(); + // remoteAddr.sin_addr.s_addr = daa.ipv4(); +// + // LOG(debug) << "Data address announcement of remote ofi endpoint received: " << Context::ConvertAddress(remoteAddr); + // fRemoteDataAddr = fContext.InsertAddressVector(remoteAddr); +// } auto Socket::InitDataEndpoint() -> void { @@ -184,8 +184,6 @@ void free_string(void* /*data*/, void* hint) auto Socket::AnnounceDataAddress() -> void try { - using namespace google::protobuf; - size_t addrlen = sizeof(sockaddr_in); auto ret = fi_getname(&fDataEndpoint->fid, &fLocalDataAddr, &addrlen); if (ret != FI_SUCCESS) @@ -195,62 +193,62 @@ try { LOG(debug) << "Address of local ofi endpoint in socket " << fId << ": " << Context::ConvertAddress(fLocalDataAddr); // Create new control message - auto ctrl = tools::make_unique(); - auto daa = tools::make_unique(); + // auto ctrl = tools::make_unique(); + // auto daa = tools::make_unique(); // Fill data address announcement - daa->set_ipv4(fLocalDataAddr.sin_addr.s_addr); - daa->set_port(fLocalDataAddr.sin_port); + // daa->set_ipv4(fLocalDataAddr.sin_addr.s_addr); + // daa->set_port(fLocalDataAddr.sin_port); // Fill control message - ctrl->set_allocated_data_address_announcement(daa.release()); - assert(ctrl->IsInitialized()); + // ctrl->set_allocated_data_address_announcement(daa.release()); + // assert(ctrl->IsInitialized()); - SendControlMessage(move(ctrl)); + // SendControlMessage(move(ctrl)); } catch (const SocketError& e) { throw SocketError(tools::ToString("Failed to announce data address, reason: ", e.what())); } -auto Socket::SendControlMessage(unique_ptr ctrl) -> void -{ - assert(fControlSocket); +// auto Socket::SendControlMessage(unique_ptr ctrl) -> void +// { + // assert(fControlSocket); // LOG(debug) << "About to send control message: " << ctrl->DebugString(); - +// // Serialize - string* str = new string(); - ctrl->SerializeToString(str); - zmq_msg_t msg; - auto ret = zmq_msg_init_data(&msg, const_cast(str->c_str()), str->length(), free_string, str); - assert(ret == 0); - + // string* str = new string(); + // ctrl->SerializeToString(str); + // zmq_msg_t msg; + // auto ret = zmq_msg_init_data(&msg, const_cast(str->c_str()), str->length(), free_string, str); + // assert(ret == 0); +// // Send - if (zmq_msg_send(&msg, fControlSocket, 0) == -1) { - zmq_msg_close(&msg); - throw SocketError(tools::ToString("Failed to send control message, reason: ", zmq_strerror(errno))); - } -} - -auto Socket::ReceiveControlMessage() -> unique_ptr -{ - assert(fControlSocket); - - // Receive - zmq_msg_t msg; - auto ret = zmq_msg_init(&msg); - assert(ret == 0); - if (zmq_msg_recv(&msg, fControlSocket, 0) == -1) { - zmq_msg_close(&msg); - throw SocketError(tools::ToString("Failed to receive control message, reason: ", zmq_strerror(errno))); - } - + // if (zmq_msg_send(&msg, fControlSocket, 0) == -1) { + // zmq_msg_close(&msg); + // throw SocketError(tools::ToString("Failed to send control message, reason: ", zmq_strerror(errno))); + // } +// } +// +// auto Socket::ReceiveControlMessage() -> unique_ptr +// { + // assert(fControlSocket); +// + // Receive + // zmq_msg_t msg; + // auto ret = zmq_msg_init(&msg); + // assert(ret == 0); + // if (zmq_msg_recv(&msg, fControlSocket, 0) == -1) { + // zmq_msg_close(&msg); + // throw SocketError(tools::ToString("Failed to receive control message, reason: ", zmq_strerror(errno))); + // } +// // Deserialize - auto ctrl = tools::make_unique(); - ctrl->ParseFromArray(zmq_msg_data(&msg), zmq_msg_size(&msg)); - - zmq_msg_close(&msg); + // auto ctrl = tools::make_unique(); + // ctrl->ParseFromArray(zmq_msg_data(&msg), zmq_msg_size(&msg)); +// + // zmq_msg_close(&msg); // LOG(debug) << "Received control message: " << ctrl->DebugString(); - return ctrl; -} + // return ctrl; +// } auto Socket::WaitForControlPeer() -> void { @@ -305,18 +303,18 @@ try { if (fWaitingForControlPeer) { WaitForControlPeer(); AnnounceDataAddress(); - ProcessDataAddressAnnouncement(ReceiveControlMessage()); + // ProcessDataAddressAnnouncement(ReceiveControlMessage()); } auto size = msg->GetSize(); // Create and send control message - auto ctrl = tools::make_unique(); - auto buf = tools::make_unique(); - buf->set_size(size); - ctrl->set_allocated_post_buffer(buf.release()); - assert(ctrl->IsInitialized()); - SendControlMessage(move(ctrl)); + // auto ctrl = tools::make_unique(); + // auto buf = tools::make_unique(); + // buf->set_size(size); + // ctrl->set_allocated_post_buffer(buf.release()); + // assert(ctrl->IsInitialized()); + // SendControlMessage(move(ctrl)); if (size) { // Receive and process control message @@ -359,19 +357,19 @@ try { if (fWaitingForControlPeer) { WaitForControlPeer(); AnnounceDataAddress(); - ProcessDataAddressAnnouncement(ReceiveControlMessage()); + // ProcessDataAddressAnnouncement(ReceiveControlMessage()); } // Receive and process control message - auto ctrl = ReceiveControlMessage(); - assert(ctrl->has_post_buffer()); - auto postBuffer = ctrl->post_buffer(); - auto size = postBuffer.size(); + // auto ctrl = ReceiveControlMessage(); + // assert(ctrl->has_post_buffer()); + // auto postBuffer = ctrl->post_buffer(); + // auto size = postBuffer.size(); // Receive data - if (size) { + // if (size) { fi_context ctx; - msg->Rebuild(size); + // msg->Rebuild(size); auto buf = msg->GetData(); auto size2 = msg->GetSize(); auto ret = fi_recv(fDataEndpoint, buf, size2, nullptr, fRemoteDataAddr, &ctx); @@ -392,12 +390,13 @@ try { throw SocketError(tools::ToString("Failed reading ofi rx completion queue event, reason: ", fi_strerror(ret))); assert(cqEntry.len == size2); assert(cqEntry.buf == buf); - } + // } - fBytesRx += size; + // fBytesRx += size; fMessagesRx++; - return size; + // return size; + return 0; } catch (const SilentSocketError& e) { diff --git a/fairmq/ofi/Socket.h b/fairmq/ofi/Socket.h index 0ab202d2..46275705 100644 --- a/fairmq/ofi/Socket.h +++ b/fairmq/ofi/Socket.h @@ -12,7 +12,6 @@ #include #include #include -#include #include #include // unique_ptr @@ -110,9 +109,9 @@ class Socket final : public fair::mq::Socket auto InitDataEndpoint() -> void; auto WaitForControlPeer() -> void; auto AnnounceDataAddress() -> void; - auto SendControlMessage(std::unique_ptr ctrl) -> void; - auto ReceiveControlMessage() -> std::unique_ptr; - auto ProcessDataAddressAnnouncement(std::unique_ptr ctrl) -> void; + // auto SendControlMessage(std::unique_ptr ctrl) -> void; + // auto ReceiveControlMessage() -> std::unique_ptr; + // auto ProcessDataAddressAnnouncement(std::unique_ptr ctrl) -> void; auto ConnectControlSocket(Context::Address address) -> void; auto BindControlSocket(Context::Address address) -> void; }; /* class Socket */ diff --git a/fairmq/ofi/TransportFactory.cxx b/fairmq/ofi/TransportFactory.cxx index d71b3502..8a427b2c 100644 --- a/fairmq/ofi/TransportFactory.cxx +++ b/fairmq/ofi/TransportFactory.cxx @@ -28,7 +28,6 @@ try : FairMQTransportFactory{id} { LOG(debug) << "Transport: Using ZeroMQ (" << fContext.GetZmqVersion() << ") & " << "OFI libfabric (API " << fContext.GetOfiApiVersion() << ") & " - << "Google Protobuf (" << fContext.GetPbVersion() << ") & " << "Boost.Asio (" << fContext.GetBoostVersion() << ")"; } catch (ContextError& e)