From f885b4618e411089281d8573388d13e8ceaa4539 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Tue, 16 Jun 2020 14:39:21 +0200 Subject: [PATCH] Optimize unmanaged region ReceiveAcks --- fairmq/shmem/Manager.h | 96 ++++++++++++++++++++++++------------------ fairmq/shmem/Region.h | 80 +++++++++++++++++++---------------- 2 files changed, 97 insertions(+), 79 deletions(-) diff --git a/fairmq/shmem/Manager.h b/fairmq/shmem/Manager.h index 35856302..40c94647 100644 --- a/fairmq/shmem/Manager.h +++ b/fairmq/shmem/Manager.h @@ -32,6 +32,8 @@ #include #include // getenv +#include +#include #include #include #include @@ -100,39 +102,6 @@ class Manager Manager(const Manager&) = delete; Manager operator=(const Manager&) = delete; - ~Manager() - { - using namespace boost::interprocess; - bool lastRemoved = false; - - UnsubscribeFromRegionEvents(); - - fSendHeartbeats = false; - fHeartbeatThread.join(); - - try { - boost::interprocess::scoped_lock lock(fShmMtx); - - (fDeviceCounter->fCount)--; - - if (fDeviceCounter->fCount == 0) { - LOG(debug) << "last segment user, removing segment."; - - RemoveSegments(); - lastRemoved = true; - } else { - LOG(debug) << "other segment users present (" << fDeviceCounter->fCount << "), not removing it."; - } - } catch(interprocess_exception& e) { - LOG(error) << "error while acquiring lock in Manager destructor: " << e.what(); - } - - if (lastRemoved) { - named_mutex::remove(std::string("fmq_" + fShmId + "_mtx").c_str()); - named_condition::remove(std::string("fmq_" + fShmId + "_cv").c_str()); - } - } - boost::interprocess::managed_shared_memory& Segment() { return fSegment; } boost::interprocess::managed_shared_memory& ManagementSegment() { return fManagementSegment; } @@ -274,9 +243,9 @@ class Manager void RemoveRegion(const uint64_t id) { + fRegions.erase(id); { boost::interprocess::scoped_lock lock(fShmMtx); - fRegions.erase(id); fRegionInfos->at(id).fDestroyed = true; } fRegionEventsCV.notify_all(); @@ -385,39 +354,79 @@ class Manager { using namespace boost::interprocess; if (shared_memory_object::remove(fSegmentName.c_str())) { - LOG(debug) << "successfully removed '" << fSegmentName << "' segment after the device has stopped."; + LOG(debug) << "Removed '" << fSegmentName << "' segment after the device has stopped."; } else { - LOG(debug) << "did not remove " << fSegmentName << " segment after the device stopped. Already removed?"; + LOG(debug) << "Did not remove " << fSegmentName << " segment after the device stopped. Already removed?"; } if (shared_memory_object::remove(fManagementSegmentName.c_str())) { - LOG(debug) << "successfully removed '" << fManagementSegmentName << "' segment after the device has stopped."; + LOG(debug) << "Removed '" << fManagementSegmentName << "' segment after the device has stopped."; } else { - LOG(debug) << "did not remove '" << fManagementSegmentName << "' segment after the device stopped. Already removed?"; + LOG(debug) << "Did not remove '" << fManagementSegmentName << "' segment after the device stopped. Already removed?"; } } void SendHeartbeats() { std::string controlQueueName("fmq_" + fShmId + "_cq"); + std::unique_lock lock(fHeartbeatsMtx); while (fSendHeartbeats) { try { boost::interprocess::message_queue mq(boost::interprocess::open_only, controlQueueName.c_str()); boost::posix_time::ptime sndTill = boost::posix_time::microsec_clock::universal_time() + boost::posix_time::milliseconds(100); if (mq.timed_send(fDeviceId.c_str(), fDeviceId.size(), 0, sndTill)) { - std::this_thread::sleep_for(std::chrono::milliseconds(100)); + fHeartbeatsCV.wait_for(lock, std::chrono::milliseconds(100), [&]() { return !fSendHeartbeats; }); } else { LOG(debug) << "control queue timeout"; } } catch (boost::interprocess::interprocess_exception& ie) { - std::this_thread::sleep_for(std::chrono::milliseconds(500)); - // LOG(warn) << "no " << controlQueueName << " found"; + fHeartbeatsCV.wait_for(lock, std::chrono::milliseconds(500), [&]() { return !fSendHeartbeats; }); + // LOG(debug) << "no " << controlQueueName << " found"; } } } bool ThrowingOnBadAlloc() const { return fThrowOnBadAlloc; } + ~Manager() + { + using namespace boost::interprocess; + bool lastRemoved = false; + + UnsubscribeFromRegionEvents(); + + { + std::unique_lock lock(fHeartbeatsMtx); + fSendHeartbeats = false; + } + fHeartbeatsCV.notify_one(); + if (fHeartbeatThread.joinable()) { + fHeartbeatThread.join(); + } + + try { + boost::interprocess::scoped_lock lock(fShmMtx); + + (fDeviceCounter->fCount)--; + + if (fDeviceCounter->fCount == 0) { + LOG(debug) << "Last segment user, removing segment."; + + RemoveSegments(); + lastRemoved = true; + } else { + LOG(debug) << "Other segment users present (" << fDeviceCounter->fCount << "), skipping removal."; + } + } catch(interprocess_exception& e) { + LOG(error) << "Manager could not acquire lock: " << e.what(); + } + + if (lastRemoved) { + named_mutex::remove(std::string("fmq_" + fShmId + "_mtx").c_str()); + named_condition::remove(std::string("fmq_" + fShmId + "_cv").c_str()); + } + } + private: std::string fShmId; std::string fDeviceId; @@ -442,7 +451,10 @@ class Manager std::atomic fMsgCounter; // TODO: find a better lifetime solution instead of the counter std::thread fHeartbeatThread; - std::atomic fSendHeartbeats; + bool fSendHeartbeats; + std::mutex fHeartbeatsMtx; + std::condition_variable fHeartbeatsCV; + bool fThrowOnBadAlloc; }; diff --git a/fairmq/shmem/Region.h b/fairmq/shmem/Region.h index 105eb9f9..4408029d 100644 --- a/fairmq/shmem/Region.h +++ b/fairmq/shmem/Region.h @@ -58,8 +58,8 @@ struct Region , fFile(nullptr) , fFileMapping() , fQueue(nullptr) - , fReceiveAcksWorker() - , fSendAcksWorker() + , fAcksReceiver() + , fAcksSender() , fCallback(callback) , fBulkCallback(bulkCallback) { @@ -120,38 +120,35 @@ struct Region LOG(debug) << "shmem: initialized region queue: " << fQueueName; } - void StartSendingAcks() - { - fSendAcksWorker = std::thread(&Region::SendAcks, this); - } - + void StartSendingAcks() { fAcksSender = std::thread(&Region::SendAcks, this); } void SendAcks() { std::unique_ptr blocks = tools::make_unique(fAckBunchSize); + size_t blocksToSend = 0; - while (true) { // we'll try to send all acks before stopping - size_t blocksToSend = 0; - - { // mutex locking block + while (true) { + blocksToSend = 0; + { std::unique_lock lock(fBlockMtx); - // try to get more blocks without waiting (we can miss a notify from CloseMessage()) - if (!fStop && (fBlocksToFree.size() < fAckBunchSize)) { - // cv.wait() timeout: send whatever blocks we have + // try to get blocks + if (fBlocksToFree.size() < fAckBunchSize) { fBlockSendCV.wait_for(lock, std::chrono::milliseconds(500)); } + // send whatever blocks we have blocksToSend = std::min(fBlocksToFree.size(), fAckBunchSize); copy_n(fBlocksToFree.end() - blocksToSend, blocksToSend, blocks.get()); fBlocksToFree.resize(fBlocksToFree.size() - blocksToSend); - } // unlock the block mutex here while sending over IPC + } if (blocksToSend > 0) { while (!fQueue->try_send(blocks.get(), blocksToSend * sizeof(RegionBlock), 0) && !fStop) { // receiver slow? yield and try again... std::this_thread::yield(); } + // LOG(debug) << "Sent " << blocksToSend << " blocks."; } else { // blocksToSend == 0 if (fStop) { break; @@ -159,14 +156,11 @@ struct Region } } - LOG(debug) << "send ack worker for " << fName << " leaving."; - } - - void StartReceivingAcks() - { - fReceiveAcksWorker = std::thread(&Region::ReceiveAcks, this); + LOG(debug) << "AcksSender for " << fName << " leaving " << "(blocks left to free: " << fBlocksToFree.size() << ", " + << " blocks left to send: " << blocksToSend << ")."; } + void StartReceivingAcks() { fAcksReceiver = std::thread(&Region::ReceiveAcks, this); } void ReceiveAcks() { unsigned int priority; @@ -175,12 +169,18 @@ struct Region std::vector result; result.reserve(fAckBunchSize); - while (!fStop) { // end thread condition (should exist until region is destroyed) - auto rcvTill = boost::posix_time::microsec_clock::universal_time() + boost::posix_time::milliseconds(500); + while (true) { + uint32_t timeout = 100; + bool leave = false; + if (fStop) { + timeout = fLinger; + leave = true; + } + auto rcvTill = boost::posix_time::microsec_clock::universal_time() + boost::posix_time::milliseconds(timeout); while (fQueue->timed_receive(blocks.get(), fAckBunchSize * sizeof(RegionBlock), recvdSize, priority, rcvTill)) { - // LOG(debug) << "received: " << block.fHandle << " " << block.fSize << " " << block.fMessageId; const auto numBlocks = recvdSize / sizeof(RegionBlock); + // LOG(debug) << "Received " << numBlocks << " blocks (recvdSize: " << recvdSize << "). (remaining queue size: " << fQueue->get_num_msg() << ")."; if (fBulkCallback) { result.clear(); for (size_t i = 0; i < numBlocks; i++) { @@ -193,9 +193,13 @@ struct Region } } } - } // while !fStop - LOG(debug) << "ReceiveAcks() worker for " << fName << " leaving."; + if (leave) { + break; + } + } + + LOG(debug) << "AcksReceiver for " << fName << " leaving (remaining queue size: " << fQueue->get_num_msg() << ")."; } void ReleaseBlock(const RegionBlock& block) @@ -205,7 +209,7 @@ struct Region fBlocksToFree.emplace_back(block); if (fBlocksToFree.size() >= fAckBunchSize) { - lock.unlock(); // reduces contention on fBlockMtx + lock.unlock(); fBlockSendCV.notify_one(); } } @@ -217,22 +221,22 @@ struct Region { fStop = true; - if (fSendAcksWorker.joinable()) { + if (fAcksSender.joinable()) { fBlockSendCV.notify_one(); - fSendAcksWorker.join(); + fAcksSender.join(); } if (!fRemote) { - if (fReceiveAcksWorker.joinable()) { - fReceiveAcksWorker.join(); + if (fAcksReceiver.joinable()) { + fAcksReceiver.join(); } if (boost::interprocess::shared_memory_object::remove(fName.c_str())) { - LOG(debug) << "shmem: destroyed region " << fName; + LOG(debug) << "Region " << fName << " destroyed."; } if (boost::interprocess::file_mapping::remove(fName.c_str())) { - LOG(debug) << "shmem: destroyed file mapping " << fName; + LOG(debug) << "File mapping " << fName << " destroyed."; } if (fFile) { @@ -240,12 +244,14 @@ struct Region } if (boost::interprocess::message_queue::remove(fQueueName.c_str())) { - LOG(debug) << "shmem: removed region queue " << fQueueName; + LOG(debug) << "Region queue " << fQueueName << " destroyed."; } } else { // LOG(debug) << "shmem: region '" << fName << "' is remote, no cleanup necessary."; - LOG(debug) << "shmem: region queue '" << fQueueName << "' is remote, no cleanup necessary"; + LOG(debug) << "Region queue '" << fQueueName << "' is remote, no cleanup necessary"; } + + LOG(debug) << "Region " << fName << " (" << (fRemote ? "remote" : "local") << ") destructed."; } bool fRemote; @@ -264,8 +270,8 @@ struct Region const std::size_t fAckBunchSize = 256; std::unique_ptr fQueue; - std::thread fReceiveAcksWorker; - std::thread fSendAcksWorker; + std::thread fAcksReceiver; + std::thread fAcksSender; RegionCallback fCallback; RegionBulkCallback fBulkCallback; };