monitor update

This commit is contained in:
Alexey Rybalchenko 2018-01-31 14:48:55 +01:00 committed by Mohammad Al-Turany
parent abcc5083f2
commit 13678f9f6d
7 changed files with 174 additions and 71 deletions

View File

@ -892,13 +892,13 @@ void FairMQDevice::SetConfig(FairMQProgOptions& config)
LOG(warn) << "did not insert channel '" << c.first << "', it is already in the device."; LOG(warn) << "did not insert channel '" << c.first << "', it is already in the device.";
} }
} }
fDefaultTransport = config.GetValue<string>("transport");
SetTransport(fDefaultTransport);
fId = config.GetValue<string>("id"); fId = config.GetValue<string>("id");
fNetworkInterface = config.GetValue<string>("network-interface"); fNetworkInterface = config.GetValue<string>("network-interface");
fNumIoThreads = config.GetValue<int>("io-threads"); fNumIoThreads = config.GetValue<int>("io-threads");
fInitializationTimeoutInS = config.GetValue<int>("initialization-timeout"); fInitializationTimeoutInS = config.GetValue<int>("initialization-timeout");
fRate = fConfig->GetValue<float>("rate"); fRate = fConfig->GetValue<float>("rate");
fDefaultTransport = config.GetValue<string>("transport");
SetTransport(fDefaultTransport);
} }
void FairMQDevice::LogSocketRates() void FairMQDevice::LogSocketRates()

View File

@ -240,6 +240,7 @@ void FairMQProgOptions::InitOptionDescription()
("port-range-max", po::value<int >()->default_value(32000), "End of the port range for dynamic initialization.") ("port-range-max", po::value<int >()->default_value(32000), "End of the port range for dynamic initialization.")
("print-channels", po::value<bool >()->implicit_value(true), "Print registered channel endpoints in a machine-readable format (<channel name>:<min num subchannels>:<max num subchannels>)") ("print-channels", po::value<bool >()->implicit_value(true), "Print registered channel endpoints in a machine-readable format (<channel name>:<min num subchannels>:<max num subchannels>)")
("shm-segment-size", po::value<size_t>()->default_value(2000000000), "Shared memory: size of the shared memory segment (in bytes).") ("shm-segment-size", po::value<size_t>()->default_value(2000000000), "Shared memory: size of the shared memory segment (in bytes).")
("shm-monitor", po::value<bool >()->default_value(false), "Shared memory: run monitor daemon.")
("rate", po::value<float >()->default_value(0.), "Rate for conditional run loop (Hz).") ("rate", po::value<float >()->default_value(0.), "Rate for conditional run loop (Hz).")
("session", po::value<string>()->default_value("default"), "Session name.") ("session", po::value<string>()->default_value("default"), "Session name.")
; ;

View File

