mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 08:41:16 +00:00
Temporary remove the OFI control band
This commit is contained in:
parent
763c21ffdd
commit
e1b1e5e21b
|
@ -25,7 +25,7 @@ target_link_libraries(fairmq-ex-readout-builder PRIVATE ExampleReadoutLib)
|
|||
add_executable(fairmq-ex-readout-sink runSink.cxx)
|
||||
target_link_libraries(fairmq-ex-readout-sink PRIVATE ExampleReadoutLib)
|
||||
|
||||
add_custom_target(Examplereadout DEPENDS fairmq-ex-readout-sampler fairmq-ex-readout-sink)
|
||||
add_custom_target(ExampleReadout DEPENDS fairmq-ex-readout-sampler fairmq-ex-readout-builder fairmq-ex-readout-sink)
|
||||
|
||||
set(EX_BIN_DIR ${CMAKE_CURRENT_BINARY_DIR})
|
||||
set(FAIRMQ_BIN_DIR ${CMAKE_BINARY_DIR}/fairmq)
|
||||
|
@ -36,6 +36,7 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-readout.sh.in ${CMAKE
|
|||
install(
|
||||
TARGETS
|
||||
fairmq-ex-readout-sampler
|
||||
fairmq-ex-readout-builder
|
||||
fairmq-ex-readout-sink
|
||||
|
||||
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
|
||||
|
|
|
@ -44,7 +44,6 @@ Socket::Socket(Context& context, const string& type, const string& name, const s
|
|||
, fOfiDomain(nullptr)
|
||||
, fPassiveEndpoint(nullptr)
|
||||
, fDataEndpoint(nullptr)
|
||||
, fControlEndpoint(nullptr)
|
||||
, fId(id + "." + name + "." + type)
|
||||
, fBytesTx(0)
|
||||
, fBytesRx(0)
|
||||
|
@ -72,7 +71,7 @@ Socket::Socket(Context& context, const string& type, const string& name, const s
|
|||
fRecvQueueWrite.set_option(recv_max);
|
||||
|
||||
// Setup internal queue
|
||||
auto hashed_id = std::hash<std::string>()(fId);
|
||||
auto hashed_id = hash<string>()(fId);
|
||||
auto queue_id = tools::ToString("inproc://TXQUEUE", hashed_id);
|
||||
queue_id = tools::ToString("inproc://RXQUEUE", hashed_id);
|
||||
LOG(debug) << "OFI transport (" << fId << "): " << "Binding RQR: " << queue_id;
|
||||
|
@ -95,9 +94,9 @@ auto Socket::InitOfi(Address addr) -> void
|
|||
hints.set_provider("verbs");
|
||||
}
|
||||
if (fRemoteAddr == addr) {
|
||||
fOfiInfo = tools::make_unique<asiofi::info>(addr.Ip.c_str(), std::to_string(addr.Port).c_str(), 0, hints);
|
||||
fOfiInfo = tools::make_unique<asiofi::info>(addr.Ip.c_str(), to_string(addr.Port).c_str(), 0, hints);
|
||||
} else {
|
||||
fOfiInfo = tools::make_unique<asiofi::info>(addr.Ip.c_str(), std::to_string(addr.Port).c_str(), FI_SOURCE, hints);
|
||||
fOfiInfo = tools::make_unique<asiofi::info>(addr.Ip.c_str(), to_string(addr.Port).c_str(), FI_SOURCE, hints);
|
||||
}
|
||||
|
||||
LOG(debug) << "OFI transport: " << *fOfiInfo;
|
||||
|
@ -121,7 +120,21 @@ try {
|
|||
fPassiveEndpoint = tools::make_unique<asiofi::passive_endpoint>(fContext.GetIoContext(), *fOfiFabric);
|
||||
//fPassiveEndpoint->set_local_address(Context::ConvertAddress(fLocalAddr));
|
||||
|
||||
BindControlEndpoint();
|
||||
assert(!fDataEndpoint);
|
||||
|
||||
fPassiveEndpoint->listen([&](asiofi::info&& info) {
|
||||
LOG(debug) << "OFI transport (" << fId << "): data band connection request received. Accepting ...";
|
||||
fDataEndpoint = tools::make_unique<asiofi::connected_endpoint>(fContext.GetIoContext(), *fOfiDomain, info);
|
||||
fDataEndpoint->enable();
|
||||
fDataEndpoint->accept([&]() {
|
||||
LOG(debug) << "OFI transport (" << fId << "): data band connection accepted.";
|
||||
|
||||
boost::asio::post(fContext.GetIoContext(), bind(&Socket::RecvQueueReader, this));
|
||||
fBound = true;
|
||||
});
|
||||
});
|
||||
|
||||
LOG(debug) << "OFI transport (" << fId << "): data band bound to " << fLocalAddr;
|
||||
|
||||
while (!fBound) {
|
||||
this_thread::sleep_for(chrono::milliseconds(100));
|
||||
|
@ -137,43 +150,6 @@ try {
|
|||
return false;
|
||||
}
|
||||
|
||||
auto Socket::BindControlEndpoint() -> void
|
||||
{
|
||||
assert(!fControlEndpoint);
|
||||
|
||||
fPassiveEndpoint->listen([&](asiofi::info&& info) {
|
||||
LOG(debug) << "OFI transport (" << fId << "): control band connection request received. Accepting ...";
|
||||
fControlEndpoint = tools::make_unique<asiofi::connected_endpoint>(fContext.GetIoContext(), *fOfiDomain, info);
|
||||
fControlEndpoint->enable();
|
||||
fControlEndpoint->accept([&]() {
|
||||
LOG(debug) << "OFI transport (" << fId << "): control band connection accepted.";
|
||||
|
||||
BindDataEndpoint();
|
||||
});
|
||||
});
|
||||
|
||||
LOG(debug) << "OFI transport (" << fId << "): control band bound to " << fLocalAddr;
|
||||
}
|
||||
|
||||
auto Socket::BindDataEndpoint() -> void
|
||||
{
|
||||
assert(!fDataEndpoint);
|
||||
|
||||
fPassiveEndpoint->listen([&](asiofi::info&& info) {
|
||||
LOG(debug) << "OFI transport (" << fId << "): data band connection request received. Accepting ...";
|
||||
fDataEndpoint = tools::make_unique<asiofi::connected_endpoint>(fContext.GetIoContext(), *fOfiDomain, info);
|
||||
fDataEndpoint->enable();
|
||||
fDataEndpoint->accept([&]() {
|
||||
LOG(debug) << "OFI transport (" << fId << "): data band connection accepted.";
|
||||
|
||||
boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::RecvControlQueueReader, this));
|
||||
fBound = true;
|
||||
});
|
||||
});
|
||||
|
||||
LOG(debug) << "OFI transport (" << fId << "): data band bound to " << fLocalAddr;
|
||||
}
|
||||
|
||||
auto Socket::Connect(const string& address) -> bool
|
||||
try {
|
||||
fConnected = false;
|
||||
|
@ -184,7 +160,23 @@ try {
|
|||
|
||||
InitOfi(fRemoteAddr);
|
||||
|
||||
ConnectEndpoint(fControlEndpoint, Band::Control);
|
||||
assert(!fDataEndpoint);
|
||||
|
||||
fDataEndpoint = tools::make_unique<asiofi::connected_endpoint>(fContext.GetIoContext(), *fOfiDomain);
|
||||
fDataEndpoint->enable();
|
||||
|
||||
LOG(debug) << "OFI transport (" << fId << "): Sending data band connection request to " << fRemoteAddr;
|
||||
|
||||
fDataEndpoint->connect(Context::ConvertAddress(fRemoteAddr), [&](asiofi::eq::event event) {
|
||||
LOG(debug) << "OFI transport (" << fId << "): data band conn event happened";
|
||||
if (event == asiofi::eq::event::connected) {
|
||||
LOG(debug) << "OFI transport (" << fId << "): data band connected.";
|
||||
boost::asio::post(fContext.GetIoContext(), bind(&Socket::RecvQueueReader, this));
|
||||
fConnected = true;
|
||||
} else {
|
||||
LOG(error) << "Could not connect on the first try";
|
||||
}
|
||||
});
|
||||
|
||||
while (!fConnected) {
|
||||
this_thread::sleep_for(chrono::milliseconds(100));
|
||||
|
@ -194,74 +186,11 @@ try {
|
|||
} catch (const SilentSocketError& e) {
|
||||
// do not print error in this case, this is handled by FairMQDevice
|
||||
return false;
|
||||
} catch (const std::exception& e) {
|
||||
} catch (const exception& e) {
|
||||
LOG(error) << "OFI transport: " << e.what();
|
||||
return false;
|
||||
}
|
||||
|
||||
auto Socket::ConnectEndpoint(std::unique_ptr<asiofi::connected_endpoint>& endpoint, Band type) -> void
|
||||
{
|
||||
assert(!endpoint);
|
||||
|
||||
std::string band(type == Band::Control ? "control" : "data");
|
||||
|
||||
endpoint = tools::make_unique<asiofi::connected_endpoint>(fContext.GetIoContext(), *fOfiDomain);
|
||||
endpoint->enable();
|
||||
|
||||
LOG(debug) << "OFI transport (" << fId << "): Sending " << band << " band connection request to " << fRemoteAddr;
|
||||
|
||||
endpoint->connect(Context::ConvertAddress(fRemoteAddr), [&, band, type](asiofi::eq::event event) {
|
||||
LOG(debug) << "OFI transport (" << fId << "): " << band << " band conn event happened";
|
||||
if (event == asiofi::eq::event::connected) {
|
||||
LOG(debug) << "OFI transport (" << fId << "): " << band << " band connected.";
|
||||
if (type == Band::Control) {
|
||||
ConnectEndpoint(fDataEndpoint, Band::Data);
|
||||
} else {
|
||||
boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::RecvControlQueueReader, this));
|
||||
fConnected = true;
|
||||
}
|
||||
} else {
|
||||
LOG(error) << "Could not connect on the first try";
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// auto Socket::ReceiveDataAddressAnnouncement() -> void
|
||||
// {
|
||||
// azmq::message ctrl;
|
||||
// auto recv = fControlEndpoint.receive(ctrl);
|
||||
// assert(recv == sizeof(DataAddressAnnouncement)); (void)recv;
|
||||
// auto daa(static_cast<const DataAddressAnnouncement*>(ctrl.data()));
|
||||
// assert(daa->type == ControlMessageType::DataAddressAnnouncement);
|
||||
//
|
||||
// sockaddr_in remoteAddr;
|
||||
// remoteAddr.sin_family = AF_INET;
|
||||
// remoteAddr.sin_port = daa->port;
|
||||
// remoteAddr.sin_addr.s_addr = daa->ipv4;
|
||||
//
|
||||
// auto addr = Context::ConvertAddress(remoteAddr);
|
||||
// addr.Protocol = fRemoteDataAddr.Protocol;
|
||||
// LOG(debug) << "OFI transport (" << fId << "): Data address announcement of remote endpoint received: " << addr;
|
||||
// fRemoteDataAddr = addr;
|
||||
// }
|
||||
//
|
||||
// auto Socket::AnnounceDataAddress() -> void
|
||||
// {
|
||||
// fLocalDataAddr = fDataEndpoint->get_local_address();
|
||||
// LOG(debug) << "Address of local ofi endpoint in socket " << fId << ": " << Context::ConvertAddress(fLocalDataAddr);
|
||||
//
|
||||
// Create new data address announcement message
|
||||
// auto daa = MakeControlMessage<DataAddressAnnouncement>();
|
||||
// auto addr = Context::ConvertAddress(fLocalDataAddr);
|
||||
// daa.ipv4 = addr.sin_addr.s_addr;
|
||||
// daa.port = addr.sin_port;
|
||||
//
|
||||
// auto sent = fControlEndpoint.send(boost::asio::buffer(daa));
|
||||
// assert(sent == sizeof(addr)); (void)sent;
|
||||
//
|
||||
// LOG(debug) << "OFI transport (" << fId << "): data band address " << fLocalDataAddr << " announced.";
|
||||
// }
|
||||
|
||||
auto Socket::Send(MessagePtr& msg, const int /*timeout*/) -> int
|
||||
{
|
||||
// LOG(debug) << "OFI transport (" << fId << "): ENTER Send: data=" << msg->GetData() << ",size=" << msg->GetSize();
|
||||
|
@ -271,7 +200,7 @@ auto Socket::Send(MessagePtr& msg, const int /*timeout*/) -> int
|
|||
size_t size = msg->GetSize();
|
||||
OnSend(msg);
|
||||
return size;
|
||||
} catch (const std::exception& e) {
|
||||
} catch (const exception& e) {
|
||||
LOG(error) << e.what();
|
||||
return -1;
|
||||
} catch (const boost::system::error_code& e) {
|
||||
|
@ -284,68 +213,36 @@ auto Socket::OnSend(MessagePtr& msg) -> void
|
|||
{
|
||||
// LOG(debug) << "OFI transport (" << fId << "): ENTER OnSend";
|
||||
|
||||
auto size = msg->GetSize();
|
||||
auto size = 2000000;
|
||||
|
||||
// LOG(debug) << "OFI transport (" << fId << "): OnSend: data=" << msg->GetData() << ",size=" << msg->GetSize();
|
||||
|
||||
// Create and send control message
|
||||
auto ctrl = MakeControlMessageWithPmr<PostBuffer>(&fControlMemPool);
|
||||
ctrl->size = size;
|
||||
auto ctrl_msg = boost::asio::mutable_buffer(ctrl.get(), sizeof(PostBuffer));
|
||||
boost::asio::mutable_buffer buffer(msg->GetData(), size);
|
||||
|
||||
if (fNeedOfiMemoryRegistration) {
|
||||
asiofi::memory_region mr(*fOfiDomain, ctrl_msg, asiofi::mr::access::send);
|
||||
asiofi::memory_region mr(*fOfiDomain, buffer, asiofi::mr::access::send);
|
||||
auto desc = mr.desc();
|
||||
fControlEndpoint->send(
|
||||
ctrl_msg, desc, [&, ctrl2 = std::move(ctrl), mr2 = std::move(mr)](boost::asio::mutable_buffer) mutable {
|
||||
// LOG(debug) << "OFI transport (" << fId << "): >>>>> Control message sent";
|
||||
|
||||
fDataEndpoint->send(buffer, desc, [&, size, msg2 = move(msg), mr2 = move(mr)](boost::asio::mutable_buffer) mutable {
|
||||
// LOG(debug) << "OFI transport (" << fId << "): >>>>> Data buffer sent";
|
||||
fBytesTx += size;
|
||||
fMessagesTx++;
|
||||
fSendSem.async_signal([&](const boost::system::error_code& ec) {
|
||||
if (!ec) {
|
||||
// LOG(debug) << "OFI transport (" << fId << "): > Signal fSendSem=" << fSendSem.get_value();
|
||||
}
|
||||
});
|
||||
});
|
||||
} else {
|
||||
fControlEndpoint->send(ctrl_msg,
|
||||
[&, ctrl2 = std::move(ctrl)](boost::asio::mutable_buffer) mutable {
|
||||
// LOG(debug) << "OFI transport (" << fId << "): >>>>> Control message sent";
|
||||
});
|
||||
}
|
||||
|
||||
if (size) {
|
||||
boost::asio::mutable_buffer buffer(msg->GetData(), size);
|
||||
|
||||
if (fNeedOfiMemoryRegistration) {
|
||||
asiofi::memory_region mr(*fOfiDomain, buffer, asiofi::mr::access::send);
|
||||
auto desc = mr.desc();
|
||||
|
||||
fDataEndpoint->send(buffer,
|
||||
desc,
|
||||
[&, size, msg2 = std::move(msg), mr2 = std::move(mr)](
|
||||
boost::asio::mutable_buffer) mutable {
|
||||
// LOG(debug) << "OFI transport (" << fId << "): >>>>> Data buffer sent";
|
||||
fBytesTx += size;
|
||||
fMessagesTx++;
|
||||
fSendSem.async_signal([&](const boost::system::error_code& ec) {
|
||||
if (!ec) {
|
||||
// LOG(debug) << "OFI transport (" << fId << "): > Signal fSendSem=" << fSendSem.get_value();
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
} else {
|
||||
fDataEndpoint->send(
|
||||
buffer, [&, size, msg2 = std::move(msg)](boost::asio::mutable_buffer) mutable {
|
||||
// LOG(debug) << "OFI transport (" << fId << "): >>>>> Data buffer sent";
|
||||
fBytesTx += size;
|
||||
fMessagesTx++;
|
||||
fSendSem.async_signal([&](const boost::system::error_code& ec) {
|
||||
if (!ec) {
|
||||
// LOG(debug) << "OFI transport (" << fId << "): > Signal fSendSem=" << fSendSem.get_value();
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
} else {
|
||||
++fMessagesTx;
|
||||
fSendSem.async_signal([&](const boost::system::error_code& ec) {
|
||||
if (!ec) {
|
||||
// LOG(debug) << "OFI transport (" << fId << "): > Signal fSendSem=" << fSendSem.get_value();
|
||||
}
|
||||
fDataEndpoint->send(buffer, [&, size, msg2 = move(msg)](boost::asio::mutable_buffer) mutable {
|
||||
// LOG(debug) << "OFI transport (" << fId << "): >>>>> Data buffer sent";
|
||||
fBytesTx += size;
|
||||
fMessagesTx++;
|
||||
fSendSem.async_signal([&](const boost::system::error_code& ec) {
|
||||
if (!ec) {
|
||||
// LOG(debug) << "OFI transport (" << fId << "): > Signal fSendSem=" << fSendSem.get_value();
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -358,9 +255,9 @@ try {
|
|||
azmq::message zmsg;
|
||||
auto recv = fRecvQueueRead.receive(zmsg);
|
||||
|
||||
size_t size(0);
|
||||
size_t size = 0;
|
||||
if (recv > 0) {
|
||||
msg = std::move(*(static_cast<MessagePtr*>(zmsg.buffer().data())));
|
||||
msg = move(*(static_cast<MessagePtr*>(zmsg.buffer().data())));
|
||||
size = msg->GetSize();
|
||||
}
|
||||
|
||||
|
@ -369,7 +266,7 @@ try {
|
|||
|
||||
// LOG(debug) << "OFI transport (" << fId << "): LEAVE Receive";
|
||||
return size;
|
||||
} catch (const std::exception& e) {
|
||||
} catch (const exception& e) {
|
||||
LOG(error) << e.what();
|
||||
return -1;
|
||||
} catch (const boost::system::error_code& e) {
|
||||
|
@ -377,111 +274,59 @@ try {
|
|||
return -1;
|
||||
}
|
||||
|
||||
auto Socket::Receive(std::vector<MessagePtr>& msgVec, const int timeout) -> int64_t
|
||||
auto Socket::Receive(vector<MessagePtr>& msgVec, const int timeout) -> int64_t
|
||||
{
|
||||
return ReceiveImpl(msgVec, 0, timeout);
|
||||
}
|
||||
|
||||
auto Socket::RecvControlQueueReader() -> void
|
||||
auto Socket::RecvQueueReader() -> void
|
||||
{
|
||||
fRecvSem.async_wait([&](const boost::system::error_code& ec) {
|
||||
if (!ec) {
|
||||
auto ctrl = MakeControlMessageWithPmr<PostBuffer>(&fControlMemPool);
|
||||
auto ctrl_msg = boost::asio::mutable_buffer(ctrl.get(), sizeof(PostBuffer));
|
||||
auto size = 2000000;
|
||||
|
||||
auto msg = fContext.MakeReceiveMessage(size);
|
||||
boost::asio::mutable_buffer buffer(msg->GetData(), size);
|
||||
|
||||
if (fNeedOfiMemoryRegistration) {
|
||||
asiofi::memory_region mr(*fOfiDomain, ctrl_msg, asiofi::mr::access::recv);
|
||||
asiofi::memory_region mr(*fOfiDomain, buffer, asiofi::mr::access::recv);
|
||||
auto desc = mr.desc();
|
||||
|
||||
fControlEndpoint->recv(
|
||||
ctrl_msg, desc, [&, ctrl2 = std::move(ctrl), mr2 = std::move(mr)](boost::asio::mutable_buffer) mutable {
|
||||
OnRecvControl(std::move(ctrl2));
|
||||
fDataEndpoint->recv(buffer, desc, [&, msg2 = move(msg), mr2 = move(mr)](boost::asio::mutable_buffer) mutable {
|
||||
MessagePtr* msgptr(new std::unique_ptr<Message>(move(msg2)));
|
||||
fRecvQueueWrite.async_send(azmq::message(boost::asio::const_buffer(msgptr, sizeof(MessagePtr))), [&](const boost::system::error_code& ec2, size_t /*bytes_transferred2*/) {
|
||||
if (!ec2) {
|
||||
// LOG(debug) << "OFI transport (" << fId << "): <<<<< Data buffer received, bytes_transferred2=" << bytes_transferred2;
|
||||
fRecvSem.async_signal([&](const boost::system::error_code& ec3) {
|
||||
if (!ec3) {
|
||||
// LOG(debug) << "OFI transport (" << fId << "): < Signal fRecvSem";
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
});
|
||||
} else {
|
||||
fControlEndpoint->recv(
|
||||
ctrl_msg, [&, ctrl2 = std::move(ctrl)](boost::asio::mutable_buffer) mutable {
|
||||
OnRecvControl(std::move(ctrl2));
|
||||
fDataEndpoint->recv(buffer, [&, msg2 = move(msg)](boost::asio::mutable_buffer) mutable {
|
||||
MessagePtr* msgptr(new std::unique_ptr<Message>(move(msg2)));
|
||||
fRecvQueueWrite.async_send(azmq::message(boost::asio::const_buffer(msgptr, sizeof(MessagePtr))), [&](const boost::system::error_code& ec2, size_t /*bytes_transferred2*/) {
|
||||
if (!ec2) {
|
||||
// LOG(debug) << "OFI transport (" << fId << "): <<<<< Data buffer received, bytes_transferred2=" << bytes_transferred2;
|
||||
fRecvSem.async_signal([&](const boost::system::error_code& ec3) {
|
||||
if (!ec3) {
|
||||
// LOG(debug) << "OFI transport (" << fId << "): < Signal fRecvSem";
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
boost::asio::post(fContext.GetIoContext(), bind(&Socket::RecvQueueReader, this));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
auto Socket::OnRecvControl(ofi::unique_ptr<PostBuffer> ctrl) -> void
|
||||
{
|
||||
// LOG(debug) << "OFI transport (" << fId << "): ENTER OnRecvControl";
|
||||
|
||||
auto size = ctrl->size;
|
||||
// LOG(debug) << "OFI transport (" << fId << "): OnRecvControl: PostBuffer.size=" << size;
|
||||
|
||||
// Receive data
|
||||
if (size) {
|
||||
auto msg = fContext.MakeReceiveMessage(size);
|
||||
boost::asio::mutable_buffer buffer(msg->GetData(), size);
|
||||
|
||||
if (fNeedOfiMemoryRegistration) {
|
||||
asiofi::memory_region mr(*fOfiDomain, buffer, asiofi::mr::access::recv);
|
||||
auto desc = mr.desc();
|
||||
|
||||
fDataEndpoint->recv(
|
||||
buffer,
|
||||
desc,
|
||||
[&, msg2 = std::move(msg), mr2 = std::move(mr)](
|
||||
boost::asio::mutable_buffer) mutable {
|
||||
MessagePtr* msgptr(new std::unique_ptr<Message>(std::move(msg2)));
|
||||
fRecvQueueWrite.async_send(
|
||||
azmq::message(boost::asio::const_buffer(msgptr, sizeof(MessagePtr))),
|
||||
[&](const boost::system::error_code& ec, size_t /*bytes_transferred2*/) {
|
||||
if (!ec) {
|
||||
// LOG(debug) << "OFI transport (" << fId << "): <<<<< Data buffer received, bytes_transferred2=" << bytes_transferred2;
|
||||
fRecvSem.async_signal([&](const boost::system::error_code& ec2) {
|
||||
if (!ec2) {
|
||||
//LOG(debug) << "OFI transport (" << fId << "): < Signal fRecvSem";
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
} else {
|
||||
fDataEndpoint->recv(
|
||||
buffer, [&, msg2 = std::move(msg)](boost::asio::mutable_buffer) mutable {
|
||||
MessagePtr* msgptr(new std::unique_ptr<Message>(std::move(msg2)));
|
||||
fRecvQueueWrite.async_send(
|
||||
azmq::message(boost::asio::const_buffer(msgptr, sizeof(MessagePtr))),
|
||||
[&](const boost::system::error_code& ec, size_t /*bytes_transferred2*/) {
|
||||
if (!ec) {
|
||||
// LOG(debug) << "OFI transport (" << fId << "): <<<<< Data buffer received, bytes_transferred2=" << bytes_transferred2;
|
||||
fRecvSem.async_signal([&](const boost::system::error_code& ec2) {
|
||||
if (!ec2) {
|
||||
// LOG(debug) << "OFI transport (" << fId << "): < Signal fRecvSem";
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
} else {
|
||||
fRecvQueueWrite.async_send(
|
||||
azmq::message(boost::asio::const_buffer(nullptr, 0)),
|
||||
[&](const boost::system::error_code& ec, size_t /*bytes_transferred2*/) {
|
||||
if (!ec) {
|
||||
// LOG(debug) << "OFI transport (" << fId << "): <<<<< Data buffer received, bytes_transferred2=" << bytes_transferred2;
|
||||
fRecvSem.async_signal([&](const boost::system::error_code& ec2) {
|
||||
if (!ec2) {
|
||||
// LOG(debug) << "OFI transport (" << fId << "): < Signal fRecvSem";
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::RecvControlQueueReader, this));
|
||||
|
||||
// LOG(debug) << "OFI transport (" << fId << "): LEAVE OnRecvControl";
|
||||
}
|
||||
|
||||
auto Socket::Send(std::vector<MessagePtr>& msgVec, const int timeout) -> int64_t
|
||||
auto Socket::Send(vector<MessagePtr>& msgVec, const int timeout) -> int64_t
|
||||
{
|
||||
return SendImpl(msgVec, 0, timeout);
|
||||
}
|
||||
|
@ -489,172 +334,11 @@ auto Socket::Send(std::vector<MessagePtr>& msgVec, const int timeout) -> int64_t
|
|||
auto Socket::SendImpl(vector<FairMQMessagePtr>& /*msgVec*/, const int /*flags*/, const int /*timeout*/) -> int64_t
|
||||
{
|
||||
throw SocketError{"Not yet implemented."};
|
||||
// const unsigned int vecSize = msgVec.size();
|
||||
// int elapsed = 0;
|
||||
//
|
||||
// // Sending vector typicaly handles more then one part
|
||||
// if (vecSize > 1)
|
||||
// {
|
||||
// int64_t totalSize = 0;
|
||||
// int nbytes = -1;
|
||||
// bool repeat = false;
|
||||
//
|
||||
// while (true && !fInterrupted)
|
||||
// {
|
||||
// for (unsigned int i = 0; i < vecSize; ++i)
|
||||
// {
|
||||
// nbytes = zmq_msg_send(static_cast<FairMQMessageSHM*>(msgVec[i].get())->GetMessage(),
|
||||
// fSocket,
|
||||
// (i < vecSize - 1) ? ZMQ_SNDMORE|flags : flags);
|
||||
// if (nbytes >= 0)
|
||||
// {
|
||||
// static_cast<FairMQMessageSHM*>(msgVec[i].get())->fQueued = true;
|
||||
// size_t size = msgVec[i]->GetSize();
|
||||
//
|
||||
// totalSize += size;
|
||||
// }
|
||||
// else
|
||||
// {
|
||||
// // according to ZMQ docs, this can only occur for the first part
|
||||
// if (zmq_errno() == EAGAIN)
|
||||
// {
|
||||
// if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0))
|
||||
// {
|
||||
// if (timeout)
|
||||
// {
|
||||
// elapsed += fSndTimeout;
|
||||
// if (elapsed >= timeout)
|
||||
// {
|
||||
// return -2;
|
||||
// }
|
||||
// }
|
||||
// repeat = true;
|
||||
// break;
|
||||
// }
|
||||
// else
|
||||
// {
|
||||
// return -2;
|
||||
// }
|
||||
// }
|
||||
// if (zmq_errno() == ETERM)
|
||||
// {
|
||||
// LOG(info) << "terminating socket " << fId;
|
||||
// return -1;
|
||||
// }
|
||||
// LOG(error) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno);
|
||||
// return nbytes;
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// if (repeat)
|
||||
// {
|
||||
// continue;
|
||||
// }
|
||||
//
|
||||
// // store statistics on how many messages have been sent (handle all parts as a single message)
|
||||
// ++fMessagesTx;
|
||||
// fBytesTx += totalSize;
|
||||
// return totalSize;
|
||||
// }
|
||||
//
|
||||
// return -1;
|
||||
// } // If there's only one part, send it as a regular message
|
||||
// else if (vecSize == 1)
|
||||
// {
|
||||
// return Send(msgVec.back(), flags);
|
||||
// }
|
||||
// else // if the vector is empty, something might be wrong
|
||||
// {
|
||||
// LOG(warn) << "Will not send empty vector";
|
||||
// return -1;
|
||||
// }
|
||||
}
|
||||
|
||||
auto Socket::ReceiveImpl(vector<FairMQMessagePtr>& /*msgVec*/, const int /*flags*/, const int /*timeout*/) -> int64_t
|
||||
{
|
||||
throw SocketError{"Not yet implemented."};
|
||||
// int64_t totalSize = 0;
|
||||
// int64_t more = 0;
|
||||
// bool repeat = false;
|
||||
// int elapsed = 0;
|
||||
//
|
||||
// while (true)
|
||||
// {
|
||||
// // Warn if the vector is filled before Receive() and empty it.
|
||||
// // if (msgVec.size() > 0)
|
||||
// // {
|
||||
// // LOG(warn) << "Message vector contains elements before Receive(), they will be deleted!";
|
||||
// // msgVec.clear();
|
||||
// // }
|
||||
//
|
||||
// totalSize = 0;
|
||||
// more = 0;
|
||||
// repeat = false;
|
||||
//
|
||||
// do
|
||||
// {
|
||||
// FairMQMessagePtr part(new FairMQMessageSHM(fManager, GetTransport()));
|
||||
// zmq_msg_t* msgPtr = static_cast<FairMQMessageSHM*>(part.get())->GetMessage();
|
||||
//
|
||||
// int nbytes = zmq_msg_recv(msgPtr, fSocket, flags);
|
||||
// if (nbytes == 0)
|
||||
// {
|
||||
// msgVec.push_back(move(part));
|
||||
// }
|
||||
// else if (nbytes > 0)
|
||||
// {
|
||||
// MetaHeader* hdr = static_cast<MetaHeader*>(zmq_msg_data(msgPtr));
|
||||
// size_t size = 0;
|
||||
// static_cast<FairMQMessageSHM*>(part.get())->fHandle = hdr->fHandle;
|
||||
// static_cast<FairMQMessageSHM*>(part.get())->fSize = hdr->fSize;
|
||||
// static_cast<FairMQMessageSHM*>(part.get())->fRegionId = hdr->fRegionId;
|
||||
// static_cast<FairMQMessageSHM*>(part.get())->fHint = hdr->fHint;
|
||||
// size = part->GetSize();
|
||||
//
|
||||
// msgVec.push_back(move(part));
|
||||
//
|
||||
// totalSize += size;
|
||||
// }
|
||||
// else if (zmq_errno() == EAGAIN)
|
||||
// {
|
||||
// if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0))
|
||||
// {
|
||||
// if (timeout)
|
||||
// {
|
||||
// elapsed += fSndTimeout;
|
||||
// if (elapsed >= timeout)
|
||||
// {
|
||||
// return -2;
|
||||
// }
|
||||
// }
|
||||
// repeat = true;
|
||||
// break;
|
||||
// }
|
||||
// else
|
||||
// {
|
||||
// return -2;
|
||||
// }
|
||||
// }
|
||||
// else
|
||||
// {
|
||||
// return nbytes;
|
||||
// }
|
||||
//
|
||||
// size_t more_size = sizeof(more);
|
||||
// zmq_getsockopt(fSocket, ZMQ_RCVMORE, &more, &more_size);
|
||||
// }
|
||||
// while (more);
|
||||
//
|
||||
// if (repeat)
|
||||
// {
|
||||
// continue;
|
||||
// }
|
||||
//
|
||||
// // store statistics on how many messages have been received (handle all parts as a single message)
|
||||
// ++fMessagesRx;
|
||||
// fBytesRx += totalSize;
|
||||
// return totalSize;
|
||||
// }
|
||||
}
|
||||
|
||||
auto Socket::Close() -> void {}
|
||||
|
|
|
@ -82,12 +82,11 @@ class Socket final : public fair::mq::Socket
|
|||
|
||||
private:
|
||||
Context& fContext;
|
||||
asiofi::allocated_pool_resource fControlMemPool;
|
||||
std::unique_ptr<asiofi::info> fOfiInfo;
|
||||
std::unique_ptr<asiofi::fabric> fOfiFabric;
|
||||
std::unique_ptr<asiofi::domain> fOfiDomain;
|
||||
std::unique_ptr<asiofi::passive_endpoint> fPassiveEndpoint;
|
||||
std::unique_ptr<asiofi::connected_endpoint> fDataEndpoint, fControlEndpoint;
|
||||
std::unique_ptr<asiofi::connected_endpoint> fDataEndpoint;
|
||||
std::string fId;
|
||||
std::atomic<unsigned long> fBytesTx;
|
||||
std::atomic<unsigned long> fBytesRx;
|
||||
|
@ -104,9 +103,7 @@ class Socket final : public fair::mq::Socket
|
|||
std::atomic<bool> fConnected;
|
||||
|
||||
auto OnSend(MessagePtr& msg) -> void;
|
||||
auto RecvControlQueueReader() -> void;
|
||||
auto OnRecvControl(ofi::unique_ptr<PostBuffer> ctrl) -> void;
|
||||
auto OnReceive() -> void;
|
||||
auto RecvQueueReader() -> void;
|
||||
auto ReceiveImpl(MessagePtr& msg, const int flags, const int timeout) -> int;
|
||||
auto SendImpl(std::vector<MessagePtr>& msgVec, const int flags, const int timeout) -> int64_t;
|
||||
auto ReceiveImpl(std::vector<MessagePtr>& msgVec, const int flags, const int timeout) -> int64_t;
|
||||
|
@ -114,10 +111,6 @@ class Socket final : public fair::mq::Socket
|
|||
// auto WaitForControlPeer() -> void;
|
||||
// auto AnnounceDataAddress() -> void;
|
||||
auto InitOfi(Address addr) -> void;
|
||||
auto BindControlEndpoint() -> void;
|
||||
auto BindDataEndpoint() -> void;
|
||||
enum class Band { Control, Data };
|
||||
auto ConnectEndpoint(std::unique_ptr<asiofi::connected_endpoint>& endpoint, Band type) -> void;
|
||||
// auto ReceiveDataAddressAnnouncement() -> void;
|
||||
}; /* class Socket */
|
||||
|
||||
|
|
Loading…
Reference in New Issue
Block a user