Implement ofi Send/Receive

This commit is contained in:
Dennis Klein 2018-11-01 20:44:45 +01:00 committed by Dennis Klein
parent 91025cbc88
commit b394feca18
7 changed files with 146 additions and 89 deletions

View File

@ -9,6 +9,8 @@
#ifndef FAIR_MQ_OFI_CONTEXT_H #ifndef FAIR_MQ_OFI_CONTEXT_H
#define FAIR_MQ_OFI_CONTEXT_H #define FAIR_MQ_OFI_CONTEXT_H
#include <FairMQLogger.h>
#include <asiofi/connected_endpoint.hpp> #include <asiofi/connected_endpoint.hpp>
#include <asiofi/domain.hpp> #include <asiofi/domain.hpp>
#include <asiofi/fabric.hpp> #include <asiofi/fabric.hpp>
@ -63,6 +65,9 @@ class Context
static auto ConvertAddress(Address address) -> sockaddr_in; static auto ConvertAddress(Address address) -> sockaddr_in;
static auto ConvertAddress(sockaddr_in address) -> Address; static auto ConvertAddress(sockaddr_in address) -> Address;
static auto VerifyAddress(const std::string& address) -> Address; static auto VerifyAddress(const std::string& address) -> Address;
auto GetDomain() const -> const asiofi::domain& { return *fOfiDomain; }
auto Interrupt() -> void { LOG(debug) << "OFI transport: Interrupted (NOOP - not implemented)."; }
auto Resume() -> void { LOG(debug) << "OFI transport: Resumed (NOOP - not implemented)."; }
private: private:
void* fZmqContext; void* fZmqContext;

View File

@ -56,6 +56,8 @@ auto MakeControlMessage(A* pmr, Args&& ... args) -> CtrlMsgPtr<T>
if (std::is_same<T, DataAddressAnnouncement>::value) { if (std::is_same<T, DataAddressAnnouncement>::value) {
raw_ptr->type = ControlMessageType::DataAddressAnnouncement; raw_ptr->type = ControlMessageType::DataAddressAnnouncement;
} else if (std::is_same<T, PostBuffer>::value) {
raw_ptr->type = ControlMessageType::PostBuffer;
} }
return {raw_ptr, [=](T* p) { pmr->deallocate(p, sizeof(T)); }}; return {raw_ptr, [=](T* p) { pmr->deallocate(p, sizeof(T)); }};

View File

