shmem region subscriptions: fix race condition

This commit is contained in:
Alexey Rybalchenko 2020-05-10 12:21:59 +02:00
parent a3afadb824
commit 59e32437a2
2 changed files with 16 additions and 14 deletions

View File

@ -233,31 +233,37 @@ vector<fair::mq::RegionInfo> Manager::GetRegionInfoUnsafe()
void Manager::SubscribeToRegionEvents(RegionEventCallback callback) void Manager::SubscribeToRegionEvents(RegionEventCallback callback)
{ {
bipc::scoped_lock<bipc::named_mutex> lock(fShmMtx);
if (fRegionEventThread.joinable()) { if (fRegionEventThread.joinable()) {
fRegionEventsSubscriptionActive.store(false); LOG(debug) << "Already subscribed. Overwriting previous subscription.";
bipc::scoped_lock<bipc::named_mutex> lock(fShmMtx);
fRegionEventsSubscriptionActive = false;
lock.unlock();
fRegionEventsCV.notify_all();
fRegionEventThread.join(); fRegionEventThread.join();
} }
bipc::scoped_lock<bipc::named_mutex> lock(fShmMtx);
fRegionEventCallback = callback; fRegionEventCallback = callback;
fRegionEventsSubscriptionActive.store(true); fRegionEventsSubscriptionActive = true;
fRegionEventThread = thread(&Manager::RegionEventsSubscription, this); fRegionEventThread = thread(&Manager::RegionEventsSubscription, this);
} }
void Manager::UnsubscribeFromRegionEvents() void Manager::UnsubscribeFromRegionEvents()
{ {
if (fRegionEventThread.joinable()) { if (fRegionEventThread.joinable()) {
fRegionEventsSubscriptionActive.store(false); bipc::scoped_lock<bipc::named_mutex> lock(fShmMtx);
fRegionEventsSubscriptionActive = false;
lock.unlock();
fRegionEventsCV.notify_all(); fRegionEventsCV.notify_all();
fRegionEventThread.join(); fRegionEventThread.join();
lock.lock();
fRegionEventCallback = nullptr;
} }
bipc::scoped_lock<bipc::named_mutex> lock(fShmMtx);
fRegionEventCallback = nullptr;
} }
void Manager::RegionEventsSubscription() void Manager::RegionEventsSubscription()
{ {
while (fRegionEventsSubscriptionActive.load()) { bipc::scoped_lock<bipc::named_mutex> lock(fShmMtx);
bipc::scoped_lock<bipc::named_mutex> lock(fShmMtx); while (fRegionEventsSubscriptionActive) {
auto infos = GetRegionInfoUnsafe(); auto infos = GetRegionInfoUnsafe();
for (const auto& i : infos) { for (const auto& i : infos) {
auto el = fObservedRegionEvents.find(i.id); auto el = fObservedRegionEvents.find(i.id);
@ -298,11 +304,7 @@ Manager::~Manager()
{ {
bool lastRemoved = false; bool lastRemoved = false;
if (fRegionEventThread.joinable()) { UnsubscribeFromRegionEvents();
fRegionEventsSubscriptionActive.store(false);
fRegionEventsCV.notify_all();
fRegionEventThread.join();
}
try { try {
bipc::scoped_lock<bipc::named_mutex> lock(fShmMtx); bipc::scoped_lock<bipc::named_mutex> lock(fShmMtx);

View File

@ -94,7 +94,7 @@ class Manager
boost::interprocess::named_condition fRegionEventsCV; boost::interprocess::named_condition fRegionEventsCV;
std::thread fRegionEventThread; std::thread fRegionEventThread;
std::atomic<bool> fRegionEventsSubscriptionActive; bool fRegionEventsSubscriptionActive;
std::function<void(fair::mq::RegionInfo)> fRegionEventCallback; std::function<void(fair::mq::RegionInfo)> fRegionEventCallback;
std::unordered_map<uint64_t, RegionEvent> fObservedRegionEvents; std::unordered_map<uint64_t, RegionEvent> fObservedRegionEvents;