shm: Monitor: Add region/segment presence check function

This commit is contained in:
Alexey Rybalchenko 2022-06-22 10:36:04 +02:00
parent 2500771689
commit 97f12baeb7
3 changed files with 225 additions and 48 deletions

View File

@ -6,10 +6,9 @@
* copied verbatim in the file "LICENSE" * * copied verbatim in the file "LICENSE" *
********************************************************************************/ ********************************************************************************/
#include <fairmq/shmem/Common.h> #include <fairmq/shmem/Common.h>
#include <fairmq/shmem/UnmanagedRegion.h>
#include <fairmq/shmem/Segment.h>
#include <fairmq/shmem/Monitor.h> #include <fairmq/shmem/Monitor.h>
#include <fairmq/shmem/Segment.h>
#include <fairmq/shmem/UnmanagedRegion.h>
#include <fairmq/tools/Unique.h> #include <fairmq/tools/Unique.h>
#include <fairlogger/Logger.h> #include <fairlogger/Logger.h>
@ -17,9 +16,8 @@
#include <boost/algorithm/string.hpp> #include <boost/algorithm/string.hpp>
#include <boost/program_options.hpp> #include <boost/program_options.hpp>
#include <csignal>
#include <chrono> #include <chrono>
#include <csignal>
#include <map> #include <map>
#include <string> #include <string>
#include <thread> #include <thread>
@ -27,65 +25,117 @@
using namespace std; using namespace std;
using namespace boost::program_options; using namespace boost::program_options;
namespace namespace {
{ volatile sig_atomic_t gStopping = 0;
volatile sig_atomic_t gStopping = 0; volatile sig_atomic_t gResetContent = 0;
} } // namespace
void signalHandler(int /* signal */) void signalHandler(int /* signal */) { gStopping = 1; }
{
gStopping = 1; void resetContentHandler(int /* signal */) { gResetContent = 1; }
}
struct ShmManager struct ShmManager
{ {
ShmManager(uint64_t _shmId, const vector<string>& _segments, const vector<string>& _regions) ShmManager(uint64_t _shmId, const vector<string>& _segments, const vector<string>& _regions, bool zero = true)
: shmId(fair::mq::shmem::makeShmIdStr(_shmId)) : shmId(fair::mq::shmem::makeShmIdStr(_shmId))
{ {
for (const auto& s : _segments) { LOG(info) << "Starting ShmManager for shmId: " << shmId;
vector<string> segmentConf; LOG(info) << "Performing full reset...";
boost::algorithm::split(segmentConf, s, boost::algorithm::is_any_of(",")); FullReset();
if (segmentConf.size() != 2) { LOG(info) << "Done.";
LOG(error) << "incorrect format for --segments. Expecting pairs of <id>,<size>."; LOG(info) << "Adding managed segments...";
fair::mq::shmem::Monitor::Cleanup(fair::mq::shmem::ShmId{shmId}); AddSegments(_segments, zero);
throw runtime_error("incorrect format for --segments. Expecting pairs of <id>,<size>."); LOG(info) << "Done.";
LOG(info) << "Adding unmanaged regions...";
AddRegions(_regions, zero);
LOG(info) << "Done.";
LOG(info) << "Shared memory is ready for use.";
} }
uint16_t id = stoi(segmentConf.at(0));
uint64_t size = stoull(segmentConf.at(1)); void AddSegments(const vector<string>& _segments, bool zero)
{
for (const auto& s : _segments) {
vector<string> conf;
boost::algorithm::split(conf, s, boost::algorithm::is_any_of(","));
if (conf.size() != 3) {
LOG(error) << "incorrect format for --segments. Expecting pairs of <id>,<size><numaid>.";
fair::mq::shmem::Monitor::Cleanup(fair::mq::shmem::ShmId{shmId});
throw runtime_error("incorrect format for --segments. Expecting pairs of <id>,<size>,<numaid>.");
}
uint16_t id = stoi(conf.at(0));
uint64_t size = stoull(conf.at(1));
segmentCfgs.emplace_back(fair::mq::shmem::SegmentConfig{id, size, "rbtree_best_fit"});
auto ret = segments.emplace(id, fair::mq::shmem::Segment(shmId, id, size, fair::mq::shmem::rbTreeBestFit)); auto ret = segments.emplace(id, fair::mq::shmem::Segment(shmId, id, size, fair::mq::shmem::rbTreeBestFit));
fair::mq::shmem::Segment& segment = ret.first->second; fair::mq::shmem::Segment& segment = ret.first->second;
LOG(info) << "Created segment " << id << " of size " << segment.GetSize() << ", starting at " << segment.GetData() << ". Locking..."; LOG(info) << "Created segment " << id << " of size " << segment.GetSize()
<< ", starting at " << segment.GetData() << ". Locking...";
segment.Lock(); segment.Lock();
LOG(info) << "Done."; LOG(info) << "Done.";
if (zero) {
LOG(info) << "Zeroing..."; LOG(info) << "Zeroing...";
segment.Zero(); segment.Zero();
LOG(info) << "Done."; LOG(info) << "Done.";
} }
for (const auto& r : _regions) {
vector<string> regionConf;
boost::algorithm::split(regionConf, r, boost::algorithm::is_any_of(","));
if (regionConf.size() != 2) {
LOG(error) << "incorrect format for --regions. Expecting pairs of <id>,<size>.";
fair::mq::shmem::Monitor::Cleanup(fair::mq::shmem::ShmId{shmId});
throw runtime_error("incorrect format for --regions. Expecting pairs of <id>,<size>.");
} }
uint16_t id = stoi(regionConf.at(0)); }
uint64_t size = stoull(regionConf.at(1));
void AddRegions(const vector<string>& _regions, bool zero)
{
for (const auto& r : _regions) {
vector<string> conf;
boost::algorithm::split(conf, r, boost::algorithm::is_any_of(","));
if (conf.size() != 3) {
LOG(error) << "incorrect format for --regions. Expecting pairs of <id>,<size>,<numaid>.";
fair::mq::shmem::Monitor::Cleanup(fair::mq::shmem::ShmId{shmId});
throw runtime_error("incorrect format for --regions. Expecting pairs of <id>,<size>,<numaid>.");
}
uint16_t id = stoi(conf.at(0));
uint64_t size = stoull(conf.at(1));
fair::mq::RegionConfig cfg;
cfg.id = id;
cfg.size = size;
regionCfgs.push_back(cfg);
auto ret = regions.emplace(id, make_unique<fair::mq::shmem::UnmanagedRegion>(shmId, id, size)); auto ret = regions.emplace(id, make_unique<fair::mq::shmem::UnmanagedRegion>(shmId, id, size));
fair::mq::shmem::UnmanagedRegion& region = *(ret.first->second); fair::mq::shmem::UnmanagedRegion& region = *(ret.first->second);
LOG(info) << "Created unamanged region " << id << " of size " << region.GetSize() << ", starting at " << region.GetData() << ". Locking..."; LOG(info) << "Created unamanged region " << id << " of size " << region.GetSize()
<< ", starting at " << region.GetData() << ". Locking...";
region.Lock(); region.Lock();
LOG(info) << "Done."; LOG(info) << "Done.";
if (zero) {
LOG(info) << "Zeroing..."; LOG(info) << "Zeroing...";
region.Zero(); region.Zero();
LOG(info) << "Done."; LOG(info) << "Done.";
} }
} }
}
bool CheckPresence()
{
for (const auto& sc : segmentCfgs) {
if (!(fair::mq::shmem::Monitor::SegmentIsPresent(fair::mq::shmem::ShmId{shmId}, sc.id))) {
return false;
}
}
for (const auto& rc : regionCfgs) {
if (!(fair::mq::shmem::Monitor::RegionIsPresent(fair::mq::shmem::ShmId{shmId}, rc.id.value()))) {
return false;
}
}
return true;
}
void ResetContent() void ResetContent()
{ {
fair::mq::shmem::Monitor::ResetContent(fair::mq::shmem::ShmId{shmId}); fair::mq::shmem::Monitor::ResetContent(fair::mq::shmem::ShmId{shmId}, segmentCfgs, regionCfgs);
}
void FullReset()
{
segments.clear();
regions.clear();
fair::mq::shmem::Monitor::Cleanup(fair::mq::shmem::ShmId{shmId});
} }
~ShmManager() ~ShmManager()
@ -97,6 +147,8 @@ struct ShmManager
std::string shmId; std::string shmId;
map<uint16_t, fair::mq::shmem::Segment> segments; map<uint16_t, fair::mq::shmem::Segment> segments;
map<uint16_t, unique_ptr<fair::mq::shmem::UnmanagedRegion>> regions; map<uint16_t, unique_ptr<fair::mq::shmem::UnmanagedRegion>> regions;
std::vector<fair::mq::shmem::SegmentConfig> segmentCfgs;
std::vector<fair::mq::RegionConfig> regionCfgs;
}; };
int main(int argc, char** argv) int main(int argc, char** argv)
@ -105,8 +157,11 @@ int main(int argc, char** argv)
signal(SIGINT, signalHandler); signal(SIGINT, signalHandler);
signal(SIGTERM, signalHandler); signal(SIGTERM, signalHandler);
signal(SIGUSR1, resetContentHandler);
try { try {
bool nozero = false;
bool checkPresence = true;
uint64_t shmId = 0; uint64_t shmId = 0;
vector<string> segments; vector<string> segments;
vector<string> regions; vector<string> regions;
@ -114,8 +169,10 @@ int main(int argc, char** argv)
options_description desc("Options"); options_description desc("Options");
desc.add_options() desc.add_options()
("shmid", value<uint64_t>(&shmId)->required(), "Shm id") ("shmid", value<uint64_t>(&shmId)->required(), "Shm id")
("segments", value<vector<string>>(&segments)->multitoken()->composing(), "Segments, as <id>,<size> <id>,<size> <id>,<size> ...") ("segments", value<vector<string>>(&segments)->multitoken()->composing(), "Segments, as <id>,<size>,<numaid> <id>,<size>,<numaid> <id>,<size>,<numaid> ... (numaid: -2 disabled, -1 interleave, >=0 node)")
("regions", value<vector<string>>(&regions)->multitoken()->composing(), "Regions, as <id>,<size> <id>,<size> <id>,<size> ...") ("regions", value<vector<string>>(&regions)->multitoken()->composing(), "Regions, as <id>,<size> <id>,<size>,<numaid> <id>,<size>,<numaid> ...")
("nozero", value<bool>(&nozero)->default_value(false)->implicit_value(true), "Do not zero segments after initialization")
("check-presence", value<bool>(&checkPresence)->default_value(true)->implicit_value(true), "Check periodically if configured segments/regions are still present, and cleanup and leave if they are not")
("help,h", "Print help"); ("help,h", "Print help");
variables_map vm; variables_map vm;
@ -128,15 +185,35 @@ int main(int argc, char** argv)
notify(vm); notify(vm);
ShmManager shmManager(shmId, segments, regions); ShmManager shmManager(shmId, segments, regions, !nozero);
std::thread resetContentThread([&shmManager]() {
while (!gStopping) { while (!gStopping) {
std::this_thread::sleep_for(std::chrono::milliseconds(50)); std::this_thread::sleep_for(std::chrono::milliseconds(50));
if (gResetContent == 1) {
LOG(info) << "Resetting content for shmId " << shmManager.shmId;
shmManager.ResetContent();
gResetContent = 0;
LOG(info) << "Done resetting content for shmId " << shmManager.shmId;
} }
}
});
if (checkPresence) {
while (!gStopping) {
if (shmManager.CheckPresence() == false) {
LOG(error) << "Failed to find segments, exiting.";
gStopping = true;
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
}
resetContentThread.join();
LOG(info) << "stopping."; LOG(info) << "stopping.";
} catch (exception& e) { } catch (exception& e) {
LOG(error) << "Unhandled Exception reached the top of main: " << e.what() << ", application will now exit"; LOG(error) << "Exception reached the top of main: " << e.what() << ", exiting";
return 2; return 2;
} }

View File

@ -23,6 +23,7 @@
#include <boost/interprocess/ipc/message_queue.hpp> #include <boost/interprocess/ipc/message_queue.hpp>
#include <csignal> #include <csignal>
#include <cstdio>
#include <iostream> #include <iostream>
#include <iomanip> #include <iomanip>
#include <chrono> #include <chrono>
@ -533,6 +534,88 @@ unsigned long Monitor::GetFreeMemory(const SessionId& sessionId, uint16_t segmen
return GetFreeMemory(shmId, segmentId); return GetFreeMemory(shmId, segmentId);
} }
bool Monitor::SegmentIsPresent(const ShmId& shmId, uint16_t segmentId)
{
using namespace boost::interprocess;
try {
bipc::managed_shared_memory managementSegment(bipc::open_read_only, std::string("fmq_" + shmId.shmId + "_mng").c_str());
Uint16SegmentInfoHashMap* shmSegments = managementSegment.find<Uint16SegmentInfoHashMap>(unique_instance).first;
if (!shmSegments) {
LOG(error) << "Found management segment, but could not locate segment info";
return false;
}
auto it = shmSegments->find(segmentId);
if (it != shmSegments->end()) {
try {
if (it->second.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) {
RBTreeBestFitSegment segment(open_read_only, std::string("fmq_" + shmId.shmId + "_m_" + std::to_string(segmentId)).c_str());
} else {
SimpleSeqFitSegment segment(open_read_only, std::string("fmq_" + shmId.shmId + "_m_" + std::to_string(segmentId)).c_str());
}
} catch (bie&) {
LOG(error) << "Could not find segment with id '" << segmentId << "' for shmId '" << shmId.shmId << "'";
return false;
}
} else {
LOG(error) << "Could not find segment info for segment id '" << segmentId << "' for shmId '" << shmId.shmId << "'";
return false;
}
} catch (bie&) {
LOG(error) << "Could not find management segment for shmid '" << shmId.shmId << "'";
return false;
}
return true;
}
bool Monitor::SegmentIsPresent(const SessionId& sessionId, uint16_t segmentId)
{
ShmId shmId{makeShmIdStr(sessionId.sessionId)};
return SegmentIsPresent(shmId, segmentId);
}
bool Monitor::RegionIsPresent(const ShmId& shmId, uint16_t regionId)
{
using namespace boost::interprocess;
try {
bipc::managed_shared_memory managementSegment(bipc::open_read_only, std::string("fmq_" + shmId.shmId + "_mng").c_str());
Uint16RegionInfoHashMap* shmRegions = managementSegment.find<Uint16RegionInfoHashMap>(bipc::unique_instance).first;
if (!shmRegions) {
LOG(error) << "Found management segment, but could not locate region info";
return false;
}
std::string regionFileName("fmq_" + shmId.shmId + "_rg_" + to_string(regionId));
auto it = shmRegions->find(regionId);
if (it != shmRegions->end()) {
try {
if (it->second.fPath.empty()) {
shared_memory_object object(open_only, regionFileName.c_str(), read_only);
}
} catch (bie&) {
LOG(error) << "Could not find region with id '" << regionId << "' for shmId '" << shmId.shmId << "'";
return false;
}
} else {
LOG(error) << "Could not find region info for region id '" << regionId << "' for shmId '" << shmId.shmId << "'";
return false;
}
} catch (bie&) {
LOG(error) << "Could not find management segment for shmid '" << shmId.shmId << "'";
return false;
}
return true;
}
bool Monitor::RegionIsPresent(const SessionId& sessionId, uint16_t regionId)
{
ShmId shmId{makeShmIdStr(sessionId.sessionId)};
return RegionIsPresent(shmId, regionId);
}
void Monitor::PrintHelp() void Monitor::PrintHelp()
{ {
LOG(info) << "controls: [x] close memory, " LOG(info) << "controls: [x] close memory, "

View File

@ -119,7 +119,7 @@ class Monitor
/// @param sessionId session id /// @param sessionId session id
static std::unordered_map<uint16_t, std::vector<BufferDebugInfo>> GetDebugInfo(const SessionId& sessionId); static std::unordered_map<uint16_t, std::vector<BufferDebugInfo>> GetDebugInfo(const SessionId& sessionId);
/// @brief Returns the amount of free memory in the specified segment /// @brief Returns the amount of free memory in the specified segment
/// @param sessionId shmem id /// @param shmId shmem id
/// @param segmentId segment id /// @param segmentId segment id
/// @throws MonitorError /// @throws MonitorError
static unsigned long GetFreeMemory(const ShmId& shmId, uint16_t segmentId); static unsigned long GetFreeMemory(const ShmId& shmId, uint16_t segmentId);
@ -128,6 +128,23 @@ class Monitor
/// @param segmentId segment id /// @param segmentId segment id
/// @throws MonitorError /// @throws MonitorError
static unsigned long GetFreeMemory(const SessionId& sessionId, uint16_t segmentId); static unsigned long GetFreeMemory(const SessionId& sessionId, uint16_t segmentId);
/// @brief Checks if a given segment can be opened
/// @param shmId shmem id
/// @param segmentId segment id
static bool SegmentIsPresent(const ShmId& shmId, uint16_t segmentId);
/// @brief Checks if a given segment can be opened
/// @param sessionId session id
/// @param segmentId segment id
static bool SegmentIsPresent(const SessionId& sessionId, uint16_t segmentId);
/// @brief Checks if a given region can be opened
/// @param shmId shmem id
/// @param regionId region id
static bool RegionIsPresent(const ShmId& shmId, uint16_t regionId);
/// @brief Checks if a given region can be opened
/// @param sessionId session id
/// @param regionId region id
static bool RegionIsPresent(const SessionId& sessionId, uint16_t regionId);
static bool PrintShm(const ShmId& shmId); static bool PrintShm(const ShmId& shmId);
static void ListAll(const std::string& path); static void ListAll(const std::string& path);