diff --git a/fairmq/shmem/Manager.h b/fairmq/shmem/Manager.h index f878ab18..e4a73b88 100644 --- a/fairmq/shmem/Manager.h +++ b/fairmq/shmem/Manager.h @@ -135,7 +135,6 @@ class Manager , fManagementSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mng").c_str(), 6553600) , fShmVoidAlloc(fManagementSegment.get_segment_manager()) , fShmMtx(fManagementSegment.find_or_construct(boost::interprocess::unique_instance)()) - , fRegionEventsShmCV(fManagementSegment.find_or_construct(boost::interprocess::unique_instance)()) , fNumObservedEvents(0) , fDeviceCounter(nullptr) , fEventCounter(nullptr) @@ -241,7 +240,6 @@ class Manager fSegments.emplace(fSegmentId, SimpleSeqFitSegment(open_or_create, segmentName.c_str(), size)); fShmSegments->emplace(fSegmentId, AllocationAlgorithm::simple_seq_fit); } - (fEventCounter->fCount)++; if (mlockSegmentOnCreation) { MlockSegment(fSegmentId); } @@ -280,6 +278,8 @@ class Manager ZeroSegment(fSegmentId); } + (fEventCounter->fCount)++; + #ifdef FAIRMQ_DEBUG_MODE fMsgDebug = fManagementSegment.find_or_construct(unique_instance)(fShmVoidAlloc); fShmMsgCounters = fManagementSegment.find_or_construct(unique_instance)(fShmVoidAlloc); @@ -399,17 +399,6 @@ class Manager region.fRemote = false; // TODO: this should be more clear, refactor it. } - if (cfg.lock) { - LOG(debug) << "Locking region " << id << "..."; - region.Lock(); - LOG(debug) << "Successfully locked region " << id << "."; - } - if (cfg.zero) { - LOG(debug) << "Zeroing free memory of region " << id << "..."; - region.Zero(); - LOG(debug) << "Successfully zeroed free memory of region " << id << "."; - } - // start ack receiver only if a callback has been provided. if (callback || bulkCallback) { region.SetCallbacks(callback, bulkCallback); @@ -421,7 +410,6 @@ class Manager result.second = id; } fRegionsGen += 1; // signal TL cache invalidation - fRegionEventsShmCV->notify_all(); return result; } catch (interprocess_exception& e) { @@ -445,19 +433,19 @@ class Manager } } - boost::interprocess::scoped_lock lock(*fShmMtx); + boost::interprocess::scoped_lock shmLock(*fShmMtx); // slow path: check invalidation if (lTlCacheGen != fRegionsGen) { fTlRegionCache.fRegionsTLCache.clear(); } - auto *lRegion = GetRegionUnsafe(id); + auto* lRegion = GetRegionUnsafe(id, shmLock); fTlRegionCache.fRegionsTLCache.emplace_back(std::make_tuple(lRegion, id, fShmId64)); fTlRegionCache.fRegionsTLCacheGen = fRegionsGen; return lRegion; } - UnmanagedRegion* GetRegionUnsafe(const uint16_t id) + UnmanagedRegion* GetRegionUnsafe(const uint16_t id, boost::interprocess::scoped_lock& lockedShmLock) { // remote region could actually be a local one if a message originates from this device (has been sent out and returned) auto it = fRegions.find(id); @@ -467,6 +455,8 @@ class Manager try { // get region info RegionInfo regionInfo = fShmRegions->at(id); + // safe to unlock now - no shm container accessed after this + lockedShmLock.unlock(); RegionConfig cfg; cfg.id = id; cfg.creationFlags = regionInfo.fCreationFlags; @@ -500,7 +490,6 @@ class Manager } fRegions.erase(id); } - fRegionEventsShmCV->notify_all(); } catch (std::out_of_range& oor) { LOG(debug) << "RemoveRegion() could not locate region with id '" << id << "'"; } @@ -508,35 +497,9 @@ class Manager } std::vector GetRegionInfo() - { - boost::interprocess::scoped_lock lock(*fShmMtx); - return GetRegionInfoUnsafe(); - } - - std::vector GetRegionInfoUnsafe() { std::vector result; - - for (const auto& e : *fShmRegions) { - fair::mq::RegionInfo info; - info.managed = false; - info.id = e.first; - info.flags = e.second.fUserFlags; - info.event = e.second.fDestroyed ? RegionEvent::destroyed : RegionEvent::created; - if (!e.second.fDestroyed) { - auto region = GetRegionUnsafe(info.id); - if (region) { - info.ptr = region->GetData(); - info.size = region->GetSize(); - } else { - throw std::runtime_error(tools::ToString("GetRegionInfoUnsafe() could not get region with id '", info.id, "'")); - } - } else { - info.ptr = nullptr; - info.size = 0; - } - result.push_back(info); - } + boost::interprocess::scoped_lock shmLock(*fShmMtx); for (const auto& e : *fShmSegments) { // make sure any segments in the session are found @@ -555,6 +518,27 @@ class Manager } } + for (const auto& e : *fShmRegions) { + fair::mq::RegionInfo info; + info.managed = false; + info.id = e.first; + info.flags = e.second.fUserFlags; + info.event = e.second.fDestroyed ? RegionEvent::destroyed : RegionEvent::created; + if (info.event == RegionEvent::created) { + auto region = GetRegionUnsafe(info.id, shmLock); + if (region) { + info.ptr = region->GetData(); + info.size = region->GetSize(); + } else { + throw std::runtime_error(tools::ToString("GetRegionInfo() could not get region with id '", info.id, "'")); + } + } else { + info.ptr = nullptr; + info.size = 0; + } + result.push_back(info); + } + return result; } @@ -562,13 +546,13 @@ class Manager { if (fRegionEventThread.joinable()) { LOG(debug) << "Already subscribed. Overwriting previous subscription."; - boost::interprocess::scoped_lock lock(*fShmMtx); + std::unique_lock lock(fRegionEventsMtx); fRegionEventsSubscriptionActive = false; lock.unlock(); - fRegionEventsShmCV->notify_all(); + fRegionEventsCV.notify_one(); fRegionEventThread.join(); } - boost::interprocess::scoped_lock lock(*fShmMtx); + std::lock_guard lock(fRegionEventsMtx); fRegionEventCallback = callback; fRegionEventsSubscriptionActive = true; fRegionEventThread = std::thread(&Manager::RegionEventsSubscription, this); @@ -579,10 +563,10 @@ class Manager void UnsubscribeFromRegionEvents() { if (fRegionEventThread.joinable()) { - boost::interprocess::scoped_lock lock(*fShmMtx); + std::unique_lock lock(fRegionEventsMtx); fRegionEventsSubscriptionActive = false; lock.unlock(); - fRegionEventsShmCV->notify_all(); + fRegionEventsCV.notify_one(); fRegionEventThread.join(); lock.lock(); fRegionEventCallback = nullptr; @@ -591,33 +575,38 @@ class Manager void RegionEventsSubscription() { - boost::interprocess::scoped_lock lock(*fShmMtx); + std::unique_lock lock(fRegionEventsMtx); + while (fRegionEventsSubscriptionActive) { - auto infos = GetRegionInfoUnsafe(); - for (const auto& i : infos) { - auto el = fObservedRegionEvents.find({i.id, i.managed}); - if (el == fObservedRegionEvents.end()) { // if event id has not been observed - fObservedRegionEvents.emplace(std::make_pair(i.id, i.managed), i.event); - // if a region has been created and destroyed rapidly, we could see 'destroyed' without ever seeing 'created' - // TODO: do we care to show 'created' events if we know region is already destroyed? - if (i.event == RegionEvent::created) { - fRegionEventCallback(i); - ++fNumObservedEvents; - } else { - fNumObservedEvents += 2; - } - } else { // if event id has been observed (expected - there are two events per id - created & destroyed) - // fire a callback if we have observed 'created' event and incoming is 'destroyed' - if (el->second == RegionEvent::created && i.event == RegionEvent::destroyed) { - fRegionEventCallback(i); - el->second = i.event; - ++fNumObservedEvents; - } else { - // LOG(debug) << "ignoring event " << i.id << ": incoming: " << i.event << ", stored: " << el->second; + if (fNumObservedEvents != fEventCounter->fCount) { + auto infos = GetRegionInfo(); + + for (const auto& i : infos) { + auto el = fObservedRegionEvents.find({i.id, i.managed}); + if (el == fObservedRegionEvents.end()) { // if event id has not been observed + fObservedRegionEvents.emplace(std::make_pair(i.id, i.managed), i.event); + // if a region has been created and destroyed rapidly, we could see 'destroyed' without ever seeing 'created' + // TODO: do we care to show 'created' events if we know region is already destroyed? + if (i.event == RegionEvent::created) { + fRegionEventCallback(i); + ++fNumObservedEvents; + } else { + fNumObservedEvents += 2; + } + } else { // if event id has been observed (expected - there are two events per id - created & destroyed) + // fire a callback if we have observed 'created' event and incoming is 'destroyed' + if (el->second == RegionEvent::created && i.event == RegionEvent::destroyed) { + fRegionEventCallback(i); + el->second = i.event; + ++fNumObservedEvents; + } else { + // LOG(debug) << "ignoring event " << i.id << ": incoming: " << i.event << ", stored: " << el->second; + } } } } - fRegionEventsShmCV->wait(lock, [&] { return !fRegionEventsSubscriptionActive || fNumObservedEvents != fEventCounter->fCount; }); + // TODO: do better than polling here, without adding too much shmem contention + fRegionEventsCV.wait_for(lock, std::chrono::milliseconds(50), [&] { return !fRegionEventsSubscriptionActive; }); } } @@ -812,7 +801,8 @@ class Manager VoidAlloc fShmVoidAlloc; boost::interprocess::interprocess_mutex* fShmMtx; - boost::interprocess::interprocess_condition* fRegionEventsShmCV; + std::mutex fRegionEventsMtx; + std::condition_variable fRegionEventsCV; std::thread fRegionEventThread; std::function fRegionEventCallback; std::map, RegionEvent> fObservedRegionEvents; // pair: diff --git a/fairmq/shmem/UnmanagedRegion.h b/fairmq/shmem/UnmanagedRegion.h index 28503ac4..592b4d36 100644 --- a/fairmq/shmem/UnmanagedRegion.h +++ b/fairmq/shmem/UnmanagedRegion.h @@ -107,6 +107,17 @@ struct UnmanagedRegion } } + if (cfg.lock) { + LOG(debug) << "Locking region " << cfg.id.value() << "..."; + Lock(); + LOG(debug) << "Successfully locked region " << cfg.id.value() << "."; + } + if (cfg.zero) { + LOG(debug) << "Zeroing free memory of region " << cfg.id.value() << "..."; + Zero(); + LOG(debug) << "Successfully zeroed free memory of region " << cfg.id.value() << "."; + } + if (!remote) { Register(shmId, cfg); }