diff --git a/fairmq/shmem/FairMQMessageSHM.cxx b/fairmq/shmem/FairMQMessageSHM.cxx index f71da88b..e71b8d71 100644 --- a/fairmq/shmem/FairMQMessageSHM.cxx +++ b/fairmq/shmem/FairMQMessageSHM.cxx @@ -327,50 +327,19 @@ void FairMQMessageSHM::CloseMessage() } else { - // send notification back to the receiver - // RegionBlock block(fHandle, fSize); - // if (fManager.GetRegionQueue(fRegionId).try_send(static_cast(&block), sizeof(RegionBlock), 0)) - // { - // // LOG(info) << "true"; - // } - // // else - // // { - // // LOG(debug) << "could not send ack"; - // // } - - // timed version - RegionBlock block(fHandle, fSize, fHint); - bool success = false; - do + if (!fRegionPtr) { - auto sndTill = bpt::microsec_clock::universal_time() + bpt::milliseconds(200); - if (!fRegionPtr) - { - fRegionPtr = fManager.GetRemoteRegion(fRegionId); - } - if (fRegionPtr) - { - // LOG(debug) << "sending ack"; - if (fRegionPtr->fQueue->timed_send(&block, sizeof(RegionBlock), 0, sndTill)) - { - success = true; - } - else - { - if (fInterrupted) - { - break; - } - LOG(debug) << "region ack queue is full, retrying..."; - } - } - else - { - // LOG(warn) << "region ack queue for id " << fRegionId << " no longer exist. Not sending ack"; - success = true; - } + fRegionPtr = fManager.GetRemoteRegion(fRegionId); + } + + if (fRegionPtr) + { + fRegionPtr->ReleaseBlock({fHandle, fSize, fHint}); + } + else + { + LOG(warn) << "region ack queue for id " << fRegionId << " no longer exist. Not sending ack"; } - while (!success); } } diff --git a/fairmq/shmem/Manager.cxx b/fairmq/shmem/Manager.cxx index 8c7ad56a..0f90c146 100644 --- a/fairmq/shmem/Manager.cxx +++ b/fairmq/shmem/Manager.cxx @@ -19,7 +19,7 @@ namespace mq namespace shmem { -std::unordered_map Manager::fRegions; +std::unordered_map> Manager::fRegions; Manager::Manager(const string& name, size_t size) : fSessionName(name) @@ -43,7 +43,7 @@ void Manager::Resume() // close remote regions before processing new transfers for (auto it = fRegions.begin(); it != fRegions.end(); /**/) { - if (it->second.fRemote) + if (it->second->fRemote) { it = fRegions.erase(it); } @@ -64,11 +64,11 @@ bipc::mapped_region* Manager::CreateRegion(const size_t size, const uint64_t id, } else { - auto r = fRegions.emplace(id, Region{*this, id, size, false, callback}); + auto r = fRegions.emplace(id, fair::mq::tools::make_unique(*this, id, size, false, callback)); - r.first->second.StartReceivingAcks(); + r.first->second->StartReceivingAcks(); - return &(r.first->second.fRegion); + return &(r.first->second->fRegion); } } @@ -78,14 +78,14 @@ Region* Manager::GetRemoteRegion(const uint64_t id) auto it = fRegions.find(id); if (it != fRegions.end()) { - return &(it->second); + return it->second.get(); } else { try { - auto r = fRegions.emplace(id, Region{*this, id, 0, true, nullptr}); - return &(r.first->second); + auto r = fRegions.emplace(id, fair::mq::tools::make_unique(*this, id, 0, true, nullptr)); + return r.first->second.get(); } catch (bipc::interprocess_exception& e) { diff --git a/fairmq/shmem/Manager.h b/fairmq/shmem/Manager.h index 0987d242..c50b59b6 100644 --- a/fairmq/shmem/Manager.h +++ b/fairmq/shmem/Manager.h @@ -66,7 +66,7 @@ class Manager std::string fManagementSegmentName; boost::interprocess::managed_shared_memory fSegment; boost::interprocess::managed_shared_memory fManagementSegment; - static std::unordered_map fRegions; + static std::unordered_map> fRegions; }; } // namespace shmem diff --git a/fairmq/shmem/Region.cxx b/fairmq/shmem/Region.cxx index 7c9cf17c..87272a3c 100644 --- a/fairmq/shmem/Region.cxx +++ b/fairmq/shmem/Region.cxx @@ -12,6 +12,8 @@ #include +#include + using namespace std; namespace bipc = ::boost::interprocess; @@ -32,7 +34,8 @@ Region::Region(Manager& manager, uint64_t id, uint64_t size, bool remote, FairMQ , fQueueName("fmq_" + fManager.fSessionName +"_rgq_" + to_string(id)) , fShmemObject() , fQueue(nullptr) - , fWorker() + , fReceiveAcksWorker() + , fSendAcksWorker() , fCallback(callback) { if (fRemote) @@ -49,52 +52,118 @@ Region::Region(Manager& manager, uint64_t id, uint64_t size, bool remote, FairMQ LOG(debug) << "shmem: created region: " << fName; fShmemObject.truncate(size); - fQueue = fair::mq::tools::make_unique(bipc::create_only, fQueueName.c_str(), 10000, sizeof(RegionBlock)); + fQueue = fair::mq::tools::make_unique(bipc::create_only, fQueueName.c_str(), 1024, fAckBunchSize * sizeof(RegionBlock)); LOG(debug) << "shmem: created region queue: " << fQueueName; } fRegion = bipc::mapped_region(fShmemObject, bipc::read_write); // TODO: add HUGEPAGES flag here // fRegion = bipc::mapped_region(fShmemObject, bipc::read_write, 0, 0, 0, MAP_ANONYMOUS | MAP_HUGETLB); + + fSendAcksWorker = std::thread(&Region::SendAcks, this); } void Region::StartReceivingAcks() { - fWorker = std::thread(&Region::ReceiveAcks, this); + fReceiveAcksWorker = std::thread(&Region::ReceiveAcks, this); } void Region::ReceiveAcks() { unsigned int priority; bipc::message_queue::size_type recvdSize; + std::unique_ptr blocks = fair::mq::tools::make_unique(fAckBunchSize); while (!fStop) // end thread condition (should exist until region is destroyed) { - auto rcvTill = bpt::microsec_clock::universal_time() + bpt::milliseconds(200); - RegionBlock block; - if (fQueue->timed_receive(&block, sizeof(RegionBlock), recvdSize, priority, rcvTill)) + auto rcvTill = bpt::microsec_clock::universal_time() + bpt::milliseconds(500); + + while (fQueue->timed_receive(blocks.get(), fAckBunchSize * sizeof(RegionBlock), recvdSize, priority, rcvTill)) { // LOG(debug) << "received: " << block.fHandle << " " << block.fSize << " " << block.fMessageId; if (fCallback) { - fCallback(reinterpret_cast(fRegion.get_address()) + block.fHandle, block.fSize, reinterpret_cast(block.fHint)); + const auto numBlocks = recvdSize / sizeof(RegionBlock); + for (size_t i = 0; i < numBlocks; i++) + { + fCallback(reinterpret_cast(fRegion.get_address()) + blocks[i].fHandle, blocks[i].fSize, reinterpret_cast(blocks[i].fHint)); + } } } - else - { - // LOG(debug) << "queue " << fQueueName << " timeout!"; - } } // while !fStop - LOG(debug) << "worker for " << fName << " leaving."; + LOG(debug) << "receive ack worker for " << fName << " leaving."; +} + +void Region::ReleaseBlock(const RegionBlock &block) +{ + std::unique_lock lock(fBlockLock); + + fBlocksToFree.emplace_back(block); + + if (fBlocksToFree.size() >= fAckBunchSize) + { + lock.unlock(); // reduces contention on fBlockLock + fBlockSendCV.notify_one(); + } +} + +void Region::SendAcks() +{ + std::unique_ptr blocks = fair::mq::tools::make_unique(fAckBunchSize); + + while (true) // we'll try to send all acks before stopping + { + size_t blocksToSend = 0; + + { // mutex locking block + std::unique_lock lock(fBlockLock); + + // try to get more blocks without waiting (we can miss a notify from CloseMessage()) + if (!fStop && (fBlocksToFree.size() < fAckBunchSize)) + { + // cv.wait() timeout: send whatever blocks we have + fBlockSendCV.wait_for(lock, std::chrono::milliseconds(500)); + } + + blocksToSend = std::min(fBlocksToFree.size(), fAckBunchSize); + + std::copy_n(fBlocksToFree.end() - blocksToSend, blocksToSend, blocks.get()); + fBlocksToFree.resize(fBlocksToFree.size() - blocksToSend); + } // unlock the block mutex here while sending over IPC + + if (blocksToSend > 0) + { + while (!fQueue->try_send(blocks.get(), blocksToSend * sizeof(RegionBlock), 0) && !fStop) + { + // receiver slow? yield and try again... + this_thread::yield(); + } + } + else // blocksToSend == 0 + { + if (fStop) + { + break; + } + } + } + + LOG(debug) << "send ack worker for " << fName << " leaving."; } Region::~Region() { + fStop = true; + + if (fSendAcksWorker.joinable()) + { + fSendAcksWorker.join(); + } + if (!fRemote) { - fStop = true; - if (fWorker.joinable()) + if (fReceiveAcksWorker.joinable()) { - fWorker.join(); + fReceiveAcksWorker.join(); } if (bipc::shared_memory_object::remove(fName.c_str())) diff --git a/fairmq/shmem/Region.h b/fairmq/shmem/Region.h index 81bb7a56..11b5d980 100644 --- a/fairmq/shmem/Region.h +++ b/fairmq/shmem/Region.h @@ -19,11 +19,14 @@ #include "FairMQUnmanagedRegion.h" #include +#include #include #include #include +#include +#include #include namespace fair @@ -47,6 +50,9 @@ struct Region void StartReceivingAcks(); void ReceiveAcks(); + void ReleaseBlock(const RegionBlock &); + void SendAcks(); + ~Region(); Manager& fManager; @@ -56,8 +62,15 @@ struct Region std::string fQueueName; boost::interprocess::shared_memory_object fShmemObject; boost::interprocess::mapped_region fRegion; + + std::mutex fBlockLock; + std::condition_variable fBlockSendCV; + std::vector fBlocksToFree; + const std::size_t fAckBunchSize = 256; std::unique_ptr fQueue; - std::thread fWorker; + + std::thread fReceiveAcksWorker; + std::thread fSendAcksWorker; FairMQRegionCallback fCallback; };