mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 16:46:47 +00:00
Adapt to new asiofi release
This commit is contained in:
parent
1a00f3edbd
commit
fb42b1e2f0
|
@ -61,7 +61,7 @@ endif()
|
||||||
|
|
||||||
if(BUILD_OFI_TRANSPORT)
|
if(BUILD_OFI_TRANSPORT)
|
||||||
find_package2(PRIVATE asiofi REQUIRED
|
find_package2(PRIVATE asiofi REQUIRED
|
||||||
VERSION 0.2.0
|
VERSION 0.3.0
|
||||||
)
|
)
|
||||||
find_package2(PRIVATE OFI REQUIRED
|
find_package2(PRIVATE OFI REQUIRED
|
||||||
VERSION ${asiofi_OFI_VERSION}
|
VERSION ${asiofi_OFI_VERSION}
|
||||||
|
|
|
@ -334,18 +334,16 @@ auto Socket::Receive(std::vector<MessagePtr>& msgVec, const int timeout) -> int6
|
||||||
|
|
||||||
auto Socket::SendQueueReader() -> void
|
auto Socket::SendQueueReader() -> void
|
||||||
{
|
{
|
||||||
fSendSem.async_wait([&](const boost::system::error_code& ec) {
|
fSendSem.async_wait([&] {
|
||||||
if (!ec) {
|
// LOG(debug) << "OFI transport (" << fId << "): < Wait fSendSem=" <<
|
||||||
// LOG(debug) << "OFI transport (" << fId << "): < Wait fSendSem=" <<
|
// fSendSem.get_value();
|
||||||
// fSendSem.get_value();
|
fSendQueueRead.async_receive([&](const boost::system::error_code& ec2,
|
||||||
fSendQueueRead.async_receive([&](const boost::system::error_code& ec2,
|
azmq::message& zmsg,
|
||||||
azmq::message& zmsg,
|
size_t bytes_transferred) {
|
||||||
size_t bytes_transferred) {
|
if (!ec2) {
|
||||||
if (!ec2) {
|
OnSend(zmsg, bytes_transferred);
|
||||||
OnSend(zmsg, bytes_transferred);
|
}
|
||||||
}
|
});
|
||||||
});
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -392,11 +390,9 @@ auto Socket::OnSend(azmq::message& zmsg, size_t /*bytes_transferred*/) -> void
|
||||||
// buffer sent";
|
// buffer sent";
|
||||||
fBytesTx += size;
|
fBytesTx += size;
|
||||||
fMessagesTx++;
|
fMessagesTx++;
|
||||||
fSendSem.async_signal([&](const boost::system::error_code& ec) {
|
fSendSem.async_signal([&] {
|
||||||
if (!ec) {
|
// LOG(debug) << "OFI transport (" << fId << "): >
|
||||||
// LOG(debug) << "OFI transport (" << fId << "): >
|
// Signal fSendSem=" << fSendSem.get_value();
|
||||||
// Signal fSendSem=" << fSendSem.get_value();
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -406,20 +402,16 @@ auto Socket::OnSend(azmq::message& zmsg, size_t /*bytes_transferred*/) -> void
|
||||||
// LOG(debug) << "OFI transport (" << fId << "): >>>>> Data buffer sent";
|
// LOG(debug) << "OFI transport (" << fId << "): >>>>> Data buffer sent";
|
||||||
fBytesTx += size;
|
fBytesTx += size;
|
||||||
fMessagesTx++;
|
fMessagesTx++;
|
||||||
fSendSem.async_signal([&](const boost::system::error_code& ec) {
|
fSendSem.async_signal([&] {
|
||||||
if (!ec) {
|
// LOG(debug) << "OFI transport (" << fId << "): > Signal fSendSem="
|
||||||
// LOG(debug) << "OFI transport (" << fId << "): > Signal fSendSem="
|
// << fSendSem.get_value();
|
||||||
// << fSendSem.get_value();
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
++fMessagesTx;
|
++fMessagesTx;
|
||||||
fSendSem.async_signal([&](const boost::system::error_code& ec) {
|
fSendSem.async_signal([&] {
|
||||||
if (!ec) {
|
// LOG(debug) << "OFI transport (" << fId << "): > Signal fSendSem=" << fSendSem.get_value();
|
||||||
// LOG(debug) << "OFI transport (" << fId << "): > Signal fSendSem=" << fSendSem.get_value();
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -429,25 +421,24 @@ auto Socket::OnSend(azmq::message& zmsg, size_t /*bytes_transferred*/) -> void
|
||||||
|
|
||||||
auto Socket::RecvControlQueueReader() -> void
|
auto Socket::RecvControlQueueReader() -> void
|
||||||
{
|
{
|
||||||
fRecvSem.async_wait([&](const boost::system::error_code& ec) {
|
fRecvSem.async_wait([&] {
|
||||||
if (!ec) {
|
auto ctrl = MakeControlMessageWithPmr<PostBuffer>(&fControlMemPool);
|
||||||
auto ctrl = MakeControlMessageWithPmr<PostBuffer>(&fControlMemPool);
|
auto ctrl_msg = boost::asio::mutable_buffer(ctrl.get(), sizeof(PostBuffer));
|
||||||
auto ctrl_msg = boost::asio::mutable_buffer(ctrl.get(), sizeof(PostBuffer));
|
|
||||||
|
|
||||||
if (fNeedOfiMemoryRegistration) {
|
if (fNeedOfiMemoryRegistration) {
|
||||||
asiofi::memory_region mr(*fOfiDomain, ctrl_msg, asiofi::mr::access::recv);
|
asiofi::memory_region mr(*fOfiDomain, ctrl_msg, asiofi::mr::access::recv);
|
||||||
auto desc = mr.desc();
|
auto desc = mr.desc();
|
||||||
|
|
||||||
fControlEndpoint->recv(
|
fControlEndpoint->recv(
|
||||||
ctrl_msg, desc, [&, ctrl2 = std::move(ctrl), mr2 = std::move(mr)](boost::asio::mutable_buffer) mutable {
|
ctrl_msg,
|
||||||
OnRecvControl(std::move(ctrl2));
|
desc,
|
||||||
});
|
[&, ctrl2 = std::move(ctrl), mr2 = std::move(mr)](
|
||||||
} else {
|
boost::asio::mutable_buffer) mutable { OnRecvControl(std::move(ctrl2)); });
|
||||||
fControlEndpoint->recv(
|
} else {
|
||||||
ctrl_msg, [&, ctrl2 = std::move(ctrl)](boost::asio::mutable_buffer) mutable {
|
fControlEndpoint->recv(
|
||||||
OnRecvControl(std::move(ctrl2));
|
ctrl_msg, [&, ctrl2 = std::move(ctrl)](boost::asio::mutable_buffer) mutable {
|
||||||
});
|
OnRecvControl(std::move(ctrl2));
|
||||||
}
|
});
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -481,10 +472,8 @@ auto Socket::OnRecvControl(ofi::unique_ptr<PostBuffer> ctrl) -> void
|
||||||
// LOG(debug) << "OFI transport (" << fId
|
// LOG(debug) << "OFI transport (" << fId
|
||||||
// << "): <<<<< Data buffer received, bytes_transferred2="
|
// << "): <<<<< Data buffer received, bytes_transferred2="
|
||||||
// << bytes_transferred2;
|
// << bytes_transferred2;
|
||||||
fRecvSem.async_signal([&](const boost::system::error_code& ec2) {
|
fRecvSem.async_signal([&] {
|
||||||
if (!ec2) {
|
//LOG(debug) << "OFI transport (" << fId << "): < Signal fRecvSem";
|
||||||
//LOG(debug) << "OFI transport (" << fId << "): < Signal fRecvSem";
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -501,10 +490,8 @@ auto Socket::OnRecvControl(ofi::unique_ptr<PostBuffer> ctrl) -> void
|
||||||
// LOG(debug) << "OFI transport (" << fId
|
// LOG(debug) << "OFI transport (" << fId
|
||||||
// << "): <<<<< Data buffer received, bytes_transferred2="
|
// << "): <<<<< Data buffer received, bytes_transferred2="
|
||||||
// << bytes_transferred2;
|
// << bytes_transferred2;
|
||||||
fRecvSem.async_signal([&](const boost::system::error_code& ec2) {
|
fRecvSem.async_signal([&] {
|
||||||
if (!ec2) {
|
// LOG(debug) << "OFI transport (" << fId << "): < Signal fRecvSem";
|
||||||
// LOG(debug) << "OFI transport (" << fId << "): < Signal fRecvSem";
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -518,10 +505,8 @@ auto Socket::OnRecvControl(ofi::unique_ptr<PostBuffer> ctrl) -> void
|
||||||
// LOG(debug) << "OFI transport (" << fId
|
// LOG(debug) << "OFI transport (" << fId
|
||||||
// << "): <<<<< Data buffer received, bytes_transferred2="
|
// << "): <<<<< Data buffer received, bytes_transferred2="
|
||||||
// << bytes_transferred2;
|
// << bytes_transferred2;
|
||||||
fRecvSem.async_signal([&](const boost::system::error_code& ec2) {
|
fRecvSem.async_signal([&] {
|
||||||
if (!ec2) {
|
// LOG(debug) << "OFI transport (" << fId << "): < Signal fRecvSem";
|
||||||
// LOG(debug) << "OFI transport (" << fId << "): < Signal fRecvSem";
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
|
@ -99,7 +99,7 @@ class Socket final : public fair::mq::Socket
|
||||||
int fRcvTimeout;
|
int fRcvTimeout;
|
||||||
azmq::socket fSendQueueWrite, fSendQueueRead;
|
azmq::socket fSendQueueWrite, fSendQueueRead;
|
||||||
azmq::socket fRecvQueueWrite, fRecvQueueRead;
|
azmq::socket fRecvQueueWrite, fRecvQueueRead;
|
||||||
asiofi::semaphore fSendSem, fRecvSem;
|
asiofi::synchronized_semaphore fSendSem, fRecvSem;
|
||||||
std::atomic<bool> fNeedOfiMemoryRegistration;
|
std::atomic<bool> fNeedOfiMemoryRegistration;
|
||||||
|
|
||||||
auto SendQueueReader() -> void;
|
auto SendQueueReader() -> void;
|
||||||
|
|
Loading…
Reference in New Issue
Block a user