Alignment part I - Interface and shmem send

This commit is contained in:
Alexey Rybalchenko 2020-05-19 20:52:21 +02:00
parent 20544e1f18
commit 53a4d17f8b
11 changed files with 161 additions and 7 deletions

View File

@ -17,11 +17,26 @@
using fairmq_free_fn = void(void* data, void* hint);
class FairMQTransportFactory;
namespace fair
{
namespace mq
{
struct Alignment
{
size_t alignment;
explicit operator size_t() const { return alignment; }
};
} /* namespace mq */
} /* namespace fair */
class FairMQMessage
{
public:
FairMQMessage() = default;
FairMQMessage(FairMQTransportFactory* factory) : fTransport(factory) {}
virtual void Rebuild() = 0;
virtual void Rebuild(const size_t size) = 0;
virtual void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) = 0;

View File

@ -18,7 +18,7 @@
#include <fairmq/Transports.h>
#include <string>
#include <memory>
#include <memory> // shared_ptr
#include <vector>
#include <unordered_map>
#include <stdexcept>
@ -47,13 +47,22 @@ class FairMQTransportFactory
fair::mq::ChannelResource* GetMemoryResource() { return &fMemoryResource; }
operator fair::mq::ChannelResource*() { return &fMemoryResource; }
/// @brief Create empty FairMQMessage
/// @brief Create empty FairMQMessage (for receiving)
/// @return pointer to FairMQMessage
virtual FairMQMessagePtr CreateMessage() = 0;
/// @brief Create empty FairMQMessage (for receiving), align received buffer to specified alignment
/// @param alignment alignment to align received buffer to
/// @return pointer to FairMQMessage
virtual FairMQMessagePtr CreateMessage(fair::mq::Alignment alignment) = 0;
/// @brief Create new FairMQMessage of specified size
/// @param size message size
/// @return pointer to FairMQMessage
virtual FairMQMessagePtr CreateMessage(const size_t size) = 0;
/// @brief Create new FairMQMessage of specified size and alignment
/// @param size message size
/// @param alignment message alignment
/// @return pointer to FairMQMessage
virtual FairMQMessagePtr CreateMessage(const size_t size, fair::mq::Alignment alignment) = 0;
/// @brief Create new FairMQMessage with user provided buffer and size
/// @param data pointer to user provided buffer
/// @param size size of the user provided buffer

View File

