diff --git a/fairmq/shmem/Manager.h b/fairmq/shmem/Manager.h index d03f9a09..e7b433ce 100644 --- a/fairmq/shmem/Manager.h +++ b/fairmq/shmem/Manager.h @@ -215,7 +215,7 @@ class Manager boost::filesystem::path p = boost::process::search_path("fairmq-shmmonitor", ownPath); if (!p.empty()) { - boost::process::spawn(p, "-x", "--shmid", id, "-d", "-t", "2000", env); + boost::process::spawn(p, "-x", "-m", "--shmid", id, "-d", "-t", "2000", env); int numTries = 0; do { try { diff --git a/fairmq/shmem/Monitor.cxx b/fairmq/shmem/Monitor.cxx index 72ff845f..5dd319ea 100644 --- a/fairmq/shmem/Monitor.cxx +++ b/fairmq/shmem/Monitor.cxx @@ -71,11 +71,11 @@ void signalHandler(int signal) gSignalStatus = signal; } -Monitor::Monitor(const string& shmId, bool selfDestruct, bool interactive, bool viewOnly, unsigned int timeoutInMS, unsigned int intervalInMS, bool runAsDaemon, bool cleanOnExit) +Monitor::Monitor(const string& shmId, bool selfDestruct, bool interactive, bool viewOnly, unsigned int timeoutInMS, unsigned int intervalInMS, bool monitor, bool cleanOnExit) : fSelfDestruct(selfDestruct) , fInteractive(interactive) , fViewOnly(viewOnly) - , fIsDaemon(runAsDaemon) + , fMonitor(monitor) , fSeenOnce(false) , fCleanOnExit(cleanOnExit) , fTimeoutInMS(timeoutInMS) @@ -90,12 +90,12 @@ Monitor::Monitor(const string& shmId, bool selfDestruct, bool interactive, bool , fSignalThread() , fDeviceHeartbeats() { - if (!fViewOnly) { + if (fMonitor) { try { bipc::named_mutex monitorStatus(bipc::create_only, string("fmq_" + fShmId + "_ms").c_str()); } catch (bie&) { - cout << "fairmq-shmmonitor for shared memory id " << fShmId << " already started or not properly exited. Try `fairmq-shmmonitor --cleanup --shmid " << fShmId << "`" << endl; - throw DaemonPresent(tools::ToString("fairmq-shmmonitor for shared memory id ", fShmId, " already started or not properly exited.")); + cout << "fairmq-shmmonitor (in monitoring mode) for shared memory id " << fShmId << " already started or did not not properly exited. Try `fairmq-shmmonitor --cleanup --shmid " << fShmId << "`" << endl; + throw DaemonPresent(tools::ToString("fairmq-shmmonitor (in monitoring mode) for shared memory id ", fShmId, " already started or did not not properly exited.")); } } @@ -140,7 +140,7 @@ void Monitor::Run() CheckSegment(); } else { while (!fTerminating) { - this_thread::sleep_for(chrono::milliseconds(fIntervalInMS)); + this_thread::sleep_for(chrono::milliseconds(500)); CheckSegment(); } } @@ -288,11 +288,12 @@ void Monitor::CheckSegment() unsigned int duration = chrono::duration_cast(now - fLastHeartbeat).count(); if (fHeartbeatTriggered && duration > fTimeoutInMS) { + // memory is present, but no heartbeats since timeout duration cout << "no heartbeats since over " << fTimeoutInMS << " milliseconds, cleaning..." << endl; Cleanup(ShmId{fShmId}); fHeartbeatTriggered = false; if (fSelfDestruct) { - cout << "\nself destructing" << endl; + cout << "self destructing (segment has been observed and cleaned up by the monitor)" << endl; fTerminating = true; } } @@ -329,22 +330,22 @@ void Monitor::CheckSegment() } catch (bie&) { fHeartbeatTriggered = false; - auto now = chrono::high_resolution_clock::now(); - unsigned int duration = chrono::duration_cast(now - fLastHeartbeat).count(); - - if (fIsDaemon && duration > fTimeoutInMS * 2) { - Cleanup(ShmId{fShmId}); - fHeartbeatTriggered = false; - if (fSelfDestruct) { - cout << "\nself destructing" << endl; - fTerminating = true; - } - } - if (fSelfDestruct) { if (fSeenOnce) { - cout << "self destructing" << endl; + // segment has been observed at least once, can self-destruct + cout << "self destructing (segment has been observed and cleaned up orderly)" << endl; fTerminating = true; + } else { + // if self-destruct is requested, and no segment has ever been observed, quit after double timeout duration + auto now = chrono::high_resolution_clock::now(); + unsigned int duration = chrono::duration_cast(now - fLastHeartbeat).count(); + + if (fMonitor && duration > fTimeoutInMS * 2) { + // clean just in case any other artifacts are left. + Cleanup(ShmId{fShmId}); + cout << "self destructing (no segments observed within (timeout * 2) since start)" << endl; + fTerminating = true; + } } } } diff --git a/fairmq/shmem/Monitor.h b/fairmq/shmem/Monitor.h index 558902cc..434e7dc8 100644 --- a/fairmq/shmem/Monitor.h +++ b/fairmq/shmem/Monitor.h @@ -100,7 +100,7 @@ class Monitor bool fSelfDestruct; // will self-destruct after the memory has been closed bool fInteractive; // running in interactive mode bool fViewOnly; // view only mode - bool fIsDaemon; + bool fMonitor; bool fSeenOnce; // true is segment has been opened successfully at least once bool fCleanOnExit; unsigned int fTimeoutInMS; diff --git a/fairmq/shmem/runMonitor.cxx b/fairmq/shmem/runMonitor.cxx index f3cfdd8d..5f305715 100644 --- a/fairmq/shmem/runMonitor.cxx +++ b/fairmq/shmem/runMonitor.cxx @@ -78,6 +78,7 @@ int main(int argc, char** argv) unsigned int timeoutInMS = 5000; unsigned int intervalInMS = 100; bool runAsDaemon = false; + bool monitor = false; bool debug = false; bool cleanOnExit = false; bool getShmId = false; @@ -93,11 +94,12 @@ int main(int argc, char** argv) ("view,v" , value(&viewOnly)->implicit_value(true), "Run in view only mode") ("timeout,t" , value(&timeoutInMS)->default_value(5000), "Heartbeat timeout in milliseconds") ("daemonize,d" , value(&runAsDaemon)->implicit_value(true), "Daemonize the monitor") + ("monitor,m" , value(&monitor)->implicit_value(true), "Run in monitoring mode") ("debug,b" , value(&debug)->implicit_value(true), "Debug - Print a list of messages)") ("clean-on-exit,e", value(&cleanOnExit)->implicit_value(true), "Perform cleanup on exit") ("interval" , value(&intervalInMS)->default_value(100), "Output interval for interactive/view-only mode") ("get-shmid" , value(&getShmId)->implicit_value(true), "Translate given session id and user id to a shmem id (uses current user id if none provided)") - ("user-id" , value(&userId)->default_value(-1), "User id") + ("user-id" , value(&userId)->default_value(-1), "User id (used with --get-shmid)") ("help,h", "Print help"); variables_map vm; @@ -141,12 +143,17 @@ int main(int argc, char** argv) cout << "Starting shared memory monitor for session: \"" << sessionName << "\" (shmId: " << shmId << ")..." << endl; - Monitor monitor(shmId, selfDestruct, interactive, viewOnly, timeoutInMS, intervalInMS, runAsDaemon, cleanOnExit); + if (!viewOnly && !interactive && !monitor) { + // if neither of the run modes are selected, use view only mode. + viewOnly = true; + } + + Monitor shmmonitor(shmId, selfDestruct, interactive, viewOnly, timeoutInMS, intervalInMS, monitor, cleanOnExit); if (interactive || !viewOnly) { - monitor.CatchSignals(); + shmmonitor.CatchSignals(); } - monitor.Run(); + shmmonitor.Run(); } catch (Monitor::DaemonPresent& dp) { return 0; } catch (exception& e) {