diff --git a/fairmq/ofi/Socket.cxx b/fairmq/ofi/Socket.cxx index 76bfca93..806c0195 100644 --- a/fairmq/ofi/Socket.cxx +++ b/fairmq/ofi/Socket.cxx @@ -17,12 +17,14 @@ #include #include #include +#include #include #include #include #include #include #include +#include #include #include @@ -149,7 +151,7 @@ auto Socket::BindControlEndpoint() -> void { assert(!fControlEndpoint); - fPassiveEndpoint->listen([&](fid_t /*handle*/, asiofi::info&& info) { + fPassiveEndpoint->listen([&](asiofi::info&& info) { LOG(debug) << "OFI transport (" << fId << "): control band connection request received. Accepting ..."; fControlEndpoint = tools::make_unique( @@ -169,7 +171,7 @@ auto Socket::BindDataEndpoint() -> void { assert(!fDataEndpoint); - fPassiveEndpoint->listen([&](fid_t /*handle*/, asiofi::info&& info) { + fPassiveEndpoint->listen([&](asiofi::info&& info) { LOG(debug) << "OFI transport (" << fId << "): data band connection request received. Accepting ..."; fDataEndpoint = tools::make_unique( @@ -195,86 +197,65 @@ try { InitOfi(fRemoteAddr); - ConnectControlEndpoint(); + ConnectEndpoint(fControlEndpoint, Band::Control); - ConnectDataEndpoint(); + ConnectEndpoint(fDataEndpoint, Band::Data); boost::asio::post(fIoStrand, std::bind(&Socket::SendQueueReader, this)); boost::asio::post(fIoStrand, std::bind(&Socket::RecvControlQueueReader, this)); return true; } -// TODO catch the correct ofi error catch (const SilentSocketError& e) { // do not print error in this case, this is handled by FairMQDevice // in case no connection could be established after trying a number of random ports from a range. return false; } -catch (const SocketError& e) +catch (const std::exception& e) { LOG(error) << "OFI transport: " << e.what(); return false; } -auto Socket::ConnectControlEndpoint() -> void +auto Socket::ConnectEndpoint(std::unique_ptr& endpoint, Band type) -> void { - assert(!fControlEndpoint); + assert(!endpoint); std::mutex m; std::condition_variable cv; - bool completed(false); + asiofi::eq::event status(asiofi::eq::event::connrefused); + std::string band(type == Band::Control ? "control" : "data"); - fControlEndpoint = - tools::make_unique(fIoStrand.context(), *fOfiDomain); - fControlEndpoint->enable(); + endpoint = tools::make_unique(fIoStrand.context(), *fOfiDomain); + endpoint->enable(); - fControlEndpoint->connect(Context::ConvertAddress(fRemoteAddr), [&]() { - { - std::unique_lock lk(m); - completed = true; - } - cv.notify_one(); - }); - - LOG(debug) << "OFI transport (" << fId << "): control band connection request sent to " + LOG(debug) << "OFI transport (" << fId << "): Sending " << band << " band connection request to " << fRemoteAddr; - { - std::unique_lock lk(m); - cv.wait(lk, [&]() { return completed; }); - } - LOG(debug) << "OFI transport (" << fId << "): control band connected."; -} + while (true) { + endpoint->connect(Context::ConvertAddress(fRemoteAddr), [&](asiofi::eq::event event) { + { + std::unique_lock lk(m); + status = event; + } + cv.notify_one(); + }); -auto Socket::ConnectDataEndpoint() -> void -{ - assert(!fDataEndpoint); - - std::mutex m; - std::condition_variable cv; - bool completed(false); - - fDataEndpoint = - tools::make_unique(fIoStrand.context(), *fOfiDomain); - fDataEndpoint->enable(); - - fDataEndpoint->connect(Context::ConvertAddress(fRemoteAddr), [&]() { { std::unique_lock lk(m); - completed = true; + cv.wait(lk); + + if (status == asiofi::eq::event::connected) { + break; + } else { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + continue; + } } - cv.notify_one(); - }); - - LOG(debug) << "OFI transport (" << fId << "): data band connection request sent to " - << fRemoteAddr; - - { - std::unique_lock lk(m); - cv.wait(lk, [&]() { return completed; }); } - LOG(debug) << "OFI transport (" << fId << "): data band connected."; + + LOG(debug) << "OFI transport (" << fId << "): " << band << " band connected."; } // auto Socket::ReceiveDataAddressAnnouncement() -> void @@ -362,15 +343,22 @@ auto Socket::Receive(MessagePtr& msg, const int /*timeout*/) -> int } } -auto Socket::Send(std::vector& msgVec, const int timeout) -> int64_t { return SendImpl(msgVec, 0, timeout); } -auto Socket::Receive(std::vector& msgVec, const int timeout) -> int64_t { return ReceiveImpl(msgVec, 0, timeout); } +auto Socket::Send(std::vector& msgVec, const int timeout) -> int64_t +{ + return SendImpl(msgVec, 0, timeout); +} +auto Socket::Receive(std::vector& msgVec, const int timeout) -> int64_t +{ + return ReceiveImpl(msgVec, 0, timeout); +} auto Socket::SendQueueReader() -> void { fSendSem.async_wait( boost::asio::bind_executor(fIoStrand, [&](const boost::system::error_code& ec) { if (!ec) { - // LOG(debug) << "OFI transport (" << fId << "): < Wait fSendSem=" << fSendSem.get_value(); + // LOG(debug) << "OFI transport (" << fId << "): < Wait fSendSem=" << + // fSendSem.get_value(); fSendQueueRead.async_receive([&](const boost::system::error_code& ec2, azmq::message& zmsg, size_t bytes_transferred) { diff --git a/fairmq/ofi/Socket.h b/fairmq/ofi/Socket.h index 549d5f50..9fd12085 100644 --- a/fairmq/ofi/Socket.h +++ b/fairmq/ofi/Socket.h @@ -117,8 +117,8 @@ class Socket final : public fair::mq::Socket auto InitOfi(Address addr) -> void; auto BindControlEndpoint() -> void; auto BindDataEndpoint() -> void; - auto ConnectControlEndpoint() -> void; - auto ConnectDataEndpoint() -> void; + enum class Band { Control, Data }; + auto ConnectEndpoint(std::unique_ptr& endpoint, Band type) -> void; // auto ReceiveDataAddressAnnouncement() -> void; }; /* class Socket */