@ -34,6 +34,16 @@ Message::Message(boost::container::pmr::memory_resource* pmr)
{
}
Message::Message(boost::container::pmr::memory_resource* pmr, Alignment /* alignment */)
: fInitialSize(0)
, fSize(0)
, fData(nullptr)
, fFreeFunction(nullptr)
, fHint(nullptr)
, fPmr(pmr)
{
}
Message::Message(boost::container::pmr::memory_resource* pmr, const size_t size)
: fInitialSize(size)
, fSize(size)
@ -48,6 +58,20 @@ Message::Message(boost::container::pmr::memory_resource* pmr, const size_t size)
}
}
Message::Message(boost::container::pmr::memory_resource* pmr, const size_t size, Alignment /* alignment */)
: fInitialSize(size)
, fSize(size)
, fData(nullptr)
, fFreeFunction(nullptr)
, fHint(nullptr)
, fPmr(pmr)
{
if (size) {
fData = fPmr->allocate(size);
assert(fData);
}
}
Message::Message(boost::container::pmr::memory_resource* pmr,
void* data,
const size_t size,

View File

@ -34,7 +34,9 @@ class Message final : public fair::mq::Message
{
public:
Message(boost::container::pmr::memory_resource* pmr);
Message(boost::container::pmr::memory_resource* pmr, Alignment alignment);
Message(boost::container::pmr::memory_resource* pmr, const size_t size);
Message(boost::container::pmr::memory_resource* pmr, const size_t size, Alignment alignment);
Message(boost::container::pmr::memory_resource* pmr,
void* data,
const size_t size,

View File

@ -41,11 +41,23 @@ auto TransportFactory::CreateMessage() -> MessagePtr
return MessagePtr{new Message(&fMemoryResource)};
}
auto TransportFactory::CreateMessage(Alignment /* alignment */) -> MessagePtr
{
// TODO Do not ignore alignment
return MessagePtr{new Message(&fMemoryResource)};
}
auto TransportFactory::CreateMessage(const size_t size) -> MessagePtr
{
return MessagePtr{new Message(&fMemoryResource, size)};
}
auto TransportFactory::CreateMessage(const size_t size, Alignment /* alignment */) -> MessagePtr
{
// TODO Do not ignore alignment
return MessagePtr{new Message(&fMemoryResource, size)};
}
auto TransportFactory::CreateMessage(void* data,
const size_t size,
fairmq_free_fn* ffn,

View File

@ -36,7 +36,9 @@ class TransportFactory final : public FairMQTransportFactory
TransportFactory operator=(const TransportFactory&) = delete;
auto CreateMessage() -> MessagePtr override;
auto CreateMessage(Alignment alignment) -> MessagePtr override;
auto CreateMessage(const std::size_t size) -> MessagePtr override;
auto CreateMessage(const std::size_t size, Alignment alignment) -> MessagePtr override;
auto CreateMessage(void* data, const std::size_t size, fairmq_free_fn* ffn, void* hint = nullptr) -> MessagePtr override;
auto CreateMessage(UnmanagedRegionPtr& region, void* data, const std::size_t size, void* hint = nullptr) -> MessagePtr override;

View File

@ -47,6 +47,17 @@ class Message final : public fair::mq::Message
fManager.IncrementMsgCounter();
}
Message(Manager& manager, Alignment /* alignment */, FairMQTransportFactory* factory = nullptr)
: fair::mq::Message(factory)
, fManager(manager)
, fQueued(false)
, fMeta{0, 0, 0, -1}
, fRegionPtr(nullptr)
, fLocalPtr(nullptr)
{
fManager.IncrementMsgCounter();
}
Message(Manager& manager, const size_t size, FairMQTransportFactory* factory = nullptr)
: fair::mq::Message(factory)
, fManager(manager)
@ -59,6 +70,18 @@ class Message final : public fair::mq::Message
fManager.IncrementMsgCounter();
}
Message(Manager& manager, const size_t size, Alignment alignment, FairMQTransportFactory* factory = nullptr)
: fair::mq::Message(factory)
, fManager(manager)
, fQueued(false)
, fMeta{0, 0, 0, -1}
, fRegionPtr(nullptr)
, fLocalPtr(nullptr)
{
InitializeChunk(size, static_cast<size_t>(alignment));
fManager.IncrementMsgCounter();
}
Message(Manager& manager, void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr, FairMQTransportFactory* factory = nullptr)
: fair::mq::Message(factory)
, fManager(manager)
@ -219,19 +242,24 @@ class Message final : public fair::mq::Message
mutable Region* fRegionPtr;
mutable char* fLocalPtr;
bool InitializeChunk(const size_t size)
bool InitializeChunk(const size_t size, size_t alignment = 0)
{
tools::RateLimiter rateLimiter(20);
while (fMeta.fHandle < 0) {
try {
boost::interprocess::managed_shared_memory::size_type actualSize = size;
char* hint = 0; // unused for boost::interprocess::allocate_new
fLocalPtr = fManager.Segment().allocation_command<char>(boost::interprocess::allocate_new, size, actualSize, hint);
// boost::interprocess::managed_shared_memory::size_type actualSize = size;
// char* hint = 0; // unused for boost::interprocess::allocate_new
// fLocalPtr = fManager.Segment().allocation_command<char>(boost::interprocess::allocate_new, size, actualSize, hint);
if (alignment == 0) {
fLocalPtr = reinterpret_cast<char*>(fManager.Segment().allocate(size));
} else {
fLocalPtr = reinterpret_cast<char*>(fManager.Segment().allocate_aligned(size, alignment));
}
} catch (boost::interprocess::bad_alloc& ba) {
// LOG(warn) << "Shared memory full...";
if (fManager.ThrowingOnBadAlloc()) {
throw MessageBadAlloc(tools::ToString("shmem: could not create a message of size ", size));
throw MessageBadAlloc(tools::ToString("shmem: could not create a message of size ", size, ", alignment: ", (alignment != 0) ? std::to_string(alignment) : "default"));
}
rateLimiter.maybe_sleep();
if (fManager.Interrupted()) {

View File

@ -101,11 +101,21 @@ class TransportFactory final : public fair::mq::TransportFactory
return tools::make_unique<Message>(*fManager, this);
}
MessagePtr CreateMessage(Alignment alignment) override
{
return tools::make_unique<Message>(*fManager, alignment, this);
}
MessagePtr CreateMessage(const size_t size) override
{
return tools::make_unique<Message>(*fManager, size, this);
}
MessagePtr CreateMessage(const size_t size, Alignment alignment) override
{
return tools::make_unique<Message>(*fManager, size, alignment, this);
}
MessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override
{
return tools::make_unique<Message>(*fManager, data, size, ffn, hint, this);

View File

@ -47,6 +47,17 @@ class Message final : public fair::mq::Message
LOG(error) << "failed initializing message, reason: " << zmq_strerror(errno);
}
}
Message(Alignment /* alignment */, FairMQTransportFactory* factory = nullptr)
: fair::mq::Message(factory)
, fUsedSizeModified(false)
, fUsedSize()
, fMsg(tools::make_unique<zmq_msg_t>())
, fViewMsg(nullptr)
{
if (zmq_msg_init(fMsg.get()) != 0) {
LOG(error) << "failed initializing message, reason: " << zmq_strerror(errno);
}
}
Message(const size_t size, FairMQTransportFactory* factory = nullptr)
: fair::mq::Message(factory)
@ -60,6 +71,18 @@ class Message final : public fair::mq::Message
}
}
Message(const size_t size, Alignment /* alignment */, FairMQTransportFactory* factory = nullptr)
: fair::mq::Message(factory)
, fUsedSizeModified(false)
, fUsedSize(size)
, fMsg(tools::make_unique<zmq_msg_t>())
, fViewMsg(nullptr)
{
if (zmq_msg_init_size(fMsg.get(), size) != 0) {
LOG(error) << "failed initializing message with size, reason: " << zmq_strerror(errno);
}
}
Message(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr, FairMQTransportFactory* factory = nullptr)
: fair::mq::Message(factory)
, fUsedSizeModified(false)

View File

@ -55,12 +55,22 @@ class TransportFactory final : public FairMQTransportFactory
{
return tools::make_unique<Message>(this);
}
MessagePtr CreateMessage(Alignment alignment) override
{
return tools::make_unique<Message>(alignment, this);
}
MessagePtr CreateMessage(const size_t size) override
{
return tools::make_unique<Message>(size, this);
}
MessagePtr CreateMessage(const size_t size, Alignment alignment) override
{
return tools::make_unique<Message>(size, alignment, this);
}
MessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override
{
return tools::make_unique<Message>(data, size, ffn, hint, this);

View File

@ -15,6 +15,7 @@
#include <gtest/gtest.h>
#include <string>
#include <cstdint>
namespace
{
@ -77,6 +78,19 @@ void RunMsgRebuild(const string& transport)
EXPECT_EQ(string(static_cast<char*>(msg->GetData()), msg->GetSize()), string("asdf"));
}
void Alignment(const string& transport)
{
size_t session{fair::mq::tools::UuidHash()};
fair::mq::ProgOptions config;
config.SetProperty<string>("session", to_string(session));
auto factory = FairMQTransportFactory::CreateTransportFactory(transport, fair::mq::tools::Uuid(), &config);
FairMQMessagePtr msg(factory->CreateMessage(100, fair::mq::Alignment{64}));
ASSERT_EQ(reinterpret_cast<uintptr_t>(msg->GetData()) % 64, 0);
}
TEST(Resize, zeromq)
{
RunPushPullWithMsgResize("zeromq", "ipc://test_message_resize");
@ -97,4 +111,9 @@ TEST(Rebuild, shmem)
RunMsgRebuild("shmem");
}
TEST(Alignment, shmem) // TODO: add test for ZeroMQ once it is implemented
{
Alignment("shmem");
}
} // namespace