From 13678f9f6db8a32539690919547e010572ca838c Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Wed, 31 Jan 2018 14:48:55 +0100 Subject: [PATCH] monitor update --- fairmq/FairMQDevice.cxx | 4 +- fairmq/options/FairMQProgOptions.cxx | 1 + fairmq/shmem/FairMQTransportFactorySHM.cxx | 91 ++++++++++++---------- fairmq/shmem/FairMQTransportFactorySHM.h | 1 + fairmq/shmem/Monitor.cxx | 71 ++++++++++++----- fairmq/shmem/Monitor.h | 6 +- fairmq/shmem/runMonitor.cxx | 71 +++++++++++++++-- 7 files changed, 174 insertions(+), 71 deletions(-) diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index 59e17206..8131c5e9 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -892,13 +892,13 @@ void FairMQDevice::SetConfig(FairMQProgOptions& config) LOG(warn) << "did not insert channel '" << c.first << "', it is already in the device."; } } - fDefaultTransport = config.GetValue("transport"); - SetTransport(fDefaultTransport); fId = config.GetValue("id"); fNetworkInterface = config.GetValue("network-interface"); fNumIoThreads = config.GetValue("io-threads"); fInitializationTimeoutInS = config.GetValue("initialization-timeout"); fRate = fConfig->GetValue("rate"); + fDefaultTransport = config.GetValue("transport"); + SetTransport(fDefaultTransport); } void FairMQDevice::LogSocketRates() diff --git a/fairmq/options/FairMQProgOptions.cxx b/fairmq/options/FairMQProgOptions.cxx index 9e12adb7..e2b3a161 100644 --- a/fairmq/options/FairMQProgOptions.cxx +++ b/fairmq/options/FairMQProgOptions.cxx @@ -240,6 +240,7 @@ void FairMQProgOptions::InitOptionDescription() ("port-range-max", po::value()->default_value(32000), "End of the port range for dynamic initialization.") ("print-channels", po::value()->implicit_value(true), "Print registered channel endpoints in a machine-readable format (::)") ("shm-segment-size", po::value()->default_value(2000000000), "Shared memory: size of the shared memory segment (in bytes).") + ("shm-monitor", po::value()->default_value(false), "Shared memory: run monitor daemon.") ("rate", po::value()->default_value(0.), "Rate for conditional run loop (Hz).") ("session", po::value()->default_value("default"), "Session name.") ; diff --git a/fairmq/shmem/FairMQTransportFactorySHM.cxx b/fairmq/shmem/FairMQTransportFactorySHM.cxx index cec0bf30..fa755659 100644 --- a/fairmq/shmem/FairMQTransportFactorySHM.cxx +++ b/fairmq/shmem/FairMQTransportFactorySHM.cxx @@ -12,6 +12,7 @@ #include #include +#include #include #include @@ -35,6 +36,7 @@ FairMQ::Transport FairMQTransportFactorySHM::fTransportType = FairMQ::Transport: FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const FairMQProgOptions* config) : FairMQTransportFactory(id) + , fDeviceId(id) , fSessionName("default") , fContext(nullptr) , fHeartbeatThread() @@ -57,11 +59,13 @@ FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const Fai int numIoThreads = 1; size_t segmentSize = 2000000000; + bool autolaunchMonitor = false; if (config) { numIoThreads = config->GetValue("io-threads"); fSessionName = config->GetValue("session"); segmentSize = config->GetValue("shm-segment-size"); + autolaunchMonitor = config->GetValue("shm-monitor"); } else { @@ -106,24 +110,27 @@ FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const Fai } // start shm monitor - // try - // { - // MonitorStatus* monitorStatus = fManagementSegment.find(bipc::unique_instance).first; - // if (monitorStatus == nullptr) - // { - // LOG(debug) << "no shmmonitor found, starting..."; - // StartMonitor(); - // } - // else - // { - // LOG(debug) << "found shmmonitor."; - // } - // } - // catch (std::exception& e) - // { - // LOG(error) << "Exception during shmmonitor initialization: " << e.what() << ", application will now exit"; - // exit(EXIT_FAILURE); - // } + if (autolaunchMonitor) + { + try + { + MonitorStatus* monitorStatus = fManager->ManagementSegment().find(bipc::unique_instance).first; + if (monitorStatus == nullptr) + { + LOG(debug) << "no shmmonitor found, starting..."; + StartMonitor(); + } + else + { + LOG(debug) << "found shmmonitor."; + } + } + catch (std::exception& e) + { + LOG(error) << "Exception during shmmonitor initialization: " << e.what() << ", application will now exit"; + exit(EXIT_FAILURE); + } + } } } catch(bipc::interprocess_exception& e) @@ -140,35 +147,38 @@ void FairMQTransportFactorySHM::StartMonitor() { int numTries = 0; - if (!bfs::exists(bfs::path("shmmonitor"))) - { - LOG(error) << "Could not find shmmonitor. Is it in the PATH? Monitor not started"; - return; - } + auto env = boost::this_process::environment(); - // TODO: replace with Boost.Process once boost 1.64 is available - int r = system("shmmonitor --self-destruct &"); - LOG(debug) << r; + boost::filesystem::path p = boost::process::search_path("shmmonitor"); - do + if (!p.empty()) { - MonitorStatus* monitorStatus = fManager->ManagementSegment().find(bipc::unique_instance).first; - if (monitorStatus) + boost::process::spawn(p, "-x", "-s", fSessionName, "-d", "-t", "2000", env); + + do { - LOG(debug) << "shmmonitor started"; - break; - } - else - { - this_thread::sleep_for(std::chrono::milliseconds(10)); - if (++numTries > 100) + MonitorStatus* monitorStatus = fManager->ManagementSegment().find(bipc::unique_instance).first; + if (monitorStatus) { - LOG(error) << "Did not get response from shmmonitor after " << 10 * 100 << " milliseconds. Exiting."; - exit(EXIT_FAILURE); + LOG(debug) << "shmmonitor started"; + break; + } + else + { + this_thread::sleep_for(std::chrono::milliseconds(10)); + if (++numTries > 1000) + { + LOG(error) << "Did not get response from shmmonitor after " << 10 * 1000 << " milliseconds. Exiting."; + exit(EXIT_FAILURE); + } } } + while (true); + } + else + { + LOG(WARN) << "could not find shmmonitor in the path"; } - while (true); } void FairMQTransportFactorySHM::SendHeartbeats() @@ -179,9 +189,8 @@ void FairMQTransportFactorySHM::SendHeartbeats() try { bipc::message_queue mq(bipc::open_only, controlQueueName.c_str()); - bool heartbeat = true; bpt::ptime sndTill = bpt::microsec_clock::universal_time() + bpt::milliseconds(100); - if (mq.timed_send(&heartbeat, sizeof(heartbeat), 0, sndTill)) + if (mq.timed_send(fDeviceId.c_str(), fDeviceId.size(), 0, sndTill)) { this_thread::sleep_for(chrono::milliseconds(100)); } diff --git a/fairmq/shmem/FairMQTransportFactorySHM.h b/fairmq/shmem/FairMQTransportFactorySHM.h index b36cc319..00ddeb7b 100644 --- a/fairmq/shmem/FairMQTransportFactorySHM.h +++ b/fairmq/shmem/FairMQTransportFactorySHM.h @@ -56,6 +56,7 @@ class FairMQTransportFactorySHM : public FairMQTransportFactory void StartMonitor(); static FairMQ::Transport fTransportType; + std::string fDeviceId; std::string fSessionName; void* fContext; std::thread fHeartbeatThread; diff --git a/fairmq/shmem/Monitor.cxx b/fairmq/shmem/Monitor.cxx index 52e02b89..a2b7f5c8 100644 --- a/fairmq/shmem/Monitor.cxx +++ b/fairmq/shmem/Monitor.cxx @@ -30,13 +30,13 @@ namespace bipc = boost::interprocess; namespace bpt = boost::posix_time; using CharAllocator = bipc::allocator; -using String = bipc::basic_string, CharAllocator>; +using String = bipc::basic_string, CharAllocator>; using StringAllocator = bipc::allocator; using StringVector = bipc::vector; namespace { - volatile std::sig_atomic_t gSignalStatus = 0; + volatile sig_atomic_t gSignalStatus = 0; } namespace fair @@ -51,10 +51,12 @@ void signalHandler(int signal) gSignalStatus = signal; } -Monitor::Monitor(const string& sessionName, bool selfDestruct, bool interactive, unsigned int timeoutInMS) +Monitor::Monitor(const string& sessionName, bool selfDestruct, bool interactive, unsigned int timeoutInMS, bool runAsDaemon, bool cleanOnExit) : fSelfDestruct(selfDestruct) , fInteractive(interactive) , fSeenOnce(false) + , fIsDaemon(runAsDaemon) + , fCleanOnExit(cleanOnExit) , fTimeoutInMS(timeoutInMS) , fSessionName(sessionName) , fSegmentName("fmq_shm_" + fSessionName + "_main") @@ -62,9 +64,10 @@ Monitor::Monitor(const string& sessionName, bool selfDestruct, bool interactive, , fControlQueueName("fmq_shm_" + fSessionName + "_control_queue") , fTerminating(false) , fHeartbeatTriggered(false) - , fLastHeartbeat() + , fLastHeartbeat(chrono::high_resolution_clock::now()) , fSignalThread() , fManagementSegment(bipc::open_or_create, fManagementSegmentName.c_str(), 65536) + , fDeviceHeartbeats() { MonitorStatus* monitorStatus = fManagementSegment.find(bipc::unique_instance).first; if (monitorStatus != nullptr) @@ -127,19 +130,21 @@ void Monitor::MonitorHeartbeats() { try { - bipc::message_queue mq(bipc::open_or_create, fControlQueueName.c_str(), 1000, sizeof(bool)); + bipc::message_queue mq(bipc::open_or_create, fControlQueueName.c_str(), 1000, 256); unsigned int priority; bipc::message_queue::size_type recvdSize; + char msg[256] = {0}; while (!fTerminating) { - bool heartbeat; bpt::ptime rcvTill = bpt::microsec_clock::universal_time() + bpt::milliseconds(100); - if (mq.timed_receive(&heartbeat, sizeof(heartbeat), recvdSize, priority, rcvTill)) + if (mq.timed_receive(&msg, sizeof(msg), recvdSize, priority, rcvTill)) { fHeartbeatTriggered = true; fLastHeartbeat = chrono::high_resolution_clock::now(); + string deviceId(msg, recvdSize); + fDeviceHeartbeats[deviceId] = fLastHeartbeat; } else { @@ -186,27 +191,27 @@ void Monitor::Interactive() switch (c) { case 'q': - cout << "[q] --> quitting." << endl; + cout << "\n[q] --> quitting." << endl; fTerminating = true; break; case 'p': - cout << "[p] --> active queues:" << endl; + cout << "\n[p] --> active queues:" << endl; PrintQueues(); break; case 'x': - cout << "[x] --> closing shared memory:" << endl; + cout << "\n[x] --> closing shared memory:" << endl; Cleanup(fSessionName); break; case 'h': - cout << "[h] --> help:" << endl << endl; + cout << "\n[h] --> help:" << endl << endl; PrintHelp(); cout << endl; break; case '\n': - cout << "[\\n] --> invalid input." << endl; + cout << "\n[\\n] --> invalid input." << endl; break; default: - cout << "[" << c << "] --> invalid input." << endl; + cout << "\n[" << c << "] --> invalid input." << endl; break; } @@ -290,7 +295,7 @@ void Monitor::CheckSegment() fHeartbeatTriggered = false; if (fSelfDestruct) { - cout << "self destructing" << endl; + cout << "\nself destructing" << endl; fTerminating = true; } } @@ -327,6 +332,21 @@ void Monitor::CheckSegment() << c << flush; } + + auto now = chrono::high_resolution_clock::now(); + unsigned int duration = chrono::duration_cast(now - fLastHeartbeat).count(); + + if (fIsDaemon && duration > fTimeoutInMS * 2) + { + Cleanup(fSessionName); + fHeartbeatTriggered = false; + if (fSelfDestruct) + { + cout << "\nself destructing" << endl; + fTerminating = true; + } + } + if (fSelfDestruct) { if (fSeenOnce) @@ -352,7 +372,7 @@ void Monitor::Cleanup(const string& sessionName) for (unsigned int i = 1; i <= regionCount; ++i) { RemoveObject("fmq_shm_" + sessionName + "_region_" + to_string(i)); - RemoveQueue(std::string("fmq_shm_" + sessionName + "_region_queue_" + std::to_string(i))); + RemoveQueue(string("fmq_shm_" + sessionName + "_region_queue_" + to_string(i))); } } else @@ -369,12 +389,12 @@ void Monitor::Cleanup(const string& sessionName) RemoveObject("fmq_shm_" + sessionName + "_main"); - boost::interprocess::named_mutex::remove(std::string("fmq_shm_" + sessionName + "_mutex").c_str()); + boost::interprocess::named_mutex::remove(string("fmq_shm_" + sessionName + "_mutex").c_str()); cout << endl; } -void Monitor::RemoveObject(const std::string& name) +void Monitor::RemoveObject(const string& name) { if (bipc::shared_memory_object::remove(name.c_str())) { @@ -386,7 +406,7 @@ void Monitor::RemoveObject(const std::string& name) } } -void Monitor::RemoveQueue(const std::string& name) +void Monitor::RemoveQueue(const string& name) { if (bipc::message_queue::remove(name.c_str())) { @@ -405,7 +425,7 @@ void Monitor::PrintQueues() try { bipc::managed_shared_memory segment(bipc::open_only, fSegmentName.c_str()); - StringVector* queues = segment.find(std::string("fmq_shm_" + fSessionName + "_queues").c_str()).first; + StringVector* queues = segment.find(string("fmq_shm_" + fSessionName + "_queues").c_str()).first; if (queues) { cout << "found " << queues->size() << " queue(s):" << endl; @@ -434,11 +454,18 @@ void Monitor::PrintQueues() { cout << "\tno queues found" << endl; } - catch (std::out_of_range& ie) + catch (out_of_range& ie) { cout << "\tno queues found" << endl; } + cout << "\n --> last heartbeats: " << endl << endl; + auto now = chrono::high_resolution_clock::now(); + for (const auto& h : fDeviceHeartbeats) + { + cout << "\t" << h.first << " : " << chrono::duration(now - h.second).count() << "ms ago." << endl; + } + cout << endl; } @@ -469,6 +496,10 @@ Monitor::~Monitor() { fSignalThread.join(); } + if (fCleanOnExit) + { + Cleanup(fSessionName); + } } } // namespace shmem diff --git a/fairmq/shmem/Monitor.h b/fairmq/shmem/Monitor.h index 4d8c1cf3..d0d0527b 100644 --- a/fairmq/shmem/Monitor.h +++ b/fairmq/shmem/Monitor.h @@ -14,6 +14,7 @@ #include #include #include +#include namespace fair { @@ -25,7 +26,7 @@ namespace shmem class Monitor { public: - Monitor(const std::string& sessionName, bool selfDestruct, bool interactive, unsigned int timeoutInMS); + Monitor(const std::string& sessionName, bool selfDestruct, bool interactive, unsigned int timeoutInMS, bool runAsDaemon, bool cleanOnExit); Monitor(const Monitor&) = delete; Monitor operator=(const Monitor&) = delete; @@ -51,6 +52,8 @@ class Monitor bool fSelfDestruct; // will self-destruct after the memory has been closed bool fInteractive; // running in interactive mode bool fSeenOnce; // true is segment has been opened successfully at least once + bool fIsDaemon; + bool fCleanOnExit; unsigned int fTimeoutInMS; std::string fSessionName; std::string fSegmentName; @@ -61,6 +64,7 @@ class Monitor std::chrono::high_resolution_clock::time_point fLastHeartbeat; std::thread fSignalThread; boost::interprocess::managed_shared_memory fManagementSegment; + std::unordered_map fDeviceHeartbeats; }; } // namespace shmem diff --git a/fairmq/shmem/runMonitor.cxx b/fairmq/shmem/runMonitor.cxx index c3f92610..563b5485 100644 --- a/fairmq/shmem/runMonitor.cxx +++ b/fairmq/shmem/runMonitor.cxx @@ -9,12 +9,60 @@ #include +#include +#include +#include +#include +#include + #include #include using namespace std; using namespace boost::program_options; +static void daemonize() +{ + // already a daemon? + // if (getppid() == 1) return; + + // Fork off the parent process + // pid_t pid = fork(); + // if (pid < 0) exit(1); + + // If we got a good PID, then we can exit the parent process. + // if (pid > 0) exit(0); + + // Change the file mode mask + umask(0); + + // Create a new SID for the child process + if (setsid() < 0) + { + exit(1); + } + + // Change the current working directory. This prevents the current directory from being locked; hence not being able to remove it. + if ((chdir("/")) < 0) + { + exit(1); + } + + // Redirect standard files to /dev/null + if (!freopen("/dev/null", "r", stdin)) + { + cout << "could not redirect stdin to /dev/null" << endl; + } + if (!freopen("/dev/null", "w", stdout)) + { + cout << "could not redirect stdout to /dev/null" << endl; + } + if (!freopen("/dev/null", "w", stderr)) + { + cout << "could not redirect stderr to /dev/null" << endl; + } +} + int main(int argc, char** argv) { try @@ -24,15 +72,19 @@ int main(int argc, char** argv) bool selfDestruct = false; bool interactive = false; unsigned int timeoutInMS; + bool runAsDaemon = false; + bool cleanOnExit = false; options_description desc("Options"); desc.add_options() - ("session", value(&sessionName)->default_value("default"), "Name of the session which to monitor") - ("cleanup", value(&cleanup)->implicit_value(true), "Perform cleanup and quit") - ("self-destruct", value(&selfDestruct)->implicit_value(true), "Quit after first closing of the memory") - ("interactive", value(&interactive)->implicit_value(true), "Interactive run") - ("timeout", value(&timeoutInMS)->default_value(5000), "Heartbeat timeout in milliseconds") - ("help", "Print help"); + ("session,s", value(&sessionName)->default_value("default"), "Name of the session which to monitor") + ("cleanup,c", value(&cleanup)->implicit_value(true), "Perform cleanup and quit") + ("self-destruct,x", value(&selfDestruct)->implicit_value(true), "Quit after first closing of the memory") + ("interactive,i", value(&interactive)->implicit_value(true), "Interactive run") + ("timeout,t", value(&timeoutInMS)->default_value(5000), "Heartbeat timeout in milliseconds") + ("daemonize,d", value(&runAsDaemon)->implicit_value(true), "Daemonize the monitor") + ("clean-on-exit,e", value(&cleanOnExit)->implicit_value(true), "Perform cleanup on exit") + ("help,h", "Print help"); variables_map vm; store(parse_command_line(argc, argv, desc), vm); @@ -47,6 +99,11 @@ int main(int argc, char** argv) sessionName.resize(8, '_'); // shorten the session name, to accommodate for name size limit on some systems (MacOS) + if (runAsDaemon) + { + daemonize(); + } + if (cleanup) { cout << "Cleaning up \"" << sessionName << "\"..." << endl; @@ -57,7 +114,7 @@ int main(int argc, char** argv) cout << "Starting shared memory monitor for session: \"" << sessionName << "\"..." << endl; - fair::mq::shmem::Monitor monitor{sessionName, selfDestruct, interactive, timeoutInMS}; + fair::mq::shmem::Monitor monitor{sessionName, selfDestruct, interactive, timeoutInMS, runAsDaemon, cleanOnExit}; monitor.CatchSignals(); monitor.Run();