@ -10,6 +10,7 @@
#include <fairmq/Tools.h> #include <fairmq/Tools.h>
#include <FairMQLogger.h> #include <FairMQLogger.h>
#include <asiofi.hpp>
#include <cassert> #include <cassert>
#include <cstdlib> #include <cstdlib>
#include <zmq.h> #include <zmq.h>
@ -23,38 +24,48 @@ namespace ofi
using namespace std; using namespace std;
Message::Message() Message::Message(boost::container::pmr::memory_resource* pmr)
: fInitialSize(0) : fInitialSize(0)
, fSize(0) , fSize(0)
, fData(nullptr) , fData(nullptr)
, fFreeFunction(nullptr) , fFreeFunction(nullptr)
, fHint(nullptr) , fHint(nullptr)
, fPmr(pmr)
{ {
} }
Message::Message(const size_t size) Message::Message(boost::container::pmr::memory_resource* pmr, const size_t size)
: fInitialSize(size) : fInitialSize(size)
, fSize(size) , fSize(size)
, fData(nullptr) , fData(nullptr)
, fFreeFunction(nullptr) , fFreeFunction(nullptr)
, fHint(nullptr) , fHint(nullptr)
, fPmr(pmr)
{ {
if (size) { if (size) {
fData = malloc(size); fData = fPmr->allocate(size);
assert(fData); assert(fData);
} }
} }
Message::Message(void* data, const size_t size, fairmq_free_fn* ffn, void* hint) Message::Message(boost::container::pmr::memory_resource* pmr,
void* data,
const size_t size,
fairmq_free_fn* ffn,
void* hint)
: fInitialSize(size) : fInitialSize(size)
, fSize(size) , fSize(size)
, fData(data) , fData(data)
, fFreeFunction(ffn) , fFreeFunction(ffn)
, fHint(hint) , fHint(hint)
{ , fPmr(pmr)
} {}
Message::Message(FairMQUnmanagedRegionPtr& /*region*/, void* /*data*/, const size_t /*size*/, void* /*hint*/) Message::Message(boost::container::pmr::memory_resource* /*pmr*/,
FairMQUnmanagedRegionPtr& /*region*/,
void* /*data*/,
const size_t /*size*/,
void* /*hint*/)
{ {
throw MessageError{"Not yet implemented."}; throw MessageError{"Not yet implemented."};
} }
@ -62,9 +73,9 @@ Message::Message(FairMQUnmanagedRegionPtr& /*region*/, void* /*data*/, const siz
auto Message::Rebuild() -> void auto Message::Rebuild() -> void
{ {
if (fFreeFunction) { if (fFreeFunction) {
fFreeFunction(fData, fHint); fFreeFunction(fData, fHint);
} else { } else {
free(fData); fPmr->deallocate(fData, fSize);
} }
fData = nullptr; fData = nullptr;
fInitialSize = 0; fInitialSize = 0;
@ -78,10 +89,10 @@ auto Message::Rebuild(const size_t size) -> void
if (fFreeFunction) { if (fFreeFunction) {
fFreeFunction(fData, fHint); fFreeFunction(fData, fHint);
} else { } else {
free(fData); fPmr->deallocate(fData, fSize);
} }
if (size) { if (size) {
fData = malloc(size); fData = fPmr->allocate(size);
assert(fData); assert(fData);
} else { } else {
fData = nullptr; fData = nullptr;
@ -97,10 +108,10 @@ auto Message::Rebuild(void* /*data*/, const size_t size, fairmq_free_fn* ffn, vo
if (fFreeFunction) { if (fFreeFunction) {
fFreeFunction(fData, fHint); fFreeFunction(fData, fHint);
} else { } else {
free(fData); fPmr->deallocate(fData, fSize);
} }
if (size) { if (size) {
fData = malloc(size); fData = fPmr->allocate(size);
assert(fData); assert(fData);
} else { } else {
fData = nullptr; fData = nullptr;
@ -143,7 +154,7 @@ Message::~Message()
if (fFreeFunction) { if (fFreeFunction) {
fFreeFunction(fData, fHint); fFreeFunction(fData, fHint);
} else { } else {
free(fData); fPmr->deallocate(fData, fSize);
} }
} }

View File

@ -12,10 +12,10 @@
#include <FairMQMessage.h> #include <FairMQMessage.h>
#include <FairMQUnmanagedRegion.h> #include <FairMQUnmanagedRegion.h>
#include <zmq.h> #include <asiofi.hpp>
#include <cstddef> // size_t
#include <atomic> #include <atomic>
#include <cstddef> // size_t
#include <zmq.h>
namespace fair namespace fair
{ {
@ -33,10 +33,18 @@ namespace ofi
class Message final : public fair::mq::Message class Message final : public fair::mq::Message
{ {
public: public:
Message(); Message(boost::container::pmr::memory_resource* pmr);
Message(const size_t size); Message(boost::container::pmr::memory_resource* pmr, const size_t size);
Message(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr); Message(boost::container::pmr::memory_resource* pmr,
Message(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0); void* data,
const size_t size,
fairmq_free_fn* ffn,
void* hint = nullptr);
Message(boost::container::pmr::memory_resource* pmr,
FairMQUnmanagedRegionPtr& region,
void* data,
const size_t size,
void* hint = 0);
Message(const Message&) = delete; Message(const Message&) = delete;
Message operator=(const Message&) = delete; Message operator=(const Message&) = delete;
@ -62,6 +70,7 @@ class Message final : public fair::mq::Message
void* fData; void* fData;
fairmq_free_fn* fFreeFunction; fairmq_free_fn* fFreeFunction;
void* fHint; void* fHint;
boost::container::pmr::memory_resource* fPmr;
}; /* class Message */ }; /* class Message */
} /* namespace ofi */ } /* namespace ofi */

View File

@ -13,6 +13,8 @@
#include <FairMQLogger.h> #include <FairMQLogger.h>
#include <arpa/inet.h> #include <arpa/inet.h>
#include <asiofi.hpp>
#include <boost/asio/buffer.hpp>
#include <cstring> #include <cstring>
#include <netinet/in.h> #include <netinet/in.h>
#include <rdma/fabric.h> #include <rdma/fabric.h>
@ -43,7 +45,6 @@ Socket::Socket(Context& context, const string& type, const string& name, const s
, fMessagesTx(0) , fMessagesTx(0)
, fMessagesRx(0) , fMessagesRx(0)
, fContext(context) , fContext(context)
, fWaitingForControlPeer(false)
, fIoStrand(fContext.GetIoContext()) , fIoStrand(fContext.GetIoContext())
, fSndTimeout(100) , fSndTimeout(100)
, fRcvTimeout(100) , fRcvTimeout(100)
@ -230,6 +231,14 @@ auto Socket::SendControlMessage(CtrlMsgPtr<ControlMessage> ctrl) -> void
std::memcpy(zmq_msg_data(msg), ctrl.get(), sizeof(DataAddressAnnouncement)); std::memcpy(zmq_msg_data(msg), ctrl.get(), sizeof(DataAddressAnnouncement));
} }
break; break;
case ControlMessageType::PostBuffer:
{
auto ret = zmq_msg_init_size(msg, sizeof(PostBuffer));
(void)ret;
assert(ret == 0);
std::memcpy(zmq_msg_data(msg), ctrl.get(), sizeof(PostBuffer));
}
break;
default: default:
throw SocketError(tools::ToString("Cannot send control message of unknown type.")); throw SocketError(tools::ToString("Cannot send control message of unknown type."));
} }
@ -274,6 +283,13 @@ auto Socket::ReceiveControlMessage() -> CtrlMsgPtr<ControlMessage>
// LOG(debug) << "Received control message: " << ctrl->DebugString(); // LOG(debug) << "Received control message: " << ctrl->DebugString();
return StaticUniquePtrUpcast<ControlMessage>(std::move(daa)); return StaticUniquePtrUpcast<ControlMessage>(std::move(daa));
} }
case ControlMessageType::PostBuffer: {
assert(msg_size == sizeof(PostBuffer));
auto pb = MakeControlMessage<PostBuffer>(&fCtrlMemPool);
std::memcpy(pb.get(), msg_data, sizeof(PostBuffer));
// LOG(debug) << "Received control message: " << ctrl->DebugString();
return StaticUniquePtrUpcast<ControlMessage>(std::move(pb));
}
default: default:
throw SocketError(tools::ToString("Received control message of unknown type.")); throw SocketError(tools::ToString("Received control message of unknown type."));
} }
@ -327,43 +343,53 @@ auto Socket::TryReceive(MessagePtr& msg) -> int { return ReceiveImpl(msg, ZMQ_DO
auto Socket::TrySend(std::vector<MessagePtr>& msgVec) -> int64_t { return SendImpl(msgVec, ZMQ_DONTWAIT, 0); } auto Socket::TrySend(std::vector<MessagePtr>& msgVec) -> int64_t { return SendImpl(msgVec, ZMQ_DONTWAIT, 0); }
auto Socket::TryReceive(std::vector<MessagePtr>& msgVec) -> int64_t { return ReceiveImpl(msgVec, ZMQ_DONTWAIT, 0); } auto Socket::TryReceive(std::vector<MessagePtr>& msgVec) -> int64_t { return ReceiveImpl(msgVec, ZMQ_DONTWAIT, 0); }
#include <mutex>
#include <condition_variable>
auto Socket::SendImpl(FairMQMessagePtr& msg, const int /*flags*/, const int /*timeout*/) -> int auto Socket::SendImpl(FairMQMessagePtr& msg, const int /*flags*/, const int /*timeout*/) -> int
try { try {
auto size = msg->GetSize(); auto size = msg->GetSize();
LOG(debug) << "OFI transport (" << fId << "): ENTER SendImpl";
this_thread::sleep_for(std::chrono::seconds(10));
// Create and send control message // Create and send control message
// auto ctrl = tools::make_unique<ControlMessage>(); auto pb = MakeControlMessage<PostBuffer>(&fCtrlMemPool);
// auto buf = tools::make_unique<PostBuffer>(); pb->size = size;
// buf->set_size(size); SendControlMessage(StaticUniquePtrUpcast<ControlMessage>(std::move(pb)));
// ctrl->set_allocated_post_buffer(buf.release()); LOG(debug) << "OFI transport (" << fId << "): >>>>> SendImpl: Control message sent, size=" << size;
// assert(ctrl->IsInitialized());
// SendControlMessage(move(ctrl));
if (size) { if (size) {
// Receive and process control message boost::asio::mutable_buffer buffer(msg->GetData(), size);
// auto ctrl2 = ReceiveControlMessage(); asiofi::memory_region mr(fContext.GetDomain(), buffer, asiofi::mr::access::send);
// assert(ctrl2->has_post_buffer_acknowledgement());
// assert(ctrl2->post_buffer_acknowledgement().size() == size);
// Send data std::mutex m;
// fi_context ctx; std::condition_variable cv;
// auto ret = fi_send(fDataEndpoint, msg->GetData(), size, nullptr, fRemoteDataAddr, &ctx); bool completed(false);
// if (ret < 0)
// throw SocketError(tools::ToString("Failed posting ofi send buffer, reason: ", fi_strerror(ret)));
}
if (size) { fDataEndpoint->send(
// fi_cq_err_entry cqEntry; buffer,
// auto ret = fi_cq_sread(fDataCompletionQueueTx, &cqEntry, 1, nullptr, -1); mr.desc(),
// if (ret != 1) [&](boost::asio::mutable_buffer) {
// throw SocketError(tools::ToString("Failed reading ofi tx completion queue event, reason: ", fi_strerror(ret))); {
std::unique_lock<std::mutex> lk(m);
completed = true;
}
cv.notify_one();
LOG(debug) << "OFI transport (" << fId << "): > SendImpl: Data buffer sent";
}
);
{
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, [&](){ return completed; });
}
LOG(debug) << "OFI transport (" << fId << "): >>>>> SendImpl: Data send buffer posted";
} }
msg.reset(nullptr); msg.reset(nullptr);
fBytesTx += size; fBytesTx += size;
fMessagesTx++; fMessagesTx++;
LOG(debug) << "OFI transport (" << fId << "): LEAVE SendImpl";
return size; return size;
} }
catch (const SilentSocketError& e) catch (const SilentSocketError& e)
@ -376,52 +402,47 @@ catch (const std::exception& e)
return -1; return -1;
} }
auto Socket::ReceiveImpl(FairMQMessagePtr& /*msg*/, const int /*flags*/, const int /*timeout*/) -> int auto Socket::ReceiveImpl(FairMQMessagePtr& msg, const int /*flags*/, const int /*timeout*/) -> int
try { try {
this_thread::sleep_for(std::chrono::seconds(10)); LOG(debug) << "OFI transport (" << fId << "): ENTER ReceiveImpl";
if (fWaitingForControlPeer) {
WaitForControlPeer();
// AnnounceDataAddress();
// ProcessDataAddressAnnouncement(ReceiveControlMessage());
}
// Receive and process control message // Receive and process control message
// auto ctrl = ReceiveControlMessage(); auto pb = StaticUniquePtrDowncast<PostBuffer>(ReceiveControlMessage());
// assert(ctrl->has_post_buffer()); assert(pb.get());
// auto postBuffer = ctrl->post_buffer(); auto size = pb->size;
// auto size = postBuffer.size(); LOG(debug) << "OFI transport (" << fId << "): <<<<< ReceiveImpl: Control message received, size=" << size;
// Receive data // Receive data
// if (size) { if (size) {
// fi_context ctx; msg->Rebuild(size);
// msg->Rebuild(size); boost::asio::mutable_buffer buffer(msg->GetData(), size);
// auto buf = msg->GetData(); asiofi::memory_region mr(fContext.GetDomain(), buffer, asiofi::mr::access::recv);
// auto size2 = msg->GetSize();
// auto ret = fi_recv(fDataEndpoint, buf, size2, nullptr, fRemoteDataAddr, &ctx);
// if (ret < 0)
// throw SocketError(tools::ToString("Failed posting ofi receive buffer, reason: ", fi_strerror(ret)));
// Create and send control message std::mutex m;
// auto ctrl2 = tools::make_unique<ControlMessage>(); std::condition_variable cv;
// auto ack = tools::make_unique<PostBufferAcknowledgement>(); bool completed(false);
// ack->set_size(msg->GetSize());
// ctrl2->set_allocated_post_buffer_acknowledgement(ack.release());
// assert(ctrl2->IsInitialized());
// SendControlMessage(move(ctrl2));
// fi_cq_err_entry cqEntry; fDataEndpoint->recv(buffer, mr.desc(), [&](boost::asio::mutable_buffer) {
// ret = fi_cq_sread(fDataCompletionQueueRx, &cqEntry, 1, nullptr, -1); {
// if (ret != 1) std::unique_lock<std::mutex> lk(m);
// throw SocketError(tools::ToString("Failed reading ofi rx completion queue event, reason: ", fi_strerror(ret))); completed = true;
// assert(cqEntry.len == size2); }
// assert(cqEntry.buf == buf); cv.notify_one();
// } }
);
LOG(debug) << "OFI transport (" << fId << "): <<<<< ReceiveImpl: Data buffer posted";
// fBytesRx += size; {
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, [&](){ return completed; });
}
LOG(debug) << "OFI transport (" << fId << "): <<<<< ReceiveImpl: Data received";
}
fBytesRx += size;
fMessagesRx++; fMessagesRx++;
// return size; LOG(debug) << "OFI transport (" << fId << "): EXIT ReceiveImpl";
return 0; return size;
} }
catch (const SilentSocketError& e) catch (const SilentSocketError& e)
{ {

View File

@ -36,22 +36,28 @@ catch (ContextError& e)
auto TransportFactory::CreateMessage() const -> MessagePtr auto TransportFactory::CreateMessage() const -> MessagePtr
{ {
return MessagePtr{new Message()}; return MessagePtr{new Message(&fMemoryResource)};
} }
auto TransportFactory::CreateMessage(const size_t size) const -> MessagePtr auto TransportFactory::CreateMessage(const size_t size) const -> MessagePtr
{ {
return MessagePtr{new Message(size)}; return MessagePtr{new Message(&fMemoryResource, size)};
} }
auto TransportFactory::CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint) const -> MessagePtr auto TransportFactory::CreateMessage(void* data,
const size_t size,
fairmq_free_fn* ffn,
void* hint) const -> MessagePtr
{ {
return MessagePtr{new Message(data, size, ffn, hint)}; return MessagePtr{new Message(&fMemoryResource, data, size, ffn, hint)};
} }
auto TransportFactory::CreateMessage(UnmanagedRegionPtr& region, void* data, const size_t size, void* hint) const -> MessagePtr auto TransportFactory::CreateMessage(UnmanagedRegionPtr& region,
void* data,
const size_t size,
void* hint) const -> MessagePtr
{ {
return MessagePtr{new Message(region, data, size, hint)}; return MessagePtr{new Message(&fMemoryResource, region, data, size, hint)};
} }
auto TransportFactory::CreateSocket(const string& type, const string& name) -> SocketPtr auto TransportFactory::CreateSocket(const string& type, const string& name) -> SocketPtr

View File

@ -13,6 +13,8 @@
#include <options/FairMQProgOptions.h> #include <options/FairMQProgOptions.h>
#include <fairmq/ofi/Context.h> #include <fairmq/ofi/Context.h>
#include <asiofi.hpp>
namespace fair namespace fair
{ {
namespace mq namespace mq
@ -48,12 +50,13 @@ class TransportFactory final : public FairMQTransportFactory
auto GetType() const -> Transport override; auto GetType() const -> Transport override;
void Interrupt() override {} void Interrupt() override { fContext.Interrupt(); }
void Resume() override {} void Resume() override { fContext.Resume(); }
void Reset() override {} void Reset() override {}
private: private:
mutable Context fContext; mutable Context fContext;
asiofi::allocated_pool_resource fMemoryResource;
}; /* class TransportFactory */ }; /* class TransportFactory */
} /* namespace ofi */ } /* namespace ofi */