mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-15 09:31:45 +00:00
Reach compilable state with asiofi again
This commit is contained in:
committed by
Dennis Klein
parent
0ff8eaf84d
commit
1c5d7ca46a
@@ -39,15 +39,17 @@ Socket::Socket(Context& context, const string& type, const string& name, const s
|
||||
, fId(id + "." + name + "." + type)
|
||||
, fControlSocket(nullptr)
|
||||
, fMonitorSocket(nullptr)
|
||||
, fSndTimeout(100)
|
||||
, fRcvTimeout(100)
|
||||
, fContext(context)
|
||||
, fWaitingForControlPeer(false)
|
||||
, fIoStrand(fContext.GetIoContext())
|
||||
, fDataEndpoint(nullptr)
|
||||
, fId(id + "." + name + "." + type)
|
||||
, fBytesTx(0)
|
||||
, fBytesRx(0)
|
||||
, fMessagesTx(0)
|
||||
, fMessagesRx(0)
|
||||
, fContext(context)
|
||||
, fWaitingForControlPeer(false)
|
||||
, fIoStrand(fContext.GetIoContext())
|
||||
, fSndTimeout(100)
|
||||
, fRcvTimeout(100)
|
||||
{
|
||||
if (type != "pair") {
|
||||
throw SocketError{tools::ToString("Socket type '", type, "' not implemented for ofi transport.")};
|
||||
@@ -152,29 +154,29 @@ auto Socket::ConnectControlSocket(Context::Address address) -> void
|
||||
|
||||
auto Socket::InitDataEndpoint() -> void
|
||||
{
|
||||
if (!fDataEndpoint) {
|
||||
try {
|
||||
fDataEndpoint = fContext.CreateOfiEndpoint();
|
||||
} catch (ContextError& e) {
|
||||
throw SocketError(tools::ToString("Failed creating ofi endpoint, reason: ", e.what()));
|
||||
}
|
||||
assert(!fDataEndpoint);
|
||||
|
||||
if (!fDataCompletionQueueTx)
|
||||
fDataCompletionQueueTx = fContext.CreateOfiCompletionQueue(Direction::Transmit);
|
||||
auto ret = fi_ep_bind(fDataEndpoint, &fDataCompletionQueueTx->fid, FI_TRANSMIT);
|
||||
if (ret != FI_SUCCESS)
|
||||
throw SocketError(tools::ToString("Failed binding ofi transmit completion queue to endpoint, reason: ", fi_strerror(ret)));
|
||||
|
||||
if (!fDataCompletionQueueRx)
|
||||
fDataCompletionQueueRx = fContext.CreateOfiCompletionQueue(Direction::Receive);
|
||||
ret = fi_ep_bind(fDataEndpoint, &fDataCompletionQueueRx->fid, FI_RECV);
|
||||
if (ret != FI_SUCCESS)
|
||||
throw SocketError(tools::ToString("Failed binding ofi receive completion queue to endpoint, reason: ", fi_strerror(ret)));
|
||||
|
||||
ret = fi_enable(fDataEndpoint);
|
||||
if (ret != FI_SUCCESS)
|
||||
throw SocketError(tools::ToString("Failed enabling ofi endpoint, reason: ", fi_strerror(ret)));
|
||||
}
|
||||
// try {
|
||||
// fDataEndpoint = fContext.CreateOfiEndpoint();
|
||||
// } catch (ContextError& e) {
|
||||
// throw SocketError(tools::ToString("Failed creating ofi endpoint, reason: ", e.what()));
|
||||
// }
|
||||
//
|
||||
// if (!fDataCompletionQueueTx)
|
||||
// fDataCompletionQueueTx = fContext.CreateOfiCompletionQueue(Direction::Transmit);
|
||||
// auto ret = fi_ep_bind(fDataEndpoint, &fDataCompletionQueueTx->fid, FI_TRANSMIT);
|
||||
// if (ret != FI_SUCCESS)
|
||||
// throw SocketError(tools::ToString("Failed binding ofi transmit completion queue to endpoint, reason: ", fi_strerror(ret)));
|
||||
//
|
||||
// if (!fDataCompletionQueueRx)
|
||||
// fDataCompletionQueueRx = fContext.CreateOfiCompletionQueue(Direction::Receive);
|
||||
// ret = fi_ep_bind(fDataEndpoint, &fDataCompletionQueueRx->fid, FI_RECV);
|
||||
// if (ret != FI_SUCCESS)
|
||||
// throw SocketError(tools::ToString("Failed binding ofi receive completion queue to endpoint, reason: ", fi_strerror(ret)));
|
||||
//
|
||||
// ret = fi_enable(fDataEndpoint);
|
||||
// if (ret != FI_SUCCESS)
|
||||
// throw SocketError(tools::ToString("Failed enabling ofi endpoint, reason: ", fi_strerror(ret)));
|
||||
}
|
||||
|
||||
void free_string(void* /*data*/, void* hint)
|
||||
@@ -184,13 +186,13 @@ void free_string(void* /*data*/, void* hint)
|
||||
|
||||
auto Socket::AnnounceDataAddress() -> void
|
||||
try {
|
||||
size_t addrlen = sizeof(sockaddr_in);
|
||||
auto ret = fi_getname(&fDataEndpoint->fid, &fLocalDataAddr, &addrlen);
|
||||
if (ret != FI_SUCCESS)
|
||||
throw SocketError(tools::ToString("Failed retrieving native address from ofi endpoint, reason: ", fi_strerror(ret)));
|
||||
assert(addrlen == sizeof(sockaddr_in));
|
||||
|
||||
LOG(debug) << "Address of local ofi endpoint in socket " << fId << ": " << Context::ConvertAddress(fLocalDataAddr);
|
||||
// size_t addrlen = sizeof(sockaddr_in);
|
||||
// auto ret = fi_getname(&fDataEndpoint->fid, &fLocalDataAddr, &addrlen);
|
||||
// if (ret != FI_SUCCESS)
|
||||
// throw SocketError(tools::ToString("Failed retrieving native address from ofi endpoint, reason: ", fi_strerror(ret)));
|
||||
// assert(addrlen == sizeof(sockaddr_in));
|
||||
//
|
||||
// LOG(debug) << "Address of local ofi endpoint in socket " << fId << ": " << Context::ConvertAddress(fLocalDataAddr);
|
||||
|
||||
// Create new control message
|
||||
// auto ctrl = tools::make_unique<ControlMessage>();
|
||||
@@ -298,7 +300,7 @@ auto Socket::TryReceive(MessagePtr& msg) -> int { return ReceiveImpl(msg, ZMQ_DO
|
||||
auto Socket::TrySend(std::vector<MessagePtr>& msgVec) -> int64_t { return SendImpl(msgVec, ZMQ_DONTWAIT, 0); }
|
||||
auto Socket::TryReceive(std::vector<MessagePtr>& msgVec) -> int64_t { return ReceiveImpl(msgVec, ZMQ_DONTWAIT, 0); }
|
||||
|
||||
auto Socket::SendImpl(FairMQMessagePtr& msg, const int flags, const int timeout) -> int
|
||||
auto Socket::SendImpl(FairMQMessagePtr& msg, const int /*flags*/, const int /*timeout*/) -> int
|
||||
try {
|
||||
if (fWaitingForControlPeer) {
|
||||
WaitForControlPeer();
|
||||
@@ -323,17 +325,17 @@ try {
|
||||
// assert(ctrl2->post_buffer_acknowledgement().size() == size);
|
||||
|
||||
// Send data
|
||||
fi_context ctx;
|
||||
auto ret = fi_send(fDataEndpoint, msg->GetData(), size, nullptr, fRemoteDataAddr, &ctx);
|
||||
if (ret < 0)
|
||||
throw SocketError(tools::ToString("Failed posting ofi send buffer, reason: ", fi_strerror(ret)));
|
||||
// fi_context ctx;
|
||||
// auto ret = fi_send(fDataEndpoint, msg->GetData(), size, nullptr, fRemoteDataAddr, &ctx);
|
||||
// if (ret < 0)
|
||||
// throw SocketError(tools::ToString("Failed posting ofi send buffer, reason: ", fi_strerror(ret)));
|
||||
}
|
||||
|
||||
if (size) {
|
||||
fi_cq_err_entry cqEntry;
|
||||
auto ret = fi_cq_sread(fDataCompletionQueueTx, &cqEntry, 1, nullptr, -1);
|
||||
if (ret != 1)
|
||||
throw SocketError(tools::ToString("Failed reading ofi tx completion queue event, reason: ", fi_strerror(ret)));
|
||||
// fi_cq_err_entry cqEntry;
|
||||
// auto ret = fi_cq_sread(fDataCompletionQueueTx, &cqEntry, 1, nullptr, -1);
|
||||
// if (ret != 1)
|
||||
// throw SocketError(tools::ToString("Failed reading ofi tx completion queue event, reason: ", fi_strerror(ret)));
|
||||
}
|
||||
|
||||
msg.reset(nullptr);
|
||||
@@ -352,7 +354,7 @@ catch (const std::exception& e)
|
||||
return -1;
|
||||
}
|
||||
|
||||
auto Socket::ReceiveImpl(FairMQMessagePtr& msg, const int flags, const int timeout) -> int
|
||||
auto Socket::ReceiveImpl(FairMQMessagePtr& /*msg*/, const int /*flags*/, const int /*timeout*/) -> int
|
||||
try {
|
||||
if (fWaitingForControlPeer) {
|
||||
WaitForControlPeer();
|
||||
@@ -368,13 +370,13 @@ try {
|
||||
|
||||
// Receive data
|
||||
// if (size) {
|
||||
fi_context ctx;
|
||||
// fi_context ctx;
|
||||
// msg->Rebuild(size);
|
||||
auto buf = msg->GetData();
|
||||
auto size2 = msg->GetSize();
|
||||
auto ret = fi_recv(fDataEndpoint, buf, size2, nullptr, fRemoteDataAddr, &ctx);
|
||||
if (ret < 0)
|
||||
throw SocketError(tools::ToString("Failed posting ofi receive buffer, reason: ", fi_strerror(ret)));
|
||||
// auto buf = msg->GetData();
|
||||
// auto size2 = msg->GetSize();
|
||||
// auto ret = fi_recv(fDataEndpoint, buf, size2, nullptr, fRemoteDataAddr, &ctx);
|
||||
// if (ret < 0)
|
||||
// throw SocketError(tools::ToString("Failed posting ofi receive buffer, reason: ", fi_strerror(ret)));
|
||||
|
||||
// Create and send control message
|
||||
// auto ctrl2 = tools::make_unique<ControlMessage>();
|
||||
@@ -384,12 +386,12 @@ try {
|
||||
// assert(ctrl2->IsInitialized());
|
||||
// SendControlMessage(move(ctrl2));
|
||||
|
||||
fi_cq_err_entry cqEntry;
|
||||
ret = fi_cq_sread(fDataCompletionQueueRx, &cqEntry, 1, nullptr, -1);
|
||||
if (ret != 1)
|
||||
throw SocketError(tools::ToString("Failed reading ofi rx completion queue event, reason: ", fi_strerror(ret)));
|
||||
assert(cqEntry.len == size2);
|
||||
assert(cqEntry.buf == buf);
|
||||
// fi_cq_err_entry cqEntry;
|
||||
// ret = fi_cq_sread(fDataCompletionQueueRx, &cqEntry, 1, nullptr, -1);
|
||||
// if (ret != 1)
|
||||
// throw SocketError(tools::ToString("Failed reading ofi rx completion queue event, reason: ", fi_strerror(ret)));
|
||||
// assert(cqEntry.len == size2);
|
||||
// assert(cqEntry.buf == buf);
|
||||
// }
|
||||
|
||||
// fBytesRx += size;
|
||||
@@ -408,7 +410,7 @@ catch (const std::exception& e)
|
||||
return -1;
|
||||
}
|
||||
|
||||
auto Socket::SendImpl(vector<FairMQMessagePtr>& msgVec, const int flags, const int timeout) -> int64_t
|
||||
auto Socket::SendImpl(vector<FairMQMessagePtr>& /*msgVec*/, const int /*flags*/, const int /*timeout*/) -> int64_t
|
||||
{
|
||||
throw SocketError{"Not yet implemented."};
|
||||
// const unsigned int vecSize = msgVec.size();
|
||||
@@ -492,7 +494,7 @@ auto Socket::SendImpl(vector<FairMQMessagePtr>& msgVec, const int flags, const i
|
||||
// }
|
||||
}
|
||||
|
||||
auto Socket::ReceiveImpl(vector<FairMQMessagePtr>& msgVec, const int flags, const int timeout) -> int64_t
|
||||
auto Socket::ReceiveImpl(vector<FairMQMessagePtr>& /*msgVec*/, const int /*flags*/, const int /*timeout*/) -> int64_t
|
||||
{
|
||||
throw SocketError{"Not yet implemented."};
|
||||
// int64_t totalSize = 0;
|
||||
@@ -586,24 +588,6 @@ auto Socket::Close() -> void
|
||||
|
||||
if (zmq_close(fMonitorSocket) != 0)
|
||||
throw SocketError(tools::ToString("Failed closing zmq monitor socket, reason: ", zmq_strerror(errno)));
|
||||
|
||||
if (fDataEndpoint) {
|
||||
auto ret = fi_close(&fDataEndpoint->fid);
|
||||
if (ret != FI_SUCCESS)
|
||||
throw SocketError(tools::ToString("Failed closing ofi endpoint, reason: ", fi_strerror(ret)));
|
||||
}
|
||||
|
||||
if (fDataCompletionQueueTx) {
|
||||
auto ret = fi_close(&fDataCompletionQueueTx->fid);
|
||||
if (ret != FI_SUCCESS)
|
||||
throw SocketError(tools::ToString("Failed closing ofi transmit completion queue, reason: ", fi_strerror(ret)));
|
||||
}
|
||||
|
||||
if (fDataCompletionQueueRx) {
|
||||
auto ret = fi_close(&fDataCompletionQueueRx->fid);
|
||||
if (ret != FI_SUCCESS)
|
||||
throw SocketError(tools::ToString("Failed closing ofi receive completion queue, reason: ", fi_strerror(ret)));
|
||||
}
|
||||
}
|
||||
|
||||
auto Socket::SetOption(const string& option, const void* value, size_t valueSize) -> void
|
||||
|
Reference in New Issue
Block a user