Remove azmq on send, make connect/bind blocking

This commit is contained in:
Alexey Rybalchenko 2019-03-04 17:06:56 +01:00
parent 95ec56dcf0
commit 763c21ffdd
3 changed files with 77 additions and 121 deletions

View File

@ -43,6 +43,8 @@ Message::Message(boost::container::pmr::memory_resource* pmr, const size_t size)
, fPmr(pmr)
{
if (size) {
// static void* buffer = fPmr->allocate(size);
// fData = buffer;
fData = fPmr->allocate(size);
assert(fData);
}

View File

@ -52,13 +52,13 @@ Socket::Socket(Context& context, const string& type, const string& name, const s
, fMessagesRx(0)
, fSndTimeout(100)
, fRcvTimeout(100)
, fSendQueueWrite(fContext.GetIoContext(), ZMQ_PUSH)
, fSendQueueRead(fContext.GetIoContext(), ZMQ_PULL)
, fRecvQueueWrite(fContext.GetIoContext(), ZMQ_PUSH)
, fRecvQueueRead(fContext.GetIoContext(), ZMQ_PULL)
, fSendSem(fContext.GetIoContext(), 300)
, fRecvSem(fContext.GetIoContext(), 300)
, fNeedOfiMemoryRegistration(false)
, fBound(false)
, fConnected(false)
{
if (type != "pair") {
throw SocketError{tools::ToString("Socket type '", type, "' not implemented for ofi transport.")};
@ -66,10 +66,6 @@ Socket::Socket(Context& context, const string& type, const string& name, const s
// TODO wire this up with config
azmq::socket::snd_hwm send_max(300);
azmq::socket::rcv_hwm recv_max(300);
fSendQueueRead.set_option(send_max);
fSendQueueRead.set_option(recv_max);
fSendQueueWrite.set_option(send_max);
fSendQueueWrite.set_option(recv_max);
fRecvQueueRead.set_option(send_max);
fRecvQueueRead.set_option(recv_max);
fRecvQueueWrite.set_option(send_max);
@ -78,10 +74,6 @@ Socket::Socket(Context& context, const string& type, const string& name, const s
// Setup internal queue
auto hashed_id = std::hash<std::string>()(fId);
auto queue_id = tools::ToString("inproc://TXQUEUE", hashed_id);
LOG(debug) << "OFI transport (" << fId << "): " << "Binding SQR: " << queue_id;
fSendQueueRead.bind(queue_id);
LOG(debug) << "OFI transport (" << fId << "): " << "Connecting SQW: " << queue_id;
fSendQueueWrite.connect(queue_id);
queue_id = tools::ToString("inproc://RXQUEUE", hashed_id);
LOG(debug) << "OFI transport (" << fId << "): " << "Binding RQR: " << queue_id;
fRecvQueueRead.bind(queue_id);
@ -118,6 +110,7 @@ auto Socket::InitOfi(Address addr) -> void
auto Socket::Bind(const string& addr) -> bool
try {
fBound = false;
fLocalAddr = Context::VerifyAddress(addr);
if (fLocalAddr.Protocol == "verbs") {
fNeedOfiMemoryRegistration = true;
@ -130,17 +123,16 @@ try {
BindControlEndpoint();
while (!fBound) {
this_thread::sleep_for(chrono::milliseconds(100));
}
return true;
}
// TODO catch the correct ofi error
catch (const SilentSocketError& e)
{
} catch (const SilentSocketError& e) {// TODO catch the correct ofi error
// do not print error in this case, this is handled by FairMQDevice
// in case no connection could be established after trying a number of random ports from a range.
return false;
}
catch (const SocketError& e)
{
} catch (const SocketError& e) {
LOG(error) << "OFI transport: " << e.what();
return false;
}
@ -150,10 +142,8 @@ auto Socket::BindControlEndpoint() -> void
assert(!fControlEndpoint);
fPassiveEndpoint->listen([&](asiofi::info&& info) {
LOG(debug) << "OFI transport (" << fId
<< "): control band connection request received. Accepting ...";
fControlEndpoint = tools::make_unique<asiofi::connected_endpoint>(
fContext.GetIoContext(), *fOfiDomain, info);
LOG(debug) << "OFI transport (" << fId << "): control band connection request received. Accepting ...";
fControlEndpoint = tools::make_unique<asiofi::connected_endpoint>(fContext.GetIoContext(), *fOfiDomain, info);
fControlEndpoint->enable();
fControlEndpoint->accept([&]() {
LOG(debug) << "OFI transport (" << fId << "): control band connection accepted.";
@ -170,16 +160,14 @@ auto Socket::BindDataEndpoint() -> void
assert(!fDataEndpoint);
fPassiveEndpoint->listen([&](asiofi::info&& info) {
LOG(debug) << "OFI transport (" << fId
<< "): data band connection request received. Accepting ...";
fDataEndpoint = tools::make_unique<asiofi::connected_endpoint>(
fContext.GetIoContext(), *fOfiDomain, info);
LOG(debug) << "OFI transport (" << fId << "): data band connection request received. Accepting ...";
fDataEndpoint = tools::make_unique<asiofi::connected_endpoint>(fContext.GetIoContext(), *fOfiDomain, info);
fDataEndpoint->enable();
fDataEndpoint->accept([&]() {
LOG(debug) << "OFI transport (" << fId << "): data band connection accepted.";
boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::SendQueueReader, this));
boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::RecvControlQueueReader, this));
fBound = true;
});
});
@ -188,6 +176,7 @@ auto Socket::BindDataEndpoint() -> void
auto Socket::Connect(const string& address) -> bool
try {
fConnected = false;
fRemoteAddr = Context::VerifyAddress(address);
if (fRemoteAddr.Protocol == "verbs") {
fNeedOfiMemoryRegistration = true;
@ -197,15 +186,15 @@ try {
ConnectEndpoint(fControlEndpoint, Band::Control);
while (!fConnected) {
this_thread::sleep_for(chrono::milliseconds(100));
}
return true;
}
catch (const SilentSocketError& e)
{
} catch (const SilentSocketError& e) {
// do not print error in this case, this is handled by FairMQDevice
return false;
}
catch (const std::exception& e)
{
} catch (const std::exception& e) {
LOG(error) << "OFI transport: " << e.what();
return false;
}
@ -228,11 +217,11 @@ auto Socket::ConnectEndpoint(std::unique_ptr<asiofi::connected_endpoint>& endpoi
if (type == Band::Control) {
ConnectEndpoint(fDataEndpoint, Band::Data);
} else {
boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::SendQueueReader, this));
boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::RecvControlQueueReader, this));
fConnected = true;
}
} else {
LOG(error) << "asdf";
LOG(error) << "Could not connect on the first try";
}
});
}
@ -277,41 +266,10 @@ auto Socket::Send(MessagePtr& msg, const int /*timeout*/) -> int
{
// LOG(debug) << "OFI transport (" << fId << "): ENTER Send: data=" << msg->GetData() << ",size=" << msg->GetSize();
MessagePtr* msgptr(new std::unique_ptr<Message>(std::move(msg)));
try {
auto res = fSendQueueWrite.send(boost::asio::const_buffer(msgptr, sizeof(MessagePtr)), 0);
// LOG(debug) << "OFI transport (" << fId << "): LEAVE Send";
return res;
} catch (const std::exception& e) {
msg = std::move(*msgptr);
LOG(error) << e.what();
return -1;
} catch (const boost::system::error_code& e) {
msg = std::move(*msgptr);
LOG(error) << e;
return -1;
}
}
auto Socket::Receive(MessagePtr& msg, const int /*timeout*/) -> int
{
// LOG(debug) << "OFI transport (" << fId << "): ENTER Receive";
try {
azmq::message zmsg;
auto recv = fRecvQueueRead.receive(zmsg);
size_t size(0);
if (recv > 0) {
msg = std::move(*(static_cast<MessagePtr*>(zmsg.buffer().data())));
size = msg->GetSize();
}
fBytesRx += size;
fMessagesRx++;
// LOG(debug) << "OFI transport (" << fId << "): LEAVE Receive";
fSendSem.wait();
size_t size = msg->GetSize();
OnSend(msg);
return size;
} catch (const std::exception& e) {
LOG(error) << e.what();
@ -322,38 +280,10 @@ auto Socket::Receive(MessagePtr& msg, const int /*timeout*/) -> int
}
}
auto Socket::Send(std::vector<MessagePtr>& msgVec, const int timeout) -> int64_t
auto Socket::OnSend(MessagePtr& msg) -> void
{
return SendImpl(msgVec, 0, timeout);
}
// LOG(debug) << "OFI transport (" << fId << "): ENTER OnSend";
auto Socket::Receive(std::vector<MessagePtr>& msgVec, const int timeout) -> int64_t
{
return ReceiveImpl(msgVec, 0, timeout);
}
auto Socket::SendQueueReader() -> void
{
fSendSem.async_wait([&](const boost::system::error_code& ec) {
if (!ec) {
// LOG(debug) << "OFI transport (" << fId << "): < Wait fSendSem=" <<
// fSendSem.get_value();
fSendQueueRead.async_receive([&](const boost::system::error_code& ec2,
azmq::message& zmsg,
size_t bytes_transferred) {
if (!ec2) {
OnSend(zmsg, bytes_transferred);
}
});
}
});
}
auto Socket::OnSend(azmq::message& zmsg, size_t /*bytes_transferred*/) -> void
{
// LOG(debug) << "OFI transport (" << fId << "): ENTER OnSend: bytes_transferred=" << bytes_transferred;
MessagePtr msg(std::move(*(static_cast<MessagePtr*>(zmsg.buffer().data()))));
auto size = msg->GetSize();
// LOG(debug) << "OFI transport (" << fId << "): OnSend: data=" << msg->GetData() << ",size=" << msg->GetSize();
@ -372,8 +302,7 @@ auto Socket::OnSend(azmq::message& zmsg, size_t /*bytes_transferred*/) -> void
} else {
fControlEndpoint->send(ctrl_msg,
[&, ctrl2 = std::move(ctrl)](boost::asio::mutable_buffer) mutable {
// LOG(debug) << "OFI transport (" << fId << "): >>>>> Control
// message sent";
// LOG(debug) << "OFI transport (" << fId << "): >>>>> Control message sent";
});
}
@ -388,14 +317,12 @@ auto Socket::OnSend(azmq::message& zmsg, size_t /*bytes_transferred*/) -> void
desc,
[&, size, msg2 = std::move(msg), mr2 = std::move(mr)](
boost::asio::mutable_buffer) mutable {
// LOG(debug) << "OFI transport (" << fId << "): >>>>> Data
// buffer sent";
// LOG(debug) << "OFI transport (" << fId << "): >>>>> Data buffer sent";
fBytesTx += size;
fMessagesTx++;
fSendSem.async_signal([&](const boost::system::error_code& ec) {
if (!ec) {
// LOG(debug) << "OFI transport (" << fId << "): >
// Signal fSendSem=" << fSendSem.get_value();
// LOG(debug) << "OFI transport (" << fId << "): > Signal fSendSem=" << fSendSem.get_value();
}
});
});
@ -408,8 +335,7 @@ auto Socket::OnSend(azmq::message& zmsg, size_t /*bytes_transferred*/) -> void
fMessagesTx++;
fSendSem.async_signal([&](const boost::system::error_code& ec) {
if (!ec) {
// LOG(debug) << "OFI transport (" << fId << "): > Signal fSendSem="
// << fSendSem.get_value();
// LOG(debug) << "OFI transport (" << fId << "): > Signal fSendSem=" << fSendSem.get_value();
}
});
});
@ -423,10 +349,39 @@ auto Socket::OnSend(azmq::message& zmsg, size_t /*bytes_transferred*/) -> void
});
}
boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::SendQueueReader, this));
// LOG(debug) << "OFI transport (" << fId << "): LEAVE OnSend";
}
auto Socket::Receive(MessagePtr& msg, const int /*timeout*/) -> int
try {
// LOG(debug) << "OFI transport (" << fId << "): ENTER Receive";
azmq::message zmsg;
auto recv = fRecvQueueRead.receive(zmsg);
size_t size(0);
if (recv > 0) {
msg = std::move(*(static_cast<MessagePtr*>(zmsg.buffer().data())));
size = msg->GetSize();
}
fBytesRx += size;
fMessagesRx++;
// LOG(debug) << "OFI transport (" << fId << "): LEAVE Receive";
return size;
} catch (const std::exception& e) {
LOG(error) << e.what();
return -1;
} catch (const boost::system::error_code& e) {
LOG(error) << e;
return -1;
}
auto Socket::Receive(std::vector<MessagePtr>& msgVec, const int timeout) -> int64_t
{
return ReceiveImpl(msgVec, 0, timeout);
}
auto Socket::RecvControlQueueReader() -> void
{
fRecvSem.async_wait([&](const boost::system::error_code& ec) {
@ -478,9 +433,7 @@ auto Socket::OnRecvControl(ofi::unique_ptr<PostBuffer> ctrl) -> void
azmq::message(boost::asio::const_buffer(msgptr, sizeof(MessagePtr))),
[&](const boost::system::error_code& ec, size_t /*bytes_transferred2*/) {
if (!ec) {
// LOG(debug) << "OFI transport (" << fId
// << "): <<<<< Data buffer received, bytes_transferred2="
// << bytes_transferred2;
// LOG(debug) << "OFI transport (" << fId << "): <<<<< Data buffer received, bytes_transferred2=" << bytes_transferred2;
fRecvSem.async_signal([&](const boost::system::error_code& ec2) {
if (!ec2) {
//LOG(debug) << "OFI transport (" << fId << "): < Signal fRecvSem";
@ -498,9 +451,7 @@ auto Socket::OnRecvControl(ofi::unique_ptr<PostBuffer> ctrl) -> void
azmq::message(boost::asio::const_buffer(msgptr, sizeof(MessagePtr))),
[&](const boost::system::error_code& ec, size_t /*bytes_transferred2*/) {
if (!ec) {
// LOG(debug) << "OFI transport (" << fId
// << "): <<<<< Data buffer received, bytes_transferred2="
// << bytes_transferred2;
// LOG(debug) << "OFI transport (" << fId << "): <<<<< Data buffer received, bytes_transferred2=" << bytes_transferred2;
fRecvSem.async_signal([&](const boost::system::error_code& ec2) {
if (!ec2) {
// LOG(debug) << "OFI transport (" << fId << "): < Signal fRecvSem";
@ -515,9 +466,7 @@ auto Socket::OnRecvControl(ofi::unique_ptr<PostBuffer> ctrl) -> void
azmq::message(boost::asio::const_buffer(nullptr, 0)),
[&](const boost::system::error_code& ec, size_t /*bytes_transferred2*/) {
if (!ec) {
// LOG(debug) << "OFI transport (" << fId
// << "): <<<<< Data buffer received, bytes_transferred2="
// << bytes_transferred2;
// LOG(debug) << "OFI transport (" << fId << "): <<<<< Data buffer received, bytes_transferred2=" << bytes_transferred2;
fRecvSem.async_signal([&](const boost::system::error_code& ec2) {
if (!ec2) {
// LOG(debug) << "OFI transport (" << fId << "): < Signal fRecvSem";
@ -532,6 +481,11 @@ auto Socket::OnRecvControl(ofi::unique_ptr<PostBuffer> ctrl) -> void
// LOG(debug) << "OFI transport (" << fId << "): LEAVE OnRecvControl";
}
auto Socket::Send(std::vector<MessagePtr>& msgVec, const int timeout) -> int64_t
{
return SendImpl(msgVec, 0, timeout);
}
auto Socket::SendImpl(vector<FairMQMessagePtr>& /*msgVec*/, const int /*flags*/, const int /*timeout*/) -> int64_t
{
throw SocketError{"Not yet implemented."};

View File

@ -97,13 +97,13 @@ class Socket final : public fair::mq::Socket
Address fLocalAddr;
int fSndTimeout;
int fRcvTimeout;
azmq::socket fSendQueueWrite, fSendQueueRead;
azmq::socket fRecvQueueWrite, fRecvQueueRead;
asiofi::semaphore fSendSem, fRecvSem;
std::atomic<bool> fNeedOfiMemoryRegistration;
std::atomic<bool> fBound;
std::atomic<bool> fConnected;
auto SendQueueReader() -> void;
auto OnSend(azmq::message& msg, size_t bytes_transferred) -> void;
auto OnSend(MessagePtr& msg) -> void;
auto RecvControlQueueReader() -> void;
auto OnRecvControl(ofi::unique_ptr<PostBuffer> ctrl) -> void;
auto OnReceive() -> void;