shm: reduce contention on region events

This commit is contained in:
Alexey Rybalchenko 2022-01-04 20:13:57 +01:00
parent 692576a5b1
commit c04958e2a4
2 changed files with 75 additions and 74 deletions

View File

@ -135,7 +135,6 @@ class Manager
, fManagementSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mng").c_str(), 6553600) , fManagementSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mng").c_str(), 6553600)
, fShmVoidAlloc(fManagementSegment.get_segment_manager()) , fShmVoidAlloc(fManagementSegment.get_segment_manager())
, fShmMtx(fManagementSegment.find_or_construct<boost::interprocess::interprocess_mutex>(boost::interprocess::unique_instance)()) , fShmMtx(fManagementSegment.find_or_construct<boost::interprocess::interprocess_mutex>(boost::interprocess::unique_instance)())
, fRegionEventsShmCV(fManagementSegment.find_or_construct<boost::interprocess::interprocess_condition>(boost::interprocess::unique_instance)())
, fNumObservedEvents(0) , fNumObservedEvents(0)
, fDeviceCounter(nullptr) , fDeviceCounter(nullptr)
, fEventCounter(nullptr) , fEventCounter(nullptr)
@ -241,7 +240,6 @@ class Manager
fSegments.emplace(fSegmentId, SimpleSeqFitSegment(open_or_create, segmentName.c_str(), size)); fSegments.emplace(fSegmentId, SimpleSeqFitSegment(open_or_create, segmentName.c_str(), size));
fShmSegments->emplace(fSegmentId, AllocationAlgorithm::simple_seq_fit); fShmSegments->emplace(fSegmentId, AllocationAlgorithm::simple_seq_fit);
} }
(fEventCounter->fCount)++;
if (mlockSegmentOnCreation) { if (mlockSegmentOnCreation) {
MlockSegment(fSegmentId); MlockSegment(fSegmentId);
} }
@ -280,6 +278,8 @@ class Manager
ZeroSegment(fSegmentId); ZeroSegment(fSegmentId);
} }
(fEventCounter->fCount)++;
#ifdef FAIRMQ_DEBUG_MODE #ifdef FAIRMQ_DEBUG_MODE
fMsgDebug = fManagementSegment.find_or_construct<Uint16MsgDebugMapHashMap>(unique_instance)(fShmVoidAlloc); fMsgDebug = fManagementSegment.find_or_construct<Uint16MsgDebugMapHashMap>(unique_instance)(fShmVoidAlloc);
fShmMsgCounters = fManagementSegment.find_or_construct<Uint16MsgCounterHashMap>(unique_instance)(fShmVoidAlloc); fShmMsgCounters = fManagementSegment.find_or_construct<Uint16MsgCounterHashMap>(unique_instance)(fShmVoidAlloc);
@ -399,17 +399,6 @@ class Manager
region.fRemote = false; // TODO: this should be more clear, refactor it. region.fRemote = false; // TODO: this should be more clear, refactor it.
} }
if (cfg.lock) {
LOG(debug) << "Locking region " << id << "...";
region.Lock();
LOG(debug) << "Successfully locked region " << id << ".";
}
if (cfg.zero) {
LOG(debug) << "Zeroing free memory of region " << id << "...";
region.Zero();
LOG(debug) << "Successfully zeroed free memory of region " << id << ".";
}
// start ack receiver only if a callback has been provided. // start ack receiver only if a callback has been provided.
if (callback || bulkCallback) { if (callback || bulkCallback) {
region.SetCallbacks(callback, bulkCallback); region.SetCallbacks(callback, bulkCallback);
@ -421,7 +410,6 @@ class Manager
result.second = id; result.second = id;
} }
fRegionsGen += 1; // signal TL cache invalidation fRegionsGen += 1; // signal TL cache invalidation
fRegionEventsShmCV->notify_all();
return result; return result;
} catch (interprocess_exception& e) { } catch (interprocess_exception& e) {
@ -445,19 +433,19 @@ class Manager
} }
} }
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx); boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
// slow path: check invalidation // slow path: check invalidation
if (lTlCacheGen != fRegionsGen) { if (lTlCacheGen != fRegionsGen) {
fTlRegionCache.fRegionsTLCache.clear(); fTlRegionCache.fRegionsTLCache.clear();
} }
auto *lRegion = GetRegionUnsafe(id); auto* lRegion = GetRegionUnsafe(id, shmLock);
fTlRegionCache.fRegionsTLCache.emplace_back(std::make_tuple(lRegion, id, fShmId64)); fTlRegionCache.fRegionsTLCache.emplace_back(std::make_tuple(lRegion, id, fShmId64));
fTlRegionCache.fRegionsTLCacheGen = fRegionsGen; fTlRegionCache.fRegionsTLCacheGen = fRegionsGen;
return lRegion; return lRegion;
} }
UnmanagedRegion* GetRegionUnsafe(const uint16_t id) UnmanagedRegion* GetRegionUnsafe(const uint16_t id, boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex>& lockedShmLock)
{ {
// remote region could actually be a local one if a message originates from this device (has been sent out and returned) // remote region could actually be a local one if a message originates from this device (has been sent out and returned)
auto it = fRegions.find(id); auto it = fRegions.find(id);
@ -467,6 +455,8 @@ class Manager
try { try {
// get region info // get region info
RegionInfo regionInfo = fShmRegions->at(id); RegionInfo regionInfo = fShmRegions->at(id);
// safe to unlock now - no shm container accessed after this
lockedShmLock.unlock();
RegionConfig cfg; RegionConfig cfg;
cfg.id = id; cfg.id = id;
cfg.creationFlags = regionInfo.fCreationFlags; cfg.creationFlags = regionInfo.fCreationFlags;
@ -500,7 +490,6 @@ class Manager
} }
fRegions.erase(id); fRegions.erase(id);
} }
fRegionEventsShmCV->notify_all();
} catch (std::out_of_range& oor) { } catch (std::out_of_range& oor) {
LOG(debug) << "RemoveRegion() could not locate region with id '" << id << "'"; LOG(debug) << "RemoveRegion() could not locate region with id '" << id << "'";
} }
@ -508,35 +497,9 @@ class Manager
} }
std::vector<fair::mq::RegionInfo> GetRegionInfo() std::vector<fair::mq::RegionInfo> GetRegionInfo()
{
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx);
return GetRegionInfoUnsafe();
}
std::vector<fair::mq::RegionInfo> GetRegionInfoUnsafe()
{ {
std::vector<fair::mq::RegionInfo> result; std::vector<fair::mq::RegionInfo> result;
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
for (const auto& e : *fShmRegions) {
fair::mq::RegionInfo info;
info.managed = false;
info.id = e.first;
info.flags = e.second.fUserFlags;
info.event = e.second.fDestroyed ? RegionEvent::destroyed : RegionEvent::created;
if (!e.second.fDestroyed) {
auto region = GetRegionUnsafe(info.id);
if (region) {
info.ptr = region->GetData();
info.size = region->GetSize();
} else {
throw std::runtime_error(tools::ToString("GetRegionInfoUnsafe() could not get region with id '", info.id, "'"));
}
} else {
info.ptr = nullptr;
info.size = 0;
}
result.push_back(info);
}
for (const auto& e : *fShmSegments) { for (const auto& e : *fShmSegments) {
// make sure any segments in the session are found // make sure any segments in the session are found
@ -555,6 +518,27 @@ class Manager
} }
} }
for (const auto& e : *fShmRegions) {
fair::mq::RegionInfo info;
info.managed = false;
info.id = e.first;
info.flags = e.second.fUserFlags;
info.event = e.second.fDestroyed ? RegionEvent::destroyed : RegionEvent::created;
if (info.event == RegionEvent::created) {
auto region = GetRegionUnsafe(info.id, shmLock);
if (region) {
info.ptr = region->GetData();
info.size = region->GetSize();
} else {
throw std::runtime_error(tools::ToString("GetRegionInfo() could not get region with id '", info.id, "'"));
}
} else {
info.ptr = nullptr;
info.size = 0;
}
result.push_back(info);
}
return result; return result;
} }
@ -562,13 +546,13 @@ class Manager
{ {
if (fRegionEventThread.joinable()) { if (fRegionEventThread.joinable()) {
LOG(debug) << "Already subscribed. Overwriting previous subscription."; LOG(debug) << "Already subscribed. Overwriting previous subscription.";
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx); std::unique_lock<std::mutex> lock(fRegionEventsMtx);
fRegionEventsSubscriptionActive = false; fRegionEventsSubscriptionActive = false;
lock.unlock(); lock.unlock();
fRegionEventsShmCV->notify_all(); fRegionEventsCV.notify_one();
fRegionEventThread.join(); fRegionEventThread.join();
} }
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx); std::lock_guard<std::mutex> lock(fRegionEventsMtx);
fRegionEventCallback = callback; fRegionEventCallback = callback;
fRegionEventsSubscriptionActive = true; fRegionEventsSubscriptionActive = true;
fRegionEventThread = std::thread(&Manager::RegionEventsSubscription, this); fRegionEventThread = std::thread(&Manager::RegionEventsSubscription, this);
@ -579,10 +563,10 @@ class Manager
void UnsubscribeFromRegionEvents() void UnsubscribeFromRegionEvents()
{ {
if (fRegionEventThread.joinable()) { if (fRegionEventThread.joinable()) {
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx); std::unique_lock<std::mutex> lock(fRegionEventsMtx);
fRegionEventsSubscriptionActive = false; fRegionEventsSubscriptionActive = false;
lock.unlock(); lock.unlock();
fRegionEventsShmCV->notify_all(); fRegionEventsCV.notify_one();
fRegionEventThread.join(); fRegionEventThread.join();
lock.lock(); lock.lock();
fRegionEventCallback = nullptr; fRegionEventCallback = nullptr;
@ -591,9 +575,12 @@ class Manager
void RegionEventsSubscription() void RegionEventsSubscription()
{ {
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx); std::unique_lock<std::mutex> lock(fRegionEventsMtx);
while (fRegionEventsSubscriptionActive) { while (fRegionEventsSubscriptionActive) {
auto infos = GetRegionInfoUnsafe(); if (fNumObservedEvents != fEventCounter->fCount) {
auto infos = GetRegionInfo();
for (const auto& i : infos) { for (const auto& i : infos) {
auto el = fObservedRegionEvents.find({i.id, i.managed}); auto el = fObservedRegionEvents.find({i.id, i.managed});
if (el == fObservedRegionEvents.end()) { // if event id has not been observed if (el == fObservedRegionEvents.end()) { // if event id has not been observed
@ -617,7 +604,9 @@ class Manager
} }
} }
} }
fRegionEventsShmCV->wait(lock, [&] { return !fRegionEventsSubscriptionActive || fNumObservedEvents != fEventCounter->fCount; }); }
// TODO: do better than polling here, without adding too much shmem contention
fRegionEventsCV.wait_for(lock, std::chrono::milliseconds(50), [&] { return !fRegionEventsSubscriptionActive; });
} }
} }
@ -812,7 +801,8 @@ class Manager
VoidAlloc fShmVoidAlloc; VoidAlloc fShmVoidAlloc;
boost::interprocess::interprocess_mutex* fShmMtx; boost::interprocess::interprocess_mutex* fShmMtx;
boost::interprocess::interprocess_condition* fRegionEventsShmCV; std::mutex fRegionEventsMtx;
std::condition_variable fRegionEventsCV;
std::thread fRegionEventThread; std::thread fRegionEventThread;
std::function<void(fair::mq::RegionInfo)> fRegionEventCallback; std::function<void(fair::mq::RegionInfo)> fRegionEventCallback;
std::map<std::pair<uint16_t, bool>, RegionEvent> fObservedRegionEvents; // pair: <region id, managed> std::map<std::pair<uint16_t, bool>, RegionEvent> fObservedRegionEvents; // pair: <region id, managed>

View File

@ -107,6 +107,17 @@ struct UnmanagedRegion
} }
} }
if (cfg.lock) {
LOG(debug) << "Locking region " << cfg.id.value() << "...";
Lock();
LOG(debug) << "Successfully locked region " << cfg.id.value() << ".";
}
if (cfg.zero) {
LOG(debug) << "Zeroing free memory of region " << cfg.id.value() << "...";
Zero();
LOG(debug) << "Successfully zeroed free memory of region " << cfg.id.value() << ".";
}
if (!remote) { if (!remote) {
Register(shmId, cfg); Register(shmId, cfg);
} }