Unify implementation of multi part and single part message interfaces

This commit is contained in:
Dennis Klein 2019-03-19 18:29:12 +01:00 committed by Dennis Klein
parent 6ee7e5fbf0
commit a1b7efa2f4
3 changed files with 295 additions and 509 deletions

View File

@ -35,59 +35,76 @@ namespace ofi {
enum class ControlMessageType enum class ControlMessageType
{ {
DataAddressAnnouncement = 1, Empty = 1,
PostBuffer, PostBuffer,
PostBufferAcknowledgement PostMultiPartStartBuffer
};
struct Empty
{};
struct PostBuffer
{
uint64_t size; // buffer size (size_t)
};
struct PostMultiPartStartBuffer
{
uint32_t numParts; // buffer size (size_t)
uint64_t size; // buffer size (size_t)
};
union ControlMessageContent
{
PostBuffer postBuffer;
PostMultiPartStartBuffer postMultiPartStartBuffer;
}; };
struct ControlMessage struct ControlMessage
{ {
ControlMessageType type; ControlMessageType type;
}; ControlMessageContent msg;
struct DataAddressAnnouncement : ControlMessage
{
uint32_t ipv4; // in_addr_t from <netinet/in.h>
uint32_t port; // in_port_t from <netinet/in.h>
};
struct PostBuffer : ControlMessage
{
uint64_t size; // buffer size (size_t)
}; };
template<typename T> template<typename T>
using unique_ptr = std::unique_ptr<T, std::function<void(T*)>>; using unique_ptr = std::unique_ptr<T, std::function<void(T*)>>;
template<typename T, typename... Args> template<typename T, typename... Args>
auto MakeControlMessageWithPmr(boost::container::pmr::memory_resource* pmr, Args&&... args) auto MakeControlMessageWithPmr(boost::container::pmr::memory_resource& pmr, Args&&... args)
-> ofi::unique_ptr<T> -> ofi::unique_ptr<ControlMessage>
{ {
void* mem = pmr->allocate(sizeof(T)); void* mem = pmr.allocate(sizeof(ControlMessage));
T* ctrl = new (mem) T(std::forward<Args>(args)...); ControlMessage* ctrl = new (mem) ControlMessage();
if (std::is_same<T, DataAddressAnnouncement>::value) { if (std::is_same<T, PostBuffer>::value) {
ctrl->type = ControlMessageType::DataAddressAnnouncement;
} else if (std::is_same<T, PostBuffer>::value) {
ctrl->type = ControlMessageType::PostBuffer; ctrl->type = ControlMessageType::PostBuffer;
ctrl->msg.postBuffer = PostBuffer(std::forward<Args>(args)...);
} else if (std::is_same<T, PostMultiPartStartBuffer>::value) {
ctrl->type = ControlMessageType::PostMultiPartStartBuffer;
ctrl->msg.postMultiPartStartBuffer = PostMultiPartStartBuffer(std::forward<Args>(args)...);
} else if (std::is_same<T, Empty>::value) {
ctrl->type = ControlMessageType::Empty;
} }
return ofi::unique_ptr<T>(ctrl, [=](T* p) { return ofi::unique_ptr<ControlMessage>(ctrl, [&pmr](ControlMessage* p) {
p->~T(); p->~ControlMessage();
pmr->deallocate(p, sizeof(T)); pmr.deallocate(p, sizeof(T));
}); });
} }
template<typename T, typename... Args> template<typename T, typename... Args>
auto MakeControlMessage(Args&&... args) -> T auto MakeControlMessage(Args&&... args) -> ControlMessage
{ {
T ctrl = T(std::forward<Args>(args)...); ControlMessage ctrl;
if (std::is_same<T, DataAddressAnnouncement>::value) { if (std::is_same<T, PostBuffer>::value) {
ctrl.type = ControlMessageType::DataAddressAnnouncement;
} else if (std::is_same<T, PostBuffer>::value) {
ctrl.type = ControlMessageType::PostBuffer; ctrl.type = ControlMessageType::PostBuffer;
} else if (std::is_same<T, PostMultiPartStartBuffer>::value) {
ctrl.type = ControlMessageType::PostMultiPartStartBuffer;
} else if (std::is_same<T, Empty>::value) {
ctrl.type = ControlMessageType::Empty;
} }
ctrl.msg = T(std::forward<Args>(args)...);
return ctrl; return ctrl;
} }

View File

@ -13,19 +13,17 @@
#include <FairMQLogger.h> #include <FairMQLogger.h>
#include <asiofi.hpp> #include <asiofi.hpp>
#include <azmq/message.hpp>
#include <boost/asio/buffer.hpp> #include <boost/asio/buffer.hpp>
#include <boost/asio/dispatch.hpp>
#include <boost/asio/post.hpp> #include <boost/asio/post.hpp>
#include <chrono> #include <chrono>
#include <cstring> #include <cstring>
#include <functional> #include <functional>
#include <memory> #include <memory>
#include <sstream> #include <sstream>
#include <string.h>
#include <sys/socket.h>
#include <thread> #include <thread>
#include <mutex> #include <mutex>
#include <condition_variable> #include <queue>
namespace fair namespace fair
{ {
@ -51,41 +49,15 @@ Socket::Socket(Context& context, const string& type, const string& name, const s
, fMessagesRx(0) , fMessagesRx(0)
, fSndTimeout(100) , fSndTimeout(100)
, fRcvTimeout(100) , fRcvTimeout(100)
, fSendQueueWrite(fContext.GetIoContext(), ZMQ_PUSH) , fMultiPartRecvCounter(-1)
, fSendQueueRead(fContext.GetIoContext(), ZMQ_PULL) , fSendPushSem(fContext.GetIoContext(), 384)
, fRecvQueueWrite(fContext.GetIoContext(), ZMQ_PUSH) , fSendPopSem(fContext.GetIoContext(), 0)
, fRecvQueueRead(fContext.GetIoContext(), ZMQ_PULL) , fRecvPushSem(fContext.GetIoContext(), 384)
, fSendSem(fContext.GetIoContext(), 300) , fRecvPopSem(fContext.GetIoContext(), 0)
, fRecvSem(fContext.GetIoContext(), 300)
, fNeedOfiMemoryRegistration(false) , fNeedOfiMemoryRegistration(false)
{ {
if (type != "pair") { if (type != "pair") {
throw SocketError{tools::ToString("Socket type '", type, "' not implemented for ofi transport.")}; throw SocketError{tools::ToString("Socket type '", type, "' not implemented for ofi transport.")};
} else {
// TODO wire this up with config
azmq::socket::snd_hwm send_max(300);
azmq::socket::rcv_hwm recv_max(300);
fSendQueueRead.set_option(send_max);
fSendQueueRead.set_option(recv_max);
fSendQueueWrite.set_option(send_max);
fSendQueueWrite.set_option(recv_max);
fRecvQueueRead.set_option(send_max);
fRecvQueueRead.set_option(recv_max);
fRecvQueueWrite.set_option(send_max);
fRecvQueueWrite.set_option(recv_max);
// Setup internal queue
auto hashed_id = std::hash<std::string>()(fId);
auto queue_id = tools::ToString("inproc://TXQUEUE", hashed_id);
LOG(debug) << "OFI transport (" << fId << "): " << "Binding SQR: " << queue_id;
fSendQueueRead.bind(queue_id);
LOG(debug) << "OFI transport (" << fId << "): " << "Connecting SQW: " << queue_id;
fSendQueueWrite.connect(queue_id);
queue_id = tools::ToString("inproc://RXQUEUE", hashed_id);
LOG(debug) << "OFI transport (" << fId << "): " << "Binding RQR: " << queue_id;
fRecvQueueRead.bind(queue_id);
LOG(debug) << "OFI transport (" << fId << "): " << "Connecting RQW: " << queue_id;
fRecvQueueWrite.connect(queue_id);
} }
} }
@ -107,7 +79,7 @@ auto Socket::InitOfi(Address addr) -> void
fOfiInfo = tools::make_unique<asiofi::info>(addr.Ip.c_str(), std::to_string(addr.Port).c_str(), FI_SOURCE, hints); fOfiInfo = tools::make_unique<asiofi::info>(addr.Ip.c_str(), std::to_string(addr.Port).c_str(), FI_SOURCE, hints);
} }
LOG(debug) << "OFI transport: " << *fOfiInfo; LOG(debug) << "OFI transport (" << fId << "): " << *fOfiInfo;
fOfiFabric = tools::make_unique<asiofi::fabric>(*fOfiInfo); fOfiFabric = tools::make_unique<asiofi::fabric>(*fOfiInfo);
@ -268,222 +240,213 @@ auto Socket::ConnectEndpoint(std::unique_ptr<asiofi::connected_endpoint>& endpoi
} }
} }
// auto Socket::ReceiveDataAddressAnnouncement() -> void
// {
// azmq::message ctrl;
// auto recv = fControlEndpoint.receive(ctrl);
// assert(recv == sizeof(DataAddressAnnouncement)); (void)recv;
// auto daa(static_cast<const DataAddressAnnouncement*>(ctrl.data()));
// assert(daa->type == ControlMessageType::DataAddressAnnouncement);
//
// sockaddr_in remoteAddr;
// remoteAddr.sin_family = AF_INET;
// remoteAddr.sin_port = daa->port;
// remoteAddr.sin_addr.s_addr = daa->ipv4;
//
// auto addr = Context::ConvertAddress(remoteAddr);
// addr.Protocol = fRemoteDataAddr.Protocol;
// LOG(debug) << "OFI transport (" << fId << "): Data address announcement of remote endpoint received: " << addr;
// fRemoteDataAddr = addr;
// }
//
// auto Socket::AnnounceDataAddress() -> void
// {
// fLocalDataAddr = fDataEndpoint->get_local_address();
// LOG(debug) << "Address of local ofi endpoint in socket " << fId << ": " << Context::ConvertAddress(fLocalDataAddr);
//
// Create new data address announcement message
// auto daa = MakeControlMessage<DataAddressAnnouncement>();
// auto addr = Context::ConvertAddress(fLocalDataAddr);
// daa.ipv4 = addr.sin_addr.s_addr;
// daa.port = addr.sin_port;
//
// auto sent = fControlEndpoint.send(boost::asio::buffer(daa));
// assert(sent == sizeof(addr)); (void)sent;
//
// LOG(debug) << "OFI transport (" << fId << "): data band address " << fLocalDataAddr << " announced.";
// }
auto Socket::Send(MessagePtr& msg, const int /*timeout*/) -> int auto Socket::Send(MessagePtr& msg, const int /*timeout*/) -> int
{ {
// LOG(debug) << "OFI transport (" << fId << "): ENTER Send: data=" << msg->GetData() << ",size=" << msg->GetSize(); // timeout argument not yet implemented
MessagePtr* msgptr(new std::unique_ptr<Message>(std::move(msg))); std::vector<MessagePtr> msgVec;
try { msgVec.reserve(1);
auto res = fSendQueueWrite.send(boost::asio::const_buffer(msgptr, sizeof(MessagePtr)), 0); msgVec.emplace_back(std::move(msg));
// LOG(debug) << "OFI transport (" << fId << "): LEAVE Send"; return Send(msgVec);
return res; }
} catch (const std::exception& e) {
msg = std::move(*msgptr); auto Socket::Send(std::vector<MessagePtr>& msgVec, const int /*timeout*/) -> int64_t
LOG(error) << e.what(); try {
return -1; // timeout argument not yet implemented
} catch (const boost::system::error_code& e) {
msg = std::move(*msgptr); int size(0);
LOG(error) << e; for (auto& msg : msgVec) {
return -1; size += msg->GetSize();
} }
}
auto Socket::Receive(MessagePtr& msg, const int /*timeout*/) -> int fSendPushSem.wait();
{ {
// LOG(debug) << "OFI transport (" << fId << "): ENTER Receive"; std::lock_guard<std::mutex> lk(fSendQueueMutex);
fSendQueue.emplace(std::move(msgVec));
try {
azmq::message zmsg;
auto recv = fRecvQueueRead.receive(zmsg);
size_t size(0);
if (recv > 0) {
msg = std::move(*(static_cast<MessagePtr*>(zmsg.buffer().data())));
size = msg->GetSize();
}
fBytesRx += size;
fMessagesRx++;
// LOG(debug) << "OFI transport (" << fId << "): LEAVE Receive";
return size;
} catch (const std::exception& e) {
LOG(error) << e.what();
return -1;
} catch (const boost::system::error_code& e) {
LOG(error) << e;
return -1;
} }
} fSendPopSem.signal();
auto Socket::Send(std::vector<MessagePtr>& msgVec, const int timeout) -> int64_t return size;
{ } catch (const std::exception& e) {
return SendImpl(msgVec, 0, timeout); LOG(error) << e.what();
} return -1;
auto Socket::Receive(std::vector<MessagePtr>& msgVec, const int timeout) -> int64_t
{
return ReceiveImpl(msgVec, 0, timeout);
} }
auto Socket::SendQueueReader() -> void auto Socket::SendQueueReader() -> void
{ {
fSendSem.async_wait([&] { fSendPopSem.async_wait([&] {
// LOG(debug) << "OFI transport (" << fId << "): < Wait fSendSem=" << // Read msg from send queue
// fSendSem.get_value(); std::unique_lock<std::mutex> lk(fSendQueueMutex);
fSendQueueRead.async_receive([&](const boost::system::error_code& ec2, std::vector<MessagePtr> msgVec(std::move(fSendQueue.front()));
azmq::message& zmsg, fSendQueue.pop();
size_t bytes_transferred) { lk.unlock();
if (!ec2) {
OnSend(zmsg, bytes_transferred); bool postMultiPartStartBuffer = msgVec.size() > 1;
for (auto& msg : msgVec) {
// Create control message
ofi::unique_ptr<ControlMessage> ctrl(nullptr);
if (postMultiPartStartBuffer) {
postMultiPartStartBuffer = false;
ctrl = MakeControlMessageWithPmr<PostMultiPartStartBuffer>(fControlMemPool);
ctrl->msg.postMultiPartStartBuffer.numParts = msgVec.size();
ctrl->msg.postMultiPartStartBuffer.size = msg->GetSize();
} else {
ctrl = MakeControlMessageWithPmr<PostBuffer>(fControlMemPool);
ctrl->msg.postBuffer.size = msg->GetSize();
} }
});
// Send control message
boost::asio::mutable_buffer ctrlMsg(ctrl.get(), sizeof(ControlMessage));
if (fNeedOfiMemoryRegistration) {
asiofi::memory_region mr(*fOfiDomain, ctrlMsg, asiofi::mr::access::send);
auto desc = mr.desc();
fControlEndpoint->send(ctrlMsg,
desc,
[&, ctrl2 = std::move(ctrlMsg), mr2 = std::move(mr)](
boost::asio::mutable_buffer) mutable {});
} else {
fControlEndpoint->send(
ctrlMsg, [&, ctrl2 = std::move(ctrl)](boost::asio::mutable_buffer) mutable {});
}
// Send data message
const auto size = msg->GetSize();
if (size) {
boost::asio::mutable_buffer buffer(msg->GetData(), size);
if (fNeedOfiMemoryRegistration) {
asiofi::memory_region mr(*fOfiDomain, buffer, asiofi::mr::access::send);
auto desc = mr.desc();
fDataEndpoint->send(buffer,
desc,
[&, size, msg2 = std::move(msg), mr2 = std::move(mr)](
boost::asio::mutable_buffer) mutable {
fBytesTx += size;
fMessagesTx++;
fSendPushSem.signal();
});
} else {
fDataEndpoint->send(
buffer, [&, size, msg2 = std::move(msg)](boost::asio::mutable_buffer) mutable {
fBytesTx += size;
fMessagesTx++;
fSendPushSem.signal();
});
}
} else {
++fMessagesTx;
fSendPushSem.signal();
}
}
boost::asio::dispatch(fContext.GetIoContext(), std::bind(&Socket::SendQueueReader, this));
}); });
} }
auto Socket::OnSend(azmq::message& zmsg, size_t /*bytes_transferred*/) -> void auto Socket::Receive(MessagePtr& msg, const int /*timeout*/) -> int
{ try {
// LOG(debug) << "OFI transport (" << fId << "): ENTER OnSend: bytes_transferred=" << bytes_transferred; // timeout argument not yet implemented
MessagePtr msg(std::move(*(static_cast<MessagePtr*>(zmsg.buffer().data())))); fRecvPopSem.wait();
auto size = msg->GetSize(); {
std::lock_guard<std::mutex> lk(fRecvQueueMutex);
// LOG(debug) << "OFI transport (" << fId << "): OnSend: data=" << msg->GetData() << ",size=" << msg->GetSize(); msg = std::move(fRecvQueue.front().front());
fRecvQueue.pop();
// Create and send control message
auto ctrl = MakeControlMessageWithPmr<PostBuffer>(&fControlMemPool);
ctrl->size = size;
auto ctrl_msg = boost::asio::mutable_buffer(ctrl.get(), sizeof(PostBuffer));
if (fNeedOfiMemoryRegistration) {
asiofi::memory_region mr(*fOfiDomain, ctrl_msg, asiofi::mr::access::send);
auto desc = mr.desc();
fControlEndpoint->send(
ctrl_msg, desc, [&, ctrl2 = std::move(ctrl), mr2 = std::move(mr)](boost::asio::mutable_buffer) mutable {
// LOG(debug) << "OFI transport (" << fId << "): >>>>> Control message sent";
});
} else {
fControlEndpoint->send(ctrl_msg,
[&, ctrl2 = std::move(ctrl)](boost::asio::mutable_buffer) mutable {
// LOG(debug) << "OFI transport (" << fId << "): >>>>> Control
// message sent";
});
} }
fRecvPushSem.signal();
if (size) { int size(msg->GetSize());
boost::asio::mutable_buffer buffer(msg->GetData(), size); fBytesRx += size;
++fMessagesRx;
if (fNeedOfiMemoryRegistration) { return size;
asiofi::memory_region mr(*fOfiDomain, buffer, asiofi::mr::access::send); } catch (const std::exception& e) {
auto desc = mr.desc(); LOG(error) << e.what();
return -1;
}
fDataEndpoint->send(buffer, auto Socket::Receive(std::vector<MessagePtr>& msgVec, const int /*timeout*/) -> int64_t
desc, try {
[&, size, msg2 = std::move(msg), mr2 = std::move(mr)]( // timeout argument not yet implemented
boost::asio::mutable_buffer) mutable {
// LOG(debug) << "OFI transport (" << fId << "): >>>>> Data
// buffer sent";
fBytesTx += size;
fMessagesTx++;
fSendSem.async_signal([&] {
// LOG(debug) << "OFI transport (" << fId << "): >
// Signal fSendSem=" << fSendSem.get_value();
});
});
} else { fRecvPopSem.wait();
fDataEndpoint->send( {
buffer, [&, size, msg2 = std::move(msg)](boost::asio::mutable_buffer) mutable { std::lock_guard<std::mutex> lk(fRecvQueueMutex);
// LOG(debug) << "OFI transport (" << fId << "): >>>>> Data buffer sent"; msgVec = std::move(fRecvQueue.front());
fBytesTx += size; fRecvQueue.pop();
fMessagesTx++;
fSendSem.async_signal([&] {
// LOG(debug) << "OFI transport (" << fId << "): > Signal fSendSem="
// << fSendSem.get_value();
});
});
}
} else {
++fMessagesTx;
fSendSem.async_signal([&] {
// LOG(debug) << "OFI transport (" << fId << "): > Signal fSendSem=" << fSendSem.get_value();
});
} }
fRecvPushSem.signal();
boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::SendQueueReader, this)); int64_t size(0);
// LOG(debug) << "OFI transport (" << fId << "): LEAVE OnSend"; for (auto& msg : msgVec) {
size += msg->GetSize();
}
fBytesRx += size;
++fMessagesRx;
return size;
} catch (const std::exception& e) {
LOG(error) << e.what();
return -1;
} }
auto Socket::RecvControlQueueReader() -> void auto Socket::RecvControlQueueReader() -> void
{ {
fRecvSem.async_wait([&] { fRecvPushSem.async_wait([&] {
auto ctrl = MakeControlMessageWithPmr<PostBuffer>(&fControlMemPool); // Receive control message
auto ctrl_msg = boost::asio::mutable_buffer(ctrl.get(), sizeof(PostBuffer)); ofi::unique_ptr<ControlMessage> ctrl(MakeControlMessageWithPmr<Empty>(fControlMemPool));
boost::asio::mutable_buffer ctrlMsg(ctrl.get(), sizeof(ControlMessage));
if (fNeedOfiMemoryRegistration) { if (fNeedOfiMemoryRegistration) {
asiofi::memory_region mr(*fOfiDomain, ctrl_msg, asiofi::mr::access::recv); asiofi::memory_region mr(*fOfiDomain, ctrlMsg, asiofi::mr::access::recv);
auto desc = mr.desc(); auto desc = mr.desc();
fControlEndpoint->recv( fControlEndpoint->recv(
ctrl_msg, ctrlMsg,
desc, desc,
[&, ctrl2 = std::move(ctrl), mr2 = std::move(mr)]( [&, ctrl2 = std::move(ctrl), mr2 = std::move(mr)](
boost::asio::mutable_buffer) mutable { OnRecvControl(std::move(ctrl2)); }); boost::asio::mutable_buffer) mutable { OnRecvControl(std::move(ctrl2)); });
} else { } else {
fControlEndpoint->recv( fControlEndpoint->recv(
ctrl_msg, [&, ctrl2 = std::move(ctrl)](boost::asio::mutable_buffer) mutable { ctrlMsg, [&, ctrl2 = std::move(ctrl)](boost::asio::mutable_buffer) mutable {
OnRecvControl(std::move(ctrl2)); OnRecvControl(std::move(ctrl2));
}); });
} }
}); });
} }
auto Socket::OnRecvControl(ofi::unique_ptr<PostBuffer> ctrl) -> void auto Socket::OnRecvControl(ofi::unique_ptr<ControlMessage> ctrl) -> void
{ {
// LOG(debug) << "OFI transport (" << fId << "): ENTER OnRecvControl"; // Check control message type
auto size(0);
auto size = ctrl->size; if (ctrl->type == ControlMessageType::PostMultiPartStartBuffer) {
// LOG(debug) << "OFI transport (" << fId << "): OnRecvControl: PostBuffer.size=" << size; size = ctrl->msg.postMultiPartStartBuffer.size;
if (fMultiPartRecvCounter == -1) {
fMultiPartRecvCounter = ctrl->msg.postMultiPartStartBuffer.numParts;
assert(fInflightMultiPartMessage.empty());
fInflightMultiPartMessage.reserve(ctrl->msg.postMultiPartStartBuffer.numParts);
} else {
throw SocketError{tools::ToString(
"OFI transport: Received control start of new multi part message without completed "
"reception of previous multi part message. Number of parts missing: ",
fMultiPartRecvCounter)};
}
} else if (ctrl->type == ControlMessageType::PostBuffer) {
size = ctrl->msg.postBuffer.size;
} else {
throw SocketError{tools::ToString("OFI transport: Unknown control message type: '",
static_cast<int>(ctrl->type))};
}
// Receive data // Receive data
auto msg = fContext.MakeReceiveMessage(size);
if (size) { if (size) {
auto msg = fContext.MakeReceiveMessage(size);
boost::asio::mutable_buffer buffer(msg->GetData(), size); boost::asio::mutable_buffer buffer(msg->GetData(), size);
if (fNeedOfiMemoryRegistration) { if (fNeedOfiMemoryRegistration) {
@ -494,229 +457,41 @@ auto Socket::OnRecvControl(ofi::unique_ptr<PostBuffer> ctrl) -> void
buffer, buffer,
desc, desc,
[&, msg2 = std::move(msg), mr2 = std::move(mr)]( [&, msg2 = std::move(msg), mr2 = std::move(mr)](
boost::asio::mutable_buffer) mutable { boost::asio::mutable_buffer) mutable { DataMessageReceived(std::move(msg2)); });
MessagePtr* msgptr(new std::unique_ptr<Message>(std::move(msg2)));
fRecvQueueWrite.async_send(
azmq::message(boost::asio::const_buffer(msgptr, sizeof(MessagePtr))),
[&](const boost::system::error_code& ec, size_t /*bytes_transferred2*/) {
if (!ec) {
// LOG(debug) << "OFI transport (" << fId
// << "): <<<<< Data buffer received, bytes_transferred2="
// << bytes_transferred2;
fRecvSem.async_signal([&] {
//LOG(debug) << "OFI transport (" << fId << "): < Signal fRecvSem";
});
}
});
});
} else { } else {
fDataEndpoint->recv( fDataEndpoint->recv(buffer,
buffer, [&, msg2 = std::move(msg)](boost::asio::mutable_buffer) mutable { [&, msg2 = std::move(msg)](boost::asio::mutable_buffer) mutable {
MessagePtr* msgptr(new std::unique_ptr<Message>(std::move(msg2))); DataMessageReceived(std::move(msg2));
fRecvQueueWrite.async_send(
azmq::message(boost::asio::const_buffer(msgptr, sizeof(MessagePtr))),
[&](const boost::system::error_code& ec, size_t /*bytes_transferred2*/) {
if (!ec) {
// LOG(debug) << "OFI transport (" << fId
// << "): <<<<< Data buffer received, bytes_transferred2="
// << bytes_transferred2;
fRecvSem.async_signal([&] {
// LOG(debug) << "OFI transport (" << fId << "): < Signal fRecvSem";
}); });
}
});
});
} }
} else { } else {
fRecvQueueWrite.async_send( DataMessageReceived(std::move(msg));
azmq::message(boost::asio::const_buffer(nullptr, 0)),
[&](const boost::system::error_code& ec, size_t /*bytes_transferred2*/) {
if (!ec) {
// LOG(debug) << "OFI transport (" << fId
// << "): <<<<< Data buffer received, bytes_transferred2="
// << bytes_transferred2;
fRecvSem.async_signal([&] {
// LOG(debug) << "OFI transport (" << fId << "): < Signal fRecvSem";
});
}
});
} }
boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::RecvControlQueueReader, this)); boost::asio::dispatch(fContext.GetIoContext(),
std::bind(&Socket::RecvControlQueueReader, this));
// LOG(debug) << "OFI transport (" << fId << "): LEAVE OnRecvControl";
} }
auto Socket::SendImpl(vector<FairMQMessagePtr>& /*msgVec*/, const int /*flags*/, const int /*timeout*/) -> int64_t auto Socket::DataMessageReceived(MessagePtr msg) -> void
{ {
throw SocketError{"Not yet implemented."}; if (fMultiPartRecvCounter > 0) {
// const unsigned int vecSize = msgVec.size(); --fMultiPartRecvCounter;
// int elapsed = 0; fInflightMultiPartMessage.emplace_back(std::move(msg));
// }
// // Sending vector typicaly handles more then one part
// if (vecSize > 1)
// {
// int64_t totalSize = 0;
// int nbytes = -1;
// bool repeat = false;
//
// while (true && !fInterrupted)
// {
// for (unsigned int i = 0; i < vecSize; ++i)
// {
// nbytes = zmq_msg_send(static_cast<FairMQMessageSHM*>(msgVec[i].get())->GetMessage(),
// fSocket,
// (i < vecSize - 1) ? ZMQ_SNDMORE|flags : flags);
// if (nbytes >= 0)
// {
// static_cast<FairMQMessageSHM*>(msgVec[i].get())->fQueued = true;
// size_t size = msgVec[i]->GetSize();
//
// totalSize += size;
// }
// else
// {
// // according to ZMQ docs, this can only occur for the first part
// if (zmq_errno() == EAGAIN)
// {
// if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0))
// {
// if (timeout)
// {
// elapsed += fSndTimeout;
// if (elapsed >= timeout)
// {
// return -2;
// }
// }
// repeat = true;
// break;
// }
// else
// {
// return -2;
// }
// }
// if (zmq_errno() == ETERM)
// {
// LOG(info) << "terminating socket " << fId;
// return -1;
// }
// LOG(error) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno);
// return nbytes;
// }
// }
//
// if (repeat)
// {
// continue;
// }
//
// // store statistics on how many messages have been sent (handle all parts as a single message)
// ++fMessagesTx;
// fBytesTx += totalSize;
// return totalSize;
// }
//
// return -1;
// } // If there's only one part, send it as a regular message
// else if (vecSize == 1)
// {
// return Send(msgVec.back(), flags);
// }
// else // if the vector is empty, something might be wrong
// {
// LOG(warn) << "Will not send empty vector";
// return -1;
// }
}
auto Socket::ReceiveImpl(vector<FairMQMessagePtr>& /*msgVec*/, const int /*flags*/, const int /*timeout*/) -> int64_t std::unique_lock<std::mutex> lk(fRecvQueueMutex);
{ if (fMultiPartRecvCounter == 0) {
throw SocketError{"Not yet implemented."}; fRecvQueue.push(std::move(fInflightMultiPartMessage));
// int64_t totalSize = 0; fMultiPartRecvCounter = -1;
// int64_t more = 0; } else {
// bool repeat = false; std::vector<MessagePtr> msgVec;
// int elapsed = 0; msgVec.push_back(std::move(msg));
// fRecvQueue.push(std::move(msgVec));
// while (true) }
// { lk.unlock();
// // Warn if the vector is filled before Receive() and empty it.
// // if (msgVec.size() > 0) fRecvPopSem.signal();
// // {
// // LOG(warn) << "Message vector contains elements before Receive(), they will be deleted!";
// // msgVec.clear();
// // }
//
// totalSize = 0;
// more = 0;
// repeat = false;
//
// do
// {
// FairMQMessagePtr part(new FairMQMessageSHM(fManager, GetTransport()));
// zmq_msg_t* msgPtr = static_cast<FairMQMessageSHM*>(part.get())->GetMessage();
//
// int nbytes = zmq_msg_recv(msgPtr, fSocket, flags);
// if (nbytes == 0)
// {
// msgVec.push_back(move(part));
// }
// else if (nbytes > 0)
// {
// MetaHeader* hdr = static_cast<MetaHeader*>(zmq_msg_data(msgPtr));
// size_t size = 0;
// static_cast<FairMQMessageSHM*>(part.get())->fHandle = hdr->fHandle;
// static_cast<FairMQMessageSHM*>(part.get())->fSize = hdr->fSize;
// static_cast<FairMQMessageSHM*>(part.get())->fRegionId = hdr->fRegionId;
// static_cast<FairMQMessageSHM*>(part.get())->fHint = hdr->fHint;
// size = part->GetSize();
//
// msgVec.push_back(move(part));
//
// totalSize += size;
// }
// else if (zmq_errno() == EAGAIN)
// {
// if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0))
// {
// if (timeout)
// {
// elapsed += fSndTimeout;
// if (elapsed >= timeout)
// {
// return -2;
// }
// }
// repeat = true;
// break;
// }
// else
// {
// return -2;
// }
// }
// else
// {
// return nbytes;
// }
//
// size_t more_size = sizeof(more);
// zmq_getsockopt(fSocket, ZMQ_RCVMORE, &more, &more_size);
// }
// while (more);
//
// if (repeat)
// {
// continue;
// }
//
// // store statistics on how many messages have been received (handle all parts as a single message)
// ++fMessagesRx;
// fBytesRx += totalSize;
// return totalSize;
// }
} }
auto Socket::Close() -> void {} auto Socket::Close() -> void {}
@ -805,53 +580,53 @@ int Socket::GetRcvKernelSize() const
return 0; return 0;
} }
auto Socket::GetConstant(const string& constant) -> int auto Socket::GetConstant(const string& /*constant*/) -> int
{ {
if (constant == "") // if (constant == "")
return 0; // return 0;
if (constant == "sub") // if (constant == "sub")
return ZMQ_SUB; // return ZMQ_SUB;
if (constant == "pub") // if (constant == "pub")
return ZMQ_PUB; // return ZMQ_PUB;
if (constant == "xsub") // if (constant == "xsub")
return ZMQ_XSUB; // return ZMQ_XSUB;
if (constant == "xpub") // if (constant == "xpub")
return ZMQ_XPUB; // return ZMQ_XPUB;
if (constant == "push") // if (constant == "push")
return ZMQ_PUSH; // return ZMQ_PUSH;
if (constant == "pull") // if (constant == "pull")
return ZMQ_PULL; // return ZMQ_PULL;
if (constant == "req") // if (constant == "req")
return ZMQ_REQ; // return ZMQ_REQ;
if (constant == "rep") // if (constant == "rep")
return ZMQ_REP; // return ZMQ_REP;
if (constant == "dealer") // if (constant == "dealer")
return ZMQ_DEALER; // return ZMQ_DEALER;
if (constant == "router") // if (constant == "router")
return ZMQ_ROUTER; // return ZMQ_ROUTER;
if (constant == "pair") // if (constant == "pair")
return ZMQ_PAIR; // return ZMQ_PAIR;
//
if (constant == "snd-hwm") // if (constant == "snd-hwm")
return ZMQ_SNDHWM; // return ZMQ_SNDHWM;
if (constant == "rcv-hwm") // if (constant == "rcv-hwm")
return ZMQ_RCVHWM; // return ZMQ_RCVHWM;
if (constant == "snd-size") // if (constant == "snd-size")
return ZMQ_SNDBUF; // return ZMQ_SNDBUF;
if (constant == "rcv-size") // if (constant == "rcv-size")
return ZMQ_RCVBUF; // return ZMQ_RCVBUF;
if (constant == "snd-more") // if (constant == "snd-more")
return ZMQ_SNDMORE; // return ZMQ_SNDMORE;
if (constant == "rcv-more") // if (constant == "rcv-more")
return ZMQ_RCVMORE; // return ZMQ_RCVMORE;
//
if (constant == "linger") // if (constant == "linger")
return ZMQ_LINGER; // return ZMQ_LINGER;
if (constant == "no-block") // if (constant == "no-block")
return ZMQ_DONTWAIT; // return ZMQ_DONTWAIT;
if (constant == "snd-more no-block") // if (constant == "snd-more no-block")
return ZMQ_DONTWAIT|ZMQ_SNDMORE; // return ZMQ_DONTWAIT|ZMQ_SNDMORE;
//
return -1; return -1;
} }

