From c85d6e079c1eb3ceae38022f7e450340e17bc418 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Thu, 22 Apr 2021 18:32:39 +0200 Subject: [PATCH] shm: reduce shm contention when dealing with ack queues --- fairmq/shmem/Manager.h | 70 +++++++++++++++++++++--------------- fairmq/shmem/Message.h | 4 ++- fairmq/shmem/Region.h | 56 +++++++++++++++++++++-------- test/device/_error_state.cxx | 2 ++ 4 files changed, 88 insertions(+), 44 deletions(-) diff --git a/fairmq/shmem/Manager.h b/fairmq/shmem/Manager.h index 7da4c075..82d8a08d 100644 --- a/fairmq/shmem/Manager.h +++ b/fairmq/shmem/Manager.h @@ -67,7 +67,7 @@ class Manager , fManagementSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mng").c_str(), 6553600) , fShmVoidAlloc(fManagementSegment.get_segment_manager()) , fShmMtx(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mtx").c_str()) - , fRegionEventsCV(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_cv").c_str()) + , fRegionEventsShmCV(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_cv").c_str()) , fRegionEventsSubscriptionActive(false) , fNumObservedEvents(0) , fDeviceCounter(nullptr) @@ -78,13 +78,13 @@ class Manager #ifdef FAIRMQ_DEBUG_MODE , fMsgDebug(nullptr) , fShmMsgCounters(nullptr) + , fMsgCounterNew(0) + , fMsgCounterDelete(0) #endif , fHeartbeatThread() , fSendHeartbeats(true) , fThrowOnBadAlloc(config ? config->GetProperty("shm-throw-bad-alloc", true) : true) , fNoCleanup(config ? config->GetProperty("shm-no-cleanup", false) : false) - , fMsgCounterNew(0) - , fMsgCounterDelete(0) { using namespace boost::interprocess; @@ -263,11 +263,13 @@ class Manager void Resume() { fInterrupted.store(false); } void Reset() { +#ifdef FAIRMQ_DEBUG_MODE auto diff = fMsgCounterNew.load() - fMsgCounterDelete.load(); if (diff != 0) { LOG(error) << "Message counter during Reset expected to be 0, found: " << diff; throw MessageError(tools::ToString("Message counter during Reset expected to be 0, found: ", diff)); } +#endif } bool Interrupted() { return fInterrupted.load(); } @@ -306,12 +308,12 @@ class Manager return {nullptr, id}; } - // create region info - fShmRegions->emplace(id, RegionInfo(path.c_str(), flags, userFlags, fShmVoidAlloc)); - auto r = fRegions.emplace(id, std::make_unique(fShmId, id, size, false, callback, bulkCallback, path, flags)); // LOG(debug) << "Created region with id '" << id << "', path: '" << path << "', flags: '" << flags << "'"; + fShmRegions->emplace(id, RegionInfo(path.c_str(), flags, userFlags, fShmVoidAlloc)); + + r.first->second->InitializeQueues(); r.first->second->StartReceivingAcks(); result.first = &(r.first->second->fRegion); result.second = id; @@ -319,7 +321,7 @@ class Manager (fEventCounter->fCount)++; } fRegionsGen += 1; // signal TL cache invalidation - fRegionEventsCV.notify_all(); + fRegionEventsShmCV.notify_all(); return result; } catch (interprocess_exception& e) { @@ -384,14 +386,19 @@ class Manager void RemoveRegion(const uint16_t id) { - { - boost::interprocess::scoped_lock lock(fShmMtx); - fShmRegions->at(id).fDestroyed = true; - fRegions.erase(id); - (fEventCounter->fCount)++; + try { + fRegions.at(id)->StopAcks(); + { + boost::interprocess::scoped_lock lock(fShmMtx); + fShmRegions->at(id).fDestroyed = true; + fRegions.erase(id); + (fEventCounter->fCount)++; + } + fRegionEventsShmCV.notify_all(); + } catch(std::out_of_range& oor) { + LOG(debug) << "RemoveRegion() could not locate region with id'" << id << "'"; } fRegionsGen += 1; // signal TL cache invalidation - fRegionEventsCV.notify_all(); } std::vector GetRegionInfo() @@ -452,7 +459,7 @@ class Manager boost::interprocess::scoped_lock lock(fShmMtx); fRegionEventsSubscriptionActive = false; lock.unlock(); - fRegionEventsCV.notify_all(); + fRegionEventsShmCV.notify_all(); fRegionEventThread.join(); } boost::interprocess::scoped_lock lock(fShmMtx); @@ -469,7 +476,7 @@ class Manager boost::interprocess::scoped_lock lock(fShmMtx); fRegionEventsSubscriptionActive = false; lock.unlock(); - fRegionEventsCV.notify_all(); + fRegionEventsShmCV.notify_all(); fRegionEventThread.join(); lock.lock(); fRegionEventCallback = nullptr; @@ -500,26 +507,32 @@ class Manager el->second = i.event; ++fNumObservedEvents; } else { - // LOG(debug) << "ignoring event for id " << i.id << ":" - // << " incoming: " << i.event << "," - // << " stored: " << el->second; + // LOG(debug) << "ignoring event " << i.id << ": incoming: " << i.event << ", stored: " << el->second; } } } - fRegionEventsCV.wait(lock, [&] { return !fRegionEventsSubscriptionActive || fNumObservedEvents != fEventCounter->fCount; }); + fRegionEventsShmCV.wait(lock, [&] { return !fRegionEventsSubscriptionActive || fNumObservedEvents != fEventCounter->fCount; }); } } - void IncrementMsgCounter() { fMsgCounterNew.fetch_add(1, std::memory_order_relaxed); } - void DecrementMsgCounter() { fMsgCounterDelete.fetch_add(1, std::memory_order_relaxed); } + void IncrementMsgCounter() + { +#ifdef FAIRMQ_DEBUG_MODE + fMsgCounterNew.fetch_add(1, std::memory_order_relaxed); +#endif + } + void DecrementMsgCounter() + { +#ifdef FAIRMQ_DEBUG_MODE + fMsgCounterDelete.fetch_add(1, std::memory_order_relaxed); +#endif + } #ifdef FAIRMQ_DEBUG_MODE void IncrementShmMsgCounter(uint16_t segmentId) { ++((*fShmMsgCounters)[segmentId].fCount); } void DecrementShmMsgCounter(uint16_t segmentId) { --((*fShmMsgCounters)[segmentId].fCount); } #endif - boost::interprocess::named_mutex& GetMtx() { return fShmMtx; } - void SendHeartbeats() { std::string controlQueueName("fmq_" + fShmId + "_cq"); @@ -547,7 +560,7 @@ class Manager auto it = fSegments.find(id); if (it == fSegments.end()) { try { - // get region info + // get segment info SegmentInfo segmentInfo = fShmSegments->at(id); LOG(debug) << "Located segment with id '" << id << "'"; @@ -691,7 +704,7 @@ class Manager VoidAlloc fShmVoidAlloc; boost::interprocess::named_mutex fShmMtx; - boost::interprocess::named_condition fRegionEventsCV; + boost::interprocess::named_condition fRegionEventsShmCV; std::thread fRegionEventThread; bool fRegionEventsSubscriptionActive; std::function fRegionEventCallback; @@ -712,8 +725,11 @@ class Manager std::atomic fInterrupted; #ifdef FAIRMQ_DEBUG_MODE + // make sure the counters are not thrashing the cache line between threads doing creation and deallocation Uint16MsgDebugMapHashMap* fMsgDebug; Uint16MsgCounterHashMap* fShmMsgCounters; + alignas(128) std::atomic_uint64_t fMsgCounterNew; + alignas(128) std::atomic_uint64_t fMsgCounterDelete; #endif std::thread fHeartbeatThread; @@ -724,9 +740,7 @@ class Manager bool fThrowOnBadAlloc; bool fNoCleanup; - // make sure the counters are not thrashing the cache line between threads doing creation and deallocation - alignas(128) std::atomic_uint64_t fMsgCounterNew; // TODO: find a better lifetime solution instead of the counter - alignas(128) std::atomic_uint64_t fMsgCounterDelete; + }; } // namespace fair::mq::shmem diff --git a/fairmq/shmem/Message.h b/fairmq/shmem/Message.h index ed5cf15b..77abf9e0 100644 --- a/fairmq/shmem/Message.h +++ b/fairmq/shmem/Message.h @@ -309,6 +309,8 @@ class Message final : public fair::mq::Message } if (fRegionPtr) { + fRegionPtr->InitializeQueues(); + fRegionPtr->StartSendingAcks(); fRegionPtr->ReleaseBlock({fMeta.fHandle, fMeta.fSize, fMeta.fHint}); } else { LOG(warn) << "region ack queue for id " << fMeta.fRegionId << " no longer exist. Not sending ack"; @@ -324,7 +326,7 @@ class Message final : public fair::mq::Message Deallocate(); fAlignment = 0; - fManager.DecrementMsgCounter(); // TODO: put this to debug mode + fManager.DecrementMsgCounter(); } }; diff --git a/fairmq/shmem/Region.h b/fairmq/shmem/Region.h index d1b54e7f..9a4ab719 100644 --- a/fairmq/shmem/Region.h +++ b/fairmq/shmem/Region.h @@ -47,7 +47,7 @@ struct Region Region(const std::string& shmId, uint16_t id, uint64_t size, bool remote, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string& path, int flags) : fRemote(remote) , fLinger(100) - , fStop(false) + , fStopAcks(false) , fName("fmq_" + shmId + "_rg_" + std::to_string(id)) , fQueueName("fmq_" + shmId + "_rgq_" + std::to_string(id)) , fShmemObject() @@ -104,8 +104,6 @@ struct Region } } - InitializeQueues(); - StartSendingAcks(); LOG(trace) << "shmem: initialized region: " << fName << " (" << (fRemote ? "remote" : "local") << ")"; } @@ -118,15 +116,22 @@ struct Region { using namespace boost::interprocess; - if (fRemote) { - fQueue = std::make_unique(open_only, fQueueName.c_str()); - } else { - fQueue = std::make_unique(create_only, fQueueName.c_str(), 1024, fAckBunchSize * sizeof(RegionBlock)); + if (fQueue == nullptr) { + if (fRemote) { + fQueue = std::make_unique(open_only, fQueueName.c_str()); + } else { + fQueue = std::make_unique(create_only, fQueueName.c_str(), 1024, fAckBunchSize * sizeof(RegionBlock)); + } + LOG(trace) << "shmem: initialized region queue: " << fQueueName << " (" << (fRemote ? "remote" : "local") << ")"; } - LOG(trace) << "shmem: initialized region queue: " << fQueueName << " (" << (fRemote ? "remote" : "local") << ")"; } - void StartSendingAcks() { fAcksSender = std::thread(&Region::SendAcks, this); } + void StartSendingAcks() + { + if (!fAcksSender.joinable()) { + fAcksSender = std::thread(&Region::SendAcks, this); + } + } void SendAcks() { std::unique_ptr blocks = std::make_unique(fAckBunchSize); @@ -150,13 +155,13 @@ struct Region } if (blocksToSend > 0) { - while (!fQueue->try_send(blocks.get(), blocksToSend * sizeof(RegionBlock), 0) && !fStop) { + while (!fQueue->try_send(blocks.get(), blocksToSend * sizeof(RegionBlock), 0) && !fStopAcks) { // receiver slow? yield and try again... std::this_thread::yield(); } // LOG(debug) << "Sent " << blocksToSend << " blocks."; } else { // blocksToSend == 0 - if (fStop) { + if (fStopAcks) { break; } } @@ -166,7 +171,12 @@ struct Region << " blocks left to send: " << blocksToSend << ")."; } - void StartReceivingAcks() { fAcksReceiver = std::thread(&Region::ReceiveAcks, this); } + void StartReceivingAcks() + { + if (!fAcksReceiver.joinable()) { + fAcksReceiver = std::thread(&Region::ReceiveAcks, this); + } + } void ReceiveAcks() { unsigned int priority; @@ -178,7 +188,7 @@ struct Region while (true) { uint32_t timeout = 100; bool leave = false; - if (fStop) { + if (fStopAcks) { timeout = fLinger; leave = true; } @@ -223,9 +233,25 @@ struct Region void SetLinger(uint32_t linger) { fLinger = linger; } uint32_t GetLinger() const { return fLinger; } + void StopAcks() + { + fStopAcks = true; + + if (fAcksSender.joinable()) { + fBlockSendCV.notify_one(); + fAcksSender.join(); + } + + if (!fRemote) { + if (fAcksReceiver.joinable()) { + fAcksReceiver.join(); + } + } + } + ~Region() { - fStop = true; + fStopAcks = true; if (fAcksSender.joinable()) { fBlockSendCV.notify_one(); @@ -261,7 +287,7 @@ struct Region bool fRemote; uint32_t fLinger; - std::atomic fStop; + std::atomic fStopAcks; std::string fName; std::string fQueueName; boost::interprocess::shared_memory_object fShmemObject; diff --git a/test/device/_error_state.cxx b/test/device/_error_state.cxx index 03877368..71ea82a1 100644 --- a/test/device/_error_state.cxx +++ b/test/device/_error_state.cxx @@ -153,9 +153,11 @@ TEST(ErrorState, interactive_InReset) EXPECT_EXIT(RunErrorStateIn("Reset", "interactive", "q"), ::testing::ExitedWithCode(1), ""); } +#ifdef FAIRMQ_DEBUG_MODE TEST(ErrorState, OrphanMessages) { BadDevice badDevice; } +#endif } // namespace