diff --git a/fairmq/ofi/Socket.cxx b/fairmq/ofi/Socket.cxx index cb33cb56..2229c81d 100644 --- a/fairmq/ofi/Socket.cxx +++ b/fairmq/ofi/Socket.cxx @@ -126,7 +126,7 @@ try { InitOfi(fLocalAddr); fPassiveEndpoint = tools::make_unique(fContext.GetIoContext(), *fOfiFabric); - fPassiveEndpoint->set_local_address(Context::ConvertAddress(fLocalAddr)); + //fPassiveEndpoint->set_local_address(Context::ConvertAddress(fLocalAddr)); BindControlEndpoint(); @@ -195,11 +195,6 @@ try { ConnectEndpoint(fControlEndpoint, Band::Control); - ConnectEndpoint(fDataEndpoint, Band::Data); - - boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::SendQueueReader, this)); - boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::RecvControlQueueReader, this)); - return true; } catch (const SilentSocketError& e) @@ -217,42 +212,27 @@ auto Socket::ConnectEndpoint(std::unique_ptr& endpoi { assert(!endpoint); - std::mutex m; - std::condition_variable cv; - asiofi::eq::event status(asiofi::eq::event::connrefused); std::string band(type == Band::Control ? "control" : "data"); endpoint = tools::make_unique(fContext.GetIoContext(), *fOfiDomain); endpoint->enable(); - LOG(debug) << "OFI transport (" << fId << "): Sending " << band << " band connection request to " - << fRemoteAddr; + LOG(debug) << "OFI transport (" << fId << "): Sending " << band << " band connection request to " << fRemoteAddr; - while (true) { - endpoint->connect(Context::ConvertAddress(fRemoteAddr), [&](asiofi::eq::event event) { - LOG(debug) << "OFI transport (" << fId << "): conn event happened"; - { - std::unique_lock lk(m); - status = event; - } - cv.notify_one(); - }); - - LOG(debug) << "OFI transport (" << fId << "): endpoint->connect called."; - { - std::unique_lock lk(m); - cv.wait(lk); - - if (status == asiofi::eq::event::connected) { - break; + endpoint->connect(Context::ConvertAddress(fRemoteAddr), [&, band, type](asiofi::eq::event event) { + LOG(debug) << "OFI transport (" << fId << "): " << band << " band conn event happened"; + if (event == asiofi::eq::event::connected) { + LOG(debug) << "OFI transport (" << fId << "): " << band << " band connected."; + if (type == Band::Control) { + ConnectEndpoint(fDataEndpoint, Band::Data); } else { - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - continue; + boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::SendQueueReader, this)); + boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::RecvControlQueueReader, this)); } + } else { + LOG(error) << "asdf"; } - } - - LOG(debug) << "OFI transport (" << fId << "): " << band << " band connected."; + }); } // auto Socket::ReceiveDataAddressAnnouncement() -> void