From 539e5602a600b6f834caf74400e23dadb5e71c23 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Fri, 19 Jun 2020 13:31:52 +0200 Subject: [PATCH] Expose fair::mq::shmem::Monitor::Cleanup() API --- fairmq/CMakeLists.txt | 2 + fairmq/shmem/Manager.h | 30 ++------------- fairmq/shmem/Monitor.cxx | 73 ++++++++++++++++++++++++------------- fairmq/shmem/Monitor.h | 29 +++++++++++++-- fairmq/shmem/README.md | 27 +++++++++----- fairmq/shmem/Region.h | 8 ++-- fairmq/shmem/runMonitor.cxx | 4 +- 7 files changed, 100 insertions(+), 73 deletions(-) diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index af16d8b6..29c1b534 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -170,6 +170,7 @@ if(BUILD_FAIRMQ) PluginManager.h PluginServices.h runFairMQDevice.h + shmem/Monitor.h ) set(FAIRMQ_PRIVATE_HEADER_FILES @@ -234,6 +235,7 @@ if(BUILD_FAIRMQ) plugins/config/Config.cxx plugins/Control.cxx MemoryResources.cxx + shmem/Monitor.cxx ) if(BUILD_OFI_TRANSPORT) diff --git a/fairmq/shmem/Manager.h b/fairmq/shmem/Manager.h index 40c94647..72703bcd 100644 --- a/fairmq/shmem/Manager.h +++ b/fairmq/shmem/Manager.h @@ -17,6 +17,7 @@ #include "Common.h" #include "Region.h" +#include "Monitor.h" #include #include @@ -57,10 +58,8 @@ class Manager Manager(std::string id, std::string deviceId, size_t size, bool throwOnBadAlloc) : fShmId(std::move(id)) , fDeviceId(std::move(deviceId)) - , fSegmentName("fmq_" + fShmId + "_main") - , fManagementSegmentName("fmq_" + fShmId + "_mng") - , fSegment(boost::interprocess::open_or_create, fSegmentName.c_str(), size) - , fManagementSegment(boost::interprocess::open_or_create, fManagementSegmentName.c_str(), 655360) + , fSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_main").c_str(), size) + , fManagementSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mng").c_str(), 655360) , fShmVoidAlloc(fManagementSegment.get_segment_manager()) , fShmMtx(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mtx").c_str()) , fRegionEventsCV(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_cv").c_str()) @@ -350,22 +349,6 @@ class Manager void IncrementMsgCounter() { ++fMsgCounter; } void DecrementMsgCounter() { --fMsgCounter; } - void RemoveSegments() - { - using namespace boost::interprocess; - if (shared_memory_object::remove(fSegmentName.c_str())) { - LOG(debug) << "Removed '" << fSegmentName << "' segment after the device has stopped."; - } else { - LOG(debug) << "Did not remove " << fSegmentName << " segment after the device stopped. Already removed?"; - } - - if (shared_memory_object::remove(fManagementSegmentName.c_str())) { - LOG(debug) << "Removed '" << fManagementSegmentName << "' segment after the device has stopped."; - } else { - LOG(debug) << "Did not remove '" << fManagementSegmentName << "' segment after the device stopped. Already removed?"; - } - } - void SendHeartbeats() { std::string controlQueueName("fmq_" + fShmId + "_cq"); @@ -411,8 +394,6 @@ class Manager if (fDeviceCounter->fCount == 0) { LOG(debug) << "Last segment user, removing segment."; - - RemoveSegments(); lastRemoved = true; } else { LOG(debug) << "Other segment users present (" << fDeviceCounter->fCount << "), skipping removal."; @@ -422,16 +403,13 @@ class Manager } if (lastRemoved) { - named_mutex::remove(std::string("fmq_" + fShmId + "_mtx").c_str()); - named_condition::remove(std::string("fmq_" + fShmId + "_cv").c_str()); + Monitor::Cleanup(ShmId{fShmId}); } } private: std::string fShmId; std::string fDeviceId; - std::string fSegmentName; - std::string fManagementSegmentName; boost::interprocess::managed_shared_memory fSegment; boost::interprocess::managed_shared_memory fManagementSegment; VoidAlloc fShmVoidAlloc; diff --git a/fairmq/shmem/Monitor.cxx b/fairmq/shmem/Monitor.cxx index 4f3b6cc5..b3a58b05 100644 --- a/fairmq/shmem/Monitor.cxx +++ b/fairmq/shmem/Monitor.cxx @@ -64,7 +64,6 @@ Monitor::Monitor(const string& shmId, bool selfDestruct, bool interactive, bool , fHeartbeatTriggered(false) , fLastHeartbeat(chrono::high_resolution_clock::now()) , fSignalThread() - , fManagementSegment(bipc::open_or_create, fManagementSegmentName.c_str(), 65536) , fDeviceHeartbeats() { if (!fViewOnly) { @@ -203,7 +202,7 @@ void Monitor::Interactive() case 'x': cout << "\n[x] --> closing shared memory:" << endl; if (!fViewOnly) { - Cleanup(fShmId); + Cleanup(ShmId{fShmId}); } else { cout << "cannot close because in view only mode" << endl; } @@ -288,7 +287,7 @@ void Monitor::CheckSegment() if (fHeartbeatTriggered && duration > fTimeoutInMS) { cout << "no heartbeats since over " << fTimeoutInMS << " milliseconds, cleaning..." << endl; - Cleanup(fShmId); + Cleanup(ShmId{fShmId}); fHeartbeatTriggered = false; if (fSelfDestruct) { cout << "\nself destructing" << endl; @@ -321,7 +320,7 @@ void Monitor::CheckSegment() unsigned int duration = chrono::duration_cast(now - fLastHeartbeat).count(); if (fIsDaemon && duration > fTimeoutInMS * 2) { - Cleanup(fShmId); + Cleanup(ShmId{fShmId}); fHeartbeatTriggered = false; if (fSelfDestruct) { cout << "\nself destructing" << endl; @@ -395,51 +394,52 @@ void Monitor::PrintHelp() void Monitor::RemoveObject(const string& name) { if (bipc::shared_memory_object::remove(name.c_str())) { - cout << "Successfully removed \"" << name << "\"." << endl; + cout << "Successfully removed '" << name << "'." << endl; } else { - cout << "Did not remove \"" << name << "\". Already removed?" << endl; + cout << "Did not remove '" << name << "'. Already removed?" << endl; } } void Monitor::RemoveFileMapping(const string& name) { if (bipc::file_mapping::remove(name.c_str())) { - cout << "Successfully removed \"" << name << "\"." << endl; + cout << "Successfully removed '" << name << "'." << endl; } else { - cout << "Did not remove \"" << name << "\". Already removed?" << endl; + cout << "Did not remove '" << name << "'. Already removed?" << endl; } } void Monitor::RemoveQueue(const string& name) { if (bipc::message_queue::remove(name.c_str())) { - cout << "Successfully removed \"" << name << "\"." << endl; + cout << "Successfully removed '" << name << "'." << endl; } else { - cout << "Did not remove \"" << name << "\". Already removed?" << endl; + cout << "Did not remove '" << name << "'. Already removed?" << endl; } } void Monitor::RemoveMutex(const string& name) { if (bipc::named_mutex::remove(name.c_str())) { - cout << "Successfully removed \"" << name << "\"." << endl; + cout << "Successfully removed '" << name << "'." << endl; } else { - cout << "Did not remove \"" << name << "\". Already removed?" << endl; + cout << "Did not remove '" << name << "'. Already removed?" << endl; } } void Monitor::RemoveCondition(const string& name) { if (bipc::named_condition::remove(name.c_str())) { - cout << "Successfully removed \"" << name << "\"." << endl; + cout << "Successfully removed '" << name << "'." << endl; } else { - cout << "Did not remove \"" << name << "\". Already removed?" << endl; + cout << "Did not remove '" << name << "'. Already removed?" << endl; } } -void Monitor::Cleanup(const string& shmId) +void Monitor::Cleanup(const ShmId& shmId) { - string managementSegmentName("fmq_" + shmId + "_mng"); + cout << "Cleaning up for shared memory id '" << shmId.shmId << "'..." << endl; + string managementSegmentName("fmq_" + shmId.shmId + "_mng"); try { bipc::managed_shared_memory managementSegment(bipc::open_only, managementSegmentName.c_str()); RegionCounter* rc = managementSegment.find(bipc::unique_instance).first; @@ -454,20 +454,20 @@ void Monitor::Cleanup(const string& shmId) RegionInfo ri = m->at(i); string path = ri.fPath.c_str(); int flags = ri.fFlags; - cout << "Found RegionInfo with path: '" << path << "', flags: " << flags << "'." << endl; + cout << "Found RegionInfo with path: '" << path << "', flags: " << flags << ", fDestroyed: " << ri.fDestroyed << "." << endl; if (path != "") { - RemoveFileMapping(tools::ToString(path, "fmq_" + shmId + "_rg_" + to_string(i))); + RemoveFileMapping(tools::ToString(path, "fmq_" + shmId.shmId + "_rg_" + to_string(i))); } else { - RemoveObject("fmq_" + shmId + "_rg_" + to_string(i)); + RemoveObject("fmq_" + shmId.shmId + "_rg_" + to_string(i)); } } else { - RemoveObject("fmq_" + shmId + "_rg_" + to_string(i)); + RemoveObject("fmq_" + shmId.shmId + "_rg_" + to_string(i)); } - RemoveQueue(string("fmq_" + shmId + "_rgq_" + to_string(i))); + RemoveQueue(string("fmq_" + shmId.shmId + "_rgq_" + to_string(i))); } } else { - cout << "No region counter found. no regions to cleanup." << endl; + cout << "No region counter found. No regions to cleanup." << endl; } RemoveObject(managementSegmentName.c_str()); @@ -477,20 +477,41 @@ void Monitor::Cleanup(const string& shmId) cout << "Could not locate element in the region map, out of range: " << oor.what() << endl; } - RemoveObject("fmq_" + shmId + "_main"); - RemoveMutex("fmq_" + shmId + "_mtx"); - RemoveCondition("fmq_" + shmId + "_cv"); + RemoveObject("fmq_" + shmId.shmId + "_main"); + RemoveMutex("fmq_" + shmId.shmId + "_mtx"); + RemoveCondition("fmq_" + shmId.shmId + "_cv"); cout << endl; } +void Monitor::Cleanup(const SessionId& sessionId) +{ + ShmId shmId{buildShmIdFromSessionIdAndUserId(sessionId.sessionId)}; + cout << "Cleanup called with session id '" << sessionId.sessionId << "', translating to shared memory id '" << shmId.shmId << "'" << endl; + Cleanup(shmId); +} + +void Monitor::CleanupFull(const ShmId& shmId) +{ + Cleanup(shmId); + RemoveMutex("fmq_" + shmId.shmId + "_ms"); + RemoveQueue("fmq_" + shmId.shmId + "_cq"); +} + +void Monitor::CleanupFull(const SessionId& sessionId) +{ + ShmId shmId{buildShmIdFromSessionIdAndUserId(sessionId.sessionId)}; + cout << "Cleanup called with session id '" << sessionId.sessionId << "', translating to shared memory id '" << shmId.shmId << "'" << endl; + CleanupFull(shmId); +} + Monitor::~Monitor() { if (fSignalThread.joinable()) { fSignalThread.join(); } if (fCleanOnExit) { - Cleanup(fShmId); + Cleanup(ShmId{fShmId}); } if (!fViewOnly) { RemoveMutex("fmq_" + fShmId + "_ms"); diff --git a/fairmq/shmem/Monitor.h b/fairmq/shmem/Monitor.h index 4492668d..d0ee9add 100644 --- a/fairmq/shmem/Monitor.h +++ b/fairmq/shmem/Monitor.h @@ -8,8 +8,6 @@ #ifndef FAIR_MQ_SHMEM_MONITOR_H_ #define FAIR_MQ_SHMEM_MONITOR_H_ -#include - #include #include #include @@ -24,6 +22,18 @@ namespace mq namespace shmem { +struct SessionId +{ + std::string sessionId; + explicit operator std::string() const { return sessionId; } +}; + +struct ShmId +{ + std::string shmId; + explicit operator std::string() const { return shmId; } +}; + class Monitor { public: @@ -37,7 +47,19 @@ class Monitor void CatchSignals(); void Run(); - static void Cleanup(const std::string& shmId); + /// @brief Cleanup all shared memory artifacts created by devices + /// @param shmId shared memory id + static void Cleanup(const ShmId& shmId); + /// @brief Cleanup all shared memory artifacts created by devices + /// @param sessionId session id + static void Cleanup(const SessionId& sessionId); + /// @brief Cleanup all shared memory artifacts created by devices and monitors + /// @param shmId shared memory id + static void CleanupFull(const ShmId& shmId); + /// @brief Cleanup all shared memory artifacts created by devices and monitors + /// @param sessionId session id + static void CleanupFull(const SessionId& sessionId); + static void RemoveObject(const std::string&); static void RemoveFileMapping(const std::string&); static void RemoveQueue(const std::string&); @@ -70,7 +92,6 @@ class Monitor std::atomic fHeartbeatTriggered; std::chrono::high_resolution_clock::time_point fLastHeartbeat; std::thread fSignalThread; - boost::interprocess::managed_shared_memory fManagementSegment; std::unordered_map fDeviceHeartbeats; }; diff --git a/fairmq/shmem/README.md b/fairmq/shmem/README.md index 8e2264b4..bb7cfe09 100644 --- a/fairmq/shmem/README.md +++ b/fairmq/shmem/README.md @@ -6,6 +6,23 @@ The transport manages shared memory via boost::interprocess library. The transfe Devices track and cleanup shared memory on shutdown. For more information on the current shared memory segment and additional cleanup options, see following section. +# Shared Memory objects / files + +FairMQ Shared Memory currently uses the following names to register shared memory on the system: + +| name | info | created by | used by | +| ------------------------- | ---------------------------------------------- | ------------------ | ------------------------------ | +| `fmq__main` | main segment (user data) | one of the devices | devices | +| `fmq__mng` | management segment (management data) | one of the devices | devices | +| `fmq__mtx` | mutex | one of the devices | devices | +| `fmq__cv` | condition variable | one of the devices | devices with unmanaged regions | +| `fmq__rg_` | unmanaged region(s) | one of the devices | devices with unmanaged regions | +| `fmq__rgq_` | unmanaged region queue(s) | one of the devices | devices with unmanaged regions | +| `fmq__ms` | shmmonitor status | shmmonitor | devices, shmmonitor | +| `fmq__cq` | message queue between transport and shmmonitor | shmmonitor | devices, shmmonitor | + +The shmId is generated out of session id and user id. + ## Shared memory monitor The shared memory monitor tool, supplied with the shared memory transport can be used to monitor shared memory use and automatically cleanup shared memory in case of device crashes. @@ -25,13 +42,3 @@ Without the `--self-destruct` option, the monitor will run continuously, moitori Possible further implementation would be to run the monitor with `--self-destruct` with each topology. The Monitor class can also be used independently from the supplied executable (built from `runMonitor.cxx`), allowing integration on any level. For example invoking the monitor could be a functionality that a device offers. - -FairMQ Shared Memory currently uses following names to register shared memory on the system: - -`fmq__main` - main segment name, used for user data (the shmId is generated out of session id and user id). -`fmq__mng` - management segment name, used for storing management data. -`fmq__cq` - message queue for communicating between shm transport and shm monitor (exists independent of above segments). -`fmq__mtx` - boost::interprocess::named_mutex for management purposes (exists independent of above segments). -`fmq__ms` - shmmonitor status used to signal if it is active or not (exists independent of above segments). -`fmq__rg_` - names of unmanaged regions. -`fmq__rgq_` - names of queues for the unmanaged regions. diff --git a/fairmq/shmem/Region.h b/fairmq/shmem/Region.h index 4408029d..c798ab6a 100644 --- a/fairmq/shmem/Region.h +++ b/fairmq/shmem/Region.h @@ -232,11 +232,11 @@ struct Region } if (boost::interprocess::shared_memory_object::remove(fName.c_str())) { - LOG(debug) << "Region " << fName << " destroyed."; + LOG(debug) << "Region '" << fName << "' destroyed."; } if (boost::interprocess::file_mapping::remove(fName.c_str())) { - LOG(debug) << "File mapping " << fName << " destroyed."; + LOG(debug) << "File mapping '" << fName << "' destroyed."; } if (fFile) { @@ -244,14 +244,14 @@ struct Region } if (boost::interprocess::message_queue::remove(fQueueName.c_str())) { - LOG(debug) << "Region queue " << fQueueName << " destroyed."; + LOG(debug) << "Region queue '" << fQueueName << "' destroyed."; } } else { // LOG(debug) << "shmem: region '" << fName << "' is remote, no cleanup necessary."; LOG(debug) << "Region queue '" << fQueueName << "' is remote, no cleanup necessary"; } - LOG(debug) << "Region " << fName << " (" << (fRemote ? "remote" : "local") << ") destructed."; + LOG(debug) << "Region '" << fName << "' (" << (fRemote ? "remote" : "local") << ") destructed."; } bool fRemote; diff --git a/fairmq/shmem/runMonitor.cxx b/fairmq/shmem/runMonitor.cxx index f7302f81..ac8b3dfa 100644 --- a/fairmq/shmem/runMonitor.cxx +++ b/fairmq/shmem/runMonitor.cxx @@ -111,9 +111,7 @@ int main(int argc, char** argv) } if (cleanup) { - cout << "Cleaning up \"" << shmId << "\"..." << endl; - Monitor::Cleanup(shmId); - Monitor::RemoveQueue("fmq_" + shmId + "_cq"); + Monitor::CleanupFull(ShmId{shmId}); return 0; }