@ -12,6 +12,7 @@
#include <zmq.h> #include <zmq.h>
#include <boost/version.hpp> #include <boost/version.hpp>
#include <boost/process.hpp>
#include <boost/filesystem.hpp> #include <boost/filesystem.hpp>
#include <boost/interprocess/managed_shared_memory.hpp> #include <boost/interprocess/managed_shared_memory.hpp>
@ -35,6 +36,7 @@ FairMQ::Transport FairMQTransportFactorySHM::fTransportType = FairMQ::Transport:
FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const FairMQProgOptions* config) FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const FairMQProgOptions* config)
: FairMQTransportFactory(id) : FairMQTransportFactory(id)
, fDeviceId(id)
, fSessionName("default") , fSessionName("default")
, fContext(nullptr) , fContext(nullptr)
, fHeartbeatThread() , fHeartbeatThread()
@ -57,11 +59,13 @@ FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const Fai
int numIoThreads = 1; int numIoThreads = 1;
size_t segmentSize = 2000000000; size_t segmentSize = 2000000000;
bool autolaunchMonitor = false;
if (config) if (config)
{ {
numIoThreads = config->GetValue<int>("io-threads"); numIoThreads = config->GetValue<int>("io-threads");
fSessionName = config->GetValue<string>("session"); fSessionName = config->GetValue<string>("session");
segmentSize = config->GetValue<size_t>("shm-segment-size"); segmentSize = config->GetValue<size_t>("shm-segment-size");
autolaunchMonitor = config->GetValue<bool>("shm-monitor");
} }
else else
{ {
@ -106,24 +110,27 @@ FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const Fai
} }
// start shm monitor // start shm monitor
// try if (autolaunchMonitor)
// { {
// MonitorStatus* monitorStatus = fManagementSegment.find<MonitorStatus>(bipc::unique_instance).first; try
// if (monitorStatus == nullptr) {
// { MonitorStatus* monitorStatus = fManager->ManagementSegment().find<MonitorStatus>(bipc::unique_instance).first;
// LOG(debug) << "no shmmonitor found, starting..."; if (monitorStatus == nullptr)
// StartMonitor(); {
// } LOG(debug) << "no shmmonitor found, starting...";
// else StartMonitor();
// { }
// LOG(debug) << "found shmmonitor."; else
// } {
// } LOG(debug) << "found shmmonitor.";
// catch (std::exception& e) }
// { }
// LOG(error) << "Exception during shmmonitor initialization: " << e.what() << ", application will now exit"; catch (std::exception& e)
// exit(EXIT_FAILURE); {
// } LOG(error) << "Exception during shmmonitor initialization: " << e.what() << ", application will now exit";
exit(EXIT_FAILURE);
}
}
} }
} }
catch(bipc::interprocess_exception& e) catch(bipc::interprocess_exception& e)
@ -140,15 +147,13 @@ void FairMQTransportFactorySHM::StartMonitor()
{ {
int numTries = 0; int numTries = 0;
if (!bfs::exists(bfs::path("shmmonitor"))) auto env = boost::this_process::environment();
{
LOG(error) << "Could not find shmmonitor. Is it in the PATH? Monitor not started";
return;
}
// TODO: replace with Boost.Process once boost 1.64 is available boost::filesystem::path p = boost::process::search_path("shmmonitor");
int r = system("shmmonitor --self-destruct &");
LOG(debug) << r; if (!p.empty())
{
boost::process::spawn(p, "-x", "-s", fSessionName, "-d", "-t", "2000", env);
do do
{ {
@ -161,14 +166,19 @@ void FairMQTransportFactorySHM::StartMonitor()
else else
{ {
this_thread::sleep_for(std::chrono::milliseconds(10)); this_thread::sleep_for(std::chrono::milliseconds(10));
if (++numTries > 100) if (++numTries > 1000)
{ {
LOG(error) << "Did not get response from shmmonitor after " << 10 * 100 << " milliseconds. Exiting."; LOG(error) << "Did not get response from shmmonitor after " << 10 * 1000 << " milliseconds. Exiting.";
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
} }
} }
while (true); while (true);
}
else
{
LOG(WARN) << "could not find shmmonitor in the path";
}
} }
void FairMQTransportFactorySHM::SendHeartbeats() void FairMQTransportFactorySHM::SendHeartbeats()
@ -179,9 +189,8 @@ void FairMQTransportFactorySHM::SendHeartbeats()
try try
{ {
bipc::message_queue mq(bipc::open_only, controlQueueName.c_str()); bipc::message_queue mq(bipc::open_only, controlQueueName.c_str());
bool heartbeat = true;
bpt::ptime sndTill = bpt::microsec_clock::universal_time() + bpt::milliseconds(100); bpt::ptime sndTill = bpt::microsec_clock::universal_time() + bpt::milliseconds(100);
if (mq.timed_send(&heartbeat, sizeof(heartbeat), 0, sndTill)) if (mq.timed_send(fDeviceId.c_str(), fDeviceId.size(), 0, sndTill))
{ {
this_thread::sleep_for(chrono::milliseconds(100)); this_thread::sleep_for(chrono::milliseconds(100));
} }

View File

@ -56,6 +56,7 @@ class FairMQTransportFactorySHM : public FairMQTransportFactory
void StartMonitor(); void StartMonitor();
static FairMQ::Transport fTransportType; static FairMQ::Transport fTransportType;
std::string fDeviceId;
std::string fSessionName; std::string fSessionName;
void* fContext; void* fContext;
std::thread fHeartbeatThread; std::thread fHeartbeatThread;

View File

@ -30,13 +30,13 @@ namespace bipc = boost::interprocess;
namespace bpt = boost::posix_time; namespace bpt = boost::posix_time;
using CharAllocator = bipc::allocator<char, bipc::managed_shared_memory::segment_manager>; using CharAllocator = bipc::allocator<char, bipc::managed_shared_memory::segment_manager>;
using String = bipc::basic_string<char, std::char_traits<char>, CharAllocator>; using String = bipc::basic_string<char, char_traits<char>, CharAllocator>;
using StringAllocator = bipc::allocator<String, bipc::managed_shared_memory::segment_manager>; using StringAllocator = bipc::allocator<String, bipc::managed_shared_memory::segment_manager>;
using StringVector = bipc::vector<String, StringAllocator>; using StringVector = bipc::vector<String, StringAllocator>;
namespace namespace
{ {
volatile std::sig_atomic_t gSignalStatus = 0; volatile sig_atomic_t gSignalStatus = 0;
} }
namespace fair namespace fair
@ -51,10 +51,12 @@ void signalHandler(int signal)
gSignalStatus = signal; gSignalStatus = signal;
} }
Monitor::Monitor(const string& sessionName, bool selfDestruct, bool interactive, unsigned int timeoutInMS) Monitor::Monitor(const string& sessionName, bool selfDestruct, bool interactive, unsigned int timeoutInMS, bool runAsDaemon, bool cleanOnExit)
: fSelfDestruct(selfDestruct) : fSelfDestruct(selfDestruct)
, fInteractive(interactive) , fInteractive(interactive)
, fSeenOnce(false) , fSeenOnce(false)
, fIsDaemon(runAsDaemon)
, fCleanOnExit(cleanOnExit)
, fTimeoutInMS(timeoutInMS) , fTimeoutInMS(timeoutInMS)
, fSessionName(sessionName) , fSessionName(sessionName)
, fSegmentName("fmq_shm_" + fSessionName + "_main") , fSegmentName("fmq_shm_" + fSessionName + "_main")
@ -62,9 +64,10 @@ Monitor::Monitor(const string& sessionName, bool selfDestruct, bool interactive,
, fControlQueueName("fmq_shm_" + fSessionName + "_control_queue") , fControlQueueName("fmq_shm_" + fSessionName + "_control_queue")
, fTerminating(false) , fTerminating(false)
, fHeartbeatTriggered(false) , fHeartbeatTriggered(false)
, fLastHeartbeat() , fLastHeartbeat(chrono::high_resolution_clock::now())
, fSignalThread() , fSignalThread()
, fManagementSegment(bipc::open_or_create, fManagementSegmentName.c_str(), 65536) , fManagementSegment(bipc::open_or_create, fManagementSegmentName.c_str(), 65536)
, fDeviceHeartbeats()
{ {
MonitorStatus* monitorStatus = fManagementSegment.find<MonitorStatus>(bipc::unique_instance).first; MonitorStatus* monitorStatus = fManagementSegment.find<MonitorStatus>(bipc::unique_instance).first;
if (monitorStatus != nullptr) if (monitorStatus != nullptr)
@ -127,19 +130,21 @@ void Monitor::MonitorHeartbeats()
{ {
try try
{ {
bipc::message_queue mq(bipc::open_or_create, fControlQueueName.c_str(), 1000, sizeof(bool)); bipc::message_queue mq(bipc::open_or_create, fControlQueueName.c_str(), 1000, 256);
unsigned int priority; unsigned int priority;
bipc::message_queue::size_type recvdSize; bipc::message_queue::size_type recvdSize;
char msg[256] = {0};
while (!fTerminating) while (!fTerminating)
{ {
bool heartbeat;
bpt::ptime rcvTill = bpt::microsec_clock::universal_time() + bpt::milliseconds(100); bpt::ptime rcvTill = bpt::microsec_clock::universal_time() + bpt::milliseconds(100);
if (mq.timed_receive(&heartbeat, sizeof(heartbeat), recvdSize, priority, rcvTill)) if (mq.timed_receive(&msg, sizeof(msg), recvdSize, priority, rcvTill))
{ {
fHeartbeatTriggered = true; fHeartbeatTriggered = true;
fLastHeartbeat = chrono::high_resolution_clock::now(); fLastHeartbeat = chrono::high_resolution_clock::now();
string deviceId(msg, recvdSize);
fDeviceHeartbeats[deviceId] = fLastHeartbeat;
} }
else else
{ {
@ -186,27 +191,27 @@ void Monitor::Interactive()
switch (c) switch (c)
{ {
case 'q': case 'q':
cout << "[q] --> quitting." << endl; cout << "\n[q] --> quitting." << endl;
fTerminating = true; fTerminating = true;
break; break;
case 'p': case 'p':
cout << "[p] --> active queues:" << endl; cout << "\n[p] --> active queues:" << endl;
PrintQueues(); PrintQueues();
break; break;
case 'x': case 'x':
cout << "[x] --> closing shared memory:" << endl; cout << "\n[x] --> closing shared memory:" << endl;
Cleanup(fSessionName); Cleanup(fSessionName);
break; break;
case 'h': case 'h':
cout << "[h] --> help:" << endl << endl; cout << "\n[h] --> help:" << endl << endl;
PrintHelp(); PrintHelp();
cout << endl; cout << endl;
break; break;
case '\n': case '\n':
cout << "[\\n] --> invalid input." << endl; cout << "\n[\\n] --> invalid input." << endl;
break; break;
default: default:
cout << "[" << c << "] --> invalid input." << endl; cout << "\n[" << c << "] --> invalid input." << endl;
break; break;
} }
@ -290,7 +295,7 @@ void Monitor::CheckSegment()
fHeartbeatTriggered = false; fHeartbeatTriggered = false;
if (fSelfDestruct) if (fSelfDestruct)
{ {
cout << "self destructing" << endl; cout << "\nself destructing" << endl;
fTerminating = true; fTerminating = true;
} }
} }
@ -327,6 +332,21 @@ void Monitor::CheckSegment()
<< c << c
<< flush; << flush;
} }
auto now = chrono::high_resolution_clock::now();
unsigned int duration = chrono::duration_cast<chrono::milliseconds>(now - fLastHeartbeat).count();
if (fIsDaemon && duration > fTimeoutInMS * 2)
{
Cleanup(fSessionName);
fHeartbeatTriggered = false;
if (fSelfDestruct)
{
cout << "\nself destructing" << endl;
fTerminating = true;
}
}
if (fSelfDestruct) if (fSelfDestruct)
{ {
if (fSeenOnce) if (fSeenOnce)
@ -352,7 +372,7 @@ void Monitor::Cleanup(const string& sessionName)
for (unsigned int i = 1; i <= regionCount; ++i) for (unsigned int i = 1; i <= regionCount; ++i)
{ {
RemoveObject("fmq_shm_" + sessionName + "_region_" + to_string(i)); RemoveObject("fmq_shm_" + sessionName + "_region_" + to_string(i));
RemoveQueue(std::string("fmq_shm_" + sessionName + "_region_queue_" + std::to_string(i))); RemoveQueue(string("fmq_shm_" + sessionName + "_region_queue_" + to_string(i)));
} }
} }
else else
@ -369,12 +389,12 @@ void Monitor::Cleanup(const string& sessionName)
RemoveObject("fmq_shm_" + sessionName + "_main"); RemoveObject("fmq_shm_" + sessionName + "_main");
boost::interprocess::named_mutex::remove(std::string("fmq_shm_" + sessionName + "_mutex").c_str()); boost::interprocess::named_mutex::remove(string("fmq_shm_" + sessionName + "_mutex").c_str());
cout << endl; cout << endl;
} }
void Monitor::RemoveObject(const std::string& name) void Monitor::RemoveObject(const string& name)
{ {
if (bipc::shared_memory_object::remove(name.c_str())) if (bipc::shared_memory_object::remove(name.c_str()))
{ {
@ -386,7 +406,7 @@ void Monitor::RemoveObject(const std::string& name)
} }
} }
void Monitor::RemoveQueue(const std::string& name) void Monitor::RemoveQueue(const string& name)
{ {
if (bipc::message_queue::remove(name.c_str())) if (bipc::message_queue::remove(name.c_str()))
{ {
@ -405,7 +425,7 @@ void Monitor::PrintQueues()
try try
{ {
bipc::managed_shared_memory segment(bipc::open_only, fSegmentName.c_str()); bipc::managed_shared_memory segment(bipc::open_only, fSegmentName.c_str());
StringVector* queues = segment.find<StringVector>(std::string("fmq_shm_" + fSessionName + "_queues").c_str()).first; StringVector* queues = segment.find<StringVector>(string("fmq_shm_" + fSessionName + "_queues").c_str()).first;
if (queues) if (queues)
{ {
cout << "found " << queues->size() << " queue(s):" << endl; cout << "found " << queues->size() << " queue(s):" << endl;
@ -434,11 +454,18 @@ void Monitor::PrintQueues()
{ {
cout << "\tno queues found" << endl; cout << "\tno queues found" << endl;
} }
catch (std::out_of_range& ie) catch (out_of_range& ie)
{ {
cout << "\tno queues found" << endl; cout << "\tno queues found" << endl;
} }
cout << "\n --> last heartbeats: " << endl << endl;
auto now = chrono::high_resolution_clock::now();
for (const auto& h : fDeviceHeartbeats)
{
cout << "\t" << h.first << " : " << chrono::duration<double, milli>(now - h.second).count() << "ms ago." << endl;
}
cout << endl; cout << endl;
} }
@ -469,6 +496,10 @@ Monitor::~Monitor()
{ {
fSignalThread.join(); fSignalThread.join();
} }
if (fCleanOnExit)
{
Cleanup(fSessionName);
}
} }
} // namespace shmem } // namespace shmem

View File

@ -14,6 +14,7 @@
#include <chrono> #include <chrono>
#include <atomic> #include <atomic>
#include <string> #include <string>
#include <unordered_map>
namespace fair namespace fair
{ {
@ -25,7 +26,7 @@ namespace shmem
class Monitor class Monitor
{ {
public: public:
Monitor(const std::string& sessionName, bool selfDestruct, bool interactive, unsigned int timeoutInMS); Monitor(const std::string& sessionName, bool selfDestruct, bool interactive, unsigned int timeoutInMS, bool runAsDaemon, bool cleanOnExit);
Monitor(const Monitor&) = delete; Monitor(const Monitor&) = delete;
Monitor operator=(const Monitor&) = delete; Monitor operator=(const Monitor&) = delete;
@ -51,6 +52,8 @@ class Monitor
bool fSelfDestruct; // will self-destruct after the memory has been closed bool fSelfDestruct; // will self-destruct after the memory has been closed
bool fInteractive; // running in interactive mode bool fInteractive; // running in interactive mode
bool fSeenOnce; // true is segment has been opened successfully at least once bool fSeenOnce; // true is segment has been opened successfully at least once
bool fIsDaemon;
bool fCleanOnExit;
unsigned int fTimeoutInMS; unsigned int fTimeoutInMS;
std::string fSessionName; std::string fSessionName;
std::string fSegmentName; std::string fSegmentName;
@ -61,6 +64,7 @@ class Monitor
std::chrono::high_resolution_clock::time_point fLastHeartbeat; std::chrono::high_resolution_clock::time_point fLastHeartbeat;
std::thread fSignalThread; std::thread fSignalThread;
boost::interprocess::managed_shared_memory fManagementSegment; boost::interprocess::managed_shared_memory fManagementSegment;
std::unordered_map<std::string, std::chrono::high_resolution_clock::time_point> fDeviceHeartbeats;
}; };
} // namespace shmem } // namespace shmem

View File

@ -9,12 +9,60 @@
#include <boost/program_options.hpp> #include <boost/program_options.hpp>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <iostream> #include <iostream>
#include <string> #include <string>
using namespace std; using namespace std;
using namespace boost::program_options; using namespace boost::program_options;
static void daemonize()
{
// already a daemon?
// if (getppid() == 1) return;
// Fork off the parent process
// pid_t pid = fork();
// if (pid < 0) exit(1);
// If we got a good PID, then we can exit the parent process.
// if (pid > 0) exit(0);
// Change the file mode mask
umask(0);
// Create a new SID for the child process
if (setsid() < 0)
{
exit(1);
}
// Change the current working directory. This prevents the current directory from being locked; hence not being able to remove it.
if ((chdir("/")) < 0)
{
exit(1);
}
// Redirect standard files to /dev/null
if (!freopen("/dev/null", "r", stdin))
{
cout << "could not redirect stdin to /dev/null" << endl;
}
if (!freopen("/dev/null", "w", stdout))
{
cout << "could not redirect stdout to /dev/null" << endl;
}
if (!freopen("/dev/null", "w", stderr))
{
cout << "could not redirect stderr to /dev/null" << endl;
}
}
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
try try
@ -24,15 +72,19 @@ int main(int argc, char** argv)
bool selfDestruct = false; bool selfDestruct = false;
bool interactive = false; bool interactive = false;
unsigned int timeoutInMS; unsigned int timeoutInMS;
bool runAsDaemon = false;
bool cleanOnExit = false;
options_description desc("Options"); options_description desc("Options");
desc.add_options() desc.add_options()
("session", value<string>(&sessionName)->default_value("default"), "Name of the session which to monitor") ("session,s", value<string>(&sessionName)->default_value("default"), "Name of the session which to monitor")
("cleanup", value<bool>(&cleanup)->implicit_value(true), "Perform cleanup and quit") ("cleanup,c", value<bool>(&cleanup)->implicit_value(true), "Perform cleanup and quit")
("self-destruct", value<bool>(&selfDestruct)->implicit_value(true), "Quit after first closing of the memory") ("self-destruct,x", value<bool>(&selfDestruct)->implicit_value(true), "Quit after first closing of the memory")
("interactive", value<bool>(&interactive)->implicit_value(true), "Interactive run") ("interactive,i", value<bool>(&interactive)->implicit_value(true), "Interactive run")
("timeout", value<unsigned int>(&timeoutInMS)->default_value(5000), "Heartbeat timeout in milliseconds") ("timeout,t", value<unsigned int>(&timeoutInMS)->default_value(5000), "Heartbeat timeout in milliseconds")
("help", "Print help"); ("daemonize,d", value<bool>(&runAsDaemon)->implicit_value(true), "Daemonize the monitor")
("clean-on-exit,e", value<bool>(&cleanOnExit)->implicit_value(true), "Perform cleanup on exit")
("help,h", "Print help");
variables_map vm; variables_map vm;
store(parse_command_line(argc, argv, desc), vm); store(parse_command_line(argc, argv, desc), vm);
@ -47,6 +99,11 @@ int main(int argc, char** argv)
sessionName.resize(8, '_'); // shorten the session name, to accommodate for name size limit on some systems (MacOS) sessionName.resize(8, '_'); // shorten the session name, to accommodate for name size limit on some systems (MacOS)
if (runAsDaemon)
{
daemonize();
}
if (cleanup) if (cleanup)
{ {
cout << "Cleaning up \"" << sessionName << "\"..." << endl; cout << "Cleaning up \"" << sessionName << "\"..." << endl;
@ -57,7 +114,7 @@ int main(int argc, char** argv)
cout << "Starting shared memory monitor for session: \"" << sessionName << "\"..." << endl; cout << "Starting shared memory monitor for session: \"" << sessionName << "\"..." << endl;
fair::mq::shmem::Monitor monitor{sessionName, selfDestruct, interactive, timeoutInMS}; fair::mq::shmem::Monitor monitor{sessionName, selfDestruct, interactive, timeoutInMS, runAsDaemon, cleanOnExit};
monitor.CatchSignals(); monitor.CatchSignals();
monitor.Run(); monitor.Run();