Optimize unmanaged region ReceiveAcks

This commit is contained in:
Alexey Rybalchenko 2020-06-16 14:39:21 +02:00
parent 3364da9541
commit f885b4618e
2 changed files with 97 additions and 79 deletions

View File

@ -32,6 +32,8 @@
#include <boost/interprocess/ipc/message_queue.hpp> #include <boost/interprocess/ipc/message_queue.hpp>
#include <cstdlib> // getenv #include <cstdlib> // getenv
#include <condition_variable>
#include <mutex>
#include <set> #include <set>
#include <stdexcept> #include <stdexcept>
#include <string> #include <string>
@ -100,39 +102,6 @@ class Manager
Manager(const Manager&) = delete; Manager(const Manager&) = delete;
Manager operator=(const Manager&) = delete; Manager operator=(const Manager&) = delete;
~Manager()
{
using namespace boost::interprocess;
bool lastRemoved = false;
UnsubscribeFromRegionEvents();
fSendHeartbeats = false;
fHeartbeatThread.join();
try {
boost::interprocess::scoped_lock<named_mutex> lock(fShmMtx);
(fDeviceCounter->fCount)--;
if (fDeviceCounter->fCount == 0) {
LOG(debug) << "last segment user, removing segment.";
RemoveSegments();
lastRemoved = true;
} else {
LOG(debug) << "other segment users present (" << fDeviceCounter->fCount << "), not removing it.";
}
} catch(interprocess_exception& e) {
LOG(error) << "error while acquiring lock in Manager destructor: " << e.what();
}
if (lastRemoved) {
named_mutex::remove(std::string("fmq_" + fShmId + "_mtx").c_str());
named_condition::remove(std::string("fmq_" + fShmId + "_cv").c_str());
}
}
boost::interprocess::managed_shared_memory& Segment() { return fSegment; } boost::interprocess::managed_shared_memory& Segment() { return fSegment; }
boost::interprocess::managed_shared_memory& ManagementSegment() { return fManagementSegment; } boost::interprocess::managed_shared_memory& ManagementSegment() { return fManagementSegment; }
@ -274,9 +243,9 @@ class Manager
void RemoveRegion(const uint64_t id) void RemoveRegion(const uint64_t id)
{ {
fRegions.erase(id);
{ {
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx); boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
fRegions.erase(id);
fRegionInfos->at(id).fDestroyed = true; fRegionInfos->at(id).fDestroyed = true;
} }
fRegionEventsCV.notify_all(); fRegionEventsCV.notify_all();
@ -385,39 +354,79 @@ class Manager
{ {
using namespace boost::interprocess; using namespace boost::interprocess;
if (shared_memory_object::remove(fSegmentName.c_str())) { if (shared_memory_object::remove(fSegmentName.c_str())) {
LOG(debug) << "successfully removed '" << fSegmentName << "' segment after the device has stopped."; LOG(debug) << "Removed '" << fSegmentName << "' segment after the device has stopped.";
} else { } else {
LOG(debug) << "did not remove " << fSegmentName << " segment after the device stopped. Already removed?"; LOG(debug) << "Did not remove " << fSegmentName << " segment after the device stopped. Already removed?";
} }
if (shared_memory_object::remove(fManagementSegmentName.c_str())) { if (shared_memory_object::remove(fManagementSegmentName.c_str())) {
LOG(debug) << "successfully removed '" << fManagementSegmentName << "' segment after the device has stopped."; LOG(debug) << "Removed '" << fManagementSegmentName << "' segment after the device has stopped.";
} else { } else {
LOG(debug) << "did not remove '" << fManagementSegmentName << "' segment after the device stopped. Already removed?"; LOG(debug) << "Did not remove '" << fManagementSegmentName << "' segment after the device stopped. Already removed?";
} }
} }
void SendHeartbeats() void SendHeartbeats()
{ {
std::string controlQueueName("fmq_" + fShmId + "_cq"); std::string controlQueueName("fmq_" + fShmId + "_cq");
std::unique_lock<std::mutex> lock(fHeartbeatsMtx);
while (fSendHeartbeats) { while (fSendHeartbeats) {
try { try {
boost::interprocess::message_queue mq(boost::interprocess::open_only, controlQueueName.c_str()); boost::interprocess::message_queue mq(boost::interprocess::open_only, controlQueueName.c_str());
boost::posix_time::ptime sndTill = boost::posix_time::microsec_clock::universal_time() + boost::posix_time::milliseconds(100); boost::posix_time::ptime sndTill = boost::posix_time::microsec_clock::universal_time() + boost::posix_time::milliseconds(100);
if (mq.timed_send(fDeviceId.c_str(), fDeviceId.size(), 0, sndTill)) { if (mq.timed_send(fDeviceId.c_str(), fDeviceId.size(), 0, sndTill)) {
std::this_thread::sleep_for(std::chrono::milliseconds(100)); fHeartbeatsCV.wait_for(lock, std::chrono::milliseconds(100), [&]() { return !fSendHeartbeats; });
} else { } else {
LOG(debug) << "control queue timeout"; LOG(debug) << "control queue timeout";
} }
} catch (boost::interprocess::interprocess_exception& ie) { } catch (boost::interprocess::interprocess_exception& ie) {
std::this_thread::sleep_for(std::chrono::milliseconds(500)); fHeartbeatsCV.wait_for(lock, std::chrono::milliseconds(500), [&]() { return !fSendHeartbeats; });
// LOG(warn) << "no " << controlQueueName << " found"; // LOG(debug) << "no " << controlQueueName << " found";
} }
} }
} }
bool ThrowingOnBadAlloc() const { return fThrowOnBadAlloc; } bool ThrowingOnBadAlloc() const { return fThrowOnBadAlloc; }
~Manager()
{
using namespace boost::interprocess;
bool lastRemoved = false;
UnsubscribeFromRegionEvents();
{
std::unique_lock<std::mutex> lock(fHeartbeatsMtx);
fSendHeartbeats = false;
}
fHeartbeatsCV.notify_one();
if (fHeartbeatThread.joinable()) {
fHeartbeatThread.join();
}
try {
boost::interprocess::scoped_lock<named_mutex> lock(fShmMtx);
(fDeviceCounter->fCount)--;
if (fDeviceCounter->fCount == 0) {
LOG(debug) << "Last segment user, removing segment.";
RemoveSegments();
lastRemoved = true;
} else {
LOG(debug) << "Other segment users present (" << fDeviceCounter->fCount << "), skipping removal.";
}
} catch(interprocess_exception& e) {
LOG(error) << "Manager could not acquire lock: " << e.what();
}
if (lastRemoved) {
named_mutex::remove(std::string("fmq_" + fShmId + "_mtx").c_str());
named_condition::remove(std::string("fmq_" + fShmId + "_cv").c_str());
}
}
private: private:
std::string fShmId; std::string fShmId;
std::string fDeviceId; std::string fDeviceId;
@ -442,7 +451,10 @@ class Manager
std::atomic<int32_t> fMsgCounter; // TODO: find a better lifetime solution instead of the counter std::atomic<int32_t> fMsgCounter; // TODO: find a better lifetime solution instead of the counter
std::thread fHeartbeatThread; std::thread fHeartbeatThread;
std::atomic<bool> fSendHeartbeats; bool fSendHeartbeats;
std::mutex fHeartbeatsMtx;
std::condition_variable fHeartbeatsCV;
bool fThrowOnBadAlloc; bool fThrowOnBadAlloc;
}; };

