From 0f89b99f503cf29f7c30f23d73c62d09f059ab9b Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Tue, 7 Dec 2021 00:35:48 +0100 Subject: [PATCH] shm: integrate mtx and cv into management segment --- fairmq/shmem/Manager.h | 47 ++++++++++++++++++++-------------------- fairmq/shmem/Monitor.cxx | 16 ++++++-------- fairmq/shmem/README.md | 2 -- 3 files changed, 31 insertions(+), 34 deletions(-) diff --git a/fairmq/shmem/Manager.h b/fairmq/shmem/Manager.h index 1a3ed777..549bc37f 100644 --- a/fairmq/shmem/Manager.h +++ b/fairmq/shmem/Manager.h @@ -23,7 +23,8 @@ #include #include #include -#include +#include +#include #include #include @@ -134,8 +135,8 @@ class Manager , fDeviceId(std::move(deviceId)) , fManagementSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mng").c_str(), 6553600) , fShmVoidAlloc(fManagementSegment.get_segment_manager()) - , fShmMtx(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mtx").c_str()) - , fRegionEventsShmCV(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_cv").c_str()) + , fShmMtx(fManagementSegment.find_or_construct(boost::interprocess::unique_instance)()) + , fRegionEventsShmCV(fManagementSegment.find_or_construct(boost::interprocess::unique_instance)()) , fNumObservedEvents(0) , fDeviceCounter(nullptr) , fEventCounter(nullptr) @@ -188,7 +189,7 @@ class Manager } if (autolaunchMonitor) { - boost::interprocess::scoped_lock lock(fShmMtx); + boost::interprocess::scoped_lock lock(*fShmMtx); StartMonitor(fShmId); } @@ -196,7 +197,7 @@ class Manager try { std::stringstream ss; - boost::interprocess::scoped_lock lock(fShmMtx); + boost::interprocess::scoped_lock lock(*fShmMtx); fShmSegments = fManagementSegment.find_or_construct(unique_instance)(fShmVoidAlloc); @@ -379,7 +380,7 @@ class Manager std::pair result; { - boost::interprocess::scoped_lock lock(fShmMtx); + boost::interprocess::scoped_lock lock(*fShmMtx); if (!cfg.id.has_value()) { RegionCounter* rc = fManagementSegment.find(unique_instance).first; @@ -437,7 +438,7 @@ class Manager } } fRegionsGen += 1; // signal TL cache invalidation - fRegionEventsShmCV.notify_all(); + fRegionEventsShmCV->notify_all(); return result; } catch (interprocess_exception& e) { @@ -461,7 +462,7 @@ class Manager } } - boost::interprocess::scoped_lock lock(fShmMtx); + boost::interprocess::scoped_lock lock(*fShmMtx); // slow path: check invalidation if (lTlCacheGen != fRegionsGen) { fTlRegionCache.fRegionsTLCache.clear(); @@ -507,14 +508,14 @@ class Manager try { fRegions.at(id)->StopAcks(); { - boost::interprocess::scoped_lock lock(fShmMtx); + boost::interprocess::scoped_lock lock(*fShmMtx); if (fRegions.at(id)->fRemoveOnDestruction) { fShmRegions->at(id).fDestroyed = true; (fEventCounter->fCount)++; } fRegions.erase(id); } - fRegionEventsShmCV.notify_all(); + fRegionEventsShmCV->notify_all(); } catch (std::out_of_range& oor) { LOG(debug) << "RemoveRegion() could not locate region with id '" << id << "'"; } @@ -523,7 +524,7 @@ class Manager std::vector GetRegionInfo() { - boost::interprocess::scoped_lock lock(fShmMtx); + boost::interprocess::scoped_lock lock(*fShmMtx); return GetRegionInfoUnsafe(); } @@ -576,13 +577,13 @@ class Manager { if (fRegionEventThread.joinable()) { LOG(debug) << "Already subscribed. Overwriting previous subscription."; - boost::interprocess::scoped_lock lock(fShmMtx); + boost::interprocess::scoped_lock lock(*fShmMtx); fRegionEventsSubscriptionActive = false; lock.unlock(); - fRegionEventsShmCV.notify_all(); + fRegionEventsShmCV->notify_all(); fRegionEventThread.join(); } - boost::interprocess::scoped_lock lock(fShmMtx); + boost::interprocess::scoped_lock lock(*fShmMtx); fRegionEventCallback = callback; fRegionEventsSubscriptionActive = true; fRegionEventThread = std::thread(&Manager::RegionEventsSubscription, this); @@ -593,10 +594,10 @@ class Manager void UnsubscribeFromRegionEvents() { if (fRegionEventThread.joinable()) { - boost::interprocess::scoped_lock lock(fShmMtx); + boost::interprocess::scoped_lock lock(*fShmMtx); fRegionEventsSubscriptionActive = false; lock.unlock(); - fRegionEventsShmCV.notify_all(); + fRegionEventsShmCV->notify_all(); fRegionEventThread.join(); lock.lock(); fRegionEventCallback = nullptr; @@ -605,7 +606,7 @@ class Manager void RegionEventsSubscription() { - boost::interprocess::scoped_lock lock(fShmMtx); + boost::interprocess::scoped_lock lock(*fShmMtx); while (fRegionEventsSubscriptionActive) { auto infos = GetRegionInfoUnsafe(); for (const auto& i : infos) { @@ -631,7 +632,7 @@ class Manager } } } - fRegionEventsShmCV.wait(lock, [&] { return !fRegionEventsSubscriptionActive || fNumObservedEvents != fEventCounter->fCount; }); + fRegionEventsShmCV->wait(lock, [&] { return !fRegionEventsSubscriptionActive || fNumObservedEvents != fEventCounter->fCount; }); } } @@ -740,7 +741,7 @@ class Manager } } #ifdef FAIRMQ_DEBUG_MODE - boost::interprocess::scoped_lock lock(fShmMtx); + boost::interprocess::scoped_lock lock(*fShmMtx); IncrementShmMsgCounter(fSegmentId); if (fMsgDebug->count(fSegmentId) == 0) { fMsgDebug->emplace(fSegmentId, fShmVoidAlloc); @@ -759,7 +760,7 @@ class Manager { char* ptr = GetAddressFromHandle(handle, segmentId); #ifdef FAIRMQ_DEBUG_MODE - boost::interprocess::scoped_lock lock(fShmMtx); + boost::interprocess::scoped_lock lock(*fShmMtx); DecrementShmMsgCounter(segmentId); try { fMsgDebug->at(segmentId).erase(GetHandleFromAddress(ShmHeader::UserPtr(ptr), fSegmentId)); @@ -784,7 +785,7 @@ class Manager bool lastRemoved = false; try { - boost::interprocess::scoped_lock lock(fShmMtx); + boost::interprocess::scoped_lock lock(*fShmMtx); (fDeviceCounter->fCount)--; @@ -821,9 +822,9 @@ class Manager std::unordered_map> fSegments; boost::interprocess::managed_shared_memory fManagementSegment; VoidAlloc fShmVoidAlloc; - boost::interprocess::named_mutex fShmMtx; + boost::interprocess::interprocess_mutex* fShmMtx; - boost::interprocess::named_condition fRegionEventsShmCV; + boost::interprocess::interprocess_condition* fRegionEventsShmCV; std::thread fRegionEventThread; std::function fRegionEventCallback; std::map, RegionEvent> fObservedRegionEvents; // pair: diff --git a/fairmq/shmem/Monitor.cxx b/fairmq/shmem/Monitor.cxx index 59e0efcd..3811a488 100644 --- a/fairmq/shmem/Monitor.cxx +++ b/fairmq/shmem/Monitor.cxx @@ -17,6 +17,7 @@ #include #include +#include #include #include @@ -390,8 +391,8 @@ void Monitor::PrintDebugInfo(const ShmId& shmId __attribute__((unused))) string managementSegmentName("fmq_" + shmId.shmId + "_mng"); try { bipc::managed_shared_memory managementSegment(bipc::open_only, managementSegmentName.c_str()); - boost::interprocess::named_mutex mtx(boost::interprocess::open_only, string("fmq_" + shmId.shmId + "_mtx").c_str()); - boost::interprocess::scoped_lock lock(mtx); + bipc::interprocess_mutex* mtx(managementSegment.find_or_construct(bipc::unique_instance)()); + bipc::scoped_lock lock(*mtx); Uint16MsgDebugMapHashMap* debug = managementSegment.find(bipc::unique_instance).first; @@ -438,8 +439,8 @@ unordered_map> Monitor::GetDebugInfo(cons string managementSegmentName("fmq_" + shmId.shmId + "_mng"); try { bipc::managed_shared_memory managementSegment(bipc::open_only, managementSegmentName.c_str()); - boost::interprocess::named_mutex mtx(boost::interprocess::open_only, string("fmq_" + shmId.shmId + "_mtx").c_str()); - boost::interprocess::scoped_lock lock(mtx); + bipc::interprocess_mutex* mtx(managementSegment.find_or_construct(bipc::unique_instance)()); + bipc::scoped_lock lock(*mtx); Uint16MsgDebugMapHashMap* debug = managementSegment.find(bipc::unique_instance).first; @@ -471,8 +472,8 @@ unsigned long Monitor::GetFreeMemory(const ShmId& shmId, uint16_t segmentId) using namespace boost::interprocess; try { bipc::managed_shared_memory managementSegment(bipc::open_only, std::string("fmq_" + shmId.shmId + "_mng").c_str()); - boost::interprocess::named_mutex mtx(boost::interprocess::open_only, std::string("fmq_" + shmId.shmId + "_mtx").c_str()); - boost::interprocess::scoped_lock lock(mtx); + boost::interprocess::interprocess_mutex* mtx(managementSegment.find_or_construct(bipc::unique_instance)()); + boost::interprocess::scoped_lock lock(*mtx); Uint16SegmentInfoHashMap* shmSegments = managementSegment.find(unique_instance).first; @@ -591,9 +592,6 @@ std::vector> Monitor::Cleanup(const ShmId& shmId, b } } - result.emplace_back(Remove("fmq_" + shmId.shmId + "_mtx", verbose)); - result.emplace_back(Remove("fmq_" + shmId.shmId + "_cv", verbose)); - return result; } diff --git a/fairmq/shmem/README.md b/fairmq/shmem/README.md index 7a084471..b85012c5 100644 --- a/fairmq/shmem/README.md +++ b/fairmq/shmem/README.md @@ -14,8 +14,6 @@ FairMQ Shared Memory currently uses the following names to register shared memor | --------------------------- | ---------------------------------------------- | ------------------ | ------------------------------ | | `fmq__m_` | managed segment(s) (user data) | one of the devices | devices | | `fmq__mng` | management segment (management data) | one of the devices | devices | -| `fmq__mtx` | mutex | one of the devices | devices | -| `fmq__cv` | condition variable | one of the devices | devices | | `fmq__rg_` | unmanaged region(s) | one of the devices | devices with unmanaged regions | | `fmq__rgq_` | unmanaged region queue(s) | one of the devices | devices with unmanaged regions | | `fmq__ms` | shmmonitor status | shmmonitor | devices, shmmonitor |