mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-15 09:31:45 +00:00
FairMQ: Implement blocking ofi::Socket Send/Receive with FI_MSG
Completion events are not yet working.
This commit is contained in:
committed by
Mohammad Al-Turany
parent
8f5b888314
commit
144aa912d7
@@ -42,6 +42,7 @@ Socket::Socket(Context& context, const string& type, const string& name, const s
|
||||
, fRcvTimeout(100)
|
||||
, fContext(context)
|
||||
, fWaitingForControlPeer(false)
|
||||
, fIoStrand(fContext.GetIoContext())
|
||||
{
|
||||
if (type != "pair") {
|
||||
throw SocketError{tools::ToString("Socket type '", type, "' not implemented for ofi transport.")};
|
||||
@@ -206,6 +207,7 @@ try {
|
||||
auto Socket::SendControlMessage(unique_ptr<ControlMessage> ctrl) -> void
|
||||
{
|
||||
assert(fControlSocket);
|
||||
LOG(debug) << "About to send control message: " << ctrl->DebugString();
|
||||
|
||||
// Serialize
|
||||
string* str = new string();
|
||||
@@ -234,6 +236,7 @@ auto Socket::ReceiveControlMessage() -> unique_ptr<ControlMessage>
|
||||
auto ctrl = tools::make_unique<ControlMessage>();
|
||||
ctrl->ParseFromArray(zmq_msg_data(&msg), zmq_msg_size(&msg));
|
||||
|
||||
LOG(debug) << "Received control message: " << ctrl->DebugString();
|
||||
return ctrl;
|
||||
}
|
||||
|
||||
@@ -293,11 +296,36 @@ try {
|
||||
ProcessDataAddressAnnouncement(ReceiveControlMessage());
|
||||
}
|
||||
|
||||
auto ret = zmq_send(fControlSocket, nullptr, 0, flags);
|
||||
if (ret == EAGAIN) throw SilentSocketError("EAGAIN");
|
||||
if (ret == -1) throw SocketError(tools::ToString("Failed sending control message on socket ", fId, ", reason: ", zmq_strerror(errno)));
|
||||
auto size = msg->GetSize();
|
||||
|
||||
return ret;
|
||||
// Create and send control message
|
||||
auto ctrl = tools::make_unique<ControlMessage>();
|
||||
auto buf = tools::make_unique<PostBuffer>();
|
||||
buf->set_size(size);
|
||||
ctrl->set_allocated_post_buffer(buf.release());
|
||||
assert(ctrl->IsInitialized());
|
||||
SendControlMessage(move(ctrl));
|
||||
|
||||
if (size) {
|
||||
// Receive and process control message
|
||||
auto ctrl2 = ReceiveControlMessage();
|
||||
assert(ctrl2->has_post_buffer_acknowledgement());
|
||||
assert(ctrl2->post_buffer_acknowledgement().size() == size);
|
||||
|
||||
// Send data
|
||||
auto ret = fi_send(fDataEndpoint, msg->GetData(), size, nullptr, fRemoteDataAddr, nullptr);
|
||||
if (ret != FI_SUCCESS)
|
||||
throw SocketError(tools::ToString("Failed posting ofi send buffer, reason: ", fi_strerror(ret)));
|
||||
|
||||
fi_cq_err_entry cqEntry;
|
||||
ret = fi_cq_sread(fDataCompletionQueueTx, &cqEntry, 1, nullptr, 1000);
|
||||
if (ret != 1)
|
||||
throw SocketError(tools::ToString("Failed reading ofi tx completion queue event, reason: ", fi_strerror(ret)));
|
||||
}
|
||||
|
||||
// TODO free msg on tx completion?
|
||||
|
||||
return size;
|
||||
}
|
||||
catch (const SilentSocketError& e)
|
||||
{
|
||||
@@ -317,11 +345,35 @@ try {
|
||||
ProcessDataAddressAnnouncement(ReceiveControlMessage());
|
||||
}
|
||||
|
||||
auto ret = zmq_recv(fControlSocket, nullptr, 0, flags);
|
||||
if (ret == EAGAIN) throw SilentSocketError("EAGAIN");
|
||||
if (ret == -1) throw SocketError(tools::ToString("Failed sending control message on socket ", fId, ", reason: ", zmq_strerror(errno)));
|
||||
// Receive and process control message
|
||||
auto ctrl = ReceiveControlMessage();
|
||||
assert(ctrl->has_post_buffer());
|
||||
auto postBuffer = ctrl->post_buffer();
|
||||
auto size = postBuffer.size();
|
||||
LOG(debug) << "Received post buffer control message with size: " << size;
|
||||
|
||||
return ret;
|
||||
// Receive data
|
||||
if (size) {
|
||||
msg->Rebuild(size);
|
||||
auto ret = fi_recv(fDataEndpoint, msg->GetData(), msg->GetSize(), nullptr, fRemoteDataAddr, nullptr);
|
||||
if (ret != FI_SUCCESS)
|
||||
throw SocketError(tools::ToString("Failed posting ofi receive buffer, reason: ", fi_strerror(ret)));
|
||||
|
||||
// Create and send control message
|
||||
auto ctrl2 = tools::make_unique<ControlMessage>();
|
||||
auto ack = tools::make_unique<PostBufferAcknowledgement>();
|
||||
ack->set_size(msg->GetSize());
|
||||
ctrl2->set_allocated_post_buffer_acknowledgement(ack.release());
|
||||
assert(ctrl2->IsInitialized());
|
||||
SendControlMessage(move(ctrl2));
|
||||
|
||||
fi_cq_err_entry cqEntry;
|
||||
ret = fi_cq_sread(fDataCompletionQueueRx, &cqEntry, 1, nullptr, 1000);
|
||||
if (ret != 1)
|
||||
throw SocketError(tools::ToString("Failed reading ofi rx completion queue event, reason: ", fi_strerror(ret)));
|
||||
}
|
||||
|
||||
return size;
|
||||
}
|
||||
catch (const SilentSocketError& e)
|
||||
{
|
||||
|
Reference in New Issue
Block a user