Shm monitor: fix startup race and enable view-only mode

This commit is contained in:
Alexey Rybalchenko 2019-11-13 12:37:48 +01:00 committed by Dennis Klein
parent 2c6f436858
commit 2ac8f98178
8 changed files with 200 additions and 197 deletions

View File

@ -73,15 +73,6 @@ struct RegionCounter
std::atomic<uint64_t> fCount;
};
struct MonitorStatus
{
MonitorStatus()
: fActive(true)
{}
bool fActive;
};
struct MetaHeader
{
size_t fSize;

View File

@ -77,11 +77,12 @@ FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const fai
LOG(error) << "failed configuring context, reason: " << zmq_strerror(errno);
}
if (autolaunchMonitor) {
Manager::StartMonitor(fShmId);
}
fManager = fair::mq::tools::make_unique<Manager>(fShmId, segmentSize);
if (autolaunchMonitor) {
fManager->StartMonitor();
}
} catch (bipc::interprocess_exception& e) {
LOG(error) << "Could not initialize shared memory transport: " << e.what();
throw runtime_error(fair::mq::tools::ToString("Could not initialize shared memory transport: ", e.what()));

View File

@ -15,6 +15,7 @@
#include <boost/filesystem.hpp>
using namespace std;
using bie = ::boost::interprocess::interprocess_exception;
namespace bipc = ::boost::interprocess;
namespace bfs = ::boost::filesystem;
@ -53,22 +54,13 @@ Manager::Manager(const std::string& id, size_t size)
}
}
bipc::managed_shared_memory& Manager::Segment()
{
return fSegment;
}
bipc::managed_shared_memory& Manager::ManagementSegment()
{
return fManagementSegment;
}
void Manager::StartMonitor()
void Manager::StartMonitor(const std::string& id)
{
try {
MonitorStatus* monitorStatus = fManagementSegment.find<MonitorStatus>(bipc::unique_instance).first;
if (monitorStatus == nullptr) {
LOG(debug) << "no fairmq-shmmonitor found, starting...";
bipc::named_mutex monitorStatus(bipc::open_only, string("fmq_" + id + "_ms").c_str());
LOG(debug) << "Found fairmq-shmmonitor for shared memory id " << id;
} catch (bie&) {
LOG(debug) << "no fairmq-shmmonitor found for shared memory id " << id << ", starting...";
auto env = boost::this_process::environment();
vector<bfs::path> ownPath = boost::this_process::path();
@ -80,14 +72,14 @@ void Manager::StartMonitor()
bfs::path p = boost::process::search_path("fairmq-shmmonitor", ownPath);
if (!p.empty()) {
boost::process::spawn(p, "-x", "--shmid", fShmId, "-d", "-t", "2000", env);
boost::process::spawn(p, "-x", "--shmid", id, "-d", "-t", "2000", env);
int numTries = 0;
do {
monitorStatus = fManagementSegment.find<MonitorStatus>(bipc::unique_instance).first;
if (monitorStatus) {
LOG(debug) << "fairmq-shmmonitor started";
try {
bipc::named_mutex monitorStatus(bipc::open_only, string("fmq_" + id + "_ms").c_str());
LOG(debug) << "Started fairmq-shmmonitor for shared memory id " << id;
break;
} else {
} catch (bie&) {
this_thread::sleep_for(chrono::milliseconds(10));
if (++numTries > 1000) {
LOG(error) << "Did not get response from fairmq-shmmonitor after " << 10 * 1000 << " milliseconds. Exiting.";
@ -98,12 +90,6 @@ void Manager::StartMonitor()
} else {
LOG(warn) << "could not find fairmq-shmmonitor in the path";
}
} else {
LOG(debug) << "found fairmq-shmmonitor.";
}
} catch (std::exception& e) {
LOG(error) << "Exception during fairmq-shmmonitor initialization: " << e.what() << ", application will now exit";
exit(EXIT_FAILURE);
}
}
@ -174,7 +160,7 @@ Region* Manager::GetRemoteRegion(const uint64_t id)
auto r = fRegions.emplace(id, fair::mq::tools::make_unique<Region>(*this, id, 0, true, nullptr, path, flags));
return r.first->second.get();
} catch (bipc::interprocess_exception& e) {
} catch (bie& e) {
LOG(warn) << "Could not get remote region for id: " << id;
return nullptr;
}

View File

@ -52,10 +52,10 @@ class Manager
~Manager();
boost::interprocess::managed_shared_memory& Segment();
boost::interprocess::managed_shared_memory& ManagementSegment();
boost::interprocess::managed_shared_memory& Segment() { return fSegment; }
boost::interprocess::managed_shared_memory& ManagementSegment() { return fManagementSegment; }
void StartMonitor();
static void StartMonitor(const std::string&);
static void Interrupt();
static void Resume();

View File

@ -25,6 +25,7 @@
#include <poll.h>
using namespace std;
using bie = ::boost::interprocess::interprocess_exception;
namespace bipc = ::boost::interprocess;
namespace bpt = ::boost::posix_time;
@ -45,11 +46,12 @@ void signalHandler(int signal)
gSignalStatus = signal;
}
Monitor::Monitor(const string& shmId, bool selfDestruct, bool interactive, unsigned int timeoutInMS, bool runAsDaemon, bool cleanOnExit)
Monitor::Monitor(const string& shmId, bool selfDestruct, bool interactive, bool viewOnly, unsigned int timeoutInMS, bool runAsDaemon, bool cleanOnExit)
: fSelfDestruct(selfDestruct)
, fInteractive(interactive)
, fSeenOnce(false)
, fViewOnly(viewOnly)
, fIsDaemon(runAsDaemon)
, fSeenOnce(false)
, fCleanOnExit(cleanOnExit)
, fTimeoutInMS(timeoutInMS)
, fShmId(shmId)
@ -63,14 +65,14 @@ Monitor::Monitor(const string& shmId, bool selfDestruct, bool interactive, unsig
, fManagementSegment(bipc::open_or_create, fManagementSegmentName.c_str(), 65536)
, fDeviceHeartbeats()
{
MonitorStatus* monitorStatus = fManagementSegment.find<MonitorStatus>(bipc::unique_instance).first;
if (monitorStatus != nullptr) {
cout << "fairmq-shmmonitor already started or not properly exited. Try `fairmq-shmmonitor --cleanup`" << endl;
exit(EXIT_FAILURE);
if (!fViewOnly) {
try {
bipc::named_mutex monitorStatus(bipc::create_only, string("fmq_" + fShmId + "_ms").c_str());
} catch (bie&) {
cout << "fairmq-shmmonitor for shared memory id " << fShmId << " already started or not properly exited. Try `fairmq-shmmonitor --cleanup --shmid " << fShmId << "`" << endl;
throw DaemonPresent(tools::ToString("fairmq-shmmonitor for shared memory id ", fShmId, " already started or not properly exited."));
}
}
fManagementSegment.construct<MonitorStatus>(bipc::unique_instance)();
RemoveQueue(fControlQueueName);
}
void Monitor::CatchSignals()
@ -97,7 +99,11 @@ void Monitor::SignalMonitor()
void Monitor::Run()
{
thread heartbeatThread(&Monitor::MonitorHeartbeats, this);
thread heartbeatThread;
if (!fViewOnly) {
RemoveQueue(fControlQueueName);
heartbeatThread = thread(&Monitor::MonitorHeartbeats, this);
}
if (fInteractive) {
Interactive();
@ -108,8 +114,10 @@ void Monitor::Run()
}
}
if (!fViewOnly) {
heartbeatThread.join();
}
}
void Monitor::MonitorHeartbeats()
{
@ -131,13 +139,34 @@ void Monitor::MonitorHeartbeats()
// cout << "control queue timeout" << endl;
}
}
} catch (bipc::interprocess_exception& ie) {
} catch (bie& ie) {
cout << ie.what() << endl;
}
RemoveQueue(fControlQueueName);
}
struct TerminalConfig
{
TerminalConfig()
{
termios t;
tcgetattr(STDIN_FILENO, &t); // get the current terminal I/O structure
t.c_lflag &= ~ICANON; // disable canonical input
t.c_lflag &= ~ECHO; // do not echo input chars
tcsetattr(STDIN_FILENO, TCSANOW, &t); // apply the new settings
}
~TerminalConfig()
{
termios t;
tcgetattr(STDIN_FILENO, &t); // get the current terminal I/O structure
t.c_lflag |= ICANON; // re-enable canonical input
t.c_lflag |= ECHO; // echo input chars
tcsetattr(STDIN_FILENO, TCSANOW, &t); // apply the new settings
}
};
void Monitor::Interactive()
{
char c;
@ -145,11 +174,7 @@ void Monitor::Interactive()
cinfd[0].fd = fileno(stdin);
cinfd[0].events = POLLIN;
struct termios t;
tcgetattr(STDIN_FILENO, &t); // get the current terminal I/O structure
t.c_lflag &= ~ICANON; // disable canonical input
t.c_lflag &= ~ECHO; // do not echo input chars
tcsetattr(STDIN_FILENO, TCSANOW, &t); // apply the new settings
TerminalConfig tcfg;
cout << endl;
PrintHelp();
@ -175,7 +200,11 @@ void Monitor::Interactive()
break;
case 'x':
cout << "\n[x] --> closing shared memory:" << endl;
if (!fViewOnly) {
Cleanup(fShmId);
} else {
cout << "cannot close because in view only mode" << endl;
}
break;
case 'h':
cout << "\n[h] --> help:" << endl << endl;
@ -207,11 +236,6 @@ void Monitor::Interactive()
cout << "\r";
}
}
tcgetattr(STDIN_FILENO, &t); // get the current terminal I/O structure
t.c_lflag |= ICANON; // re-enable canonical input
t.c_lflag |= ECHO; // echo input chars
tcsetattr(STDIN_FILENO, TCSANOW, &t); // apply the new settings
}
void Monitor::CheckSegment()
@ -250,10 +274,12 @@ void Monitor::CheckSegment()
unsigned int numDevices = 0;
if (fInteractive) {
fair::mq::shmem::DeviceCounter* dc = managementSegment.find<fair::mq::shmem::DeviceCounter>(bipc::unique_instance).first;
if (dc) {
numDevices = dc->fCount;
}
}
auto now = chrono::high_resolution_clock::now();
unsigned int duration = chrono::duration_cast<chrono::milliseconds>(now - fLastHeartbeat).count();
@ -273,28 +299,20 @@ void Monitor::CheckSegment()
<< setw(18) << fSegmentName << " | "
<< setw(10) << segment.get_size() << " | "
<< setw(10) << segment.get_free_memory() << " | "
// << setw(15) << segment.all_memory_deallocated() << " | "
<< setw(2) << segment.check_sanity() << " | "
// << setw(10) << segment.get_num_named_objects() << " | "
<< setw(10) << numDevices << " | "
// << setw(10) << segment.get_num_unique_objects() << " |"
<< setw(10) << duration << " |"
<< c
<< flush;
<< setw(8) << numDevices << " | "
<< setw(10) << (fViewOnly ? "view only" : to_string(duration)) << " |"
<< c << flush;
}
} catch (bipc::interprocess_exception& ie) {
} catch (bie&) {
fHeartbeatTriggered = false;
if (fInteractive) {
cout << "| "
<< setw(18) << "-" << " | "
<< setw(10) << "-" << " | "
<< setw(10) << "-" << " | "
// << setw(15) << "-" << " | "
<< setw(2) << "-" << " | "
<< setw(8) << "-" << " | "
<< setw(10) << "-" << " |"
<< setw(10) << "-" << " |"
<< c
<< flush;
<< c << flush;
}
auto now = chrono::high_resolution_clock::now();
@ -318,50 +336,60 @@ void Monitor::CheckSegment()
}
}
void Monitor::Cleanup(const string& shmId)
void Monitor::PrintQueues()
{
string managementSegmentName("fmq_" + shmId + "_mng");
cout << '\n';
try {
bipc::managed_shared_memory managementSegment(bipc::open_only, managementSegmentName.c_str());
RegionCounter* rc = managementSegment.find<RegionCounter>(bipc::unique_instance).first;
if (rc) {
cout << "Region counter found: " << rc->fCount << endl;
uint64_t regionCount = rc->fCount;
bipc::managed_shared_memory segment(bipc::open_only, fSegmentName.c_str());
StrVector* queues = segment.find<StrVector>(string("fmq_" + fShmId + "_qs").c_str()).first;
if (queues) {
cout << "found " << queues->size() << " queue(s):" << endl;
Uint64RegionInfoMap* m = managementSegment.find<Uint64RegionInfoMap>(bipc::unique_instance).first;
for (uint64_t i = 1; i <= regionCount; ++i) {
if (m != nullptr) {
RegionInfo ri = m->at(i);
string path = ri.fPath.c_str();
int flags = ri.fFlags;
cout << "Found RegionInfo with path: '" << path << "', flags: " << flags << "'." << endl;
if (path != "") {
RemoveFileMapping(tools::ToString(path, "fmq_" + shmId + "_rg_" + to_string(i)));
for (const auto& queue : *queues) {
string name(queue.c_str());
cout << '\t' << name << " : ";
atomic<int>* queueSize = segment.find<atomic<int>>(name.c_str()).first;
if (queueSize) {
cout << *queueSize << " messages" << endl;
} else {
RemoveObject("fmq_" + shmId + "_rg_" + to_string(i));
cout << "\tqueue does not have a queue size entry." << endl;
}
}
} else {
RemoveObject("fmq_" + shmId + "_rg_" + to_string(i));
cout << "\tno queues found" << endl;
}
} catch (bie&) {
cout << "\tno queues found" << endl;
} catch (out_of_range&) {
cout << "\tno queues found" << endl;
}
RemoveQueue(string("fmq_" + shmId + "_rgq_" + to_string(i)));
cout << "\n --> last heartbeats: " << endl << endl;
auto now = chrono::high_resolution_clock::now();
for (const auto& h : fDeviceHeartbeats) {
cout << "\t" << h.first << " : " << chrono::duration<double, milli>(now - h.second).count() << "ms ago." << endl;
}
} else {
cout << "No region counter found. no regions to cleanup." << endl;
}
RemoveObject(managementSegmentName.c_str());
} catch (bipc::interprocess_exception& ie) {
cout << "Did not find '" << managementSegmentName << "' shared memory segment. No regions to cleanup." << endl;
}
RemoveObject("fmq_" + shmId + "_main");
RemoveMutex("fmq_" + shmId + "_mtx");
cout << endl;
}
void Monitor::PrintHeader()
{
cout << "| "
<< setw(18) << "name" << " | "
<< setw(10) << "size" << " | "
<< setw(10) << "free" << " | "
<< setw(8) << "devices" << " | "
<< setw(10) << "last hb" << " |"
<< endl;
}
void Monitor::PrintHelp()
{
cout << "controls: [x] close memory, [p] print queues, [h] help, [q] quit." << endl;
}
void Monitor::RemoveObject(const string& name)
{
if (bipc::shared_memory_object::remove(name.c_str())) {
@ -398,73 +426,61 @@ void Monitor::RemoveMutex(const string& name)
}
}
void Monitor::PrintQueues()
void Monitor::Cleanup(const string& shmId)
{
cout << '\n';
string managementSegmentName("fmq_" + shmId + "_mng");
try {
bipc::managed_shared_memory segment(bipc::open_only, fSegmentName.c_str());
StrVector* queues = segment.find<StrVector>(string("fmq_" + fShmId + "_qs").c_str()).first;
if (queues) {
cout << "found " << queues->size() << " queue(s):" << endl;
bipc::managed_shared_memory managementSegment(bipc::open_only, managementSegmentName.c_str());
RegionCounter* rc = managementSegment.find<RegionCounter>(bipc::unique_instance).first;
if (rc) {
cout << "Region counter found: " << rc->fCount << endl;
uint64_t regionCount = rc->fCount;
for (const auto& queue : *queues) {
string name(queue.c_str());
cout << '\t' << name << " : ";
atomic<int>* queueSize = segment.find<atomic<int>>(name.c_str()).first;
if (queueSize) {
cout << *queueSize << " messages" << endl;
Uint64RegionInfoMap* m = managementSegment.find<Uint64RegionInfoMap>(bipc::unique_instance).first;
for (uint64_t i = 1; i <= regionCount; ++i) {
if (m != nullptr) {
RegionInfo ri = m->at(i);
string path = ri.fPath.c_str();
int flags = ri.fFlags;
cout << "Found RegionInfo with path: '" << path << "', flags: " << flags << "'." << endl;
if (path != "") {
RemoveFileMapping(tools::ToString(path, "fmq_" + shmId + "_rg_" + to_string(i)));
} else {
cout << "\tqueue does not have a queue size entry." << endl;
}
RemoveObject("fmq_" + shmId + "_rg_" + to_string(i));
}
} else {
cout << "\tno queues found" << endl;
}
} catch (bipc::interprocess_exception& ie) {
cout << "\tno queues found" << endl;
} catch (out_of_range& ie) {
cout << "\tno queues found" << endl;
RemoveObject("fmq_" + shmId + "_rg_" + to_string(i));
}
cout << "\n --> last heartbeats: " << endl << endl;
auto now = chrono::high_resolution_clock::now();
for (const auto& h : fDeviceHeartbeats) {
cout << "\t" << h.first << " : " << chrono::duration<double, milli>(now - h.second).count() << "ms ago." << endl;
RemoveQueue(string("fmq_" + shmId + "_rgq_" + to_string(i)));
}
} else {
cout << "No region counter found. no regions to cleanup." << endl;
}
RemoveObject(managementSegmentName.c_str());
} catch (bie&) {
cout << "Did not find '" << managementSegmentName << "' shared memory segment. No regions to cleanup." << endl;
}
RemoveObject("fmq_" + shmId + "_main");
RemoveMutex("fmq_" + shmId + "_mtx");
cout << endl;
}
void Monitor::PrintHeader()
{
cout << "| "
<< "\033[01;32m" << setw(18) << "name" << "\033[0m" << " | "
<< "\033[01;32m" << setw(10) << "size" << "\033[0m" << " | "
<< "\033[01;32m" << setw(10) << "free" << "\033[0m" << " | "
// << "\033[01;32m" << setw(15) << "all deallocated" << "\033[0m" << " | "
<< "\033[01;32m" << setw(2) << "ok" << "\033[0m" << " | "
// << "\033[01;32m" << setw(10) << "# named" << "\033[0m" << " | "
<< "\033[01;32m" << setw(10) << "# devices" << "\033[0m" << " | "
// << "\033[01;32m" << setw(10) << "# unique" << "\033[0m" << " |"
<< "\033[01;32m" << setw(10) << "ms since" << "\033[0m" << " |"
<< endl;
}
void Monitor::PrintHelp()
{
cout << "controls: [x] close memory, [p] print queues, [h] help, [q] quit." << endl;
}
Monitor::~Monitor()
{
fManagementSegment.destroy<MonitorStatus>(bipc::unique_instance);
if (fSignalThread.joinable()) {
fSignalThread.join();
}
if (fCleanOnExit) {
Cleanup(fShmId);
}
if (!fViewOnly) {
RemoveMutex("fmq_" + fShmId + "_ms");
}
}
} // namespace shmem

View File

@ -14,6 +14,7 @@
#include <chrono>
#include <atomic>
#include <string>
#include <stdexcept>
#include <unordered_map>
namespace fair
@ -26,22 +27,24 @@ namespace shmem
class Monitor
{
public:
Monitor(const std::string& shmId, bool selfDestruct, bool interactive, unsigned int timeoutInMS, bool runAsDaemon, bool cleanOnExit);
Monitor(const std::string& shmId, bool selfDestruct, bool interactive, bool viewOnly, unsigned int timeoutInMS, bool runAsDaemon, bool cleanOnExit);
Monitor(const Monitor&) = delete;
Monitor operator=(const Monitor&) = delete;
virtual ~Monitor();
void CatchSignals();
void Run();
virtual ~Monitor();
static void Cleanup(const std::string& shmId);
static void RemoveObject(const std::string&);
static void RemoveFileMapping(const std::string&);
static void RemoveQueue(const std::string&);
static void RemoveMutex(const std::string&);
struct DaemonPresent : std::runtime_error { using std::runtime_error::runtime_error; };
private:
void PrintHeader();
void PrintHelp();
@ -53,8 +56,9 @@ class Monitor
bool fSelfDestruct; // will self-destruct after the memory has been closed
bool fInteractive; // running in interactive mode
bool fSeenOnce; // true is segment has been opened successfully at least once
bool fViewOnly; // view only mode
bool fIsDaemon;
bool fSeenOnce; // true is segment has been opened successfully at least once
bool fCleanOnExit;
unsigned int fTimeoutInMS;
std::string fShmId;

View File

@ -32,5 +32,6 @@ FairMQ Shared Memory currently uses following names to register shared memory on
`fmq_<shmId>_mng` - management segment name, used for storing management data.
`fmq_<shmId>_cq` - message queue for communicating between shm transport and shm monitor (exists independent of above segments).
`fmq_<shmId>_mtx` - boost::interprocess::named_mutex for management purposes (exists independent of above segments).
`fmq_<shmId>_ms` - shmmonitor status used to signal if it is active or not (exists independent of above segments).
`fmq_<shmId>_rg_<index>` - names of unmanaged regions.
`fmq_<shmId>_rgq_<index>` - names of queues for the unmanaged regions.

View File

@ -74,17 +74,19 @@ int main(int argc, char** argv)
bool cleanup = false;
bool selfDestruct = false;
bool interactive = false;
bool viewOnly = false;
unsigned int timeoutInMS = 5000;
bool runAsDaemon = false;
bool cleanOnExit = false;
options_description desc("Options");
desc.add_options()
("session,s", value<string>(&sessionName)->default_value("default"), "session id which to monitor")
("shmid", value<string>(&shmId)->default_value(""), "Shmem Id to monitor (if not provided, it is generated out of session id and user id)")
("session,s" , value<string>(&sessionName)->default_value("default"), "Session id")
("shmid" , value<string>(&shmId)->default_value(""), "Shmem id (if not provided, it is generated out of session id and user id)")
("cleanup,c" , value<bool>(&cleanup)->implicit_value(true), "Perform cleanup and quit")
("self-destruct,x", value<bool>(&selfDestruct)->implicit_value(true), "Quit after first closing of the memory")
("interactive,i" , value<bool>(&interactive)->implicit_value(true), "Interactive run")
("view,v" , value<bool>(&viewOnly)->implicit_value(true), "Run in view only mode")
("timeout,t" , value<unsigned int>(&timeoutInMS)->default_value(5000), "Heartbeat timeout in milliseconds")
("daemonize,d" , value<bool>(&runAsDaemon)->implicit_value(true), "Daemonize the monitor")
("clean-on-exit,e", value<bool>(&cleanOnExit)->implicit_value(true), "Perform cleanup on exit")
@ -117,10 +119,12 @@ int main(int argc, char** argv)
cout << "Starting shared memory monitor for session: \"" << sessionName << "\" (shmId: " << shmId << ")..." << endl;
Monitor monitor{shmId, selfDestruct, interactive, timeoutInMS, runAsDaemon, cleanOnExit};
Monitor monitor(shmId, selfDestruct, interactive, viewOnly, timeoutInMS, runAsDaemon, cleanOnExit);
monitor.CatchSignals();
monitor.Run();
} catch (Monitor::DaemonPresent& dp) {
return 0;
} catch (exception& e) {
cerr << "Unhandled Exception reached the top of main: " << e.what() << ", application will now exit" << endl;
return 2;