diff --git a/examples/region/keep-alive.cxx b/examples/region/keep-alive.cxx index 41a7f8a5..2509295b 100644 --- a/examples/region/keep-alive.cxx +++ b/examples/region/keep-alive.cxx @@ -6,10 +6,9 @@ * copied verbatim in the file "LICENSE" * ********************************************************************************/ #include -#include -#include #include - +#include +#include #include #include @@ -17,9 +16,8 @@ #include #include -#include - #include +#include #include #include #include @@ -27,65 +25,117 @@ using namespace std; using namespace boost::program_options; -namespace -{ - volatile sig_atomic_t gStopping = 0; -} +namespace { +volatile sig_atomic_t gStopping = 0; +volatile sig_atomic_t gResetContent = 0; +} // namespace -void signalHandler(int /* signal */) -{ - gStopping = 1; -} +void signalHandler(int /* signal */) { gStopping = 1; } + +void resetContentHandler(int /* signal */) { gResetContent = 1; } struct ShmManager { - ShmManager(uint64_t _shmId, const vector& _segments, const vector& _regions) + ShmManager(uint64_t _shmId, const vector& _segments, const vector& _regions, bool zero = true) : shmId(fair::mq::shmem::makeShmIdStr(_shmId)) + { + LOG(info) << "Starting ShmManager for shmId: " << shmId; + LOG(info) << "Performing full reset..."; + FullReset(); + LOG(info) << "Done."; + LOG(info) << "Adding managed segments..."; + AddSegments(_segments, zero); + LOG(info) << "Done."; + LOG(info) << "Adding unmanaged regions..."; + AddRegions(_regions, zero); + LOG(info) << "Done."; + LOG(info) << "Shared memory is ready for use."; + } + + void AddSegments(const vector& _segments, bool zero) { for (const auto& s : _segments) { - vector segmentConf; - boost::algorithm::split(segmentConf, s, boost::algorithm::is_any_of(",")); - if (segmentConf.size() != 2) { - LOG(error) << "incorrect format for --segments. Expecting pairs of ,."; + vector conf; + boost::algorithm::split(conf, s, boost::algorithm::is_any_of(",")); + if (conf.size() != 3) { + LOG(error) << "incorrect format for --segments. Expecting pairs of ,."; fair::mq::shmem::Monitor::Cleanup(fair::mq::shmem::ShmId{shmId}); - throw runtime_error("incorrect format for --segments. Expecting pairs of ,."); + throw runtime_error("incorrect format for --segments. Expecting pairs of ,,."); } - uint16_t id = stoi(segmentConf.at(0)); - uint64_t size = stoull(segmentConf.at(1)); + uint16_t id = stoi(conf.at(0)); + uint64_t size = stoull(conf.at(1)); + segmentCfgs.emplace_back(fair::mq::shmem::SegmentConfig{id, size, "rbtree_best_fit"}); + auto ret = segments.emplace(id, fair::mq::shmem::Segment(shmId, id, size, fair::mq::shmem::rbTreeBestFit)); fair::mq::shmem::Segment& segment = ret.first->second; - LOG(info) << "Created segment " << id << " of size " << segment.GetSize() << ", starting at " << segment.GetData() << ". Locking..."; + LOG(info) << "Created segment " << id << " of size " << segment.GetSize() + << ", starting at " << segment.GetData() << ". Locking..."; segment.Lock(); LOG(info) << "Done."; - LOG(info) << "Zeroing..."; - segment.Zero(); - LOG(info) << "Done."; - } - - for (const auto& r : _regions) { - vector regionConf; - boost::algorithm::split(regionConf, r, boost::algorithm::is_any_of(",")); - if (regionConf.size() != 2) { - LOG(error) << "incorrect format for --regions. Expecting pairs of ,."; - fair::mq::shmem::Monitor::Cleanup(fair::mq::shmem::ShmId{shmId}); - throw runtime_error("incorrect format for --regions. Expecting pairs of ,."); + if (zero) { + LOG(info) << "Zeroing..."; + segment.Zero(); + LOG(info) << "Done."; } - uint16_t id = stoi(regionConf.at(0)); - uint64_t size = stoull(regionConf.at(1)); + } + } + + void AddRegions(const vector& _regions, bool zero) + { + for (const auto& r : _regions) { + vector conf; + boost::algorithm::split(conf, r, boost::algorithm::is_any_of(",")); + if (conf.size() != 3) { + LOG(error) << "incorrect format for --regions. Expecting pairs of ,,."; + fair::mq::shmem::Monitor::Cleanup(fair::mq::shmem::ShmId{shmId}); + throw runtime_error("incorrect format for --regions. Expecting pairs of ,,."); + } + uint16_t id = stoi(conf.at(0)); + uint64_t size = stoull(conf.at(1)); + fair::mq::RegionConfig cfg; + cfg.id = id; + cfg.size = size; + regionCfgs.push_back(cfg); + auto ret = regions.emplace(id, make_unique(shmId, id, size)); fair::mq::shmem::UnmanagedRegion& region = *(ret.first->second); - LOG(info) << "Created unamanged region " << id << " of size " << region.GetSize() << ", starting at " << region.GetData() << ". Locking..."; + LOG(info) << "Created unamanged region " << id << " of size " << region.GetSize() + << ", starting at " << region.GetData() << ". Locking..."; region.Lock(); LOG(info) << "Done."; - LOG(info) << "Zeroing..."; - region.Zero(); - LOG(info) << "Done."; + if (zero) { + LOG(info) << "Zeroing..."; + region.Zero(); + LOG(info) << "Done."; + } } } + bool CheckPresence() + { + for (const auto& sc : segmentCfgs) { + if (!(fair::mq::shmem::Monitor::SegmentIsPresent(fair::mq::shmem::ShmId{shmId}, sc.id))) { + return false; + } + } + for (const auto& rc : regionCfgs) { + if (!(fair::mq::shmem::Monitor::RegionIsPresent(fair::mq::shmem::ShmId{shmId}, rc.id.value()))) { + return false; + } + } + return true; + } + void ResetContent() { - fair::mq::shmem::Monitor::ResetContent(fair::mq::shmem::ShmId{shmId}); + fair::mq::shmem::Monitor::ResetContent(fair::mq::shmem::ShmId{shmId}, segmentCfgs, regionCfgs); + } + + void FullReset() + { + segments.clear(); + regions.clear(); + fair::mq::shmem::Monitor::Cleanup(fair::mq::shmem::ShmId{shmId}); } ~ShmManager() @@ -97,6 +147,8 @@ struct ShmManager std::string shmId; map segments; map> regions; + std::vector segmentCfgs; + std::vector regionCfgs; }; int main(int argc, char** argv) @@ -105,8 +157,11 @@ int main(int argc, char** argv) signal(SIGINT, signalHandler); signal(SIGTERM, signalHandler); + signal(SIGUSR1, resetContentHandler); try { + bool nozero = false; + bool checkPresence = true; uint64_t shmId = 0; vector segments; vector regions; @@ -114,8 +169,10 @@ int main(int argc, char** argv) options_description desc("Options"); desc.add_options() ("shmid", value(&shmId)->required(), "Shm id") - ("segments", value>(&segments)->multitoken()->composing(), "Segments, as , , , ...") - ("regions", value>(®ions)->multitoken()->composing(), "Regions, as , , , ...") + ("segments", value>(&segments)->multitoken()->composing(), "Segments, as ,, ,, ,, ... (numaid: -2 disabled, -1 interleave, >=0 node)") + ("regions", value>(®ions)->multitoken()->composing(), "Regions, as , ,, ,, ...") + ("nozero", value(&nozero)->default_value(false)->implicit_value(true), "Do not zero segments after initialization") + ("check-presence", value(&checkPresence)->default_value(true)->implicit_value(true), "Check periodically if configured segments/regions are still present, and cleanup and leave if they are not") ("help,h", "Print help"); variables_map vm; @@ -128,15 +185,35 @@ int main(int argc, char** argv) notify(vm); - ShmManager shmManager(shmId, segments, regions); + ShmManager shmManager(shmId, segments, regions, !nozero); - while (!gStopping) { - std::this_thread::sleep_for(std::chrono::milliseconds(50)); + std::thread resetContentThread([&shmManager]() { + while (!gStopping) { + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + if (gResetContent == 1) { + LOG(info) << "Resetting content for shmId " << shmManager.shmId; + shmManager.ResetContent(); + gResetContent = 0; + LOG(info) << "Done resetting content for shmId " << shmManager.shmId; + } + } + }); + + if (checkPresence) { + while (!gStopping) { + if (shmManager.CheckPresence() == false) { + LOG(error) << "Failed to find segments, exiting."; + gStopping = true; + } + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + } } + resetContentThread.join(); + LOG(info) << "stopping."; } catch (exception& e) { - LOG(error) << "Unhandled Exception reached the top of main: " << e.what() << ", application will now exit"; + LOG(error) << "Exception reached the top of main: " << e.what() << ", exiting"; return 2; } diff --git a/fairmq/shmem/Monitor.cxx b/fairmq/shmem/Monitor.cxx index 6c472e5e..1dffd977 100644 --- a/fairmq/shmem/Monitor.cxx +++ b/fairmq/shmem/Monitor.cxx @@ -23,6 +23,7 @@ #include #include +#include #include #include #include @@ -533,6 +534,88 @@ unsigned long Monitor::GetFreeMemory(const SessionId& sessionId, uint16_t segmen return GetFreeMemory(shmId, segmentId); } +bool Monitor::SegmentIsPresent(const ShmId& shmId, uint16_t segmentId) +{ + using namespace boost::interprocess; + try { + bipc::managed_shared_memory managementSegment(bipc::open_read_only, std::string("fmq_" + shmId.shmId + "_mng").c_str()); + Uint16SegmentInfoHashMap* shmSegments = managementSegment.find(unique_instance).first; + + if (!shmSegments) { + LOG(error) << "Found management segment, but could not locate segment info"; + return false; + } + + auto it = shmSegments->find(segmentId); + if (it != shmSegments->end()) { + try { + if (it->second.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) { + RBTreeBestFitSegment segment(open_read_only, std::string("fmq_" + shmId.shmId + "_m_" + std::to_string(segmentId)).c_str()); + } else { + SimpleSeqFitSegment segment(open_read_only, std::string("fmq_" + shmId.shmId + "_m_" + std::to_string(segmentId)).c_str()); + } + } catch (bie&) { + LOG(error) << "Could not find segment with id '" << segmentId << "' for shmId '" << shmId.shmId << "'"; + return false; + } + } else { + LOG(error) << "Could not find segment info for segment id '" << segmentId << "' for shmId '" << shmId.shmId << "'"; + return false; + } + } catch (bie&) { + LOG(error) << "Could not find management segment for shmid '" << shmId.shmId << "'"; + return false; + } + + return true; +} +bool Monitor::SegmentIsPresent(const SessionId& sessionId, uint16_t segmentId) +{ + ShmId shmId{makeShmIdStr(sessionId.sessionId)}; + return SegmentIsPresent(shmId, segmentId); +} + +bool Monitor::RegionIsPresent(const ShmId& shmId, uint16_t regionId) +{ + using namespace boost::interprocess; + try { + bipc::managed_shared_memory managementSegment(bipc::open_read_only, std::string("fmq_" + shmId.shmId + "_mng").c_str()); + Uint16RegionInfoHashMap* shmRegions = managementSegment.find(bipc::unique_instance).first; + + if (!shmRegions) { + LOG(error) << "Found management segment, but could not locate region info"; + return false; + } + + std::string regionFileName("fmq_" + shmId.shmId + "_rg_" + to_string(regionId)); + + auto it = shmRegions->find(regionId); + if (it != shmRegions->end()) { + try { + if (it->second.fPath.empty()) { + shared_memory_object object(open_only, regionFileName.c_str(), read_only); + } + } catch (bie&) { + LOG(error) << "Could not find region with id '" << regionId << "' for shmId '" << shmId.shmId << "'"; + return false; + } + } else { + LOG(error) << "Could not find region info for region id '" << regionId << "' for shmId '" << shmId.shmId << "'"; + return false; + } + } catch (bie&) { + LOG(error) << "Could not find management segment for shmid '" << shmId.shmId << "'"; + return false; + } + + return true; +} +bool Monitor::RegionIsPresent(const SessionId& sessionId, uint16_t regionId) +{ + ShmId shmId{makeShmIdStr(sessionId.sessionId)}; + return RegionIsPresent(shmId, regionId); +} + void Monitor::PrintHelp() { LOG(info) << "controls: [x] close memory, " diff --git a/fairmq/shmem/Monitor.h b/fairmq/shmem/Monitor.h index 49483f1d..39f34c34 100644 --- a/fairmq/shmem/Monitor.h +++ b/fairmq/shmem/Monitor.h @@ -119,7 +119,7 @@ class Monitor /// @param sessionId session id static std::unordered_map> GetDebugInfo(const SessionId& sessionId); /// @brief Returns the amount of free memory in the specified segment - /// @param sessionId shmem id + /// @param shmId shmem id /// @param segmentId segment id /// @throws MonitorError static unsigned long GetFreeMemory(const ShmId& shmId, uint16_t segmentId); @@ -128,6 +128,23 @@ class Monitor /// @param segmentId segment id /// @throws MonitorError static unsigned long GetFreeMemory(const SessionId& sessionId, uint16_t segmentId); + /// @brief Checks if a given segment can be opened + /// @param shmId shmem id + /// @param segmentId segment id + static bool SegmentIsPresent(const ShmId& shmId, uint16_t segmentId); + /// @brief Checks if a given segment can be opened + /// @param sessionId session id + /// @param segmentId segment id + static bool SegmentIsPresent(const SessionId& sessionId, uint16_t segmentId); + /// @brief Checks if a given region can be opened + /// @param shmId shmem id + /// @param regionId region id + static bool RegionIsPresent(const ShmId& shmId, uint16_t regionId); + /// @brief Checks if a given region can be opened + /// @param sessionId session id + /// @param regionId region id + static bool RegionIsPresent(const SessionId& sessionId, uint16_t regionId); + static bool PrintShm(const ShmId& shmId); static void ListAll(const std::string& path);