shmmonitor: refactor to separate monitoring from output

This commit is contained in:
Alexey Rybalchenko
2021-03-31 11:23:08 +02:00
parent 72175e5757
commit d7e2fbecea
3 changed files with 161 additions and 154 deletions

View File

@@ -10,7 +10,6 @@
#include "Common.h"
#include <fairmq/tools/Strings.h>
#include <fairlogger/Logger.h>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/file_mapping.hpp>
@@ -81,8 +80,6 @@ Monitor::Monitor(const string& shmId, bool selfDestruct, bool interactive, bool
, fTimeoutInMS(timeoutInMS)
, fIntervalInMS(intervalInMS)
, fShmId(shmId)
, fSegmentName("fmq_" + fShmId + "_m_0")
, fManagementSegmentName("fmq_" + fShmId + "_mng")
, fControlQueueName("fmq_" + fShmId + "_cq")
, fTerminating(false)
, fHeartbeatTriggered(false)
@@ -90,18 +87,18 @@ Monitor::Monitor(const string& shmId, bool selfDestruct, bool interactive, bool
, fSignalThread()
, fDeviceHeartbeats()
{
if (fMonitor) {
if (!fViewOnly) {
try {
bipc::named_mutex monitorStatus(bipc::create_only, string("fmq_" + fShmId + "_ms").c_str());
} catch (bie&) {
cout << "fairmq-shmmonitor (in monitoring mode) for shared memory id " << fShmId << " already started or did not not properly exited. Try `fairmq-shmmonitor --cleanup --shmid " << fShmId << "`" << endl;
throw DaemonPresent(tools::ToString("fairmq-shmmonitor (in monitoring mode) for shared memory id ", fShmId, " already started or did not not properly exited."));
if (fInteractive) {
cout << "fairmq-shmmonitor for shm id " << fShmId << " is already running. Try `fairmq-shmmonitor --cleanup --shmid " << fShmId << "`, or run in view-only mode (-v)" << endl;
} else {
cout << "fairmq-shmmonitor for shm id " << fShmId << " is already running. Try `fairmq-shmmonitor --cleanup --shmid " << fShmId << "`" << endl;
}
throw DaemonPresent(tools::ToString("fairmq-shmmonitor (in monitoring mode) for shm id ", fShmId, " is already running."));
}
}
Logger::SetConsoleColor(false);
Logger::DefineVerbosity(Verbosity::user1, VerbositySpec::Make(VerbositySpec::Info::timestamp_us));
Logger::SetVerbosity(Verbosity::verylow);
}
void Monitor::CatchSignals()
@@ -131,18 +128,13 @@ void Monitor::Run()
thread heartbeatThread;
if (!fViewOnly) {
RemoveQueue(fControlQueueName);
heartbeatThread = thread(&Monitor::MonitorHeartbeats, this);
heartbeatThread = thread(&Monitor::ReceiveHeartbeats, this);
}
if (fInteractive) {
Interactive();
} else if (fViewOnly) {
CheckSegment();
} else {
while (!fTerminating) {
this_thread::sleep_for(chrono::milliseconds(500));
CheckSegment();
}
Watch();
}
if (!fViewOnly) {
@@ -150,7 +142,136 @@ void Monitor::Run()
}
}
void Monitor::MonitorHeartbeats()
void Monitor::Watch()
{
while (!fTerminating) {
using namespace boost::interprocess;
try {
managed_shared_memory managementSegment(open_read_only, std::string("fmq_" + fShmId + "_mng").c_str());
fSeenOnce = true;
auto now = chrono::high_resolution_clock::now();
unsigned int duration = chrono::duration_cast<chrono::milliseconds>(now - fLastHeartbeat).count();
if (fHeartbeatTriggered && duration > fTimeoutInMS) {
// memory is present, but no heartbeats since timeout duration
cout << "no heartbeats since over " << fTimeoutInMS << " milliseconds, cleaning..." << endl;
Cleanup(ShmId{fShmId});
fHeartbeatTriggered = false;
if (fSelfDestruct) {
cout << "self destructing (segment has been observed and cleaned up by the monitor)" << endl;
fTerminating = true;
}
}
} catch (bie&) {
fHeartbeatTriggered = false;
if (fSelfDestruct) {
if (fSeenOnce) {
// segment has been observed at least once, can self-destruct
cout << "self destructing (segment has been observed and cleaned up orderly)" << endl;
fTerminating = true;
} else {
// if self-destruct is requested, and no segment has ever been observed, quit after double timeout duration
auto now = chrono::high_resolution_clock::now();
unsigned int duration = chrono::duration_cast<chrono::milliseconds>(now - fLastHeartbeat).count();
if (duration > fTimeoutInMS * 2) {
Cleanup(ShmId{fShmId});
cout << "self destructing (no segments observed within (timeout * 2) since start)" << endl;
fTerminating = true;
}
}
}
}
this_thread::sleep_for(chrono::milliseconds(100));
}
}
bool Monitor::PrintShm(const ShmId& shmId)
{
using namespace boost::interprocess;
try {
managed_shared_memory managementSegment(open_read_only, std::string("fmq_" + shmId.shmId + "_mng").c_str());
VoidAlloc allocInstance(managementSegment.get_segment_manager());
Uint16SegmentInfoHashMap* segmentInfos = managementSegment.find<Uint16SegmentInfoHashMap>(unique_instance).first;
std::unordered_map<uint16_t, boost::variant<RBTreeBestFitSegment, SimpleSeqFitSegment>> segments;
if (!segmentInfos) {
cout << "Found management segment, but cannot locate segment info, something went wrong..." << endl;
return false;
}
for (const auto& s : *segmentInfos) {
if (s.second.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) {
segments.emplace(s.first, RBTreeBestFitSegment(open_read_only, std::string("fmq_" + shmId.shmId + "_m_" + to_string(s.first)).c_str()));
} else {
segments.emplace(s.first, SimpleSeqFitSegment(open_read_only, std::string("fmq_" + shmId.shmId + "_m_" + to_string(s.first)).c_str()));
}
}
unsigned int numDevices = 0;
int creatorId = -1;
std::string sessionName;
DeviceCounter* deviceCounter = managementSegment.find<DeviceCounter>(unique_instance).first;
if (deviceCounter) {
numDevices = deviceCounter->fCount;
}
SessionInfo* sessionInfo = managementSegment.find<SessionInfo>(unique_instance).first;
if (sessionInfo) {
creatorId = sessionInfo->fCreatorId;
sessionName = sessionInfo->fSessionName;
}
#ifdef FAIRMQ_DEBUG_MODE
Uint16MsgCounterHashMap* msgCounters = managementSegment.find<Uint16MsgCounterHashMap>(unique_instance).first;
#endif
stringstream ss;
size_t mfree = managementSegment.get_free_memory();
size_t mtotal = managementSegment.get_size();
size_t mused = mtotal - mfree;
ss << "shm id: " << shmId.shmId
<< ", session: " << sessionName
<< ", creator id: " << creatorId
<< ", devices: " << numDevices
<< ", segments:\n";
for (const auto& s : segments) {
size_t free = boost::apply_visitor(SegmentFreeMemory(), s.second);
size_t total = boost::apply_visitor(SegmentSize(), s.second);
size_t used = total - free;
ss << " [" << s.first
<< "]: total: " << total
#ifdef FAIRMQ_DEBUG_MODE
<< ", msgs: " << ( (msgCounters != nullptr) ? to_string((*msgCounters)[s.first].fCount) : "unknown")
#else
<< ", msgs: NODEBUG"
#endif
<< ", free: " << free
<< ", used: " << used
<< "\n";
}
ss << " [m]: "
<< "total: " << mtotal
<< ", free: " << mfree
<< ", used: " << mused;
LOGV(info, user1) << ss.str();
} catch (bie&) {
return false;
}
return true;
}
void Monitor::ReceiveHeartbeats()
{
try {
bipc::message_queue mq(bipc::open_or_create, fControlQueueName.c_str(), 1000, 256);
@@ -235,132 +356,7 @@ void Monitor::Interactive()
break;
}
CheckSegment();
if (!fTerminating) {
cout << "\r";
}
}
}
void Monitor::CheckSegment()
{
using namespace boost::interprocess;
try {
managed_shared_memory managementSegment(open_read_only, fManagementSegmentName.c_str());
VoidAlloc allocInstance(managementSegment.get_segment_manager());
Uint16SegmentInfoHashMap* segmentInfos = managementSegment.find<Uint16SegmentInfoHashMap>(unique_instance).first;
std::unordered_map<uint16_t, boost::variant<RBTreeBestFitSegment, SimpleSeqFitSegment>> segments;
if (!segmentInfos) {
cout << "Found management segment, but cannot locate segment info, something went wrong..." << endl;
return;
}
for (const auto& s : *segmentInfos) {
if (s.second.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) {
segments.emplace(s.first, RBTreeBestFitSegment(open_read_only, std::string("fmq_" + fShmId + "_m_" + to_string(s.first)).c_str()));
} else {
segments.emplace(s.first, SimpleSeqFitSegment(open_read_only, std::string("fmq_" + fShmId + "_m_" + to_string(s.first)).c_str()));
}
}
fSeenOnce = true;
unsigned int numDevices = 0;
int creatorId = -1;
std::string sessionName;
#ifdef FAIRMQ_DEBUG_MODE
Uint16MsgCounterHashMap* msgCounters = nullptr;
#endif
if (fInteractive || fViewOnly) {
DeviceCounter* dc = managementSegment.find<DeviceCounter>(unique_instance).first;
if (dc) {
numDevices = dc->fCount;
}
SessionInfo* si = managementSegment.find<SessionInfo>(unique_instance).first;
if (si) {
creatorId = si->fCreatorId;
sessionName = si->fSessionName;
}
#ifdef FAIRMQ_DEBUG_MODE
msgCounters = managementSegment.find<Uint16MsgCounterHashMap>(unique_instance).first;
#endif
}
auto now = chrono::high_resolution_clock::now();
unsigned int duration = chrono::duration_cast<chrono::milliseconds>(now - fLastHeartbeat).count();
if (fHeartbeatTriggered && duration > fTimeoutInMS) {
// memory is present, but no heartbeats since timeout duration
cout << "no heartbeats since over " << fTimeoutInMS << " milliseconds, cleaning..." << endl;
Cleanup(ShmId{fShmId});
fHeartbeatTriggered = false;
if (fSelfDestruct) {
cout << "self destructing (segment has been observed and cleaned up by the monitor)" << endl;
fTerminating = true;
}
}
if (fInteractive || fViewOnly) {
stringstream ss;
size_t mfree = managementSegment.get_free_memory();
size_t mtotal = managementSegment.get_size();
size_t mused = mtotal - mfree;
ss << "shm id: " << fShmId
<< ", session: " << sessionName
<< ", creator id: " << creatorId
<< ", devices: " << numDevices
<< ", segments:\n";
for (const auto& s : segments) {
size_t free = boost::apply_visitor(SegmentFreeMemory(), s.second);
size_t total = boost::apply_visitor(SegmentSize(), s.second);
size_t used = total - free;
ss << " [" << s.first
<< "]: total: " << total
#ifdef FAIRMQ_DEBUG_MODE
<< ", msgs: " << ( (msgCounters != nullptr) ? to_string((*msgCounters)[s.first].fCount) : "unknown")
#else
<< ", msgs: NODEBUG"
#endif
<< ", free: " << free
<< ", used: " << used
<< "\n";
}
ss << " [m]: "
<< "total: " << mtotal
<< ", free: " << mfree
<< ", used: " << mused;
LOGV(info, user1) << ss.str();
}
} catch (bie&) {
if (!fMonitor && !fInteractive) {
cout << "No segments found." << endl;
}
fHeartbeatTriggered = false;
if (fSelfDestruct) {
if (fSeenOnce) {
// segment has been observed at least once, can self-destruct
cout << "self destructing (segment has been observed and cleaned up orderly)" << endl;
fTerminating = true;
} else {
// if self-destruct is requested, and no segment has ever been observed, quit after double timeout duration
auto now = chrono::high_resolution_clock::now();
unsigned int duration = chrono::duration_cast<chrono::milliseconds>(now - fLastHeartbeat).count();
if (fMonitor && duration > fTimeoutInMS * 2) {
// clean just in case any other artifacts are left.
Cleanup(ShmId{fShmId});
cout << "self destructing (no segments observed within (timeout * 2) since start)" << endl;
fTerminating = true;
}
}
}
PrintShm(ShmId{fShmId});
}
}