From 10fca7d45661a386cf138460053f0f2815a1ceec Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Mon, 12 Apr 2021 09:36:17 +0200 Subject: [PATCH] shm: eliminate race/deadlock in region subscriptions --- fairmq/shmem/Manager.h | 25 ++++++++++++++++--------- fairmq/shmem/Region.h | 15 +++++++-------- 2 files changed, 23 insertions(+), 17 deletions(-) diff --git a/fairmq/shmem/Manager.h b/fairmq/shmem/Manager.h index a9303021..7949d2a8 100644 --- a/fairmq/shmem/Manager.h +++ b/fairmq/shmem/Manager.h @@ -384,10 +384,10 @@ class Manager void RemoveRegion(const uint16_t id) { - fRegions.erase(id); { boost::interprocess::scoped_lock lock(fShmMtx); fShmRegions->at(id).fDestroyed = true; + fRegions.erase(id); (fEventCounter->fCount)++; } fRegionsGen += 1; // signal TL cache invalidation @@ -483,19 +483,26 @@ class Manager auto infos = GetRegionInfoUnsafe(); for (const auto& i : infos) { auto el = fObservedRegionEvents.find({i.id, i.managed}); - if (el == fObservedRegionEvents.end()) { - fRegionEventCallback(i); + if (el == fObservedRegionEvents.end()) { // if event id has not been observed fObservedRegionEvents.emplace(std::make_pair(i.id, i.managed), i.event); - ++fNumObservedEvents; - } else { + // if a region has been created and destroyed rapidly, we could see 'destroyed' without ever seeing 'created' + // TODO: do we care to show 'created' events if we know region is already destroyed? + if (i.event == RegionEvent::created) { + fRegionEventCallback(i); + ++fNumObservedEvents; + } else { + fNumObservedEvents += 2; + } + } else { // if event id has been observed (expected - there are two events per id - created & destroyed) + // fire a callback if we have observed 'created' event and incoming is 'destroyed' if (el->second == RegionEvent::created && i.event == RegionEvent::destroyed) { fRegionEventCallback(i); el->second = i.event; ++fNumObservedEvents; } else { - // LOG(debug) << "ignoring event for id" << i.id << ":"; - // LOG(debug) << "incoming event: " << i.event; - // LOG(debug) << "stored event: " << el->second; + // LOG(debug) << "ignoring event for id " << i.id << ":" + // << " incoming: " << i.event << "," + // << " stored: " << el->second; } } } @@ -688,7 +695,7 @@ class Manager std::thread fRegionEventThread; bool fRegionEventsSubscriptionActive; std::function fRegionEventCallback; - std::map, RegionEvent> fObservedRegionEvents; + std::map, RegionEvent> fObservedRegionEvents; // pair: uint64_t fNumObservedEvents; DeviceCounter* fDeviceCounter; diff --git a/fairmq/shmem/Region.h b/fairmq/shmem/Region.h index bbe3e488..d1b54e7f 100644 --- a/fairmq/shmem/Region.h +++ b/fairmq/shmem/Region.h @@ -106,7 +106,7 @@ struct Region InitializeQueues(); StartSendingAcks(); - LOG(debug) << "shmem: initialized region: " << fName; + LOG(trace) << "shmem: initialized region: " << fName << " (" << (fRemote ? "remote" : "local") << ")"; } Region() = delete; @@ -123,7 +123,7 @@ struct Region } else { fQueue = std::make_unique(create_only, fQueueName.c_str(), 1024, fAckBunchSize * sizeof(RegionBlock)); } - LOG(debug) << "shmem: initialized region queue: " << fQueueName; + LOG(trace) << "shmem: initialized region queue: " << fQueueName << " (" << (fRemote ? "remote" : "local") << ")"; } void StartSendingAcks() { fAcksSender = std::thread(&Region::SendAcks, this); } @@ -238,11 +238,11 @@ struct Region } if (boost::interprocess::shared_memory_object::remove(fName.c_str())) { - LOG(debug) << "Region '" << fName << "' destroyed."; + LOG(trace) << "Region '" << fName << "' destroyed."; } if (boost::interprocess::file_mapping::remove(fName.c_str())) { - LOG(debug) << "File mapping '" << fName << "' destroyed."; + LOG(trace) << "File mapping '" << fName << "' destroyed."; } if (fFile) { @@ -250,14 +250,13 @@ struct Region } if (boost::interprocess::message_queue::remove(fQueueName.c_str())) { - LOG(debug) << "Region queue '" << fQueueName << "' destroyed."; + LOG(trace) << "Region queue '" << fQueueName << "' destroyed."; } } else { - // LOG(debug) << "shmem: region '" << fName << "' is remote, no cleanup necessary."; - LOG(debug) << "Region queue '" << fQueueName << "' is remote, no cleanup necessary"; + // LOG(debug) << "Region queue '" << fQueueName << "' is remote, no cleanup necessary"; } - LOG(debug) << "Region '" << fName << "' (" << (fRemote ? "remote" : "local") << ") destructed."; + // LOG(debug) << "Region '" << fName << "' (" << (fRemote ? "remote" : "local") << ") destructed."; } bool fRemote;