View File

@ -18,10 +18,10 @@
#include <asiofi/memory_resources.hpp> #include <asiofi/memory_resources.hpp>
#include <asiofi/passive_endpoint.hpp> #include <asiofi/passive_endpoint.hpp>
#include <asiofi/semaphore.hpp> #include <asiofi/semaphore.hpp>
#include <azmq/socket.hpp>
#include <boost/asio.hpp> #include <boost/asio.hpp>
#include <memory> // unique_ptr #include <memory> // unique_ptr
#include <netinet/in.h> #include <mutex>
namespace fair namespace fair
{ {
@ -97,28 +97,22 @@ class Socket final : public fair::mq::Socket
Address fLocalAddr; Address fLocalAddr;
int fSndTimeout; int fSndTimeout;
int fRcvTimeout; int fRcvTimeout;
azmq::socket fSendQueueWrite, fSendQueueRead; std::mutex fSendQueueMutex, fRecvQueueMutex;
azmq::socket fRecvQueueWrite, fRecvQueueRead; std::queue<std::vector<MessagePtr>> fSendQueue, fRecvQueue;
asiofi::synchronized_semaphore fSendSem, fRecvSem; std::vector<MessagePtr> fInflightMultiPartMessage;
int64_t fMultiPartRecvCounter;
asiofi::synchronized_semaphore fSendPushSem, fSendPopSem, fRecvPushSem, fRecvPopSem;
std::atomic<bool> fNeedOfiMemoryRegistration; std::atomic<bool> fNeedOfiMemoryRegistration;
auto SendQueueReader() -> void;
auto OnSend(azmq::message& msg, size_t bytes_transferred) -> void;
auto RecvControlQueueReader() -> void;
auto OnRecvControl(ofi::unique_ptr<PostBuffer> ctrl) -> void;
auto OnReceive() -> void;
auto ReceiveImpl(MessagePtr& msg, const int flags, const int timeout) -> int;
auto SendImpl(std::vector<MessagePtr>& msgVec, const int flags, const int timeout) -> int64_t;
auto ReceiveImpl(std::vector<MessagePtr>& msgVec, const int flags, const int timeout) -> int64_t;
// auto WaitForControlPeer() -> void;
// auto AnnounceDataAddress() -> void;
auto InitOfi(Address addr) -> void; auto InitOfi(Address addr) -> void;
auto BindControlEndpoint() -> void; auto BindControlEndpoint() -> void;
auto BindDataEndpoint() -> void; auto BindDataEndpoint() -> void;
enum class Band { Control, Data }; enum class Band { Control, Data };
auto ConnectEndpoint(std::unique_ptr<asiofi::connected_endpoint>& endpoint, Band type) -> void; auto ConnectEndpoint(std::unique_ptr<asiofi::connected_endpoint>& endpoint, Band type) -> void;
// auto ReceiveDataAddressAnnouncement() -> void; auto SendQueueReader() -> void;
auto RecvControlQueueReader() -> void;
auto OnRecvControl(ofi::unique_ptr<ControlMessage> ctrl) -> void;
auto DataMessageReceived(MessagePtr msg) -> void;
}; /* class Socket */ }; /* class Socket */
struct SilentSocketError : SocketError { using SocketError::SocketError; }; struct SilentSocketError : SocketError { using SocketError::SocketError; };