mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-14 09:06:47 +00:00
shmmonitor: refactor to separate monitoring from output
This commit is contained in:
parent
79d5ac281b
commit
9cdf2c773b
|
@ -10,7 +10,6 @@
|
|||
#include "Common.h"
|
||||
|
||||
#include <fairmq/tools/Strings.h>
|
||||
#include <fairlogger/Logger.h>
|
||||
|
||||
#include <boost/interprocess/managed_shared_memory.hpp>
|
||||
#include <boost/interprocess/file_mapping.hpp>
|
||||
|
@ -81,8 +80,6 @@ Monitor::Monitor(const string& shmId, bool selfDestruct, bool interactive, bool
|
|||
, fTimeoutInMS(timeoutInMS)
|
||||
, fIntervalInMS(intervalInMS)
|
||||
, fShmId(shmId)
|
||||
, fSegmentName("fmq_" + fShmId + "_m_0")
|
||||
, fManagementSegmentName("fmq_" + fShmId + "_mng")
|
||||
, fControlQueueName("fmq_" + fShmId + "_cq")
|
||||
, fTerminating(false)
|
||||
, fHeartbeatTriggered(false)
|
||||
|
@ -90,18 +87,18 @@ Monitor::Monitor(const string& shmId, bool selfDestruct, bool interactive, bool
|
|||
, fSignalThread()
|
||||
, fDeviceHeartbeats()
|
||||
{
|
||||
if (fMonitor) {
|
||||
if (!fViewOnly) {
|
||||
try {
|
||||
bipc::named_mutex monitorStatus(bipc::create_only, string("fmq_" + fShmId + "_ms").c_str());
|
||||
} catch (bie&) {
|
||||
cout << "fairmq-shmmonitor (in monitoring mode) for shared memory id " << fShmId << " already started or did not not properly exited. Try `fairmq-shmmonitor --cleanup --shmid " << fShmId << "`" << endl;
|
||||
throw DaemonPresent(tools::ToString("fairmq-shmmonitor (in monitoring mode) for shared memory id ", fShmId, " already started or did not not properly exited."));
|
||||
if (fInteractive) {
|
||||
cout << "fairmq-shmmonitor for shm id " << fShmId << " is already running. Try `fairmq-shmmonitor --cleanup --shmid " << fShmId << "`, or run in view-only mode (-v)" << endl;
|
||||
} else {
|
||||
cout << "fairmq-shmmonitor for shm id " << fShmId << " is already running. Try `fairmq-shmmonitor --cleanup --shmid " << fShmId << "`" << endl;
|
||||
}
|
||||
throw DaemonPresent(tools::ToString("fairmq-shmmonitor (in monitoring mode) for shm id ", fShmId, " is already running."));
|
||||
}
|
||||
}
|
||||
|
||||
Logger::SetConsoleColor(false);
|
||||
Logger::DefineVerbosity(Verbosity::user1, VerbositySpec::Make(VerbositySpec::Info::timestamp_us));
|
||||
Logger::SetVerbosity(Verbosity::verylow);
|
||||
}
|
||||
|
||||
void Monitor::CatchSignals()
|
||||
|
@ -131,18 +128,13 @@ void Monitor::Run()
|
|||
thread heartbeatThread;
|
||||
if (!fViewOnly) {
|
||||
RemoveQueue(fControlQueueName);
|
||||
heartbeatThread = thread(&Monitor::MonitorHeartbeats, this);
|
||||
heartbeatThread = thread(&Monitor::ReceiveHeartbeats, this);
|
||||
}
|
||||
|
||||
if (fInteractive) {
|
||||
Interactive();
|
||||
} else if (fViewOnly) {
|
||||
CheckSegment();
|
||||
} else {
|
||||
while (!fTerminating) {
|
||||
this_thread::sleep_for(chrono::milliseconds(500));
|
||||
CheckSegment();
|
||||
}
|
||||
Watch();
|
||||
}
|
||||
|
||||
if (!fViewOnly) {
|
||||
|
@ -150,7 +142,136 @@ void Monitor::Run()
|
|||
}
|
||||
}
|
||||
|
||||
void Monitor::MonitorHeartbeats()
|
||||
void Monitor::Watch()
|
||||
{
|
||||
while (!fTerminating) {
|
||||
using namespace boost::interprocess;
|
||||
|
||||
try {
|
||||
managed_shared_memory managementSegment(open_read_only, std::string("fmq_" + fShmId + "_mng").c_str());
|
||||
|
||||
fSeenOnce = true;
|
||||
|
||||
auto now = chrono::high_resolution_clock::now();
|
||||
unsigned int duration = chrono::duration_cast<chrono::milliseconds>(now - fLastHeartbeat).count();
|
||||
|
||||
if (fHeartbeatTriggered && duration > fTimeoutInMS) {
|
||||
// memory is present, but no heartbeats since timeout duration
|
||||
cout << "no heartbeats since over " << fTimeoutInMS << " milliseconds, cleaning..." << endl;
|
||||
Cleanup(ShmId{fShmId});
|
||||
fHeartbeatTriggered = false;
|
||||
if (fSelfDestruct) {
|
||||
cout << "self destructing (segment has been observed and cleaned up by the monitor)" << endl;
|
||||
fTerminating = true;
|
||||
}
|
||||
}
|
||||
} catch (bie&) {
|
||||
fHeartbeatTriggered = false;
|
||||
|
||||
if (fSelfDestruct) {
|
||||
if (fSeenOnce) {
|
||||
// segment has been observed at least once, can self-destruct
|
||||
cout << "self destructing (segment has been observed and cleaned up orderly)" << endl;
|
||||
fTerminating = true;
|
||||
} else {
|
||||
// if self-destruct is requested, and no segment has ever been observed, quit after double timeout duration
|
||||
auto now = chrono::high_resolution_clock::now();
|
||||
unsigned int duration = chrono::duration_cast<chrono::milliseconds>(now - fLastHeartbeat).count();
|
||||
|
||||
if (duration > fTimeoutInMS * 2) {
|
||||
Cleanup(ShmId{fShmId});
|
||||
cout << "self destructing (no segments observed within (timeout * 2) since start)" << endl;
|
||||
fTerminating = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
this_thread::sleep_for(chrono::milliseconds(100));
|
||||
}
|
||||
}
|
||||
|
||||
bool Monitor::PrintShm(const ShmId& shmId)
|
||||
{
|
||||
using namespace boost::interprocess;
|
||||
|
||||
try {
|
||||
managed_shared_memory managementSegment(open_read_only, std::string("fmq_" + shmId.shmId + "_mng").c_str());
|
||||
VoidAlloc allocInstance(managementSegment.get_segment_manager());
|
||||
|
||||
Uint16SegmentInfoHashMap* segmentInfos = managementSegment.find<Uint16SegmentInfoHashMap>(unique_instance).first;
|
||||
std::unordered_map<uint16_t, boost::variant<RBTreeBestFitSegment, SimpleSeqFitSegment>> segments;
|
||||
|
||||
if (!segmentInfos) {
|
||||
cout << "Found management segment, but cannot locate segment info, something went wrong..." << endl;
|
||||
return false;
|
||||
}
|
||||
|
||||
for (const auto& s : *segmentInfos) {
|
||||
if (s.second.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) {
|
||||
segments.emplace(s.first, RBTreeBestFitSegment(open_read_only, std::string("fmq_" + shmId.shmId + "_m_" + to_string(s.first)).c_str()));
|
||||
} else {
|
||||
segments.emplace(s.first, SimpleSeqFitSegment(open_read_only, std::string("fmq_" + shmId.shmId + "_m_" + to_string(s.first)).c_str()));
|
||||
}
|
||||
}
|
||||
|
||||
unsigned int numDevices = 0;
|
||||
int creatorId = -1;
|
||||
std::string sessionName;
|
||||
|
||||
DeviceCounter* deviceCounter = managementSegment.find<DeviceCounter>(unique_instance).first;
|
||||
if (deviceCounter) {
|
||||
numDevices = deviceCounter->fCount;
|
||||
}
|
||||
SessionInfo* sessionInfo = managementSegment.find<SessionInfo>(unique_instance).first;
|
||||
if (sessionInfo) {
|
||||
creatorId = sessionInfo->fCreatorId;
|
||||
sessionName = sessionInfo->fSessionName;
|
||||
}
|
||||
#ifdef FAIRMQ_DEBUG_MODE
|
||||
Uint16MsgCounterHashMap* msgCounters = managementSegment.find<Uint16MsgCounterHashMap>(unique_instance).first;
|
||||
#endif
|
||||
|
||||
stringstream ss;
|
||||
size_t mfree = managementSegment.get_free_memory();
|
||||
size_t mtotal = managementSegment.get_size();
|
||||
size_t mused = mtotal - mfree;
|
||||
|
||||
ss << "shm id: " << shmId.shmId
|
||||
<< ", session: " << sessionName
|
||||
<< ", creator id: " << creatorId
|
||||
<< ", devices: " << numDevices
|
||||
<< ", segments:\n";
|
||||
|
||||
for (const auto& s : segments) {
|
||||
size_t free = boost::apply_visitor(SegmentFreeMemory(), s.second);
|
||||
size_t total = boost::apply_visitor(SegmentSize(), s.second);
|
||||
size_t used = total - free;
|
||||
ss << " [" << s.first
|
||||
<< "]: total: " << total
|
||||
#ifdef FAIRMQ_DEBUG_MODE
|
||||
<< ", msgs: " << ( (msgCounters != nullptr) ? to_string((*msgCounters)[s.first].fCount) : "unknown")
|
||||
#else
|
||||
<< ", msgs: NODEBUG"
|
||||
#endif
|
||||
<< ", free: " << free
|
||||
<< ", used: " << used
|
||||
<< "\n";
|
||||
}
|
||||
|
||||
ss << " [m]: "
|
||||
<< "total: " << mtotal
|
||||
<< ", free: " << mfree
|
||||
<< ", used: " << mused;
|
||||
LOGV(info, user1) << ss.str();
|
||||
} catch (bie&) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void Monitor::ReceiveHeartbeats()
|
||||
{
|
||||
try {
|
||||
bipc::message_queue mq(bipc::open_or_create, fControlQueueName.c_str(), 1000, 256);
|
||||
|
@ -235,132 +356,7 @@ void Monitor::Interactive()
|
|||
break;
|
||||
}
|
||||
|
||||
CheckSegment();
|
||||
|
||||
if (!fTerminating) {
|
||||
cout << "\r";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void Monitor::CheckSegment()
|
||||
{
|
||||
using namespace boost::interprocess;
|
||||
|
||||
try {
|
||||
managed_shared_memory managementSegment(open_read_only, fManagementSegmentName.c_str());
|
||||
VoidAlloc allocInstance(managementSegment.get_segment_manager());
|
||||
|
||||
Uint16SegmentInfoHashMap* segmentInfos = managementSegment.find<Uint16SegmentInfoHashMap>(unique_instance).first;
|
||||
std::unordered_map<uint16_t, boost::variant<RBTreeBestFitSegment, SimpleSeqFitSegment>> segments;
|
||||
|
||||
if (!segmentInfos) {
|
||||
cout << "Found management segment, but cannot locate segment info, something went wrong..." << endl;
|
||||
return;
|
||||
}
|
||||
|
||||
for (const auto& s : *segmentInfos) {
|
||||
if (s.second.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) {
|
||||
segments.emplace(s.first, RBTreeBestFitSegment(open_read_only, std::string("fmq_" + fShmId + "_m_" + to_string(s.first)).c_str()));
|
||||
} else {
|
||||
segments.emplace(s.first, SimpleSeqFitSegment(open_read_only, std::string("fmq_" + fShmId + "_m_" + to_string(s.first)).c_str()));
|
||||
}
|
||||
}
|
||||
|
||||
fSeenOnce = true;
|
||||
|
||||
unsigned int numDevices = 0;
|
||||
int creatorId = -1;
|
||||
std::string sessionName;
|
||||
#ifdef FAIRMQ_DEBUG_MODE
|
||||
Uint16MsgCounterHashMap* msgCounters = nullptr;
|
||||
#endif
|
||||
|
||||
if (fInteractive || fViewOnly) {
|
||||
DeviceCounter* dc = managementSegment.find<DeviceCounter>(unique_instance).first;
|
||||
if (dc) {
|
||||
numDevices = dc->fCount;
|
||||
}
|
||||
SessionInfo* si = managementSegment.find<SessionInfo>(unique_instance).first;
|
||||
if (si) {
|
||||
creatorId = si->fCreatorId;
|
||||
sessionName = si->fSessionName;
|
||||
}
|
||||
#ifdef FAIRMQ_DEBUG_MODE
|
||||
msgCounters = managementSegment.find<Uint16MsgCounterHashMap>(unique_instance).first;
|
||||
#endif
|
||||
}
|
||||
|
||||
auto now = chrono::high_resolution_clock::now();
|
||||
unsigned int duration = chrono::duration_cast<chrono::milliseconds>(now - fLastHeartbeat).count();
|
||||
|
||||
if (fHeartbeatTriggered && duration > fTimeoutInMS) {
|
||||
// memory is present, but no heartbeats since timeout duration
|
||||
cout << "no heartbeats since over " << fTimeoutInMS << " milliseconds, cleaning..." << endl;
|
||||
Cleanup(ShmId{fShmId});
|
||||
fHeartbeatTriggered = false;
|
||||
if (fSelfDestruct) {
|
||||
cout << "self destructing (segment has been observed and cleaned up by the monitor)" << endl;
|
||||
fTerminating = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (fInteractive || fViewOnly) {
|
||||
stringstream ss;
|
||||
size_t mfree = managementSegment.get_free_memory();
|
||||
size_t mtotal = managementSegment.get_size();
|
||||
size_t mused = mtotal - mfree;
|
||||
|
||||
ss << "shm id: " << fShmId
|
||||
<< ", session: " << sessionName
|
||||
<< ", creator id: " << creatorId
|
||||
<< ", devices: " << numDevices
|
||||
<< ", segments:\n";
|
||||
for (const auto& s : segments) {
|
||||
size_t free = boost::apply_visitor(SegmentFreeMemory(), s.second);
|
||||
size_t total = boost::apply_visitor(SegmentSize(), s.second);
|
||||
size_t used = total - free;
|
||||
ss << " [" << s.first
|
||||
<< "]: total: " << total
|
||||
#ifdef FAIRMQ_DEBUG_MODE
|
||||
<< ", msgs: " << ( (msgCounters != nullptr) ? to_string((*msgCounters)[s.first].fCount) : "unknown")
|
||||
#else
|
||||
<< ", msgs: NODEBUG"
|
||||
#endif
|
||||
<< ", free: " << free
|
||||
<< ", used: " << used
|
||||
<< "\n";
|
||||
}
|
||||
ss << " [m]: "
|
||||
<< "total: " << mtotal
|
||||
<< ", free: " << mfree
|
||||
<< ", used: " << mused;
|
||||
LOGV(info, user1) << ss.str();
|
||||
}
|
||||
} catch (bie&) {
|
||||
if (!fMonitor && !fInteractive) {
|
||||
cout << "No segments found." << endl;
|
||||
}
|
||||
fHeartbeatTriggered = false;
|
||||
|
||||
if (fSelfDestruct) {
|
||||
if (fSeenOnce) {
|
||||
// segment has been observed at least once, can self-destruct
|
||||
cout << "self destructing (segment has been observed and cleaned up orderly)" << endl;
|
||||
fTerminating = true;
|
||||
} else {
|
||||
// if self-destruct is requested, and no segment has ever been observed, quit after double timeout duration
|
||||
auto now = chrono::high_resolution_clock::now();
|
||||
unsigned int duration = chrono::duration_cast<chrono::milliseconds>(now - fLastHeartbeat).count();
|
||||
|
||||
if (fMonitor && duration > fTimeoutInMS * 2) {
|
||||
// clean just in case any other artifacts are left.
|
||||
Cleanup(ShmId{fShmId});
|
||||
cout << "self destructing (no segments observed within (timeout * 2) since start)" << endl;
|
||||
fTerminating = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
PrintShm(ShmId{fShmId});
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -8,6 +8,8 @@
|
|||
#ifndef FAIR_MQ_SHMEM_MONITOR_H_
|
||||
#define FAIR_MQ_SHMEM_MONITOR_H_
|
||||
|
||||
#include <fairlogger/Logger.h>
|
||||
|
||||
#include <thread>
|
||||
#include <chrono>
|
||||
#include <atomic>
|
||||
|
@ -82,6 +84,8 @@ class Monitor
|
|||
static std::unordered_map<uint16_t, std::vector<BufferDebugInfo>> GetDebugInfo(const ShmId& shmId);
|
||||
static std::unordered_map<uint16_t, std::vector<BufferDebugInfo>> GetDebugInfo(const SessionId& shmId);
|
||||
|
||||
static bool PrintShm(const ShmId& shmId);
|
||||
|
||||
static bool RemoveObject(const std::string& name);
|
||||
static bool RemoveFileMapping(const std::string& name);
|
||||
static bool RemoveQueue(const std::string& name);
|
||||
|
@ -92,7 +96,8 @@ class Monitor
|
|||
|
||||
private:
|
||||
void PrintHelp();
|
||||
void MonitorHeartbeats();
|
||||
void Watch();
|
||||
void ReceiveHeartbeats();
|
||||
void CheckSegment();
|
||||
void Interactive();
|
||||
void SignalMonitor();
|
||||
|
@ -106,8 +111,6 @@ class Monitor
|
|||
unsigned int fTimeoutInMS;
|
||||
unsigned int fIntervalInMS;
|
||||
std::string fShmId;
|
||||
std::string fSegmentName;
|
||||
std::string fManagementSegmentName;
|
||||
std::string fControlQueueName;
|
||||
std::atomic<bool> fTerminating;
|
||||
std::atomic<bool> fHeartbeatTriggered;
|
||||
|
|
|
@ -69,6 +69,10 @@ static void daemonize()
|
|||
int main(int argc, char** argv)
|
||||
{
|
||||
try {
|
||||
fair::Logger::SetConsoleColor(false);
|
||||
fair::Logger::DefineVerbosity(fair::Verbosity::user1, fair::VerbositySpec::Make(fair::VerbositySpec::Info::timestamp_us));
|
||||
fair::Logger::SetVerbosity(fair::Verbosity::verylow);
|
||||
|
||||
string sessionName;
|
||||
string shmId;
|
||||
bool cleanup = false;
|
||||
|
@ -97,7 +101,7 @@ int main(int argc, char** argv)
|
|||
("monitor,m" , value<bool>(&monitor)->implicit_value(true), "Run in monitoring mode")
|
||||
("debug,b" , value<bool>(&debug)->implicit_value(true), "Debug - Print a list of messages)")
|
||||
("clean-on-exit,e", value<bool>(&cleanOnExit)->implicit_value(true), "Perform cleanup on exit")
|
||||
("interval" , value<unsigned int>(&intervalInMS)->default_value(100), "Output interval for interactive/view-only mode")
|
||||
("interval" , value<unsigned int>(&intervalInMS)->default_value(100), "Output interval for interactive mode")
|
||||
("get-shmid" , value<bool>(&getShmId)->implicit_value(true), "Translate given session id and user id to a shmem id (uses current user id if none provided)")
|
||||
("user-id" , value<int>(&userId)->default_value(-1), "User id (used with --get-shmid)")
|
||||
("help,h", "Print help");
|
||||
|
@ -141,18 +145,22 @@ int main(int argc, char** argv)
|
|||
return 0;
|
||||
}
|
||||
|
||||
cout << "Starting shared memory monitor for session: \"" << sessionName << "\" (shmId: " << shmId << ")..." << endl;
|
||||
|
||||
if (!viewOnly && !interactive && !monitor) {
|
||||
// if neither of the run modes are selected, use view only mode.
|
||||
viewOnly = true;
|
||||
}
|
||||
|
||||
Monitor shmmonitor(shmId, selfDestruct, interactive, viewOnly, timeoutInMS, intervalInMS, monitor, cleanOnExit);
|
||||
|
||||
if (interactive || !viewOnly) {
|
||||
shmmonitor.CatchSignals();
|
||||
if (viewOnly && !interactive) {
|
||||
if (!Monitor::PrintShm(ShmId{shmId})) {
|
||||
cout << "No segments found." << endl;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
cout << "Starting shared memory monitor for session: \"" << sessionName << "\" (shm id: " << shmId << ")..." << endl;
|
||||
|
||||
Monitor shmmonitor(shmId, selfDestruct, interactive, viewOnly, timeoutInMS, intervalInMS, monitor, cleanOnExit);
|
||||
shmmonitor.CatchSignals();
|
||||
shmmonitor.Run();
|
||||
} catch (Monitor::DaemonPresent& dp) {
|
||||
return 0;
|
||||
|
|
Loading…
Reference in New Issue
Block a user