diff --git a/examples/readout/CMakeLists.txt b/examples/readout/CMakeLists.txt index 5e309485..0e62e969 100644 --- a/examples/readout/CMakeLists.txt +++ b/examples/readout/CMakeLists.txt @@ -25,7 +25,7 @@ target_link_libraries(fairmq-ex-readout-builder PRIVATE ExampleReadoutLib) add_executable(fairmq-ex-readout-sink runSink.cxx) target_link_libraries(fairmq-ex-readout-sink PRIVATE ExampleReadoutLib) -add_custom_target(Examplereadout DEPENDS fairmq-ex-readout-sampler fairmq-ex-readout-sink) +add_custom_target(ExampleReadout DEPENDS fairmq-ex-readout-sampler fairmq-ex-readout-builder fairmq-ex-readout-sink) set(EX_BIN_DIR ${CMAKE_CURRENT_BINARY_DIR}) set(FAIRMQ_BIN_DIR ${CMAKE_BINARY_DIR}/fairmq) @@ -36,6 +36,7 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-readout.sh.in ${CMAKE install( TARGETS fairmq-ex-readout-sampler + fairmq-ex-readout-builder fairmq-ex-readout-sink LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR} diff --git a/fairmq/ofi/Socket.cxx b/fairmq/ofi/Socket.cxx index 0dacabc5..45ee1ce7 100644 --- a/fairmq/ofi/Socket.cxx +++ b/fairmq/ofi/Socket.cxx @@ -44,7 +44,6 @@ Socket::Socket(Context& context, const string& type, const string& name, const s , fOfiDomain(nullptr) , fPassiveEndpoint(nullptr) , fDataEndpoint(nullptr) - , fControlEndpoint(nullptr) , fId(id + "." + name + "." + type) , fBytesTx(0) , fBytesRx(0) @@ -72,7 +71,7 @@ Socket::Socket(Context& context, const string& type, const string& name, const s fRecvQueueWrite.set_option(recv_max); // Setup internal queue - auto hashed_id = std::hash()(fId); + auto hashed_id = hash()(fId); auto queue_id = tools::ToString("inproc://TXQUEUE", hashed_id); queue_id = tools::ToString("inproc://RXQUEUE", hashed_id); LOG(debug) << "OFI transport (" << fId << "): " << "Binding RQR: " << queue_id; @@ -95,9 +94,9 @@ auto Socket::InitOfi(Address addr) -> void hints.set_provider("verbs"); } if (fRemoteAddr == addr) { - fOfiInfo = tools::make_unique(addr.Ip.c_str(), std::to_string(addr.Port).c_str(), 0, hints); + fOfiInfo = tools::make_unique(addr.Ip.c_str(), to_string(addr.Port).c_str(), 0, hints); } else { - fOfiInfo = tools::make_unique(addr.Ip.c_str(), std::to_string(addr.Port).c_str(), FI_SOURCE, hints); + fOfiInfo = tools::make_unique(addr.Ip.c_str(), to_string(addr.Port).c_str(), FI_SOURCE, hints); } LOG(debug) << "OFI transport: " << *fOfiInfo; @@ -121,7 +120,21 @@ try { fPassiveEndpoint = tools::make_unique(fContext.GetIoContext(), *fOfiFabric); //fPassiveEndpoint->set_local_address(Context::ConvertAddress(fLocalAddr)); - BindControlEndpoint(); + assert(!fDataEndpoint); + + fPassiveEndpoint->listen([&](asiofi::info&& info) { + LOG(debug) << "OFI transport (" << fId << "): data band connection request received. Accepting ..."; + fDataEndpoint = tools::make_unique(fContext.GetIoContext(), *fOfiDomain, info); + fDataEndpoint->enable(); + fDataEndpoint->accept([&]() { + LOG(debug) << "OFI transport (" << fId << "): data band connection accepted."; + + boost::asio::post(fContext.GetIoContext(), bind(&Socket::RecvQueueReader, this)); + fBound = true; + }); + }); + + LOG(debug) << "OFI transport (" << fId << "): data band bound to " << fLocalAddr; while (!fBound) { this_thread::sleep_for(chrono::milliseconds(100)); @@ -137,43 +150,6 @@ try { return false; } -auto Socket::BindControlEndpoint() -> void -{ - assert(!fControlEndpoint); - - fPassiveEndpoint->listen([&](asiofi::info&& info) { - LOG(debug) << "OFI transport (" << fId << "): control band connection request received. Accepting ..."; - fControlEndpoint = tools::make_unique(fContext.GetIoContext(), *fOfiDomain, info); - fControlEndpoint->enable(); - fControlEndpoint->accept([&]() { - LOG(debug) << "OFI transport (" << fId << "): control band connection accepted."; - - BindDataEndpoint(); - }); - }); - - LOG(debug) << "OFI transport (" << fId << "): control band bound to " << fLocalAddr; -} - -auto Socket::BindDataEndpoint() -> void -{ - assert(!fDataEndpoint); - - fPassiveEndpoint->listen([&](asiofi::info&& info) { - LOG(debug) << "OFI transport (" << fId << "): data band connection request received. Accepting ..."; - fDataEndpoint = tools::make_unique(fContext.GetIoContext(), *fOfiDomain, info); - fDataEndpoint->enable(); - fDataEndpoint->accept([&]() { - LOG(debug) << "OFI transport (" << fId << "): data band connection accepted."; - - boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::RecvControlQueueReader, this)); - fBound = true; - }); - }); - - LOG(debug) << "OFI transport (" << fId << "): data band bound to " << fLocalAddr; -} - auto Socket::Connect(const string& address) -> bool try { fConnected = false; @@ -184,7 +160,23 @@ try { InitOfi(fRemoteAddr); - ConnectEndpoint(fControlEndpoint, Band::Control); + assert(!fDataEndpoint); + + fDataEndpoint = tools::make_unique(fContext.GetIoContext(), *fOfiDomain); + fDataEndpoint->enable(); + + LOG(debug) << "OFI transport (" << fId << "): Sending data band connection request to " << fRemoteAddr; + + fDataEndpoint->connect(Context::ConvertAddress(fRemoteAddr), [&](asiofi::eq::event event) { + LOG(debug) << "OFI transport (" << fId << "): data band conn event happened"; + if (event == asiofi::eq::event::connected) { + LOG(debug) << "OFI transport (" << fId << "): data band connected."; + boost::asio::post(fContext.GetIoContext(), bind(&Socket::RecvQueueReader, this)); + fConnected = true; + } else { + LOG(error) << "Could not connect on the first try"; + } + }); while (!fConnected) { this_thread::sleep_for(chrono::milliseconds(100)); @@ -194,74 +186,11 @@ try { } catch (const SilentSocketError& e) { // do not print error in this case, this is handled by FairMQDevice return false; -} catch (const std::exception& e) { +} catch (const exception& e) { LOG(error) << "OFI transport: " << e.what(); return false; } -auto Socket::ConnectEndpoint(std::unique_ptr& endpoint, Band type) -> void -{ - assert(!endpoint); - - std::string band(type == Band::Control ? "control" : "data"); - - endpoint = tools::make_unique(fContext.GetIoContext(), *fOfiDomain); - endpoint->enable(); - - LOG(debug) << "OFI transport (" << fId << "): Sending " << band << " band connection request to " << fRemoteAddr; - - endpoint->connect(Context::ConvertAddress(fRemoteAddr), [&, band, type](asiofi::eq::event event) { - LOG(debug) << "OFI transport (" << fId << "): " << band << " band conn event happened"; - if (event == asiofi::eq::event::connected) { - LOG(debug) << "OFI transport (" << fId << "): " << band << " band connected."; - if (type == Band::Control) { - ConnectEndpoint(fDataEndpoint, Band::Data); - } else { - boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::RecvControlQueueReader, this)); - fConnected = true; - } - } else { - LOG(error) << "Could not connect on the first try"; - } - }); -} - -// auto Socket::ReceiveDataAddressAnnouncement() -> void -// { - // azmq::message ctrl; - // auto recv = fControlEndpoint.receive(ctrl); - // assert(recv == sizeof(DataAddressAnnouncement)); (void)recv; - // auto daa(static_cast(ctrl.data())); - // assert(daa->type == ControlMessageType::DataAddressAnnouncement); -// - // sockaddr_in remoteAddr; - // remoteAddr.sin_family = AF_INET; - // remoteAddr.sin_port = daa->port; - // remoteAddr.sin_addr.s_addr = daa->ipv4; -// - // auto addr = Context::ConvertAddress(remoteAddr); - // addr.Protocol = fRemoteDataAddr.Protocol; - // LOG(debug) << "OFI transport (" << fId << "): Data address announcement of remote endpoint received: " << addr; - // fRemoteDataAddr = addr; -// } -// -// auto Socket::AnnounceDataAddress() -> void -// { - // fLocalDataAddr = fDataEndpoint->get_local_address(); - // LOG(debug) << "Address of local ofi endpoint in socket " << fId << ": " << Context::ConvertAddress(fLocalDataAddr); -// - // Create new data address announcement message - // auto daa = MakeControlMessage(); - // auto addr = Context::ConvertAddress(fLocalDataAddr); - // daa.ipv4 = addr.sin_addr.s_addr; - // daa.port = addr.sin_port; -// - // auto sent = fControlEndpoint.send(boost::asio::buffer(daa)); - // assert(sent == sizeof(addr)); (void)sent; -// - // LOG(debug) << "OFI transport (" << fId << "): data band address " << fLocalDataAddr << " announced."; -// } - auto Socket::Send(MessagePtr& msg, const int /*timeout*/) -> int { // LOG(debug) << "OFI transport (" << fId << "): ENTER Send: data=" << msg->GetData() << ",size=" << msg->GetSize(); @@ -271,7 +200,7 @@ auto Socket::Send(MessagePtr& msg, const int /*timeout*/) -> int size_t size = msg->GetSize(); OnSend(msg); return size; - } catch (const std::exception& e) { + } catch (const exception& e) { LOG(error) << e.what(); return -1; } catch (const boost::system::error_code& e) { @@ -284,68 +213,36 @@ auto Socket::OnSend(MessagePtr& msg) -> void { // LOG(debug) << "OFI transport (" << fId << "): ENTER OnSend"; - auto size = msg->GetSize(); + auto size = 2000000; // LOG(debug) << "OFI transport (" << fId << "): OnSend: data=" << msg->GetData() << ",size=" << msg->GetSize(); - // Create and send control message - auto ctrl = MakeControlMessageWithPmr(&fControlMemPool); - ctrl->size = size; - auto ctrl_msg = boost::asio::mutable_buffer(ctrl.get(), sizeof(PostBuffer)); + boost::asio::mutable_buffer buffer(msg->GetData(), size); + if (fNeedOfiMemoryRegistration) { - asiofi::memory_region mr(*fOfiDomain, ctrl_msg, asiofi::mr::access::send); + asiofi::memory_region mr(*fOfiDomain, buffer, asiofi::mr::access::send); auto desc = mr.desc(); - fControlEndpoint->send( - ctrl_msg, desc, [&, ctrl2 = std::move(ctrl), mr2 = std::move(mr)](boost::asio::mutable_buffer) mutable { - // LOG(debug) << "OFI transport (" << fId << "): >>>>> Control message sent"; + + fDataEndpoint->send(buffer, desc, [&, size, msg2 = move(msg), mr2 = move(mr)](boost::asio::mutable_buffer) mutable { + // LOG(debug) << "OFI transport (" << fId << "): >>>>> Data buffer sent"; + fBytesTx += size; + fMessagesTx++; + fSendSem.async_signal([&](const boost::system::error_code& ec) { + if (!ec) { + // LOG(debug) << "OFI transport (" << fId << "): > Signal fSendSem=" << fSendSem.get_value(); + } }); + }); } else { - fControlEndpoint->send(ctrl_msg, - [&, ctrl2 = std::move(ctrl)](boost::asio::mutable_buffer) mutable { - // LOG(debug) << "OFI transport (" << fId << "): >>>>> Control message sent"; - }); - } - - if (size) { - boost::asio::mutable_buffer buffer(msg->GetData(), size); - - if (fNeedOfiMemoryRegistration) { - asiofi::memory_region mr(*fOfiDomain, buffer, asiofi::mr::access::send); - auto desc = mr.desc(); - - fDataEndpoint->send(buffer, - desc, - [&, size, msg2 = std::move(msg), mr2 = std::move(mr)]( - boost::asio::mutable_buffer) mutable { - // LOG(debug) << "OFI transport (" << fId << "): >>>>> Data buffer sent"; - fBytesTx += size; - fMessagesTx++; - fSendSem.async_signal([&](const boost::system::error_code& ec) { - if (!ec) { - // LOG(debug) << "OFI transport (" << fId << "): > Signal fSendSem=" << fSendSem.get_value(); - } - }); - }); - - } else { - fDataEndpoint->send( - buffer, [&, size, msg2 = std::move(msg)](boost::asio::mutable_buffer) mutable { - // LOG(debug) << "OFI transport (" << fId << "): >>>>> Data buffer sent"; - fBytesTx += size; - fMessagesTx++; - fSendSem.async_signal([&](const boost::system::error_code& ec) { - if (!ec) { - // LOG(debug) << "OFI transport (" << fId << "): > Signal fSendSem=" << fSendSem.get_value(); - } - }); - }); - } - } else { - ++fMessagesTx; - fSendSem.async_signal([&](const boost::system::error_code& ec) { - if (!ec) { - // LOG(debug) << "OFI transport (" << fId << "): > Signal fSendSem=" << fSendSem.get_value(); - } + fDataEndpoint->send(buffer, [&, size, msg2 = move(msg)](boost::asio::mutable_buffer) mutable { + // LOG(debug) << "OFI transport (" << fId << "): >>>>> Data buffer sent"; + fBytesTx += size; + fMessagesTx++; + fSendSem.async_signal([&](const boost::system::error_code& ec) { + if (!ec) { + // LOG(debug) << "OFI transport (" << fId << "): > Signal fSendSem=" << fSendSem.get_value(); + } + }); }); } @@ -358,9 +255,9 @@ try { azmq::message zmsg; auto recv = fRecvQueueRead.receive(zmsg); - size_t size(0); + size_t size = 0; if (recv > 0) { - msg = std::move(*(static_cast(zmsg.buffer().data()))); + msg = move(*(static_cast(zmsg.buffer().data()))); size = msg->GetSize(); } @@ -369,7 +266,7 @@ try { // LOG(debug) << "OFI transport (" << fId << "): LEAVE Receive"; return size; -} catch (const std::exception& e) { +} catch (const exception& e) { LOG(error) << e.what(); return -1; } catch (const boost::system::error_code& e) { @@ -377,111 +274,59 @@ try { return -1; } -auto Socket::Receive(std::vector& msgVec, const int timeout) -> int64_t +auto Socket::Receive(vector& msgVec, const int timeout) -> int64_t { return ReceiveImpl(msgVec, 0, timeout); } -auto Socket::RecvControlQueueReader() -> void +auto Socket::RecvQueueReader() -> void { fRecvSem.async_wait([&](const boost::system::error_code& ec) { if (!ec) { - auto ctrl = MakeControlMessageWithPmr(&fControlMemPool); - auto ctrl_msg = boost::asio::mutable_buffer(ctrl.get(), sizeof(PostBuffer)); + auto size = 2000000; + + auto msg = fContext.MakeReceiveMessage(size); + boost::asio::mutable_buffer buffer(msg->GetData(), size); if (fNeedOfiMemoryRegistration) { - asiofi::memory_region mr(*fOfiDomain, ctrl_msg, asiofi::mr::access::recv); + asiofi::memory_region mr(*fOfiDomain, buffer, asiofi::mr::access::recv); auto desc = mr.desc(); - fControlEndpoint->recv( - ctrl_msg, desc, [&, ctrl2 = std::move(ctrl), mr2 = std::move(mr)](boost::asio::mutable_buffer) mutable { - OnRecvControl(std::move(ctrl2)); + fDataEndpoint->recv(buffer, desc, [&, msg2 = move(msg), mr2 = move(mr)](boost::asio::mutable_buffer) mutable { + MessagePtr* msgptr(new std::unique_ptr(move(msg2))); + fRecvQueueWrite.async_send(azmq::message(boost::asio::const_buffer(msgptr, sizeof(MessagePtr))), [&](const boost::system::error_code& ec2, size_t /*bytes_transferred2*/) { + if (!ec2) { + // LOG(debug) << "OFI transport (" << fId << "): <<<<< Data buffer received, bytes_transferred2=" << bytes_transferred2; + fRecvSem.async_signal([&](const boost::system::error_code& ec3) { + if (!ec3) { + // LOG(debug) << "OFI transport (" << fId << "): < Signal fRecvSem"; + } + }); + } }); + }); } else { - fControlEndpoint->recv( - ctrl_msg, [&, ctrl2 = std::move(ctrl)](boost::asio::mutable_buffer) mutable { - OnRecvControl(std::move(ctrl2)); + fDataEndpoint->recv(buffer, [&, msg2 = move(msg)](boost::asio::mutable_buffer) mutable { + MessagePtr* msgptr(new std::unique_ptr(move(msg2))); + fRecvQueueWrite.async_send(azmq::message(boost::asio::const_buffer(msgptr, sizeof(MessagePtr))), [&](const boost::system::error_code& ec2, size_t /*bytes_transferred2*/) { + if (!ec2) { + // LOG(debug) << "OFI transport (" << fId << "): <<<<< Data buffer received, bytes_transferred2=" << bytes_transferred2; + fRecvSem.async_signal([&](const boost::system::error_code& ec3) { + if (!ec3) { + // LOG(debug) << "OFI transport (" << fId << "): < Signal fRecvSem"; + } + }); + } }); + }); } + + boost::asio::post(fContext.GetIoContext(), bind(&Socket::RecvQueueReader, this)); } }); } -auto Socket::OnRecvControl(ofi::unique_ptr ctrl) -> void -{ - // LOG(debug) << "OFI transport (" << fId << "): ENTER OnRecvControl"; - - auto size = ctrl->size; - // LOG(debug) << "OFI transport (" << fId << "): OnRecvControl: PostBuffer.size=" << size; - - // Receive data - if (size) { - auto msg = fContext.MakeReceiveMessage(size); - boost::asio::mutable_buffer buffer(msg->GetData(), size); - - if (fNeedOfiMemoryRegistration) { - asiofi::memory_region mr(*fOfiDomain, buffer, asiofi::mr::access::recv); - auto desc = mr.desc(); - - fDataEndpoint->recv( - buffer, - desc, - [&, msg2 = std::move(msg), mr2 = std::move(mr)]( - boost::asio::mutable_buffer) mutable { - MessagePtr* msgptr(new std::unique_ptr(std::move(msg2))); - fRecvQueueWrite.async_send( - azmq::message(boost::asio::const_buffer(msgptr, sizeof(MessagePtr))), - [&](const boost::system::error_code& ec, size_t /*bytes_transferred2*/) { - if (!ec) { - // LOG(debug) << "OFI transport (" << fId << "): <<<<< Data buffer received, bytes_transferred2=" << bytes_transferred2; - fRecvSem.async_signal([&](const boost::system::error_code& ec2) { - if (!ec2) { - //LOG(debug) << "OFI transport (" << fId << "): < Signal fRecvSem"; - } - }); - } - }); - }); - - } else { - fDataEndpoint->recv( - buffer, [&, msg2 = std::move(msg)](boost::asio::mutable_buffer) mutable { - MessagePtr* msgptr(new std::unique_ptr(std::move(msg2))); - fRecvQueueWrite.async_send( - azmq::message(boost::asio::const_buffer(msgptr, sizeof(MessagePtr))), - [&](const boost::system::error_code& ec, size_t /*bytes_transferred2*/) { - if (!ec) { - // LOG(debug) << "OFI transport (" << fId << "): <<<<< Data buffer received, bytes_transferred2=" << bytes_transferred2; - fRecvSem.async_signal([&](const boost::system::error_code& ec2) { - if (!ec2) { - // LOG(debug) << "OFI transport (" << fId << "): < Signal fRecvSem"; - } - }); - } - }); - }); - } - } else { - fRecvQueueWrite.async_send( - azmq::message(boost::asio::const_buffer(nullptr, 0)), - [&](const boost::system::error_code& ec, size_t /*bytes_transferred2*/) { - if (!ec) { - // LOG(debug) << "OFI transport (" << fId << "): <<<<< Data buffer received, bytes_transferred2=" << bytes_transferred2; - fRecvSem.async_signal([&](const boost::system::error_code& ec2) { - if (!ec2) { - // LOG(debug) << "OFI transport (" << fId << "): < Signal fRecvSem"; - } - }); - } - }); - } - - boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::RecvControlQueueReader, this)); - - // LOG(debug) << "OFI transport (" << fId << "): LEAVE OnRecvControl"; -} - -auto Socket::Send(std::vector& msgVec, const int timeout) -> int64_t +auto Socket::Send(vector& msgVec, const int timeout) -> int64_t { return SendImpl(msgVec, 0, timeout); } @@ -489,172 +334,11 @@ auto Socket::Send(std::vector& msgVec, const int timeout) -> int64_t auto Socket::SendImpl(vector& /*msgVec*/, const int /*flags*/, const int /*timeout*/) -> int64_t { throw SocketError{"Not yet implemented."}; - // const unsigned int vecSize = msgVec.size(); - // int elapsed = 0; - // - // // Sending vector typicaly handles more then one part - // if (vecSize > 1) - // { - // int64_t totalSize = 0; - // int nbytes = -1; - // bool repeat = false; - // - // while (true && !fInterrupted) - // { - // for (unsigned int i = 0; i < vecSize; ++i) - // { - // nbytes = zmq_msg_send(static_cast(msgVec[i].get())->GetMessage(), - // fSocket, - // (i < vecSize - 1) ? ZMQ_SNDMORE|flags : flags); - // if (nbytes >= 0) - // { - // static_cast(msgVec[i].get())->fQueued = true; - // size_t size = msgVec[i]->GetSize(); - // - // totalSize += size; - // } - // else - // { - // // according to ZMQ docs, this can only occur for the first part - // if (zmq_errno() == EAGAIN) - // { - // if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) - // { - // if (timeout) - // { - // elapsed += fSndTimeout; - // if (elapsed >= timeout) - // { - // return -2; - // } - // } - // repeat = true; - // break; - // } - // else - // { - // return -2; - // } - // } - // if (zmq_errno() == ETERM) - // { - // LOG(info) << "terminating socket " << fId; - // return -1; - // } - // LOG(error) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno); - // return nbytes; - // } - // } - // - // if (repeat) - // { - // continue; - // } - // - // // store statistics on how many messages have been sent (handle all parts as a single message) - // ++fMessagesTx; - // fBytesTx += totalSize; - // return totalSize; - // } - // - // return -1; - // } // If there's only one part, send it as a regular message - // else if (vecSize == 1) - // { - // return Send(msgVec.back(), flags); - // } - // else // if the vector is empty, something might be wrong - // { - // LOG(warn) << "Will not send empty vector"; - // return -1; - // } } auto Socket::ReceiveImpl(vector& /*msgVec*/, const int /*flags*/, const int /*timeout*/) -> int64_t { throw SocketError{"Not yet implemented."}; - // int64_t totalSize = 0; - // int64_t more = 0; - // bool repeat = false; - // int elapsed = 0; - // - // while (true) - // { - // // Warn if the vector is filled before Receive() and empty it. - // // if (msgVec.size() > 0) - // // { - // // LOG(warn) << "Message vector contains elements before Receive(), they will be deleted!"; - // // msgVec.clear(); - // // } - // - // totalSize = 0; - // more = 0; - // repeat = false; - // - // do - // { - // FairMQMessagePtr part(new FairMQMessageSHM(fManager, GetTransport())); - // zmq_msg_t* msgPtr = static_cast(part.get())->GetMessage(); - // - // int nbytes = zmq_msg_recv(msgPtr, fSocket, flags); - // if (nbytes == 0) - // { - // msgVec.push_back(move(part)); - // } - // else if (nbytes > 0) - // { - // MetaHeader* hdr = static_cast(zmq_msg_data(msgPtr)); - // size_t size = 0; - // static_cast(part.get())->fHandle = hdr->fHandle; - // static_cast(part.get())->fSize = hdr->fSize; - // static_cast(part.get())->fRegionId = hdr->fRegionId; - // static_cast(part.get())->fHint = hdr->fHint; - // size = part->GetSize(); - // - // msgVec.push_back(move(part)); - // - // totalSize += size; - // } - // else if (zmq_errno() == EAGAIN) - // { - // if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) - // { - // if (timeout) - // { - // elapsed += fSndTimeout; - // if (elapsed >= timeout) - // { - // return -2; - // } - // } - // repeat = true; - // break; - // } - // else - // { - // return -2; - // } - // } - // else - // { - // return nbytes; - // } - // - // size_t more_size = sizeof(more); - // zmq_getsockopt(fSocket, ZMQ_RCVMORE, &more, &more_size); - // } - // while (more); - // - // if (repeat) - // { - // continue; - // } - // - // // store statistics on how many messages have been received (handle all parts as a single message) - // ++fMessagesRx; - // fBytesRx += totalSize; - // return totalSize; - // } } auto Socket::Close() -> void {} diff --git a/fairmq/ofi/Socket.h b/fairmq/ofi/Socket.h index a9d6516a..e9929599 100644 --- a/fairmq/ofi/Socket.h +++ b/fairmq/ofi/Socket.h @@ -82,12 +82,11 @@ class Socket final : public fair::mq::Socket private: Context& fContext; - asiofi::allocated_pool_resource fControlMemPool; std::unique_ptr fOfiInfo; std::unique_ptr fOfiFabric; std::unique_ptr fOfiDomain; std::unique_ptr fPassiveEndpoint; - std::unique_ptr fDataEndpoint, fControlEndpoint; + std::unique_ptr fDataEndpoint; std::string fId; std::atomic fBytesTx; std::atomic fBytesRx; @@ -104,9 +103,7 @@ class Socket final : public fair::mq::Socket std::atomic fConnected; auto OnSend(MessagePtr& msg) -> void; - auto RecvControlQueueReader() -> void; - auto OnRecvControl(ofi::unique_ptr ctrl) -> void; - auto OnReceive() -> void; + auto RecvQueueReader() -> void; auto ReceiveImpl(MessagePtr& msg, const int flags, const int timeout) -> int; auto SendImpl(std::vector& msgVec, const int flags, const int timeout) -> int64_t; auto ReceiveImpl(std::vector& msgVec, const int flags, const int timeout) -> int64_t; @@ -114,10 +111,6 @@ class Socket final : public fair::mq::Socket // auto WaitForControlPeer() -> void; // auto AnnounceDataAddress() -> void; auto InitOfi(Address addr) -> void; - auto BindControlEndpoint() -> void; - auto BindDataEndpoint() -> void; - enum class Band { Control, Data }; - auto ConnectEndpoint(std::unique_ptr& endpoint, Band type) -> void; // auto ReceiveDataAddressAnnouncement() -> void; }; /* class Socket */