fix(shm): race/deadlock in region locks

This commit is contained in:
Alexey Rybalchenko 2022-08-18 21:40:56 +02:00
parent 73fd1b2c2a
commit a4771d739c
3 changed files with 77 additions and 49 deletions

View File

@ -76,6 +76,8 @@ struct Sampler : fair::mq::Device
void ResetTask() override void ResetTask() override
{ {
// give some time for acks to be received
std::this_thread::sleep_for(std::chrono::milliseconds(250));
fRegion.reset(); fRegion.reset();
{ {
std::lock_guard<std::mutex> lock(fMtx); std::lock_guard<std::mutex> lock(fMtx);

View File

@ -397,12 +397,10 @@ class Manager
UnmanagedRegion* region = nullptr; UnmanagedRegion* region = nullptr;
bool newRegionCreated = false; bool newRegionCreated = false;
{
std::lock_guard<std::mutex> lock(fLocalRegionsMtx); std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
auto res = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, size, false, cfg)); auto res = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, size, false, cfg));
newRegionCreated = res.second; newRegionCreated = res.second;
region = res.first->second.get(); region = res.first->second.get();
}
// LOG(debug) << "Created region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'"; // LOG(debug) << "Created region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'";
if (!newRegionCreated) { if (!newRegionCreated) {
@ -429,7 +427,7 @@ class Manager
} }
} }
UnmanagedRegion* GetRegion(uint16_t id) UnmanagedRegion* GetRegionFromCache(uint16_t id)
{ {
// NOTE: gcc optimizations. Prevent loading tls addresses many times in the fast path // NOTE: gcc optimizations. Prevent loading tls addresses many times in the fast path
const auto &lTlCache = fTlRegionCache; const auto &lTlCache = fTlRegionCache;
@ -443,41 +441,40 @@ class Manager
} }
} }
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
// slow path: check invalidation // slow path: check invalidation
if (lTlCacheGen != fRegionsGen) { if (lTlCacheGen != fRegionsGen) {
fTlRegionCache.fRegionsTLCache.clear(); fTlRegionCache.fRegionsTLCache.clear();
} }
std::lock_guard<std::mutex> lock(fLocalRegionsMtx); auto* lRegion = GetRegion(id);
auto* lRegion = GetRegionUnsafe(id, shmLock);
fTlRegionCache.fRegionsTLCache.emplace_back(std::make_tuple(lRegion, id, fShmId64)); fTlRegionCache.fRegionsTLCache.emplace_back(std::make_tuple(lRegion, id, fShmId64));
fTlRegionCache.fRegionsTLCacheGen = fRegionsGen; fTlRegionCache.fRegionsTLCacheGen = fRegionsGen;
return lRegion; return lRegion;
} }
UnmanagedRegion* GetRegionUnsafe(uint16_t id, boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex>& lockedShmLock) UnmanagedRegion* GetRegion(uint16_t id)
{ {
std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
// remote region could actually be a local one if a message originates from this device (has been sent out and returned) // remote region could actually be a local one if a message originates from this device (has been sent out and returned)
auto it = fRegions.find(id); auto it = fRegions.find(id);
if (it != fRegions.end()) { if (it != fRegions.end()) {
return it->second.get(); return it->second.get();
} else { } else {
try { try {
// get region info
RegionInfo regionInfo = fShmRegions->at(id);
// safe to unlock now - no shm container accessed after this
lockedShmLock.unlock();
RegionConfig cfg; RegionConfig cfg;
// get region info
{
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
RegionInfo regionInfo = fShmRegions->at(id);
cfg.id = id; cfg.id = id;
cfg.creationFlags = regionInfo.fCreationFlags; cfg.creationFlags = regionInfo.fCreationFlags;
cfg.path = regionInfo.fPath.c_str(); cfg.path = regionInfo.fPath.c_str();
}
// LOG(debug) << "Located remote region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'"; // LOG(debug) << "Located remote region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'";
auto r = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, 0, true, std::move(cfg))); auto r = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, 0, true, std::move(cfg)));
r.first->second->InitializeQueues(); r.first->second->InitializeQueues();
r.first->second->StartAckSender(); r.first->second->StartAckSender();
lockedShmLock.lock();
return r.first->second.get(); return r.first->second.get();
} catch (std::out_of_range& oor) { } catch (std::out_of_range& oor) {
LOG(error) << "Could not get remote region with id '" << id << "'. Does the region creator run with the same session id?"; LOG(error) << "Could not get remote region with id '" << id << "'. Does the region creator run with the same session id?";
@ -493,10 +490,10 @@ class Manager
void RemoveRegion(uint16_t id) void RemoveRegion(uint16_t id)
{ {
try { try {
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
std::lock_guard<std::mutex> lock(fLocalRegionsMtx); std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
fRegions.at(id)->StopAcks(); fRegions.at(id)->StopAcks();
{ {
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
if (fRegions.at(id)->RemoveOnDestruction()) { if (fRegions.at(id)->RemoveOnDestruction()) {
fShmRegions->at(id).fDestroyed = true; fShmRegions->at(id).fDestroyed = true;
(fEventCounter->fCount)++; (fEventCounter->fCount)++;
@ -512,45 +509,74 @@ class Manager
std::vector<fair::mq::RegionInfo> GetRegionInfo() std::vector<fair::mq::RegionInfo> GetRegionInfo()
{ {
std::vector<fair::mq::RegionInfo> result; std::vector<fair::mq::RegionInfo> result;
std::map<uint64_t, RegionConfig> regionCfgs;
{
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx); boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
for (const auto& e : *fShmSegments) { for (const auto& [segmentId, segmentInfo] : *fShmSegments) {
// make sure any segments in the session are found // make sure any segments in the session are found
GetSegment(e.first); GetSegment(segmentId);
try { try {
fair::mq::RegionInfo info; fair::mq::RegionInfo info;
info.managed = true; info.managed = true;
info.id = e.first; info.id = segmentId;
info.event = RegionEvent::created; info.event = RegionEvent::created;
info.ptr = boost::apply_visitor(SegmentAddress(), fSegments.at(e.first)); info.ptr = boost::apply_visitor(SegmentAddress(), fSegments.at(segmentId));
info.size = boost::apply_visitor(SegmentSize(), fSegments.at(e.first)); info.size = boost::apply_visitor(SegmentSize(), fSegments.at(segmentId));
result.push_back(info); result.push_back(info);
} catch (const std::out_of_range& oor) { } catch (const std::out_of_range& oor) {
LOG(error) << "could not find segment with id " << e.first; LOG(error) << "could not find segment with id " << segmentId;
LOG(error) << oor.what(); LOG(error) << oor.what();
} }
} }
for (const auto& e : *fShmRegions) { for (const auto& [regionId, regionInfo] : *fShmRegions) {
fair::mq::RegionInfo info; fair::mq::RegionInfo info;
info.managed = false; info.managed = false;
info.id = e.first; info.id = regionId;
info.flags = e.second.fUserFlags; info.flags = regionInfo.fUserFlags;
info.event = e.second.fDestroyed ? RegionEvent::destroyed : RegionEvent::created; info.event = regionInfo.fDestroyed ? RegionEvent::destroyed : RegionEvent::created;
if (info.event == RegionEvent::created) { if (info.event == RegionEvent::created) {
auto region = GetRegionUnsafe(info.id, shmLock); RegionConfig cfg;
if (region) { cfg.id = info.id;
info.ptr = region->GetData(); cfg.creationFlags = info.id;
info.size = region->GetSize(); cfg.path = regionInfo.fPath.c_str();
} else { regionCfgs.emplace(info.id, cfg);
throw std::runtime_error(tools::ToString("GetRegionInfo() could not get region with id '", info.id, "'")); // fill the ptr+size info after shmLock is released, to avoid constructing local region under it
}
} else { } else {
info.ptr = nullptr; info.ptr = nullptr;
info.size = 0; info.size = 0;
} }
result.push_back(info); result.push_back(info);
} }
}
// do another iteration outside of shm lock, to fill ptr+size of unmanaged regions
for (auto& info : result) {
if (!info.managed && info.event == RegionEvent::created) {
auto cfgIt = regionCfgs.find(info.id);
if (cfgIt != regionCfgs.end()) {
UnmanagedRegion* region = nullptr;
std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
auto it = fRegions.find(info.id);
if (it != fRegions.end()) {
region = it->second.get();
} else {
auto r = fRegions.emplace(cfgIt->first, std::make_unique<UnmanagedRegion>(fShmId, 0, true, cfgIt->second));
region = r.first->second.get();
region->InitializeQueues();
region->StartAckSender();
}
info.ptr = region->GetData();
info.size = region->GetSize();
} else {
info.ptr = nullptr;
info.size = 0;
}
}
}
return result; return result;
} }

View File

@ -195,7 +195,7 @@ class Message final : public fair::mq::Message
fLocalPtr = nullptr; fLocalPtr = nullptr;
} }
} else { } else {
fRegionPtr = fManager.GetRegion(fMeta.fRegionId); fRegionPtr = fManager.GetRegionFromCache(fMeta.fRegionId);
if (fRegionPtr) { if (fRegionPtr) {
fLocalPtr = reinterpret_cast<char*>(fRegionPtr->GetData()) + fMeta.fHandle; fLocalPtr = reinterpret_cast<char*>(fRegionPtr->GetData()) + fMeta.fHandle;
} else { } else {
@ -365,7 +365,7 @@ class Message final : public fair::mq::Message
void ReleaseUnmanagedRegionBlock() void ReleaseUnmanagedRegionBlock()
{ {
if (!fRegionPtr) { if (!fRegionPtr) {
fRegionPtr = fManager.GetRegion(fMeta.fRegionId); fRegionPtr = fManager.GetRegionFromCache(fMeta.fRegionId);
} }
if (fRegionPtr) { if (fRegionPtr) {