From f05ed326a6dacb15f3afc9c6a1e1e27c24a56ba9 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Wed, 24 May 2017 14:09:37 +0200 Subject: [PATCH] Update FairMQShmMonitor --- fairmq/shmem/FairMQShmDeviceCounter.h | 1 + fairmq/shmem/FairMQShmMonitor.cxx | 191 +++++++++++++++----------- fairmq/shmem/FairMQShmMonitor.h | 13 +- fairmq/shmem/runFairMQShmMonitor.cxx | 58 ++++++-- 4 files changed, 172 insertions(+), 91 deletions(-) diff --git a/fairmq/shmem/FairMQShmDeviceCounter.h b/fairmq/shmem/FairMQShmDeviceCounter.h index e5827544..fa589704 100644 --- a/fairmq/shmem/FairMQShmDeviceCounter.h +++ b/fairmq/shmem/FairMQShmDeviceCounter.h @@ -22,6 +22,7 @@ struct DeviceCounter DeviceCounter(unsigned int c) : count(c) {} + std::atomic count; }; diff --git a/fairmq/shmem/FairMQShmMonitor.cxx b/fairmq/shmem/FairMQShmMonitor.cxx index 0087c835..f12412f6 100644 --- a/fairmq/shmem/FairMQShmMonitor.cxx +++ b/fairmq/shmem/FairMQShmMonitor.cxx @@ -39,8 +39,12 @@ namespace mq namespace shmem { -Monitor::Monitor(const string& segmentName) - : fSegmentName(segmentName) +Monitor::Monitor(const string& segmentName, bool selfDestruct, bool interactive, unsigned int timeoutInMS) + : fSelfDestruct(selfDestruct) + , fInteractive(interactive) + , fSeenOnce(false) + , fTimeoutInMS(timeoutInMS) + , fSegmentName(segmentName) , fTerminating(false) , fHeartbeatTriggered(false) , fLastHeartbeat() @@ -56,36 +60,24 @@ Monitor::Monitor(const string& segmentName) } } -void Monitor::PrintHeader() +void Monitor::Run() { - cout << "| " - << "\033[01;32m" << setw(18) << "name" << "\033[0m" << " | " - << "\033[01;32m" << setw(10) << "size" << "\033[0m" << " | " - << "\033[01;32m" << setw(10) << "free" << "\033[0m" << " | " - // << "\033[01;32m" << setw(15) << "all deallocated" << "\033[0m" << " | " - << "\033[01;32m" << setw(2) << "ok" << "\033[0m" << " | " - // << "\033[01;32m" << setw(10) << "# named" << "\033[0m" << " | " - << "\033[01;32m" << setw(10) << "# devices" << "\033[0m" << " | " - // << "\033[01;32m" << setw(10) << "# unique" << "\033[0m" << " |" - << "\033[01;32m" << setw(10) << "ms since" << "\033[0m" << " |" - << endl; -} + thread heartbeatThread(&Monitor::MonitorHeartbeats, this); -void Monitor::PrintHelp() -{ - cout << "controls: [x] close memory, [p] print queues, [h] help, [q] quit." << endl; -} - -void Monitor::CloseMemory() -{ - if (bipc::shared_memory_object::remove(fSegmentName.c_str())) + if (fInteractive) { - cout << "Successfully removed shared memory \"" << fSegmentName.c_str() << "\"." << endl; + Interactive(); } else { - cout << "Did not remove shared memory. Already removed?" << endl; + while (!fTerminating) + { + this_thread::sleep_for(chrono::milliseconds(100)); + CheckSegment(); + } } + + heartbeatThread.join(); } void Monitor::MonitorHeartbeats() @@ -119,7 +111,7 @@ void Monitor::MonitorHeartbeats() if (bipc::message_queue::remove("fairmq_shmem_control_queue")) { - cout << "successfully removed control queue" << endl; + // cout << "successfully removed control queue" << endl; } else { @@ -127,10 +119,8 @@ void Monitor::MonitorHeartbeats() } } -void Monitor::Run() +void Monitor::Interactive() { - thread heartbeatThread(&Monitor::MonitorHeartbeats, this); - char input; pollfd inputFd[1]; inputFd[0].fd = fileno(stdin); @@ -164,7 +154,7 @@ void Monitor::Run() break; case 'x': cout << "[x] --> closing shared memory:" << endl; - CloseMemory(); + Cleanup(fSegmentName); break; case 'h': cout << "[h] --> help:" << endl << endl; @@ -189,47 +179,53 @@ void Monitor::Run() CheckSegment(); - cout << "\r"; + if (!fTerminating) + { + cout << "\r"; + } } tcgetattr(STDIN_FILENO, &t); // get the current terminal I/O structure t.c_lflag |= ICANON; // re-enable canonical input tcsetattr(STDIN_FILENO, TCSANOW, &t); // apply the new settings - - heartbeatThread.join(); } void Monitor::CheckSegment() { static uint64_t counter = 0; - char c = '#'; - int mod = counter++ % 5; - switch (mod) + + if (fInteractive) { - case 0: - c = '-'; - break; - case 1: - c = '\\'; - break; - case 2: - c = '|'; - break; - case 3: - c = '-'; - break; - case 4: - c = '/'; - break; - default: - break; + int mod = counter++ % 5; + switch (mod) + { + case 0: + c = '-'; + break; + case 1: + c = '\\'; + break; + case 2: + c = '|'; + break; + case 3: + c = '-'; + break; + case 4: + c = '/'; + break; + default: + break; + } } try { bipc::managed_shared_memory segment(bipc::open_only, fSegmentName.c_str()); + fSeenOnce = true; + unsigned int numDevices = 0; pair result = segment.find(bipc::unique_instance); @@ -241,39 +237,58 @@ void Monitor::CheckSegment() auto now = chrono::high_resolution_clock::now(); unsigned int duration = chrono::duration_cast(now - fLastHeartbeat).count(); - if (fHeartbeatTriggered && duration > 5000) + if (fHeartbeatTriggered && duration > fTimeoutInMS) { - cout << "no heartbeats since over 5 seconds, cleaning..." << endl; - CloseMemory(); + cout << "no heartbeats since over " << fTimeoutInMS << " milliseconds, cleaning..." << endl; + Cleanup(fSegmentName); fHeartbeatTriggered = false; + if (fSelfDestruct) + { + cout << "self destructing" << endl; + fTerminating = true; + } } - cout << "| " - << setw(18) << fSegmentName << " | " - << setw(10) << segment.get_size() << " | " - << setw(10) << segment.get_free_memory() << " | " - // << setw(15) << segment.all_memory_deallocated() << " | " - << setw(2) << segment.check_sanity() << " | " - // << setw(10) << segment.get_num_named_objects() << " | " - << setw(10) << numDevices << " | " - // << setw(10) << segment.get_num_unique_objects() << " |" - << setw(10) << duration << " |" - << c - << flush; + if (fInteractive) + { + cout << "| " + << setw(18) << fSegmentName << " | " + << setw(10) << segment.get_size() << " | " + << setw(10) << segment.get_free_memory() << " | " + // << setw(15) << segment.all_memory_deallocated() << " | " + << setw(2) << segment.check_sanity() << " | " + // << setw(10) << segment.get_num_named_objects() << " | " + << setw(10) << numDevices << " | " + // << setw(10) << segment.get_num_unique_objects() << " |" + << setw(10) << duration << " |" + << c + << flush; + } } catch (bipc::interprocess_exception& ie) { fHeartbeatTriggered = false; - cout << "| " - << setw(18) << "-" << " | " - << setw(10) << "-" << " | " - << setw(10) << "-" << " | " - // << setw(15) << "-" << " | " - << setw(2) << "-" << " | " - << setw(10) << "-" << " | " - << setw(10) << "-" << " |" - << c - << flush; + if (fInteractive) + { + cout << "| " + << setw(18) << "-" << " | " + << setw(10) << "-" << " | " + << setw(10) << "-" << " | " + // << setw(15) << "-" << " | " + << setw(2) << "-" << " | " + << setw(10) << "-" << " | " + << setw(10) << "-" << " |" + << c + << flush; + } + if (fSelfDestruct) + { + if (fSeenOnce) + { + cout << "self destructing" << endl; + fTerminating = true; + } + } } } @@ -333,6 +348,26 @@ void Monitor::PrintQueues() cout << endl; } +void Monitor::PrintHeader() +{ + cout << "| " + << "\033[01;32m" << setw(18) << "name" << "\033[0m" << " | " + << "\033[01;32m" << setw(10) << "size" << "\033[0m" << " | " + << "\033[01;32m" << setw(10) << "free" << "\033[0m" << " | " + // << "\033[01;32m" << setw(15) << "all deallocated" << "\033[0m" << " | " + << "\033[01;32m" << setw(2) << "ok" << "\033[0m" << " | " + // << "\033[01;32m" << setw(10) << "# named" << "\033[0m" << " | " + << "\033[01;32m" << setw(10) << "# devices" << "\033[0m" << " | " + // << "\033[01;32m" << setw(10) << "# unique" << "\033[0m" << " |" + << "\033[01;32m" << setw(10) << "ms since" << "\033[0m" << " |" + << endl; +} + +void Monitor::PrintHelp() +{ + cout << "controls: [x] close memory, [p] print queues, [h] help, [q] quit." << endl; +} + } // namespace shmem } // namespace mq } // namespace fair diff --git a/fairmq/shmem/FairMQShmMonitor.h b/fairmq/shmem/FairMQShmMonitor.h index cc89153c..de5c273e 100644 --- a/fairmq/shmem/FairMQShmMonitor.h +++ b/fairmq/shmem/FairMQShmMonitor.h @@ -23,7 +23,8 @@ namespace shmem class Monitor { public: - Monitor(const std::string& segmentName); + Monitor(const std::string& segmentName, bool selfDestruct, bool interactive, unsigned int timeoutInMS); + Monitor(const Monitor&) = delete; Monitor operator=(const Monitor&) = delete; @@ -31,16 +32,20 @@ class Monitor virtual ~Monitor() {} + static void Cleanup(const std::string& segmentName); + private: void PrintHeader(); void PrintHelp(); void PrintQueues(); - void CloseMemory(); void MonitorHeartbeats(); void CheckSegment(); + void Interactive(); - static void Cleanup(const std::string& segmentName); - + bool fSelfDestruct; // will self-destruct after the memory has been closed + bool fInteractive; // running in interactive mode + bool fSeenOnce; // true is segment has been opened successfully at least once + unsigned int fTimeoutInMS; std::string fSegmentName; std::atomic fTerminating; std::atomic fHeartbeatTriggered; diff --git a/fairmq/shmem/runFairMQShmMonitor.cxx b/fairmq/shmem/runFairMQShmMonitor.cxx index 50d1612c..9d341677 100644 --- a/fairmq/shmem/runFairMQShmMonitor.cxx +++ b/fairmq/shmem/runFairMQShmMonitor.cxx @@ -7,22 +7,62 @@ ********************************************************************************/ #include "FairMQShmMonitor.h" +#include + #include #include +using namespace std; +using namespace boost::program_options; + int main(int argc, char** argv) { - std::string segmentName = "fairmq_shmem_main"; - - if (argc == 2) + try { - segmentName = argv[1]; + string segmentName; + bool cleanup = false; + bool selfDestruct = false; + bool interactive = false; + unsigned int timeoutInMS; + + options_description desc("Options"); + desc.add_options() + ("segment-name", value(&segmentName)->default_value("fairmq_shmem_main"), "Name of the shared memory segment") + ("cleanup", value(&cleanup)->implicit_value(true), "Perform cleanup and quit") + ("self-destruct", value(&selfDestruct)->implicit_value(true), "Quit after first closing of the memory") + ("interactive", value(&interactive)->implicit_value(true), "Interactive run") + ("timeout", value(&timeoutInMS)->default_value(5000), "Heartbeat timeout in milliseconds") + ("help", "Print help"); + + variables_map vm; + store(parse_command_line(argc, argv, desc), vm); + + if (vm.count("help")) + { + cout << "FairMQ Shared Memory Monitor" << endl << desc << endl; + return 0; + } + + notify(vm); + + if (cleanup) + { + cout << "Cleaning up \"" << segmentName << "\"..." << endl; + fair::mq::shmem::Monitor::Cleanup(segmentName); + return 0; + } + + cout << "Starting monitor for shared memory segment: \"" << segmentName << "\"..." << endl; + + fair::mq::shmem::Monitor monitor{segmentName, selfDestruct, interactive, timeoutInMS}; + + monitor.Run(); + } + catch (exception& e) + { + cerr << "Unhandled Exception reached the top of main: " << e.what() << ", application will now exit" << endl; + return 2; } - std::cout << "Looking for shared memory segment \"" << segmentName << "\"..." << std::endl; - - fair::mq::shmem::Monitor monitor{segmentName}; - - monitor.Run(); return 0; }