From b59f1b950d3142f2a56f079c2357f468ae916252 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Fri, 15 Dec 2017 14:44:17 +0100 Subject: [PATCH] FairMQ: Add hint argument to the region callback, settable per message. --- fairmq/FairMQTransportFactory.h | 2 +- fairmq/FairMQUnmanagedRegion.h | 2 +- fairmq/nanomsg/FairMQMessageNN.cxx | 6 ++- fairmq/nanomsg/FairMQMessageNN.h | 3 +- fairmq/nanomsg/FairMQSocketNN.cxx | 4 +- fairmq/nanomsg/FairMQTransportFactoryNN.cxx | 4 +- fairmq/nanomsg/FairMQTransportFactoryNN.h | 2 +- fairmq/shmem/Common.h | 12 ++++-- fairmq/shmem/FairMQMessageSHM.cxx | 41 ++++++++++++++------- fairmq/shmem/FairMQMessageSHM.h | 5 ++- fairmq/shmem/FairMQSocketSHM.cxx | 2 + fairmq/shmem/FairMQTransportFactorySHM.cxx | 4 +- fairmq/shmem/FairMQTransportFactorySHM.h | 2 +- fairmq/shmem/Region.cxx | 2 +- fairmq/zeromq/FairMQMessageZMQ.cxx | 4 +- fairmq/zeromq/FairMQMessageZMQ.h | 2 +- fairmq/zeromq/FairMQTransportFactoryZMQ.cxx | 4 +- fairmq/zeromq/FairMQTransportFactoryZMQ.h | 2 +- 18 files changed, 65 insertions(+), 38 deletions(-) diff --git a/fairmq/FairMQTransportFactory.h b/fairmq/FairMQTransportFactory.h index 0763397f..0b6ea596 100644 --- a/fairmq/FairMQTransportFactory.h +++ b/fairmq/FairMQTransportFactory.h @@ -52,7 +52,7 @@ class FairMQTransportFactory /// @return pointer to FairMQMessage virtual FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) const = 0; - virtual FairMQMessagePtr CreateMessage(FairMQUnmanagedRegionPtr& unmanagedRegion, void* data, const size_t size) const = 0; + virtual FairMQMessagePtr CreateMessage(FairMQUnmanagedRegionPtr& unmanagedRegion, void* data, const size_t size, void* hint = 0) const = 0; /// Create a socket virtual FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name) const = 0; diff --git a/fairmq/FairMQUnmanagedRegion.h b/fairmq/FairMQUnmanagedRegion.h index eb88f3b7..744a9c6b 100644 --- a/fairmq/FairMQUnmanagedRegion.h +++ b/fairmq/FairMQUnmanagedRegion.h @@ -13,7 +13,7 @@ #include // std::unique_ptr #include // std::function -using FairMQRegionCallback = std::function; +using FairMQRegionCallback = std::function; class FairMQUnmanagedRegion { diff --git a/fairmq/nanomsg/FairMQMessageNN.cxx b/fairmq/nanomsg/FairMQMessageNN.cxx index c95e1ff3..71cba27c 100644 --- a/fairmq/nanomsg/FairMQMessageNN.cxx +++ b/fairmq/nanomsg/FairMQMessageNN.cxx @@ -27,6 +27,7 @@ FairMQ::Transport FairMQMessageNN::fTransportType = FairMQ::Transport::NN; FairMQMessageNN::FairMQMessageNN() : fMessage(nullptr) , fSize(0) + , fHint(0) , fReceiving(false) , fRegionPtr(nullptr) { @@ -40,6 +41,7 @@ FairMQMessageNN::FairMQMessageNN() FairMQMessageNN::FairMQMessageNN(const size_t size) : fMessage(nullptr) , fSize(0) + , fHint(0) , fReceiving(false) , fRegionPtr(nullptr) { @@ -60,6 +62,7 @@ FairMQMessageNN::FairMQMessageNN(const size_t size) FairMQMessageNN::FairMQMessageNN(void* data, const size_t size, fairmq_free_fn* ffn, void* hint) : fMessage(nullptr) , fSize(0) + , fHint(0) , fReceiving(false) , fRegionPtr(nullptr) { @@ -83,9 +86,10 @@ FairMQMessageNN::FairMQMessageNN(void* data, const size_t size, fairmq_free_fn* } } -FairMQMessageNN::FairMQMessageNN(FairMQUnmanagedRegionPtr& region, void* data, const size_t size) +FairMQMessageNN::FairMQMessageNN(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint) : fMessage(data) , fSize(size) + , fHint(reinterpret_cast(hint)) , fReceiving(false) , fRegionPtr(region.get()) { diff --git a/fairmq/nanomsg/FairMQMessageNN.h b/fairmq/nanomsg/FairMQMessageNN.h index ca4e7042..d2c5f57f 100644 --- a/fairmq/nanomsg/FairMQMessageNN.h +++ b/fairmq/nanomsg/FairMQMessageNN.h @@ -32,7 +32,7 @@ class FairMQMessageNN : public FairMQMessage FairMQMessageNN(); FairMQMessageNN(const size_t size); FairMQMessageNN(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr); - FairMQMessageNN(FairMQUnmanagedRegionPtr& region, void* data, const size_t size); + FairMQMessageNN(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0); FairMQMessageNN(const FairMQMessageNN&) = delete; FairMQMessageNN operator=(const FairMQMessageNN&) = delete; @@ -56,6 +56,7 @@ class FairMQMessageNN : public FairMQMessage private: void* fMessage; size_t fSize; + size_t fHint; bool fReceiving; FairMQUnmanagedRegion* fRegionPtr; static FairMQ::Transport fTransportType; diff --git a/fairmq/nanomsg/FairMQSocketNN.cxx b/fairmq/nanomsg/FairMQSocketNN.cxx index 11fae63f..6785b7e6 100644 --- a/fairmq/nanomsg/FairMQSocketNN.cxx +++ b/fairmq/nanomsg/FairMQSocketNN.cxx @@ -138,7 +138,7 @@ int FairMQSocketNN::Send(FairMQMessagePtr& msg, const int flags) { nbytes = nn_send(fSocket, bufPtr, msg->GetSize(), flags); // nn_send copies the data, safe to call region callback here - static_cast(msgPtr->fRegionPtr)->fCallback(bufPtr, msg->GetSize()); + static_cast(msgPtr->fRegionPtr)->fCallback(bufPtr, msg->GetSize(), reinterpret_cast(msgPtr->fHint)); } if (nbytes >= 0) @@ -252,7 +252,7 @@ int64_t FairMQSocketNN::Send(vector& msgVec, const int flags) // call region callback if (partPtr->fRegionPtr) { - static_cast(partPtr->fRegionPtr)->fCallback(partPtr->GetMessage(), msgVec[i]->GetSize()); + static_cast(partPtr->fRegionPtr)->fCallback(partPtr->GetMessage(), partPtr->GetSize(), reinterpret_cast(partPtr->fHint)); } } diff --git a/fairmq/nanomsg/FairMQTransportFactoryNN.cxx b/fairmq/nanomsg/FairMQTransportFactoryNN.cxx index d0752aac..239d6b1b 100644 --- a/fairmq/nanomsg/FairMQTransportFactoryNN.cxx +++ b/fairmq/nanomsg/FairMQTransportFactoryNN.cxx @@ -35,9 +35,9 @@ FairMQMessagePtr FairMQTransportFactoryNN::CreateMessage(void* data, const size_ return unique_ptr(new FairMQMessageNN(data, size, ffn, hint)); } -FairMQMessagePtr FairMQTransportFactoryNN::CreateMessage(FairMQUnmanagedRegionPtr& region, void* data, const size_t size) const +FairMQMessagePtr FairMQTransportFactoryNN::CreateMessage(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint) const { - return unique_ptr(new FairMQMessageNN(region, data, size)); + return unique_ptr(new FairMQMessageNN(region, data, size, hint)); } FairMQSocketPtr FairMQTransportFactoryNN::CreateSocket(const string& type, const string& name) const diff --git a/fairmq/nanomsg/FairMQTransportFactoryNN.h b/fairmq/nanomsg/FairMQTransportFactoryNN.h index 7f2c9689..53653da8 100644 --- a/fairmq/nanomsg/FairMQTransportFactoryNN.h +++ b/fairmq/nanomsg/FairMQTransportFactoryNN.h @@ -28,7 +28,7 @@ class FairMQTransportFactoryNN : public FairMQTransportFactory FairMQMessagePtr CreateMessage() const override; FairMQMessagePtr CreateMessage(const size_t size) const override; FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) const override; - FairMQMessagePtr CreateMessage(FairMQUnmanagedRegionPtr& region, void* data, const size_t size) const override; + FairMQMessagePtr CreateMessage(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0) const override; FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name) const override; diff --git a/fairmq/shmem/Common.h b/fairmq/shmem/Common.h index ea73c611..e8846142 100644 --- a/fairmq/shmem/Common.h +++ b/fairmq/shmem/Common.h @@ -46,11 +46,12 @@ struct MonitorStatus bool fActive; }; -struct alignas(32) MetaHeader +struct MetaHeader { - uint64_t fSize; - uint64_t fRegionId; + size_t fSize; + size_t fRegionId; boost::interprocess::managed_shared_memory::handle_t fHandle; + size_t fHint; }; struct RegionBlock @@ -58,15 +59,18 @@ struct RegionBlock RegionBlock() : fHandle() , fSize(0) + , fHint(0) {} - RegionBlock(boost::interprocess::managed_shared_memory::handle_t handle, size_t size) + RegionBlock(boost::interprocess::managed_shared_memory::handle_t handle, size_t size, size_t hint) : fHandle(handle) , fSize(size) + , fHint(hint) {} boost::interprocess::managed_shared_memory::handle_t fHandle; size_t fSize; + size_t fHint; }; } // namespace shmem diff --git a/fairmq/shmem/FairMQMessageSHM.cxx b/fairmq/shmem/FairMQMessageSHM.cxx index 50d73ee5..60da2342 100644 --- a/fairmq/shmem/FairMQMessageSHM.cxx +++ b/fairmq/shmem/FairMQMessageSHM.cxx @@ -34,6 +34,7 @@ FairMQMessageSHM::FairMQMessageSHM(Manager& manager) , fRegionPtr(nullptr) , fHandle(-1) , fSize(0) + , fHint(0) , fLocalPtr(nullptr) { if (zmq_msg_init(&fMessage) != 0) @@ -52,6 +53,7 @@ FairMQMessageSHM::FairMQMessageSHM(Manager& manager, const size_t size) , fRegionPtr(nullptr) , fHandle(-1) , fSize(0) + , fHint(0) , fLocalPtr(nullptr) { InitializeChunk(size); @@ -66,6 +68,7 @@ FairMQMessageSHM::FairMQMessageSHM(Manager& manager, void* data, const size_t si , fRegionPtr(nullptr) , fHandle(-1) , fSize(0) + , fHint(0) , fLocalPtr(nullptr) { if (InitializeChunk(size)) @@ -82,7 +85,7 @@ FairMQMessageSHM::FairMQMessageSHM(Manager& manager, void* data, const size_t si } } -FairMQMessageSHM::FairMQMessageSHM(Manager& manager, FairMQUnmanagedRegionPtr& region, void* data, const size_t size) +FairMQMessageSHM::FairMQMessageSHM(Manager& manager, FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint) : fManager(manager) , fMessage() , fQueued(false) @@ -91,23 +94,34 @@ FairMQMessageSHM::FairMQMessageSHM(Manager& manager, FairMQUnmanagedRegionPtr& r , fRegionPtr(nullptr) , fHandle(-1) , fSize(size) + , fHint(reinterpret_cast(hint)) , fLocalPtr(static_cast(data)) { - fHandle = (bipc::managed_shared_memory::handle_t)(reinterpret_cast(data) - reinterpret_cast(region->GetData())); - - if (zmq_msg_init_size(&fMessage, sizeof(MetaHeader)) != 0) + if (reinterpret_cast(data) >= reinterpret_cast(region->GetData()) || + reinterpret_cast(data) <= reinterpret_cast(region->GetData()) + region->GetSize()) { - LOG(ERROR) << "failed initializing meta message, reason: " << zmq_strerror(errno); + fHandle = (bipc::managed_shared_memory::handle_t)(reinterpret_cast(data) - reinterpret_cast(region->GetData())); + + if (zmq_msg_init_size(&fMessage, sizeof(MetaHeader)) != 0) + { + LOG(ERROR) << "failed initializing meta message, reason: " << zmq_strerror(errno); + } + else + { + MetaHeader header; + header.fSize = size; + header.fHandle = fHandle; + header.fRegionId = fRegionId; + header.fHint = fHint; + memcpy(zmq_msg_data(&fMessage), &header, sizeof(MetaHeader)); + + fMetaCreated = true; + } } else { - MetaHeader header; - header.fSize = size; - header.fHandle = fHandle; - header.fRegionId = fRegionId; - memcpy(zmq_msg_data(&fMessage), &header, sizeof(MetaHeader)); - - fMetaCreated = true; + LOG(ERROR) << "shmem: trying to create region message with data from outside the region"; + throw runtime_error("shmem: trying to create region message with data from outside the region"); } } @@ -148,6 +162,7 @@ bool FairMQMessageSHM::InitializeChunk(const size_t size) header.fSize = size; header.fHandle = fHandle; header.fRegionId = fRegionId; + header.fHint = fHint; memcpy(zmq_msg_data(&fMessage), &header, sizeof(MetaHeader)); fMetaCreated = true; @@ -343,7 +358,7 @@ void FairMQMessageSHM::CloseMessage() // // } // timed version - RegionBlock block(fHandle, fSize); + RegionBlock block(fHandle, fSize, fHint); bool success = false; do { diff --git a/fairmq/shmem/FairMQMessageSHM.h b/fairmq/shmem/FairMQMessageSHM.h index c3e845af..8fd8c0b2 100644 --- a/fairmq/shmem/FairMQMessageSHM.h +++ b/fairmq/shmem/FairMQMessageSHM.h @@ -30,7 +30,7 @@ class FairMQMessageSHM : public FairMQMessage FairMQMessageSHM(fair::mq::shmem::Manager& manager); FairMQMessageSHM(fair::mq::shmem::Manager& manager, const size_t size); FairMQMessageSHM(fair::mq::shmem::Manager& manager, void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr); - FairMQMessageSHM(fair::mq::shmem::Manager& manager, FairMQUnmanagedRegionPtr& region, void* data, const size_t size); + FairMQMessageSHM(fair::mq::shmem::Manager& manager, FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0); FairMQMessageSHM(const FairMQMessageSHM&) = delete; FairMQMessageSHM operator=(const FairMQMessageSHM&) = delete; @@ -58,10 +58,11 @@ class FairMQMessageSHM : public FairMQMessage bool fMetaCreated; static std::atomic fInterrupted; static FairMQ::Transport fTransportType; - uint64_t fRegionId; + size_t fRegionId; mutable fair::mq::shmem::Region* fRegionPtr; boost::interprocess::managed_shared_memory::handle_t fHandle; size_t fSize; + size_t fHint; mutable char* fLocalPtr; bool InitializeChunk(const size_t size); diff --git a/fairmq/shmem/FairMQSocketSHM.cxx b/fairmq/shmem/FairMQSocketSHM.cxx index a1a43885..6e3ff9ee 100644 --- a/fairmq/shmem/FairMQSocketSHM.cxx +++ b/fairmq/shmem/FairMQSocketSHM.cxx @@ -175,6 +175,7 @@ int FairMQSocketSHM::Receive(FairMQMessagePtr& msg, const int flags) static_cast(msg.get())->fHandle = hdr->fHandle; static_cast(msg.get())->fSize = hdr->fSize; static_cast(msg.get())->fRegionId = hdr->fRegionId; + static_cast(msg.get())->fHint = hdr->fHint; size = msg->GetSize(); fBytesRx += size; @@ -316,6 +317,7 @@ int64_t FairMQSocketSHM::Receive(vector& msgVec, const int fla static_cast(part.get())->fHandle = hdr->fHandle; static_cast(part.get())->fSize = hdr->fSize; static_cast(part.get())->fRegionId = hdr->fRegionId; + static_cast(part.get())->fHint = hdr->fHint; size = part->GetSize(); msgVec.push_back(move(part)); diff --git a/fairmq/shmem/FairMQTransportFactorySHM.cxx b/fairmq/shmem/FairMQTransportFactorySHM.cxx index 3eae1cc4..acfcb1b8 100644 --- a/fairmq/shmem/FairMQTransportFactorySHM.cxx +++ b/fairmq/shmem/FairMQTransportFactorySHM.cxx @@ -214,9 +214,9 @@ FairMQMessagePtr FairMQTransportFactorySHM::CreateMessage(void* data, const size return unique_ptr(new FairMQMessageSHM(*fManager, data, size, ffn, hint)); } -FairMQMessagePtr FairMQTransportFactorySHM::CreateMessage(FairMQUnmanagedRegionPtr& region, void* data, const size_t size) const +FairMQMessagePtr FairMQTransportFactorySHM::CreateMessage(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint) const { - return unique_ptr(new FairMQMessageSHM(*fManager, region, data, size)); + return unique_ptr(new FairMQMessageSHM(*fManager, region, data, size, hint)); } FairMQSocketPtr FairMQTransportFactorySHM::CreateSocket(const string& type, const string& name) const diff --git a/fairmq/shmem/FairMQTransportFactorySHM.h b/fairmq/shmem/FairMQTransportFactorySHM.h index bbbf502a..83b85eab 100644 --- a/fairmq/shmem/FairMQTransportFactorySHM.h +++ b/fairmq/shmem/FairMQTransportFactorySHM.h @@ -36,7 +36,7 @@ class FairMQTransportFactorySHM : public FairMQTransportFactory FairMQMessagePtr CreateMessage() const override; FairMQMessagePtr CreateMessage(const size_t size) const override; FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) const override; - FairMQMessagePtr CreateMessage(FairMQUnmanagedRegionPtr& region, void* data, const size_t size) const override; + FairMQMessagePtr CreateMessage(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0) const override; FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name) const override; diff --git a/fairmq/shmem/Region.cxx b/fairmq/shmem/Region.cxx index e0df1c83..beefe3f4 100644 --- a/fairmq/shmem/Region.cxx +++ b/fairmq/shmem/Region.cxx @@ -75,7 +75,7 @@ void Region::ReceiveAcks() // LOG(DEBUG) << "received: " << block.fHandle << " " << block.fSize << " " << block.fMessageId; if (fCallback) { - fCallback(reinterpret_cast(fRegion.get_address()) + block.fHandle, block.fSize); + fCallback(reinterpret_cast(fRegion.get_address()) + block.fHandle, block.fSize, reinterpret_cast(block.fHint)); } } else diff --git a/fairmq/zeromq/FairMQMessageZMQ.cxx b/fairmq/zeromq/FairMQMessageZMQ.cxx index d008e7d0..9e2ce663 100644 --- a/fairmq/zeromq/FairMQMessageZMQ.cxx +++ b/fairmq/zeromq/FairMQMessageZMQ.cxx @@ -58,7 +58,7 @@ FairMQMessageZMQ::FairMQMessageZMQ(void* data, const size_t size, fairmq_free_fn } } -FairMQMessageZMQ::FairMQMessageZMQ(FairMQUnmanagedRegionPtr& region, void* data, const size_t size) +FairMQMessageZMQ::FairMQMessageZMQ(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint) : fUsedSizeModified(false) , fUsedSize() , fMsg(fair::mq::tools::make_unique()) @@ -74,7 +74,7 @@ FairMQMessageZMQ::FairMQMessageZMQ(FairMQUnmanagedRegionPtr& region, void* data, memcpy(zmq_msg_data(fMsg.get()), data, size); // call region callback - static_cast(region.get())->fCallback(data, size); + static_cast(region.get())->fCallback(data, size, hint); // if (zmq_msg_init_data(fMsg.get(), data, size, [](void*, void*){}, nullptr) != 0) // { diff --git a/fairmq/zeromq/FairMQMessageZMQ.h b/fairmq/zeromq/FairMQMessageZMQ.h index 0f6baf3e..02964338 100644 --- a/fairmq/zeromq/FairMQMessageZMQ.h +++ b/fairmq/zeromq/FairMQMessageZMQ.h @@ -34,7 +34,7 @@ class FairMQMessageZMQ : public FairMQMessage FairMQMessageZMQ(); FairMQMessageZMQ(const size_t size); FairMQMessageZMQ(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr); - FairMQMessageZMQ(FairMQUnmanagedRegionPtr& region, void* data, const size_t size); + FairMQMessageZMQ(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0); void Rebuild() override; void Rebuild(const size_t size) override; diff --git a/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx b/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx index ae8b8f37..b8acf407 100644 --- a/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx +++ b/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx @@ -64,9 +64,9 @@ FairMQMessagePtr FairMQTransportFactoryZMQ::CreateMessage(void* data, const size return unique_ptr(new FairMQMessageZMQ(data, size, ffn, hint)); } -FairMQMessagePtr FairMQTransportFactoryZMQ::CreateMessage(FairMQUnmanagedRegionPtr& region, void* data, const size_t size) const +FairMQMessagePtr FairMQTransportFactoryZMQ::CreateMessage(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint) const { - return unique_ptr(new FairMQMessageZMQ(region, data, size)); + return unique_ptr(new FairMQMessageZMQ(region, data, size, hint)); } FairMQSocketPtr FairMQTransportFactoryZMQ::CreateSocket(const string& type, const string& name) const diff --git a/fairmq/zeromq/FairMQTransportFactoryZMQ.h b/fairmq/zeromq/FairMQTransportFactoryZMQ.h index 46b19e93..79db637c 100644 --- a/fairmq/zeromq/FairMQTransportFactoryZMQ.h +++ b/fairmq/zeromq/FairMQTransportFactoryZMQ.h @@ -37,7 +37,7 @@ class FairMQTransportFactoryZMQ : public FairMQTransportFactory FairMQMessagePtr CreateMessage() const override; FairMQMessagePtr CreateMessage(const size_t size) const override; FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) const override; - FairMQMessagePtr CreateMessage(FairMQUnmanagedRegionPtr& region, void* data, const size_t size) const override; + FairMQMessagePtr CreateMessage(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0) const override; FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name) const override;