FairMQ: Add hint argument to the region callback, settable per message.

This commit is contained in:
Alexey Rybalchenko 2017-12-15 14:44:17 +01:00 committed by Mohammad Al-Turany
parent ba78964e29
commit b59f1b950d
18 changed files with 65 additions and 38 deletions

View File

@ -52,7 +52,7 @@ class FairMQTransportFactory
/// @return pointer to FairMQMessage
virtual FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) const = 0;
virtual FairMQMessagePtr CreateMessage(FairMQUnmanagedRegionPtr& unmanagedRegion, void* data, const size_t size) const = 0;
virtual FairMQMessagePtr CreateMessage(FairMQUnmanagedRegionPtr& unmanagedRegion, void* data, const size_t size, void* hint = 0) const = 0;
/// Create a socket
virtual FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name) const = 0;

View File

@ -13,7 +13,7 @@
#include <memory> // std::unique_ptr
#include <functional> // std::function
using FairMQRegionCallback = std::function<void(void*, size_t)>;
using FairMQRegionCallback = std::function<void(void*, size_t, void*)>;
class FairMQUnmanagedRegion
{

View File

@ -27,6 +27,7 @@ FairMQ::Transport FairMQMessageNN::fTransportType = FairMQ::Transport::NN;
FairMQMessageNN::FairMQMessageNN()
: fMessage(nullptr)
, fSize(0)
, fHint(0)
, fReceiving(false)
, fRegionPtr(nullptr)
{
@ -40,6 +41,7 @@ FairMQMessageNN::FairMQMessageNN()
FairMQMessageNN::FairMQMessageNN(const size_t size)
: fMessage(nullptr)
, fSize(0)
, fHint(0)
, fReceiving(false)
, fRegionPtr(nullptr)
{
@ -60,6 +62,7 @@ FairMQMessageNN::FairMQMessageNN(const size_t size)
FairMQMessageNN::FairMQMessageNN(void* data, const size_t size, fairmq_free_fn* ffn, void* hint)
: fMessage(nullptr)
, fSize(0)
, fHint(0)
, fReceiving(false)
, fRegionPtr(nullptr)
{
@ -83,9 +86,10 @@ FairMQMessageNN::FairMQMessageNN(void* data, const size_t size, fairmq_free_fn*
}
}
FairMQMessageNN::FairMQMessageNN(FairMQUnmanagedRegionPtr& region, void* data, const size_t size)
FairMQMessageNN::FairMQMessageNN(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint)
: fMessage(data)
, fSize(size)
, fHint(reinterpret_cast<size_t>(hint))
, fReceiving(false)
, fRegionPtr(region.get())
{

View File

@ -32,7 +32,7 @@ class FairMQMessageNN : public FairMQMessage
FairMQMessageNN();
FairMQMessageNN(const size_t size);
FairMQMessageNN(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr);
FairMQMessageNN(FairMQUnmanagedRegionPtr& region, void* data, const size_t size);
FairMQMessageNN(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0);
FairMQMessageNN(const FairMQMessageNN&) = delete;
FairMQMessageNN operator=(const FairMQMessageNN&) = delete;
@ -56,6 +56,7 @@ class FairMQMessageNN : public FairMQMessage
private:
void* fMessage;
size_t fSize;
size_t fHint;
bool fReceiving;
FairMQUnmanagedRegion* fRegionPtr;
static FairMQ::Transport fTransportType;

View File

@ -138,7 +138,7 @@ int FairMQSocketNN::Send(FairMQMessagePtr& msg, const int flags)
{
nbytes = nn_send(fSocket, bufPtr, msg->GetSize(), flags);
// nn_send copies the data, safe to call region callback here
static_cast<FairMQUnmanagedRegionNN*>(msgPtr->fRegionPtr)->fCallback(bufPtr, msg->GetSize());
static_cast<FairMQUnmanagedRegionNN*>(msgPtr->fRegionPtr)->fCallback(bufPtr, msg->GetSize(), reinterpret_cast<void*>(msgPtr->fHint));
}
if (nbytes >= 0)
@ -252,7 +252,7 @@ int64_t FairMQSocketNN::Send(vector<FairMQMessagePtr>& msgVec, const int flags)
// call region callback
if (partPtr->fRegionPtr)
{
static_cast<FairMQUnmanagedRegionNN*>(partPtr->fRegionPtr)->fCallback(partPtr->GetMessage(), msgVec[i]->GetSize());
static_cast<FairMQUnmanagedRegionNN*>(partPtr->fRegionPtr)->fCallback(partPtr->GetMessage(), partPtr->GetSize(), reinterpret_cast<void*>(partPtr->fHint));
}
}

View File

@ -35,9 +35,9 @@ FairMQMessagePtr FairMQTransportFactoryNN::CreateMessage(void* data, const size_
return unique_ptr<FairMQMessage>(new FairMQMessageNN(data, size, ffn, hint));
}
FairMQMessagePtr FairMQTransportFactoryNN::CreateMessage(FairMQUnmanagedRegionPtr& region, void* data, const size_t size) const
FairMQMessagePtr FairMQTransportFactoryNN::CreateMessage(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint) const
{
return unique_ptr<FairMQMessage>(new FairMQMessageNN(region, data, size));
return unique_ptr<FairMQMessage>(new FairMQMessageNN(region, data, size, hint));
}
FairMQSocketPtr FairMQTransportFactoryNN::CreateSocket(const string& type, const string& name) const

View File

@ -28,7 +28,7 @@ class FairMQTransportFactoryNN : public FairMQTransportFactory
FairMQMessagePtr CreateMessage() const override;
FairMQMessagePtr CreateMessage(const size_t size) const override;
FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) const override;
FairMQMessagePtr CreateMessage(FairMQUnmanagedRegionPtr& region, void* data, const size_t size) const override;
FairMQMessagePtr CreateMessage(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0) const override;
FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name) const override;

View File

@ -46,11 +46,12 @@ struct MonitorStatus
bool fActive;
};
struct alignas(32) MetaHeader
struct MetaHeader
{
uint64_t fSize;
uint64_t fRegionId;
size_t fSize;
size_t fRegionId;
boost::interprocess::managed_shared_memory::handle_t fHandle;
size_t fHint;
};
struct RegionBlock
@ -58,15 +59,18 @@ struct RegionBlock
RegionBlock()
: fHandle()
, fSize(0)
, fHint(0)
{}
RegionBlock(boost::interprocess::managed_shared_memory::handle_t handle, size_t size)
RegionBlock(boost::interprocess::managed_shared_memory::handle_t handle, size_t size, size_t hint)
: fHandle(handle)
, fSize(size)
, fHint(hint)
{}
boost::interprocess::managed_shared_memory::handle_t fHandle;
size_t fSize;
size_t fHint;
};
} // namespace shmem

