mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-15 09:31:45 +00:00
FairMQ: Fix missing ofi completion events
This commit is contained in:
committed by
Mohammad Al-Turany
parent
144aa912d7
commit
c5072ea425
@@ -85,7 +85,7 @@ auto Socket::Bind(const string& address) -> bool
|
||||
try {
|
||||
auto addr = Context::VerifyAddress(address);
|
||||
BindControlSocket(addr);
|
||||
fContext.InitOfi(ConnectionType::Bind, address);
|
||||
fContext.InitOfi(ConnectionType::Bind, addr);
|
||||
InitDataEndpoint();
|
||||
fWaitingForControlPeer = true;
|
||||
return true;
|
||||
@@ -106,7 +106,7 @@ auto Socket::Connect(const string& address) -> void
|
||||
{
|
||||
auto addr = Context::VerifyAddress(address);
|
||||
ConnectControlSocket(addr);
|
||||
fContext.InitOfi(ConnectionType::Connect, address);
|
||||
fContext.InitOfi(ConnectionType::Connect, addr);
|
||||
InitDataEndpoint();
|
||||
fWaitingForControlPeer = true;
|
||||
}
|
||||
@@ -207,7 +207,7 @@ try {
|
||||
auto Socket::SendControlMessage(unique_ptr<ControlMessage> ctrl) -> void
|
||||
{
|
||||
assert(fControlSocket);
|
||||
LOG(debug) << "About to send control message: " << ctrl->DebugString();
|
||||
// LOG(debug) << "About to send control message: " << ctrl->DebugString();
|
||||
|
||||
// Serialize
|
||||
string* str = new string();
|
||||
@@ -217,8 +217,10 @@ auto Socket::SendControlMessage(unique_ptr<ControlMessage> ctrl) -> void
|
||||
assert(ret == 0);
|
||||
|
||||
// Send
|
||||
if (zmq_msg_send(&msg, fControlSocket, 0) == -1)
|
||||
if (zmq_msg_send(&msg, fControlSocket, 0) == -1) {
|
||||
zmq_msg_close(&msg);
|
||||
throw SocketError(tools::ToString("Failed to send control message, reason: ", zmq_strerror(errno)));
|
||||
}
|
||||
}
|
||||
|
||||
auto Socket::ReceiveControlMessage() -> unique_ptr<ControlMessage>
|
||||
@@ -229,14 +231,17 @@ auto Socket::ReceiveControlMessage() -> unique_ptr<ControlMessage>
|
||||
zmq_msg_t msg;
|
||||
auto ret = zmq_msg_init(&msg);
|
||||
assert(ret == 0);
|
||||
if (zmq_msg_recv(&msg, fControlSocket, 0) == -1)
|
||||
if (zmq_msg_recv(&msg, fControlSocket, 0) == -1) {
|
||||
zmq_msg_close(&msg);
|
||||
throw SocketError(tools::ToString("Failed to receive control message, reason: ", zmq_strerror(errno)));
|
||||
}
|
||||
|
||||
// Deserialize
|
||||
auto ctrl = tools::make_unique<ControlMessage>();
|
||||
ctrl->ParseFromArray(zmq_msg_data(&msg), zmq_msg_size(&msg));
|
||||
|
||||
LOG(debug) << "Received control message: " << ctrl->DebugString();
|
||||
zmq_msg_close(&msg);
|
||||
// LOG(debug) << "Received control message: " << ctrl->DebugString();
|
||||
return ctrl;
|
||||
}
|
||||
|
||||
@@ -269,6 +274,9 @@ auto Socket::WaitForControlPeer() -> void
|
||||
string remoteIp(inet_ntoa(remoteAddr.sin_addr));
|
||||
int remotePort = ntohs(remoteAddr.sin_port);
|
||||
LOG(debug) << "Accepted control peer connection from " << remoteIp << ":" << remotePort;
|
||||
|
||||
// sucks, but the above event does not guarantee the socket is operational ...
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(200));
|
||||
} else if (event == ZMQ_EVENT_CONNECTED) {
|
||||
LOG(debug) << "Connected successfully to control peer";
|
||||
} else {
|
||||
@@ -318,7 +326,7 @@ try {
|
||||
throw SocketError(tools::ToString("Failed posting ofi send buffer, reason: ", fi_strerror(ret)));
|
||||
|
||||
fi_cq_err_entry cqEntry;
|
||||
ret = fi_cq_sread(fDataCompletionQueueTx, &cqEntry, 1, nullptr, 1000);
|
||||
ret = fi_cq_sread(fDataCompletionQueueTx, &cqEntry, 1, nullptr, -1);
|
||||
if (ret != 1)
|
||||
throw SocketError(tools::ToString("Failed reading ofi tx completion queue event, reason: ", fi_strerror(ret)));
|
||||
}
|
||||
@@ -350,12 +358,13 @@ try {
|
||||
assert(ctrl->has_post_buffer());
|
||||
auto postBuffer = ctrl->post_buffer();
|
||||
auto size = postBuffer.size();
|
||||
LOG(debug) << "Received post buffer control message with size: " << size;
|
||||
|
||||
// Receive data
|
||||
if (size) {
|
||||
msg->Rebuild(size);
|
||||
auto ret = fi_recv(fDataEndpoint, msg->GetData(), msg->GetSize(), nullptr, fRemoteDataAddr, nullptr);
|
||||
auto buf = msg->GetData();
|
||||
auto size2 = msg->GetSize();
|
||||
auto ret = fi_recv(fDataEndpoint, buf, size2, nullptr, fRemoteDataAddr, nullptr);
|
||||
if (ret != FI_SUCCESS)
|
||||
throw SocketError(tools::ToString("Failed posting ofi receive buffer, reason: ", fi_strerror(ret)));
|
||||
|
||||
@@ -368,9 +377,11 @@ try {
|
||||
SendControlMessage(move(ctrl2));
|
||||
|
||||
fi_cq_err_entry cqEntry;
|
||||
ret = fi_cq_sread(fDataCompletionQueueRx, &cqEntry, 1, nullptr, 1000);
|
||||
ret = fi_cq_sread(fDataCompletionQueueRx, &cqEntry, 1, nullptr, -1);
|
||||
if (ret != 1)
|
||||
throw SocketError(tools::ToString("Failed reading ofi rx completion queue event, reason: ", fi_strerror(ret)));
|
||||
assert(cqEntry.len == size2);
|
||||
assert(cqEntry.buf == buf);
|
||||
}
|
||||
|
||||
return size;
|
||||
|
Reference in New Issue
Block a user