View File

@ -58,8 +58,8 @@ struct Region
, fFile(nullptr) , fFile(nullptr)
, fFileMapping() , fFileMapping()
, fQueue(nullptr) , fQueue(nullptr)
, fReceiveAcksWorker() , fAcksReceiver()
, fSendAcksWorker() , fAcksSender()
, fCallback(callback) , fCallback(callback)
, fBulkCallback(bulkCallback) , fBulkCallback(bulkCallback)
{ {
@ -120,38 +120,35 @@ struct Region
LOG(debug) << "shmem: initialized region queue: " << fQueueName; LOG(debug) << "shmem: initialized region queue: " << fQueueName;
} }
void StartSendingAcks() void StartSendingAcks() { fAcksSender = std::thread(&Region::SendAcks, this); }
{
fSendAcksWorker = std::thread(&Region::SendAcks, this);
}
void SendAcks() void SendAcks()
{ {
std::unique_ptr<RegionBlock[]> blocks = tools::make_unique<RegionBlock[]>(fAckBunchSize); std::unique_ptr<RegionBlock[]> blocks = tools::make_unique<RegionBlock[]>(fAckBunchSize);
size_t blocksToSend = 0;
while (true) { // we'll try to send all acks before stopping while (true) {
size_t blocksToSend = 0; blocksToSend = 0;
{
{ // mutex locking block
std::unique_lock<std::mutex> lock(fBlockMtx); std::unique_lock<std::mutex> lock(fBlockMtx);
// try to get more blocks without waiting (we can miss a notify from CloseMessage()) // try to get <fAckBunchSize> blocks
if (!fStop && (fBlocksToFree.size() < fAckBunchSize)) { if (fBlocksToFree.size() < fAckBunchSize) {
// cv.wait() timeout: send whatever blocks we have
fBlockSendCV.wait_for(lock, std::chrono::milliseconds(500)); fBlockSendCV.wait_for(lock, std::chrono::milliseconds(500));
} }
// send whatever blocks we have
blocksToSend = std::min(fBlocksToFree.size(), fAckBunchSize); blocksToSend = std::min(fBlocksToFree.size(), fAckBunchSize);
copy_n(fBlocksToFree.end() - blocksToSend, blocksToSend, blocks.get()); copy_n(fBlocksToFree.end() - blocksToSend, blocksToSend, blocks.get());
fBlocksToFree.resize(fBlocksToFree.size() - blocksToSend); fBlocksToFree.resize(fBlocksToFree.size() - blocksToSend);
} // unlock the block mutex here while sending over IPC }
if (blocksToSend > 0) { if (blocksToSend > 0) {
while (!fQueue->try_send(blocks.get(), blocksToSend * sizeof(RegionBlock), 0) && !fStop) { while (!fQueue->try_send(blocks.get(), blocksToSend * sizeof(RegionBlock), 0) && !fStop) {
// receiver slow? yield and try again... // receiver slow? yield and try again...
std::this_thread::yield(); std::this_thread::yield();
} }
// LOG(debug) << "Sent " << blocksToSend << " blocks.";
} else { // blocksToSend == 0 } else { // blocksToSend == 0
if (fStop) { if (fStop) {
break; break;
@ -159,14 +156,11 @@ struct Region
} }
} }
LOG(debug) << "send ack worker for " << fName << " leaving."; LOG(debug) << "AcksSender for " << fName << " leaving " << "(blocks left to free: " << fBlocksToFree.size() << ", "
} << " blocks left to send: " << blocksToSend << ").";
void StartReceivingAcks()
{
fReceiveAcksWorker = std::thread(&Region::ReceiveAcks, this);
} }
void StartReceivingAcks() { fAcksReceiver = std::thread(&Region::ReceiveAcks, this); }
void ReceiveAcks() void ReceiveAcks()
{ {
unsigned int priority; unsigned int priority;
@ -175,12 +169,18 @@ struct Region
std::vector<fair::mq::RegionBlock> result; std::vector<fair::mq::RegionBlock> result;
result.reserve(fAckBunchSize); result.reserve(fAckBunchSize);
while (!fStop) { // end thread condition (should exist until region is destroyed) while (true) {
auto rcvTill = boost::posix_time::microsec_clock::universal_time() + boost::posix_time::milliseconds(500); uint32_t timeout = 100;
bool leave = false;
if (fStop) {
timeout = fLinger;
leave = true;
}
auto rcvTill = boost::posix_time::microsec_clock::universal_time() + boost::posix_time::milliseconds(timeout);
while (fQueue->timed_receive(blocks.get(), fAckBunchSize * sizeof(RegionBlock), recvdSize, priority, rcvTill)) { while (fQueue->timed_receive(blocks.get(), fAckBunchSize * sizeof(RegionBlock), recvdSize, priority, rcvTill)) {
// LOG(debug) << "received: " << block.fHandle << " " << block.fSize << " " << block.fMessageId;
const auto numBlocks = recvdSize / sizeof(RegionBlock); const auto numBlocks = recvdSize / sizeof(RegionBlock);
// LOG(debug) << "Received " << numBlocks << " blocks (recvdSize: " << recvdSize << "). (remaining queue size: " << fQueue->get_num_msg() << ").";
if (fBulkCallback) { if (fBulkCallback) {
result.clear(); result.clear();
for (size_t i = 0; i < numBlocks; i++) { for (size_t i = 0; i < numBlocks; i++) {
@ -193,9 +193,13 @@ struct Region
} }
} }
} }
} // while !fStop
LOG(debug) << "ReceiveAcks() worker for " << fName << " leaving."; if (leave) {
break;
}
}
LOG(debug) << "AcksReceiver for " << fName << " leaving (remaining queue size: " << fQueue->get_num_msg() << ").";
} }
void ReleaseBlock(const RegionBlock& block) void ReleaseBlock(const RegionBlock& block)
@ -205,7 +209,7 @@ struct Region
fBlocksToFree.emplace_back(block); fBlocksToFree.emplace_back(block);
if (fBlocksToFree.size() >= fAckBunchSize) { if (fBlocksToFree.size() >= fAckBunchSize) {
lock.unlock(); // reduces contention on fBlockMtx lock.unlock();
fBlockSendCV.notify_one(); fBlockSendCV.notify_one();
} }
} }
@ -217,22 +221,22 @@ struct Region
{ {
fStop = true; fStop = true;
if (fSendAcksWorker.joinable()) { if (fAcksSender.joinable()) {
fBlockSendCV.notify_one(); fBlockSendCV.notify_one();
fSendAcksWorker.join(); fAcksSender.join();
} }
if (!fRemote) { if (!fRemote) {
if (fReceiveAcksWorker.joinable()) { if (fAcksReceiver.joinable()) {
fReceiveAcksWorker.join(); fAcksReceiver.join();
} }
if (boost::interprocess::shared_memory_object::remove(fName.c_str())) { if (boost::interprocess::shared_memory_object::remove(fName.c_str())) {
LOG(debug) << "shmem: destroyed region " << fName; LOG(debug) << "Region " << fName << " destroyed.";
} }
if (boost::interprocess::file_mapping::remove(fName.c_str())) { if (boost::interprocess::file_mapping::remove(fName.c_str())) {
LOG(debug) << "shmem: destroyed file mapping " << fName; LOG(debug) << "File mapping " << fName << " destroyed.";
} }
if (fFile) { if (fFile) {
@ -240,12 +244,14 @@ struct Region
} }
if (boost::interprocess::message_queue::remove(fQueueName.c_str())) { if (boost::interprocess::message_queue::remove(fQueueName.c_str())) {
LOG(debug) << "shmem: removed region queue " << fQueueName; LOG(debug) << "Region queue " << fQueueName << " destroyed.";
} }
} else { } else {
// LOG(debug) << "shmem: region '" << fName << "' is remote, no cleanup necessary."; // LOG(debug) << "shmem: region '" << fName << "' is remote, no cleanup necessary.";
LOG(debug) << "shmem: region queue '" << fQueueName << "' is remote, no cleanup necessary"; LOG(debug) << "Region queue '" << fQueueName << "' is remote, no cleanup necessary";
} }
LOG(debug) << "Region " << fName << " (" << (fRemote ? "remote" : "local") << ") destructed.";
} }
bool fRemote; bool fRemote;
@ -264,8 +270,8 @@ struct Region
const std::size_t fAckBunchSize = 256; const std::size_t fAckBunchSize = 256;
std::unique_ptr<boost::interprocess::message_queue> fQueue; std::unique_ptr<boost::interprocess::message_queue> fQueue;
std::thread fReceiveAcksWorker; std::thread fAcksReceiver;
std::thread fSendAcksWorker; std::thread fAcksSender;
RegionCallback fCallback; RegionCallback fCallback;
RegionBulkCallback fBulkCallback; RegionBulkCallback fBulkCallback;
}; };