From 1a75141fc4040e10ef8103d8dc70af7b788fe916 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Tue, 1 Feb 2022 18:55:09 +0100 Subject: [PATCH] shm: allow monitor::ResetContent to cleanup after a crash --- fairmq/UnmanagedRegion.h | 1 + fairmq/shmem/Common.h | 2 + fairmq/shmem/Manager.h | 2 +- fairmq/shmem/Monitor.cxx | 93 +++++++++++++++++++++++++--------- fairmq/shmem/Monitor.h | 17 +++++++ fairmq/shmem/Segment.h | 9 ++-- fairmq/shmem/UnmanagedRegion.h | 21 +++++--- 7 files changed, 108 insertions(+), 37 deletions(-) diff --git a/fairmq/UnmanagedRegion.h b/fairmq/UnmanagedRegion.h index f624e64b..04f9fa8b 100644 --- a/fairmq/UnmanagedRegion.h +++ b/fairmq/UnmanagedRegion.h @@ -133,6 +133,7 @@ struct RegionConfig bool removeOnDestruction = true; /// remove the region on object destruction int creationFlags = 0; /// flags passed to the underlying transport on region creation int64_t userFlags = 0; /// custom flags that have no effect on the transport, but can be retrieved from the region by the user + uint64_t size = 0; /// region size std::string path = ""; /// file path, if the region is backed by a file std::optional id = std::nullopt; /// region id uint32_t linger = 100; /// delay in ms before region destruction to collect outstanding events diff --git a/fairmq/shmem/Common.h b/fairmq/shmem/Common.h index 3da2e605..21d44c8f 100644 --- a/fairmq/shmem/Common.h +++ b/fairmq/shmem/Common.h @@ -28,6 +28,8 @@ namespace fair::mq::shmem { +static constexpr uint64_t kManagementSegmentSize = 6553600; + struct SharedMemoryError : std::runtime_error { using std::runtime_error::runtime_error; }; using SimpleSeqFitSegment = boost::interprocess::basic_managed_shared_memoryGetProperty("shmid", makeShmIdUint64(sessionName)) : makeShmIdUint64(sessionName)) , fShmId(makeShmIdStr(fShmId64)) , fSegmentId(config ? config->GetProperty("shm-segment-id", 0) : 0) - , fManagementSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mng").c_str(), 6553600) + , fManagementSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mng").c_str(), kManagementSegmentSize) , fShmVoidAlloc(fManagementSegment.get_segment_manager()) , fShmMtx(fManagementSegment.find_or_construct(boost::interprocess::unique_instance)()) , fNumObservedEvents(0) diff --git a/fairmq/shmem/Monitor.cxx b/fairmq/shmem/Monitor.cxx index 582cf2ec..1896f60e 100644 --- a/fairmq/shmem/Monitor.cxx +++ b/fairmq/shmem/Monitor.cxx @@ -6,9 +6,10 @@ * copied verbatim in the file "LICENSE" * ********************************************************************************/ -#include "Monitor.h" #include "Common.h" -#include "UnmanagedRegion.h" +#include "Monitor.h" +#include "Segment.h" +#include #include #include @@ -415,24 +416,28 @@ void Monitor::PrintDebugInfo(const ShmId& shmId __attribute__((unused))) size_t numMessages = 0; - for (const auto& e : *debug) { - numMessages += e.second.size(); - } - LOG(info) << endl << "found " << numMessages << " messages."; - - for (const auto& s : *debug) { - for (const auto& e : s.second) { - using time_point = chrono::system_clock::time_point; - time_point tmpt{chrono::duration_cast(chrono::nanoseconds(e.second.fCreationTime))}; - time_t t = chrono::system_clock::to_time_t(tmpt); - uint64_t ms = e.second.fCreationTime % 1000000; - auto tm = localtime(&t); - LOG(info) << "segment: " << setw(3) << setfill(' ') << s.first - << ", offset: " << setw(12) << setfill(' ') << e.first - << ", size: " << setw(10) << setfill(' ') << e.second.fSize - << ", creator PID: " << e.second.fPid << setfill('0') - << ", at: " << setw(2) << tm->tm_hour << ":" << setw(2) << tm->tm_min << ":" << setw(2) << tm->tm_sec << "." << setw(6) << ms; + if (debug) { + for (const auto& e : *debug) { + numMessages += e.second.size(); } + LOG(info) << endl << "found " << numMessages << " messages."; + + for (const auto& s : *debug) { + for (const auto& e : s.second) { + using time_point = chrono::system_clock::time_point; + time_point tmpt{chrono::duration_cast(chrono::nanoseconds(e.second.fCreationTime))}; + time_t t = chrono::system_clock::to_time_t(tmpt); + uint64_t ms = e.second.fCreationTime % 1000000; + auto tm = localtime(&t); + LOG(info) << "segment: " << setw(3) << setfill(' ') << s.first + << ", offset: " << setw(12) << setfill(' ') << e.first + << ", size: " << setw(10) << setfill(' ') << e.second.fSize + << ", creator PID: " << e.second.fPid << setfill('0') + << ", at: " << setw(2) << tm->tm_hour << ":" << setw(2) << tm->tm_min << ":" << setw(2) << tm->tm_sec << "." << setw(6) << ms; + } + } + } else { + LOG(info) << "no debug data found"; } } catch (bie&) { LOG(info) << "no segments found"; @@ -463,11 +468,16 @@ unordered_map> Monitor::GetDebugInfo(cons result.reserve(debug->size()); - for (const auto& s : *debug) { - result[s.first].reserve(s.second.size()); - for (const auto& e : s.second) { - result[s.first][e.first] = BufferDebugInfo(e.first, e.second.fPid, e.second.fSize, e.second.fCreationTime); + + if (debug) { + for (const auto& s : *debug) { + result[s.first].reserve(s.second.size()); + for (const auto& e : s.second) { + result[s.first][e.first] = BufferDebugInfo(e.first, e.second.fPid, e.second.fSize, e.second.fCreationTime); + } } + } else { + LOG(info) << "no debug data found"; } } catch (bie&) { LOG(info) << "no segments found"; @@ -701,6 +711,43 @@ void Monitor::ResetContent(const SessionId& sessionId, bool verbose /* = true */ ResetContent(shmId, verbose); } +void Monitor::ResetContent(const ShmId& shmIdT, const std::vector& segmentCfgs, const std::vector& regionCfgs, bool verbose /* = true */) +{ + using namespace boost::interprocess; + + std::string shmId = shmIdT.shmId; + std::string managementSegmentName("fmq_" + shmId + "_mng"); + // reset managed segments + ResetContent(shmIdT, verbose); + // delete management segment + Remove(managementSegmentName, verbose); + // recreate management segment + managed_shared_memory mngSegment(create_only, managementSegmentName.c_str(), kManagementSegmentSize); + // fill management segment with segment & region infos + for (const auto& s : segmentCfgs) { + if (s.allocationAlgorithm == "rbtree_best_fit") { + Segment::Register(shmId, s.id, AllocationAlgorithm::rbtree_best_fit); + } else if (s.allocationAlgorithm == "simple_seq_fit") { + Segment::Register(shmId, s.id, AllocationAlgorithm::simple_seq_fit); + } else { + LOG(error) << "Unknown allocation algorithm provided: " << s.allocationAlgorithm; + throw MonitorError("Unknown allocation algorithm provided: " + s.allocationAlgorithm); + } + } + for (const auto& r : regionCfgs) { + fair::mq::shmem::UnmanagedRegion::Register(shmId, r); + } +} + +void Monitor::ResetContent(const SessionId& sessionId, const std::vector& segmentCfgs, const std::vector& regionCfgs, bool verbose /* = true */) +{ + ShmId shmId{makeShmIdStr(sessionId.sessionId)}; + if (verbose) { + cout << "ResetContent called with session id '" << sessionId.sessionId << "', translating to shared memory id '" << shmId.shmId << "'" << endl; + } + ResetContent(shmId, segmentCfgs, regionCfgs, verbose); +} + Monitor::~Monitor() { if (fSignalThread.joinable()) { diff --git a/fairmq/shmem/Monitor.h b/fairmq/shmem/Monitor.h index fe8e419e..49483f1d 100644 --- a/fairmq/shmem/Monitor.h +++ b/fairmq/shmem/Monitor.h @@ -8,6 +8,8 @@ #ifndef FAIR_MQ_SHMEM_MONITOR_H_ #define FAIR_MQ_SHMEM_MONITOR_H_ +#include + #include #include @@ -49,6 +51,13 @@ struct BufferDebugInfo uint64_t fCreationTime; }; +struct SegmentConfig +{ + uint16_t id; + uint64_t size; + std::string allocationAlgorithm; +}; + class Monitor { public: @@ -88,6 +97,14 @@ class Monitor /// @param sessionId session id /// Only call this when segment is not in use static void ResetContent(const SessionId& sessionId, bool verbose = true); + /// @brief [EXPERIMENTAL] cleanup the content of the shem segment, without recreating it + /// @param shmId shared memory id + /// Only call this when segment is not in use + static void ResetContent(const ShmId& shmId, const std::vector& segmentCfgs, const std::vector& regionCfgs, bool verbose = true); + /// @brief [EXPERIMENTAL] cleanup the content of the shem segment, without recreating it + /// @param sessionId session id + /// Only call this when segment is not in use + static void ResetContent(const SessionId& sessionId, const std::vector& segmentCfgs, const std::vector& regionCfgs, bool verbose = true); /// @brief Outputs list of messages in shmem (if compiled with FAIRMQ_DEBUG_MODE=ON) /// @param shmId shmem id diff --git a/fairmq/shmem/Segment.h b/fairmq/shmem/Segment.h index b3435ef6..1d5f2e6c 100644 --- a/fairmq/shmem/Segment.h +++ b/fairmq/shmem/Segment.h @@ -26,6 +26,8 @@ static const RBTreeBestFit rbTreeBestFit = RBTreeBestFit(); struct Segment { + friend class Monitor; + Segment(const std::string& shmId, uint16_t id, size_t size, SimpleSeqFit) : fSegment(SimpleSeqFitSegment(boost::interprocess::open_or_create, std::string("fmq_" + shmId + "_m_" + std::to_string(id)).c_str(), @@ -66,15 +68,12 @@ struct Segment static void Register(const std::string& shmId, uint16_t id, AllocationAlgorithm allocAlgo) { using namespace boost::interprocess; - managed_shared_memory mngSegment(open_or_create, std::string("fmq_" + shmId + "_mng").c_str(), 6553600); + managed_shared_memory mngSegment(open_or_create, std::string("fmq_" + shmId + "_mng").c_str(), kManagementSegmentSize); VoidAlloc alloc(mngSegment.get_segment_manager()); Uint16SegmentInfoHashMap* shmSegments = mngSegment.find_or_construct(unique_instance)(alloc); - EventCounter* eventCounter = mngSegment.find(unique_instance).first; - if (!eventCounter) { - eventCounter = mngSegment.construct(unique_instance)(0); - } + EventCounter* eventCounter = mngSegment.find_or_construct(unique_instance)(0); bool newSegmentRegistered = shmSegments->emplace(id, allocAlgo).second; if (newSegmentRegistered) { diff --git a/fairmq/shmem/UnmanagedRegion.h b/fairmq/shmem/UnmanagedRegion.h index 651630ec..7ecf5f29 100644 --- a/fairmq/shmem/UnmanagedRegion.h +++ b/fairmq/shmem/UnmanagedRegion.h @@ -41,6 +41,7 @@ struct UnmanagedRegion { friend class Message; friend class Manager; + friend class Monitor; UnmanagedRegion(const std::string& shmId, uint16_t id, uint64_t size) : UnmanagedRegion(shmId, size, false, makeRegionConfig(id)) @@ -50,6 +51,10 @@ struct UnmanagedRegion : UnmanagedRegion(shmId, size, false, std::move(cfg)) {} + UnmanagedRegion(const std::string& shmId, RegionConfig cfg) + : UnmanagedRegion(shmId, cfg.size, false, std::move(cfg)) + {} + UnmanagedRegion(const std::string& shmId, uint64_t size, bool remote, RegionConfig cfg) : fRemote(remote) , fRemoveOnDestruction(cfg.removeOnDestruction) @@ -66,6 +71,9 @@ struct UnmanagedRegion { using namespace boost::interprocess; + // TODO: refactor this + cfg.size = size; + if (!cfg.path.empty()) { fName = std::string(cfg.path + fName); @@ -119,7 +127,7 @@ struct UnmanagedRegion } if (!remote) { - Register(shmId, cfg, size); + Register(shmId, cfg); } LOG(trace) << "shmem: initialized region: " << fName << " (" << (remote ? "remote" : "local") << ")"; @@ -223,20 +231,17 @@ struct UnmanagedRegion return regionCfg; } - static void Register(const std::string& shmId, RegionConfig& cfg, uint64_t size) + static void Register(const std::string& shmId, const RegionConfig& cfg) { using namespace boost::interprocess; - managed_shared_memory mngSegment(open_or_create, std::string("fmq_" + shmId + "_mng").c_str(), 6553600); + managed_shared_memory mngSegment(open_or_create, std::string("fmq_" + shmId + "_mng").c_str(), kManagementSegmentSize); VoidAlloc alloc(mngSegment.get_segment_manager()); Uint16RegionInfoHashMap* shmRegions = mngSegment.find_or_construct(unique_instance)(alloc); - EventCounter* eventCounter = mngSegment.find(unique_instance).first; - if (!eventCounter) { - eventCounter = mngSegment.construct(unique_instance)(0); - } + EventCounter* eventCounter = mngSegment.find_or_construct(unique_instance)(0); - bool newShmRegionCreated = shmRegions->emplace(cfg.id.value(), RegionInfo(cfg.path.c_str(), cfg.creationFlags, cfg.userFlags, size, alloc)).second; + bool newShmRegionCreated = shmRegions->emplace(cfg.id.value(), RegionInfo(cfg.path.c_str(), cfg.creationFlags, cfg.userFlags, cfg.size, alloc)).second; if (newShmRegionCreated) { (eventCounter->fCount)++; }