diff --git a/fairmq/shmem/Common.h b/fairmq/shmem/Common.h index 0bd987dd..e6668c79 100644 --- a/fairmq/shmem/Common.h +++ b/fairmq/shmem/Common.h @@ -124,6 +124,15 @@ struct EventCounter std::atomic fCount; }; +struct Heartbeat +{ + Heartbeat(uint64_t c) + : fCount(c) + {} + + std::atomic fCount; +}; + struct RegionCounter { RegionCounter(uint16_t c) diff --git a/fairmq/shmem/Manager.h b/fairmq/shmem/Manager.h index b6f51194..e1505516 100644 --- a/fairmq/shmem/Manager.h +++ b/fairmq/shmem/Manager.h @@ -80,7 +80,7 @@ class Manager , fMsgCounterNew(0) , fMsgCounterDelete(0) #endif - , fSendHeartbeats(true) + , fBeatTheHeart(true) , fThrowOnBadAlloc(config ? config->GetProperty("shm-throw-bad-alloc", true) : true) , fNoCleanup(config ? config->GetProperty("shm-no-cleanup", false) : false) { @@ -106,7 +106,7 @@ class Manager StartMonitor(fShmId); } - fHeartbeatThread = std::thread(&Manager::SendHeartbeats, this); + fHeartbeatThread = std::thread(&Manager::Heartbeats, this); { std::stringstream ss; @@ -544,23 +544,15 @@ class Manager void DecrementShmMsgCounter(uint16_t segmentId) { --((*fShmMsgCounters)[segmentId].fCount); } #endif - void SendHeartbeats() + void Heartbeats() { - std::string controlQueueName("fmq_" + fShmId + "_cq"); + using namespace boost::interprocess; + + Heartbeat* hb = fManagementSegment.find_or_construct(unique_instance)(0); std::unique_lock lock(fHeartbeatsMtx); - while (fSendHeartbeats) { - try { - boost::interprocess::message_queue mq(boost::interprocess::open_only, controlQueueName.c_str()); - boost::posix_time::ptime sndTill = boost::posix_time::microsec_clock::universal_time() + boost::posix_time::milliseconds(100); - if (mq.timed_send(fDeviceId.c_str(), fDeviceId.size(), 0, sndTill)) { - fHeartbeatsCV.wait_for(lock, std::chrono::milliseconds(100), [&]() { return !fSendHeartbeats; }); - } else { - LOG(debug) << "control queue timeout"; - } - } catch (boost::interprocess::interprocess_exception& ie) { - fHeartbeatsCV.wait_for(lock, std::chrono::milliseconds(500), [&]() { return !fSendHeartbeats; }); - // LOG(debug) << "no " << controlQueueName << " found"; - } + while (fBeatTheHeart) { + (hb->fCount)++; + fHeartbeatsCV.wait_for(lock, std::chrono::milliseconds(100), [&]() { return !fBeatTheHeart; }); } } @@ -678,7 +670,7 @@ class Manager { std::unique_lock lock(fHeartbeatsMtx); - fSendHeartbeats = false; + fBeatTheHeart = false; } fHeartbeatsCV.notify_one(); if (fHeartbeatThread.joinable()) { @@ -744,14 +736,12 @@ class Manager #endif std::thread fHeartbeatThread; - bool fSendHeartbeats; + bool fBeatTheHeart; std::mutex fHeartbeatsMtx; std::condition_variable fHeartbeatsCV; bool fThrowOnBadAlloc; bool fNoCleanup; - - }; } // namespace fair::mq::shmem diff --git a/fairmq/shmem/Monitor.cxx b/fairmq/shmem/Monitor.cxx index ea6e5172..910a5f83 100644 --- a/fairmq/shmem/Monitor.cxx +++ b/fairmq/shmem/Monitor.cxx @@ -87,7 +87,6 @@ Monitor::Monitor(string shmId, bool selfDestruct, bool interactive, bool viewOnl , fTimeoutInMS(timeoutInMS) , fIntervalInMS(intervalInMS) , fShmId(std::move(shmId)) - , fControlQueueName("fmq_" + fShmId + "_cq") , fTerminating(false) , fHeartbeatTriggered(false) , fLastHeartbeat(chrono::high_resolution_clock::now()) @@ -132,8 +131,7 @@ void Monitor::Run() { thread heartbeatThread; if (!fViewOnly) { - RemoveQueue(fControlQueueName); - heartbeatThread = thread(&Monitor::ReceiveHeartbeats, this); + heartbeatThread = thread(&Monitor::CheckHeartbeats, this); } if (fInteractive) { @@ -158,7 +156,7 @@ void Monitor::Watch() fSeenOnce = true; auto now = chrono::high_resolution_clock::now(); - unsigned int duration = chrono::duration_cast(now - fLastHeartbeat).count(); + unsigned int duration = chrono::duration_cast(now - fLastHeartbeat.load()).count(); if (fHeartbeatTriggered && duration > fTimeoutInMS) { // memory is present, but no heartbeats since timeout duration @@ -181,7 +179,7 @@ void Monitor::Watch() } else { // if self-destruct is requested, and no segment has ever been observed, quit after double timeout duration auto now = chrono::high_resolution_clock::now(); - unsigned int duration = chrono::duration_cast(now - fLastHeartbeat).count(); + unsigned int duration = chrono::duration_cast(now - fLastHeartbeat.load()).count(); if (duration > fTimeoutInMS * 2) { Cleanup(ShmId{fShmId}); @@ -305,31 +303,30 @@ void Monitor::ListAll(const std::string& path) } } -void Monitor::ReceiveHeartbeats() +void Monitor::CheckHeartbeats() { - try { - bipc::message_queue mq(bipc::open_or_create, fControlQueueName.c_str(), 1000, 256); + using namespace boost::interprocess; - unsigned int priority = 0; - bipc::message_queue::size_type recvdSize = 0; - char msg[256] = {0}; + uint64_t localHb = 0; - while (!fTerminating) { - bpt::ptime rcvTill = bpt::microsec_clock::universal_time() + bpt::milliseconds(100); - if (mq.timed_receive(&msg, sizeof(msg), recvdSize, priority, rcvTill)) { - fHeartbeatTriggered = true; - fLastHeartbeat = chrono::high_resolution_clock::now(); - string deviceId(msg, recvdSize); - fDeviceHeartbeats[deviceId] = fLastHeartbeat; - } else { - // LOG(info) << "control queue timeout"; + while (!fTerminating) { + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + try { + managed_shared_memory managementSegment(open_read_only, std::string("fmq_" + fShmId + "_mng").c_str()); + Heartbeat* hb = managementSegment.find(unique_instance).first; + + if (hb) { + uint64_t globalHb = hb->fCount; + if (localHb != globalHb) { + fHeartbeatTriggered = true; + fLastHeartbeat.store(chrono::high_resolution_clock::now()); + localHb = globalHb; + } } + } catch (bie&) { + // management segment not found, simply retry. } - } catch (bie& ie) { - LOG(info) << ie.what(); } - - RemoveQueue(fControlQueueName); } void Monitor::Interactive() @@ -629,7 +626,6 @@ std::vector> Monitor::CleanupFull(const ShmId& shmI { auto result = Cleanup(shmId, verbose); result.emplace_back(RunRemoval(Monitor::RemoveMutex, "fmq_" + shmId.shmId + "_ms", verbose)); - result.emplace_back(RunRemoval(Monitor::RemoveQueue, "fmq_" + shmId.shmId + "_cq", verbose)); return result; } diff --git a/fairmq/shmem/Monitor.h b/fairmq/shmem/Monitor.h index d82483d5..894e5114 100644 --- a/fairmq/shmem/Monitor.h +++ b/fairmq/shmem/Monitor.h @@ -117,7 +117,7 @@ class Monitor private: void PrintHelp(); void Watch(); - void ReceiveHeartbeats(); + void CheckHeartbeats(); void CheckSegment(); void Interactive(); void SignalMonitor(); @@ -131,12 +131,10 @@ class Monitor unsigned int fTimeoutInMS; unsigned int fIntervalInMS; std::string fShmId; - std::string fControlQueueName; std::atomic fTerminating; std::atomic fHeartbeatTriggered; - std::chrono::high_resolution_clock::time_point fLastHeartbeat; + std::atomic fLastHeartbeat; std::thread fSignalThread; - std::unordered_map fDeviceHeartbeats; }; } // namespace fair::mq::shmem diff --git a/fairmq/shmem/README.md b/fairmq/shmem/README.md index 9c58c3ff..7a084471 100644 --- a/fairmq/shmem/README.md +++ b/fairmq/shmem/README.md @@ -19,7 +19,6 @@ FairMQ Shared Memory currently uses the following names to register shared memor | `fmq__rg_` | unmanaged region(s) | one of the devices | devices with unmanaged regions | | `fmq__rgq_` | unmanaged region queue(s) | one of the devices | devices with unmanaged regions | | `fmq__ms` | shmmonitor status | shmmonitor | devices, shmmonitor | -| `fmq__cq` | message queue between transport and shmmonitor | shmmonitor | devices, shmmonitor | The shmId is generated out of session id and user id.