From 1a0ab3a4e2a4fad0a570103e14d4a2f21008afb3 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Tue, 19 Sep 2023 10:21:00 +0200 Subject: [PATCH] shm: Ref counting for unmanaged regions in a dedicated segment --- fairmq/UnmanagedRegion.h | 1 + fairmq/shmem/Common.h | 93 +++++++++++++++++++--------------- fairmq/shmem/Manager.h | 6 +++ fairmq/shmem/Message.h | 67 ++++++++++++------------ fairmq/shmem/Monitor.cxx | 21 +++++--- fairmq/shmem/README.md | 1 + fairmq/shmem/UnmanagedRegion.h | 55 ++++++++++++++++++-- 7 files changed, 159 insertions(+), 85 deletions(-) diff --git a/fairmq/UnmanagedRegion.h b/fairmq/UnmanagedRegion.h index 0b5aee47..3c671b64 100644 --- a/fairmq/UnmanagedRegion.h +++ b/fairmq/UnmanagedRegion.h @@ -134,6 +134,7 @@ struct RegionConfig int creationFlags = 0; /// flags passed to the underlying transport on region creation int64_t userFlags = 0; /// custom flags that have no effect on the transport, but can be retrieved from the region by the user uint64_t size = 0; /// region size + uint64_t rcSegmentSize = 10000000; /// region size std::string path = ""; /// file path, if the region is backed by a file std::optional id = std::nullopt; /// region id uint32_t linger = 100; /// delay in ms before region destruction to collect outstanding events diff --git a/fairmq/shmem/Common.h b/fairmq/shmem/Common.h index bd9d5a31..1b1a96c2 100644 --- a/fairmq/shmem/Common.h +++ b/fairmq/shmem/Common.h @@ -13,6 +13,7 @@ #include // std::equal_to #include +#include #include #include #include @@ -25,6 +26,8 @@ #include +#include + namespace fair::mq::shmem { @@ -41,6 +44,31 @@ using RBTreeBestFitSegment = boost::interprocess::basic_managed_shared_memory; // boost::interprocess::iset_index>; +inline std::string MakeShmName(const std::string& shmId, const std::string& type, int index) { + return std::string("fmq_" + shmId + "_" + type + "_" + std::to_string(index)); +} + +struct RefCount +{ + explicit RefCount(uint16_t c) + : count(c) + {} + + uint16_t Get() { return count.load(); } + uint16_t Increment() { return count.fetch_add(1); } + uint16_t Decrement() { return count.fetch_sub(1); } + + std::atomic count; +}; + +// Number of nodes allocated at once when the allocator runs out of nodes. +static constexpr size_t numNodesPerBlock = 4096; +// Maximum number of totally free blocks that the adaptive node pool will hold. +// The rest of the totally free blocks will be deallocated with the segment manager. +static constexpr size_t maxFreeBlocks = 2; + +using RefCountPool = boost::interprocess::adaptive_pool; + using SegmentManager = boost::interprocess::managed_shared_memory::segment_manager; using VoidAlloc = boost::interprocess::allocator; using CharAlloc = boost::interprocess::allocator; @@ -121,6 +149,17 @@ struct ShmHeader static void Destruct(char* ptr) { RefCountPtr(ptr).~atomic(); } }; +struct MetaHeader +{ + size_t fSize; // size of the shm buffer + size_t fHint; // user-defined value, given by the user on message creation and returned to the user on "buffer no longer needed"-callbacks + boost::interprocess::managed_shared_memory::handle_t fHandle; // handle to shm buffer, convertible to shm buffer ptr + mutable boost::interprocess::managed_shared_memory::handle_t fShared; // handle to the buffer storing the ref count for shared buffers + uint16_t fRegionId; // id of the unmanaged region + mutable uint16_t fSegmentId; // id of the managed segment + bool fManaged; // true = managed segment, false = unmanaged region +}; + enum class AllocationAlgorithm : int { rbtree_best_fit, @@ -129,26 +168,30 @@ enum class AllocationAlgorithm : int struct RegionInfo { - RegionInfo(const VoidAlloc& alloc) - : fPath("", alloc) - , fCreationFlags(0) - , fUserFlags(0) - , fSize(0) - , fDestroyed(false) - {} + static constexpr uint64_t DefaultRcSegmentSize = 10000000; - RegionInfo(const char* path, int flags, uint64_t userFlags, uint64_t size, const VoidAlloc& alloc) + RegionInfo(const char* path, int flags, uint64_t userFlags, uint64_t size, uint64_t rcSegmentSize, const VoidAlloc& alloc) : fPath(path, alloc) , fCreationFlags(flags) , fUserFlags(userFlags) , fSize(size) + , fRCSegmentSize(rcSegmentSize) , fDestroyed(false) {} + RegionInfo(const VoidAlloc& alloc) + : RegionInfo("", 0, 0, 0, DefaultRcSegmentSize, alloc) + {} + + RegionInfo(const char* path, int flags, uint64_t userFlags, uint64_t size, const VoidAlloc& alloc) + : RegionInfo(path, flags, userFlags, size, DefaultRcSegmentSize, alloc) + {} + Str fPath; int fCreationFlags; uint64_t fUserFlags; uint64_t fSize; + uint64_t fRCSegmentSize; bool fDestroyed; }; @@ -216,17 +259,6 @@ struct RegionCounter std::atomic fCount; }; -struct MetaHeader -{ - size_t fSize; - size_t fHint; - boost::interprocess::managed_shared_memory::handle_t fHandle; - mutable boost::interprocess::managed_shared_memory::handle_t fShared; - uint16_t fRegionId; - mutable uint16_t fSegmentId; - bool fManaged; -}; - #ifdef FAIRMQ_DEBUG_MODE struct MsgCounter { @@ -310,29 +342,6 @@ struct SegmentBufferShrink mutable char* local_ptr; }; -// struct SegmentWrapper -// { -// SegmentWrapper(boost::variant&& _segment) -// : segment(std::move(_segment)) -// , refCountPool(nullptr) -// {} - -// void InitRefCountPoolSSF() -// { -// refCountPool = std::make_unique>( -// RefCountPoolSSF(boost::get(segment).get_segment_manager())); -// } - -// void InitRefCountPoolRBT() -// { -// refCountPool = std::make_unique>( -// RefCountPoolRBT(boost::get(segment).get_segment_manager())); -// } - -// boost::variant segment; -// std::unique_ptr> refCountPool; -// }; - } // namespace fair::mq::shmem #endif /* FAIR_MQ_SHMEM_COMMON_H_ */ diff --git a/fairmq/shmem/Manager.h b/fairmq/shmem/Manager.h index fae96108..4bdf7448 100644 --- a/fairmq/shmem/Manager.h +++ b/fairmq/shmem/Manager.h @@ -323,6 +323,7 @@ class Manager } const uint16_t id = cfg.id.value(); + const uint64_t rcSegmentSize = cfg.rcSegmentSize; std::lock_guard lock(fLocalRegionsMtx); @@ -347,6 +348,7 @@ class Manager // start ack receiver only if a callback has been provided. if (callback || bulkCallback) { region->SetCallbacks(callback, bulkCallback); + region->InitializeRefCountSegment(rcSegmentSize); region->InitializeQueues(); region->StartAckSender(); region->StartAckReceiver(); @@ -398,6 +400,7 @@ class Manager } else { try { RegionConfig cfg; + const uint64_t rcSegmentSize = cfg.rcSegmentSize; // get region info { boost::interprocess::scoped_lock shmLock(*fShmMtx); @@ -409,6 +412,7 @@ class Manager // LOG(debug) << "Located remote region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'"; auto r = fRegions.emplace(id, std::make_unique(fShmId, 0, false, std::move(cfg))); + r.first->second->InitializeRefCountSegment(rcSegmentSize); r.first->second->InitializeQueues(); r.first->second->StartAckSender(); return r.first->second.get(); @@ -499,8 +503,10 @@ class Manager if (it != fRegions.end()) { region = it->second.get(); } else { + const uint64_t rcSegmentSize = cfgIt->second.rcSegmentSize; auto r = fRegions.emplace(cfgIt->first, std::make_unique(fShmId, 0, false, cfgIt->second)); region = r.first->second.get(); + region->InitializeRefCountSegment(rcSegmentSize); region->InitializeQueues(); region->StartAckSender(); } diff --git a/fairmq/shmem/Message.h b/fairmq/shmem/Message.h index 2aa9e93d..46e25953 100644 --- a/fairmq/shmem/Message.h +++ b/fairmq/shmem/Message.h @@ -262,52 +262,50 @@ class Message final : public fair::mq::Message if (fMeta.fManaged) { // managed segment fManager.GetSegment(fMeta.fSegmentId); return ShmHeader::RefCount(fManager.GetAddressFromHandle(fMeta.fHandle, fMeta.fSegmentId)); - } else { // unmanaged region - if (fMeta.fShared < 0) { // UR msg is not yet shared - return 1; - } else { - fManager.GetSegment(fMeta.fSegmentId); - return ShmHeader::RefCount(fManager.GetAddressFromHandle(fMeta.fShared, fMeta.fSegmentId)); - } } + if (fMeta.fShared < 0) { // UR msg is not yet shared + return 1; + } + fRegionPtr = fManager.GetRegionFromCache(fMeta.fRegionId); + if (!fRegionPtr) { + throw TransportError(tools::ToString("Cannot get unmanaged region with id ", fMeta.fRegionId)); + } + return fRegionPtr->GetRefCountAddressFromHandle(fMeta.fShared)->Get(); } void Copy(const fair::mq::Message& other) override { const Message& otherMsg = static_cast(other); + // if the other message is not initialized, close this one too and return if (otherMsg.fMeta.fHandle < 0) { - // if the other message is not initialized, close this one too and return CloseMessage(); return; } + // if this msg is already initialized, close it first if (fMeta.fHandle >= 0) { - // if this msg is already initialized, close it first CloseMessage(); } - if (otherMsg.fMeta.fManaged) { // managed segment - fMeta = otherMsg.fMeta; - fManager.GetSegment(fMeta.fSegmentId); - ShmHeader::IncrementRefCount(fManager.GetAddressFromHandle(fMeta.fHandle, fMeta.fSegmentId)); - } else { // unmanaged region - if (otherMsg.fMeta.fShared < 0) { // if UR msg is not yet shared - // TODO: minimize the size to 0 and don't create extra space for user buffer alignment - char* ptr = fManager.Allocate(2, 0); - // point the fShared in the unmanaged region message to the refCount holder - otherMsg.fMeta.fShared = fManager.GetHandleFromAddress(ptr, fMeta.fSegmentId); - // the message needs to be able to locate in which segment the refCount is stored - otherMsg.fMeta.fSegmentId = fMeta.fSegmentId; - // point this message to the same content as the unmanaged region message - fMeta = otherMsg.fMeta; - // increment the refCount - ShmHeader::IncrementRefCount(ptr); - } else { // if the UR msg is already shared - fMeta = otherMsg.fMeta; - fManager.GetSegment(fMeta.fSegmentId); - ShmHeader::IncrementRefCount(fManager.GetAddressFromHandle(fMeta.fShared, fMeta.fSegmentId)); + // increment ref count + if (otherMsg.fMeta.fManaged) { // msg in managed segment + fManager.GetSegment(otherMsg.fMeta.fSegmentId); + ShmHeader::IncrementRefCount(fManager.GetAddressFromHandle(otherMsg.fMeta.fHandle, otherMsg.fMeta.fSegmentId)); + } else { // msg in unmanaged region + fRegionPtr = fManager.GetRegionFromCache(otherMsg.fMeta.fRegionId); + if (!fRegionPtr) { + throw TransportError(tools::ToString("Cannot get unmanaged region with id ", otherMsg.fMeta.fRegionId)); + } + if (otherMsg.fMeta.fShared < 0) { + // UR msg not yet shared, create the reference counting object with count 2 + otherMsg.fMeta.fShared = fRegionPtr->HandleFromAddress(&(fRegionPtr->MakeRefCount(2))); + } else { + fRegionPtr->GetRefCountAddressFromHandle(otherMsg.fMeta.fShared)->Increment(); } } + + // copy meta data + fMeta = otherMsg.fMeta; } ~Message() override { CloseMessage(); } @@ -344,12 +342,13 @@ class Message final : public fair::mq::Message } } else { // unmanaged region if (fMeta.fShared >= 0) { - // make sure segment is initialized in this transport - fManager.GetSegment(fMeta.fSegmentId); - // release unmanaged region block if ref count is one - uint16_t refCount = ShmHeader::DecrementRefCount(fManager.GetAddressFromHandle(fMeta.fShared, fMeta.fSegmentId)); + fRegionPtr = fManager.GetRegionFromCache(fMeta.fRegionId); + if (!fRegionPtr) { + throw TransportError(tools::ToString("Cannot get unmanaged region with id ", fMeta.fRegionId)); + } + uint16_t refCount = fRegionPtr->GetRefCountAddressFromHandle(fMeta.fShared)->Decrement(); if (refCount == 1) { - fManager.Deallocate(fMeta.fShared, fMeta.fSegmentId); + fRegionPtr->RemoveRefCount(*(fRegionPtr->GetRefCountAddressFromHandle(fMeta.fShared))); ReleaseUnmanagedRegionBlock(); } } else { diff --git a/fairmq/shmem/Monitor.cxx b/fairmq/shmem/Monitor.cxx index 06207d04..904e47d9 100644 --- a/fairmq/shmem/Monitor.cxx +++ b/fairmq/shmem/Monitor.cxx @@ -235,8 +235,8 @@ bool Monitor::PrintShm(const ShmId& shmId) << ", managed segments:\n"; for (const auto& s : segments) { - size_t free = std::visit([](auto& s){ return s.get_free_memory(); }, s.second); - size_t total = std::visit([](auto& s){ return s.get_size(); }, s.second); + size_t free = std::visit([](auto& seg){ return seg.get_free_memory(); }, s.second); + size_t total = std::visit([](auto& seg){ return seg.get_size(); }, s.second); size_t used = total - free; std::string msgCount; @@ -268,12 +268,19 @@ bool Monitor::PrintShm(const ShmId& shmId) if (shmRegions && !shmRegions->empty()) { ss << "\n unmanaged regions:"; - for (const auto& r : *shmRegions) { - ss << "\n [" << r.first << "]: " << (r.second.fDestroyed ? "destroyed" : "alive"); - ss << ", size: " << r.second.fSize; + for (const auto& [id, info] : *shmRegions) { + ss << "\n [" << id << "]: " << (info.fDestroyed ? "destroyed" : "alive"); + ss << ", size: " << info.fSize; + + try { + managed_shared_memory rcCountSegment(open_read_only, std::string("fmq_" + shmId.shmId + "_rrc_" + to_string(id)).c_str()); + ss << ", rcCountSegment size: " << rcCountSegment.get_size(); + } catch (bie&) { + ss << ", rcCountSegment: not found"; + } // try { - // boost::interprocess::message_queue q(open_only, std::string("fmq_" + std::string(shmId) + "_rgq_" + to_string(r.first)).c_str()); + // boost::interprocess::message_queue q(open_only, std::string("fmq_" + std::string(shmId) + "_rgq_" + to_string(id)).c_str()); // ss << ", ack queue: " << q.get_num_msg() << " messages"; // } catch (bie&) { // ss << ", ack queue: not found"; @@ -679,6 +686,7 @@ std::vector> Monitor::Cleanup(const ShmId& shmIdT, result.emplace_back(Remove("fmq_" + shmId + "_rg_" + to_string(id), verbose)); } result.emplace_back(Remove("fmq_" + shmId + "_rgq_" + to_string(id), verbose)); + result.emplace_back(Remove("fmq_" + shmId + "_rrc_" + to_string(id), verbose)); } } @@ -780,6 +788,7 @@ void Monitor::ResetContent(const ShmId& shmIdT, bool verbose /* = true */) for (const auto& region : *shmRegions) { uint16_t id = region.first; Remove("fmq_" + shmId + "_rgq_" + to_string(id), verbose); + Remove("fmq_" + shmId + "_rrc_" + to_string(id), verbose); } } } catch (bie& e) { diff --git a/fairmq/shmem/README.md b/fairmq/shmem/README.md index 75a8a2c0..f76019e0 100644 --- a/fairmq/shmem/README.md +++ b/fairmq/shmem/README.md @@ -16,6 +16,7 @@ FairMQ Shared Memory currently uses the following names to register shared memor | `fmq__mng` | management segment (management data) | one of the devices | devices | | `fmq__rg_` | unmanaged region(s) | one of the devices | devices with unmanaged regions | | `fmq__rgq_` | unmanaged region queue(s) | one of the devices | devices with unmanaged regions | +| `fmq__rrc_` | unmanaged region reference count pool(s) | one of the devices | devices with unmanaged regions | | `fmq__ms` | shmmonitor status | shmmonitor | devices, shmmonitor | The shmId is generated out of session id and user id. diff --git a/fairmq/shmem/UnmanagedRegion.h b/fairmq/shmem/UnmanagedRegion.h index cb9464ea..11f42856 100644 --- a/fairmq/shmem/UnmanagedRegion.h +++ b/fairmq/shmem/UnmanagedRegion.h @@ -59,8 +59,9 @@ struct UnmanagedRegion , fRemoveOnDestruction(cfg.removeOnDestruction) , fLinger(cfg.linger) , fStopAcks(false) - , fName("fmq_" + shmId + "_rg_" + std::to_string(cfg.id.value())) - , fQueueName("fmq_" + shmId + "_rgq_" + std::to_string(cfg.id.value())) + , fName(MakeShmName(shmId, "rg", cfg.id.value())) + , fQueueName(MakeShmName(shmId, "rgq", cfg.id.value())) + , fRefCountSegmentName(MakeShmName(shmId, "rrc", cfg.id.value())) , fShmemObject() , fFile(nullptr) , fFileMapping() @@ -186,6 +187,19 @@ struct UnmanagedRegion bool RemoveOnDestruction() { return fRemoveOnDestruction; } + RefCount& MakeRefCount(uint16_t initialCount = 1) + { + RefCount* refCount = fRefCountPool->allocate(1).get(); + new (refCount) RefCount(initialCount); + return *refCount; + } + + void RemoveRefCount(RefCount& refCount) + { + refCount.~RefCount(); + fRefCountPool->deallocate(&refCount, 1); + } + ~UnmanagedRegion() { LOG(debug) << "~UnmanagedRegion(): " << fName << " (" << (fControlling ? "controller" : "viewer") << ")"; @@ -208,6 +222,11 @@ struct UnmanagedRegion if (Monitor::RemoveFileMapping(fName.c_str())) { LOG(trace) << "File mapping '" << fName << "' destroyed."; } + if (fRefCountSegment) { + if (Monitor::RemoveObject(fRefCountSegmentName)) { + LOG(trace) << "Ref Count Segment '" << fRefCountSegmentName << "' destroyed."; + } + } } else { LOG(debug) << "Skipping removal of " << fName << " unmanaged region, because RegionConfig::removeOnDestruction is false"; } @@ -235,6 +254,7 @@ struct UnmanagedRegion std::atomic fStopAcks; std::string fName; std::string fQueueName; + std::string fRefCountSegmentName; boost::interprocess::shared_memory_object fShmemObject; FILE* fFile; boost::interprocess::file_mapping fFileMapping; @@ -245,12 +265,18 @@ struct UnmanagedRegion std::vector fBlocksToFree; const std::size_t fAckBunchSize = 256; std::unique_ptr fQueue; + std::unique_ptr fRefCountSegment; + std::unique_ptr fRefCountPool; std::thread fAcksReceiver; std::thread fAcksSender; RegionCallback fCallback; RegionBulkCallback fBulkCallback; + static std::string MakeSegmentName(const std::string& shmId, std::string_view segment, int regionIndex) { + return tools::ToString("fmq_", shmId, "_", segment, "_", regionIndex); + } + static RegionConfig makeRegionConfig(uint16_t id) { RegionConfig regionCfg; @@ -275,7 +301,7 @@ struct UnmanagedRegion throw TransportError(tools::ToString("Unmanaged Region with id ", cfg.id.value(), " has already been registered. Only unique IDs per session are allowed.")); } - shmRegions->emplace(cfg.id.value(), RegionInfo(cfg.path.c_str(), cfg.creationFlags, cfg.userFlags, cfg.size, alloc)); + shmRegions->emplace(cfg.id.value(), RegionInfo(cfg.path.c_str(), cfg.creationFlags, cfg.userFlags, cfg.size, cfg.rcSegmentSize, alloc)); (eventCounter->fCount)++; } @@ -294,6 +320,29 @@ struct UnmanagedRegion } } + void InitializeRefCountSegment(uint64_t size) + { + using namespace boost::interprocess; + if (!fRefCountSegment) { + fRefCountSegment = std::make_unique(open_or_create, fRefCountSegmentName.c_str(), size); + LOG(trace) << "shmem: initialized ref count segment: " << fRefCountSegmentName; + fRefCountPool = std::make_unique(fRefCountSegment->get_segment_manager()); + } + } + + RefCount* GetRefCountAddressFromHandle(const boost::interprocess::managed_shared_memory::handle_t handle) + { + if (fRefCountPool) { + return reinterpret_cast(fRefCountSegment->get_address_from_handle(handle)); + } + return nullptr; + }; + + boost::interprocess::managed_shared_memory::handle_t HandleFromAddress(const void* ptr) + { + return fRefCountSegment->get_handle_from_address(ptr); + } + void StartAckSender() { if (!fAcksSender.joinable()) {