View File

@ -34,6 +34,7 @@ FairMQMessageSHM::FairMQMessageSHM(Manager& manager)
, fRegionPtr(nullptr)
, fHandle(-1)
, fSize(0)
, fHint(0)
, fLocalPtr(nullptr)
{
if (zmq_msg_init(&fMessage) != 0)
@ -52,6 +53,7 @@ FairMQMessageSHM::FairMQMessageSHM(Manager& manager, const size_t size)
, fRegionPtr(nullptr)
, fHandle(-1)
, fSize(0)
, fHint(0)
, fLocalPtr(nullptr)
{
InitializeChunk(size);
@ -66,6 +68,7 @@ FairMQMessageSHM::FairMQMessageSHM(Manager& manager, void* data, const size_t si
, fRegionPtr(nullptr)
, fHandle(-1)
, fSize(0)
, fHint(0)
, fLocalPtr(nullptr)
{
if (InitializeChunk(size))
@ -82,7 +85,7 @@ FairMQMessageSHM::FairMQMessageSHM(Manager& manager, void* data, const size_t si
}
}
FairMQMessageSHM::FairMQMessageSHM(Manager& manager, FairMQUnmanagedRegionPtr& region, void* data, const size_t size)
FairMQMessageSHM::FairMQMessageSHM(Manager& manager, FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint)
: fManager(manager)
, fMessage()
, fQueued(false)
@ -91,23 +94,34 @@ FairMQMessageSHM::FairMQMessageSHM(Manager& manager, FairMQUnmanagedRegionPtr& r
, fRegionPtr(nullptr)
, fHandle(-1)
, fSize(size)
, fHint(reinterpret_cast<size_t>(hint))
, fLocalPtr(static_cast<char*>(data))
{
fHandle = (bipc::managed_shared_memory::handle_t)(reinterpret_cast<const char*>(data) - reinterpret_cast<const char*>(region->GetData()));
if (zmq_msg_init_size(&fMessage, sizeof(MetaHeader)) != 0)
if (reinterpret_cast<const char*>(data) >= reinterpret_cast<const char*>(region->GetData()) ||
reinterpret_cast<const char*>(data) <= reinterpret_cast<const char*>(region->GetData()) + region->GetSize())
{
LOG(ERROR) << "failed initializing meta message, reason: " << zmq_strerror(errno);
fHandle = (bipc::managed_shared_memory::handle_t)(reinterpret_cast<const char*>(data) - reinterpret_cast<const char*>(region->GetData()));
if (zmq_msg_init_size(&fMessage, sizeof(MetaHeader)) != 0)
{
LOG(ERROR) << "failed initializing meta message, reason: " << zmq_strerror(errno);
}
else
{
MetaHeader header;
header.fSize = size;
header.fHandle = fHandle;
header.fRegionId = fRegionId;
header.fHint = fHint;
memcpy(zmq_msg_data(&fMessage), &header, sizeof(MetaHeader));
fMetaCreated = true;
}
}
else
{
MetaHeader header;
header.fSize = size;
header.fHandle = fHandle;
header.fRegionId = fRegionId;
memcpy(zmq_msg_data(&fMessage), &header, sizeof(MetaHeader));
fMetaCreated = true;
LOG(ERROR) << "shmem: trying to create region message with data from outside the region";
throw runtime_error("shmem: trying to create region message with data from outside the region");
}
}
@ -148,6 +162,7 @@ bool FairMQMessageSHM::InitializeChunk(const size_t size)
header.fSize = size;
header.fHandle = fHandle;
header.fRegionId = fRegionId;
header.fHint = fHint;
memcpy(zmq_msg_data(&fMessage), &header, sizeof(MetaHeader));
fMetaCreated = true;
@ -343,7 +358,7 @@ void FairMQMessageSHM::CloseMessage()
// // }
// timed version
RegionBlock block(fHandle, fSize);
RegionBlock block(fHandle, fSize, fHint);
bool success = false;
do
{

View File

@ -30,7 +30,7 @@ class FairMQMessageSHM : public FairMQMessage
FairMQMessageSHM(fair::mq::shmem::Manager& manager);
FairMQMessageSHM(fair::mq::shmem::Manager& manager, const size_t size);
FairMQMessageSHM(fair::mq::shmem::Manager& manager, void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr);
FairMQMessageSHM(fair::mq::shmem::Manager& manager, FairMQUnmanagedRegionPtr& region, void* data, const size_t size);
FairMQMessageSHM(fair::mq::shmem::Manager& manager, FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0);
FairMQMessageSHM(const FairMQMessageSHM&) = delete;
FairMQMessageSHM operator=(const FairMQMessageSHM&) = delete;
@ -58,10 +58,11 @@ class FairMQMessageSHM : public FairMQMessage
bool fMetaCreated;
static std::atomic<bool> fInterrupted;
static FairMQ::Transport fTransportType;
uint64_t fRegionId;
size_t fRegionId;
mutable fair::mq::shmem::Region* fRegionPtr;
boost::interprocess::managed_shared_memory::handle_t fHandle;
size_t fSize;
size_t fHint;
mutable char* fLocalPtr;
bool InitializeChunk(const size_t size);

View File

@ -175,6 +175,7 @@ int FairMQSocketSHM::Receive(FairMQMessagePtr& msg, const int flags)
static_cast<FairMQMessageSHM*>(msg.get())->fHandle = hdr->fHandle;
static_cast<FairMQMessageSHM*>(msg.get())->fSize = hdr->fSize;
static_cast<FairMQMessageSHM*>(msg.get())->fRegionId = hdr->fRegionId;
static_cast<FairMQMessageSHM*>(msg.get())->fHint = hdr->fHint;
size = msg->GetSize();
fBytesRx += size;
@ -316,6 +317,7 @@ int64_t FairMQSocketSHM::Receive(vector<FairMQMessagePtr>& msgVec, const int fla
static_cast<FairMQMessageSHM*>(part.get())->fHandle = hdr->fHandle;
static_cast<FairMQMessageSHM*>(part.get())->fSize = hdr->fSize;
static_cast<FairMQMessageSHM*>(part.get())->fRegionId = hdr->fRegionId;
static_cast<FairMQMessageSHM*>(part.get())->fHint = hdr->fHint;
size = part->GetSize();
msgVec.push_back(move(part));

View File

@ -214,9 +214,9 @@ FairMQMessagePtr FairMQTransportFactorySHM::CreateMessage(void* data, const size
return unique_ptr<FairMQMessage>(new FairMQMessageSHM(*fManager, data, size, ffn, hint));
}
FairMQMessagePtr FairMQTransportFactorySHM::CreateMessage(FairMQUnmanagedRegionPtr& region, void* data, const size_t size) const
FairMQMessagePtr FairMQTransportFactorySHM::CreateMessage(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint) const
{
return unique_ptr<FairMQMessage>(new FairMQMessageSHM(*fManager, region, data, size));
return unique_ptr<FairMQMessage>(new FairMQMessageSHM(*fManager, region, data, size, hint));
}
FairMQSocketPtr FairMQTransportFactorySHM::CreateSocket(const string& type, const string& name) const

View File

@ -36,7 +36,7 @@ class FairMQTransportFactorySHM : public FairMQTransportFactory
FairMQMessagePtr CreateMessage() const override;
FairMQMessagePtr CreateMessage(const size_t size) const override;
FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) const override;
FairMQMessagePtr CreateMessage(FairMQUnmanagedRegionPtr& region, void* data, const size_t size) const override;
FairMQMessagePtr CreateMessage(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0) const override;
FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name) const override;

View File

@ -75,7 +75,7 @@ void Region::ReceiveAcks()
// LOG(DEBUG) << "received: " << block.fHandle << " " << block.fSize << " " << block.fMessageId;
if (fCallback)
{
fCallback(reinterpret_cast<char*>(fRegion.get_address()) + block.fHandle, block.fSize);
fCallback(reinterpret_cast<char*>(fRegion.get_address()) + block.fHandle, block.fSize, reinterpret_cast<void*>(block.fHint));
}
}
else

View File

@ -58,7 +58,7 @@ FairMQMessageZMQ::FairMQMessageZMQ(void* data, const size_t size, fairmq_free_fn
}
}
FairMQMessageZMQ::FairMQMessageZMQ(FairMQUnmanagedRegionPtr& region, void* data, const size_t size)
FairMQMessageZMQ::FairMQMessageZMQ(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint)
: fUsedSizeModified(false)
, fUsedSize()
, fMsg(fair::mq::tools::make_unique<zmq_msg_t>())
@ -74,7 +74,7 @@ FairMQMessageZMQ::FairMQMessageZMQ(FairMQUnmanagedRegionPtr& region, void* data,
memcpy(zmq_msg_data(fMsg.get()), data, size);
// call region callback
static_cast<FairMQUnmanagedRegionZMQ*>(region.get())->fCallback(data, size);
static_cast<FairMQUnmanagedRegionZMQ*>(region.get())->fCallback(data, size, hint);
// if (zmq_msg_init_data(fMsg.get(), data, size, [](void*, void*){}, nullptr) != 0)
// {

View File

@ -34,7 +34,7 @@ class FairMQMessageZMQ : public FairMQMessage
FairMQMessageZMQ();
FairMQMessageZMQ(const size_t size);
FairMQMessageZMQ(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr);
FairMQMessageZMQ(FairMQUnmanagedRegionPtr& region, void* data, const size_t size);
FairMQMessageZMQ(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0);
void Rebuild() override;
void Rebuild(const size_t size) override;

View File

@ -64,9 +64,9 @@ FairMQMessagePtr FairMQTransportFactoryZMQ::CreateMessage(void* data, const size
return unique_ptr<FairMQMessage>(new FairMQMessageZMQ(data, size, ffn, hint));
}
FairMQMessagePtr FairMQTransportFactoryZMQ::CreateMessage(FairMQUnmanagedRegionPtr& region, void* data, const size_t size) const
FairMQMessagePtr FairMQTransportFactoryZMQ::CreateMessage(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint) const
{
return unique_ptr<FairMQMessage>(new FairMQMessageZMQ(region, data, size));
return unique_ptr<FairMQMessage>(new FairMQMessageZMQ(region, data, size, hint));
}
FairMQSocketPtr FairMQTransportFactoryZMQ::CreateSocket(const string& type, const string& name) const

View File

@ -37,7 +37,7 @@ class FairMQTransportFactoryZMQ : public FairMQTransportFactory
FairMQMessagePtr CreateMessage() const override;
FairMQMessagePtr CreateMessage(const size_t size) const override;
FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) const override;
FairMQMessagePtr CreateMessage(FairMQUnmanagedRegionPtr& region, void* data, const size_t size) const override;
FairMQMessagePtr CreateMessage(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0) const override;
FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name) const override;