From 44a59f25a7397f805dec027d4b9aead83be54650 Mon Sep 17 00:00:00 2001 From: Dennis Klein Date: Wed, 20 Sep 2017 01:46:10 +0200 Subject: [PATCH] FairMQ: Move --catch-signals logic to control plugin * Add StealDeviceControl() API to plugin services --- fairmq/FairMQDevice.cxx | 52 ---- fairmq/FairMQDevice.h | 5 - fairmq/Plugin.h | 3 +- fairmq/PluginServices.cxx | 18 +- fairmq/PluginServices.h | 10 +- fairmq/options/FairMQProgOptions.cxx | 6 - fairmq/plugins/Control.cxx | 346 +++++++++++++++--------- fairmq/plugins/Control.h | 7 + fairmq/runFairMQDevice.h | 10 - fairmq/test/plugins/_plugin_manager.cxx | 8 +- 10 files changed, 256 insertions(+), 209 deletions(-) diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index ac54916f..8898ee17 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -7,7 +7,6 @@ ********************************************************************************/ #include -#include // catching system signals #include #include #include @@ -36,13 +35,6 @@ using namespace std; -// function and a wrapper to catch the signals -function sigHandler; -static void CallSignalHandler(int signal) -{ - sigHandler(signal); -} - FairMQDevice::FairMQDevice() : fTransportFactory(nullptr) , fTransports() @@ -58,8 +50,6 @@ FairMQDevice::FairMQDevice() , fNetworkInterface() , fDefaultTransport() , fInitializationTimeoutInS(120) - , fCatchingSignals(false) - , fTerminationRequested(false) , fDataCallbacks(false) , fDeviceCmdSockets() , fMsgInputs() @@ -89,8 +79,6 @@ FairMQDevice::FairMQDevice(const fair::mq::tools::Version version) , fNetworkInterface() , fDefaultTransport() , fInitializationTimeoutInS(120) - , fCatchingSignals(false) - , fTerminationRequested(false) , fDataCallbacks(false) , fDeviceCmdSockets() , fMsgInputs() @@ -105,46 +93,6 @@ FairMQDevice::FairMQDevice(const fair::mq::tools::Version version) { } -void FairMQDevice::CatchSignals() -{ - if (!fCatchingSignals) - { - sigHandler = bind1st(mem_fun(&FairMQDevice::SignalHandler), this); - signal(SIGINT, CallSignalHandler); - signal(SIGTERM, CallSignalHandler); - fCatchingSignals = true; - } -} - -void FairMQDevice::SignalHandler(int signal) -{ - LOG(INFO) << "Caught signal " << signal; - - if (!fTerminationRequested) - { - fTerminationRequested = true; - - ChangeState(STOP); - - ChangeState(RESET_TASK); - WaitForEndOfState(RESET_TASK); - - ChangeState(RESET_DEVICE); - WaitForEndOfState(RESET_DEVICE); - - ChangeState(END); - - // exit(EXIT_FAILURE); - LOG(INFO) << "Exiting."; - } - else - { - LOG(WARN) << "Repeated termination or bad initialization? Aborting."; - abort(); - // exit(EXIT_FAILURE); - } -} - void FairMQDevice::InitWrapper() { if (!fTransportFactory) diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index f66b82a6..98a7ce76 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -556,11 +556,6 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable void CreateOwnConfig(); - /// Signal handler - void SignalHandler(int signal); - bool fCatchingSignals; - std::atomic fTerminationRequested; - bool fDataCallbacks; std::unordered_map fDeviceCmdSockets; ///< Sockets used for the internal unblocking mechanism std::unordered_map fMsgInputs; diff --git a/fairmq/Plugin.h b/fairmq/Plugin.h index 3d0bc120..f83bf506 100644 --- a/fairmq/Plugin.h +++ b/fairmq/Plugin.h @@ -76,11 +76,12 @@ class Plugin auto ToStr(DeviceStateTransition transition) const -> std::string { return fPluginServices->ToStr(transition); } auto GetCurrentDeviceState() const -> DeviceState { return fPluginServices->GetCurrentDeviceState(); } auto TakeDeviceControl() -> void { fPluginServices->TakeDeviceControl(fkName); }; + auto StealDeviceControl() -> void { fPluginServices->StealDeviceControl(fkName); }; auto ReleaseDeviceControl() -> void { fPluginServices->ReleaseDeviceControl(fkName); }; auto ChangeDeviceState(const DeviceStateTransition next) -> void { fPluginServices->ChangeDeviceState(fkName, next); } auto SubscribeToDeviceStateChange(std::function callback) -> void { fPluginServices->SubscribeToDeviceStateChange(fkName, callback); } auto UnsubscribeFromDeviceStateChange() -> void { fPluginServices->UnsubscribeFromDeviceStateChange(fkName); } - auto DeviceTerminated() const -> bool { return fPluginServices->DeviceTerminated(); } + // device config API // see for docs template diff --git a/fairmq/PluginServices.cxx b/fairmq/PluginServices.cxx index d76857ec..0511137c 100644 --- a/fairmq/PluginServices.cxx +++ b/fairmq/PluginServices.cxx @@ -44,6 +44,7 @@ const std::unordered_map Plu {"INIT TASK", DeviceStateTransition::InitTask}, {"RUN", DeviceStateTransition::Run}, {"PAUSE", DeviceStateTransition::Pause}, + {"RESUME", DeviceStateTransition::Resume}, {"STOP", DeviceStateTransition::Stop}, {"RESET TASK", DeviceStateTransition::ResetTask}, {"RESET DEVICE", DeviceStateTransition::ResetDevice}, @@ -55,6 +56,7 @@ const std::unordered_map void { - // lock_guard lock{fDeviceControllerMutex}; - // - // if (!fDeviceController) fDeviceController = controller; + lock_guard lock{fDeviceControllerMutex}; + + if (!fDeviceController) fDeviceController = controller; if (fDeviceController == controller) { @@ -125,7 +128,16 @@ auto PluginServices::TakeDeviceControl(const std::string& controller) -> void "Currently, plugin '", fDeviceController, "' has taken control." )}; } +} +auto PluginServices::StealDeviceControl(const std::string& controller) -> void +{ + lock_guard lock{fDeviceControllerMutex}; + + if (!fDeviceController) + { + fDeviceController = controller; + } } auto PluginServices::ReleaseDeviceControl(const std::string& controller) -> void diff --git a/fairmq/PluginServices.h b/fairmq/PluginServices.h index 112ae868..ba6dd042 100644 --- a/fairmq/PluginServices.h +++ b/fairmq/PluginServices.h @@ -73,6 +73,7 @@ class PluginServices InitTask, Run, Pause, + Resume, Stop, ResetTask, ResetDevice, @@ -118,6 +119,13 @@ class PluginServices auto TakeDeviceControl(const std::string& controller) -> void; struct DeviceControlError : std::runtime_error { using std::runtime_error::runtime_error; }; + /// @brief Become device controller by force + /// @param controller id + /// + /// Take over device controller privileges by force. Does not trigger the ReleaseDeviceControl condition! + /// This function is intended to implement override/emergency control functionality (e.g. device shutdown on SIGINT). + auto StealDeviceControl(const std::string& controller) -> void; + /// @brief Release device controller role /// @param controller id /// @throws fair::mq::PluginServices::DeviceControlError if passed controller id is not the current device controller. @@ -156,8 +164,6 @@ class PluginServices /// @param subscriber id auto UnsubscribeFromDeviceStateChange(const std::string& subscriber) -> void { fDevice->UnsubscribeFromStateChange(subscriber); } - auto DeviceTerminated() const -> bool { return fDevice->Terminated(); } - // Config API /// @brief Set config property diff --git a/fairmq/options/FairMQProgOptions.cxx b/fairmq/options/FairMQProgOptions.cxx index 2c9627a2..aa4a1064 100644 --- a/fairmq/options/FairMQProgOptions.cxx +++ b/fairmq/options/FairMQProgOptions.cxx @@ -83,12 +83,6 @@ void FairMQProgOptions::ParseAll(const int argc, char const* const* argv, bool a return; } - if (fVarMap.count("id") == 0) - { - LOG(ERROR) << "Device id not provided, provide with --id"; - exit(EXIT_FAILURE); - } - string verbosity = GetValue("verbosity"); string logFile = GetValue("log-to-file"); bool color = GetValue("log-color"); diff --git a/fairmq/plugins/Control.cxx b/fairmq/plugins/Control.cxx index 648c17a8..8babf320 100644 --- a/fairmq/plugins/Control.cxx +++ b/fairmq/plugins/Control.cxx @@ -10,9 +10,21 @@ #include // for the interactive mode #include // for the interactive mode +#include // catching system signals +#include using namespace std; +namespace +{ + std::function gSignalHandlerClosure; + + extern "C" auto signal_handler(int signal) -> void + { + gSignalHandlerClosure(signal); + } +} + namespace fair { namespace mq @@ -23,6 +35,8 @@ namespace plugins Control::Control(const string name, const Plugin::Version version, const string maintainer, const string homepage, PluginServices* pluginServices) : Plugin(name, version, maintainer, homepage, pluginServices) , fControllerThread() + , fSignalHandlerThread() + , fDeviceTerminationRequested{false} , fEvents() , fEventsMutex() , fNewEvent() @@ -45,9 +59,21 @@ Control::Control(const string name, const Plugin::Version version, const string } else { - LOG(ERROR) << "Unrecognized control mode '" << control << "' requested via command line. " << "Ignoring and falling back to static control mode."; + LOG(ERROR) << "Unrecognized control mode '" << control << "' requested. " << "Ignoring and falling back to static control mode."; fControllerThread = thread(&Control::StaticMode, this); } + + LOG(DEBUG) << "catch-signals: " << GetProperty("catch-signals"); + if (GetProperty("catch-signals") > 0) + { + gSignalHandlerClosure = bind(&Control::SignalHandler, this, placeholders::_1); + signal(SIGINT, signal_handler); + signal(SIGTERM, signal_handler); + } + else + { + LOG(WARN) << "Signal handling (e.g. Ctrl-C) has been deactivated."; + } } catch (PluginServices::DeviceControlError& e) { @@ -58,132 +84,123 @@ Control::Control(const string name, const Plugin::Version version, const string auto ControlPluginProgramOptions() -> Plugin::ProgOptions { - auto pluginOptions = boost::program_options::options_description{"Control (builtin) Plugin"}; + namespace po = boost::program_options; + auto pluginOptions = po::options_description{"Control (builtin) Plugin"}; pluginOptions.add_options() - ("control", boost::program_options::value()->default_value("interactive"), "Control mode, 'static' or 'interactive'"); + ("control", po::value()->default_value("interactive"), "Control mode, 'static' or 'interactive'") + ("catch-signals", po::value()->default_value(1), "Enable signal handling (1/0)."); return pluginOptions; } auto Control::InteractiveMode() -> void { - SubscribeToDeviceStateChange([&](DeviceState newState) + try { + SubscribeToDeviceStateChange([&](DeviceState newState) { - lock_guard lock{fEventsMutex}; - fEvents.push(newState); - } - fNewEvent.notify_one(); - }); + { + lock_guard lock{fEventsMutex}; + fEvents.push(newState); + } + fNewEvent.notify_one(); + }); - ChangeDeviceState(DeviceStateTransition::InitDevice); - while (WaitForNextState() != DeviceState::DeviceReady) {} - ChangeDeviceState(DeviceStateTransition::InitTask); - while (WaitForNextState() != DeviceState::Ready) {} - ChangeDeviceState(DeviceStateTransition::Run); + RunStartupSequence(); - char input; // hold the user console input - pollfd cinfd[1]; - cinfd[0].fd = fileno(stdin); - cinfd[0].events = POLLIN; + char input; // hold the user console input + pollfd cinfd[1]; + cinfd[0].fd = fileno(stdin); + cinfd[0].events = POLLIN; - struct termios t; - tcgetattr(STDIN_FILENO, &t); // get the current terminal I/O structure - t.c_lflag &= ~ICANON; // disable canonical input - tcsetattr(STDIN_FILENO, TCSANOW, &t); // apply the new settings + struct termios t; + tcgetattr(STDIN_FILENO, &t); // get the current terminal I/O structure + t.c_lflag &= ~ICANON; // disable canonical input + tcsetattr(STDIN_FILENO, TCSANOW, &t); // apply the new settings - PrintInteractiveHelp(); + PrintInteractiveHelp(); - bool keepRunning = true; + bool keepRunning = true; - while (keepRunning) - { - if (poll(cinfd, 1, 500)) + while (keepRunning) { - if (DeviceTerminated()) + if (poll(cinfd, 1, 500)) + { + if (fDeviceTerminationRequested) + { + break; + } + + cin >> input; + + switch (input) + { + case 'i': + LOG(INFO) << "[i] init device"; + ChangeDeviceState(DeviceStateTransition::InitDevice); + break; + case 'j': + LOG(INFO) << "[j] init task"; + ChangeDeviceState(DeviceStateTransition::InitTask); + break; + case 'p': + LOG(INFO) << "[p] pause"; + ChangeDeviceState(DeviceStateTransition::Pause); + break; + case 'r': + LOG(INFO) << "[r] run"; + ChangeDeviceState(DeviceStateTransition::Run); + break; + case 's': + LOG(INFO) << "[s] stop"; + ChangeDeviceState(DeviceStateTransition::Stop); + break; + case 't': + LOG(INFO) << "[t] reset task"; + ChangeDeviceState(DeviceStateTransition::ResetTask); + break; + case 'd': + LOG(INFO) << "[d] reset device"; + ChangeDeviceState(DeviceStateTransition::ResetDevice); + break; + case 'h': + LOG(INFO) << "[h] help"; + PrintInteractiveHelp(); + break; + // case 'x': + // LOG(INFO) << "[x] ERROR"; + // ChangeDeviceState(DeviceStateTransition::ERROR_FOUND); + // break; + case 'q': + LOG(INFO) << "[q] end"; + keepRunning = false; + break; + default: + LOG(INFO) << "Invalid input: [" << input << "]"; + PrintInteractiveHelp(); + break; + } + } + + if (fDeviceTerminationRequested) { keepRunning = false; - break; - } - - cin >> input; - - switch (input) - { - case 'i': - LOG(INFO) << "[i] init device"; - ChangeDeviceState(DeviceStateTransition::InitDevice); - break; - case 'j': - LOG(INFO) << "[j] init task"; - ChangeDeviceState(DeviceStateTransition::InitTask); - break; - case 'p': - LOG(INFO) << "[p] pause"; - ChangeDeviceState(DeviceStateTransition::Pause); - break; - case 'r': - LOG(INFO) << "[r] run"; - ChangeDeviceState(DeviceStateTransition::Run); - break; - case 's': - LOG(INFO) << "[s] stop"; - ChangeDeviceState(DeviceStateTransition::Stop); - break; - case 't': - LOG(INFO) << "[t] reset task"; - ChangeDeviceState(DeviceStateTransition::ResetTask); - break; - case 'd': - LOG(INFO) << "[d] reset device"; - ChangeDeviceState(DeviceStateTransition::ResetDevice); - break; - case 'h': - LOG(INFO) << "[h] help"; - PrintInteractiveHelp(); - break; - // case 'x': - // LOG(INFO) << "[x] ERROR"; - // ChangeDeviceState(DeviceStateTransition::ERROR_FOUND); - // break; - case 'q': - LOG(INFO) << "[q] end"; - - ChangeDeviceState(DeviceStateTransition::Stop); - - ChangeDeviceState(DeviceStateTransition::ResetTask); - while (WaitForNextState() != DeviceState::DeviceReady) {} - - ChangeDeviceState(DeviceStateTransition::ResetDevice); - while (WaitForNextState() != DeviceState::Idle) {} - - ChangeDeviceState(DeviceStateTransition::End); - - if (GetCurrentDeviceState() == DeviceState::Exiting) - { - keepRunning = false; - } - - LOG(INFO) << "Exiting."; - break; - default: - LOG(INFO) << "Invalid input: [" << input << "]"; - PrintInteractiveHelp(); - break; } } - if (DeviceTerminated()) + tcgetattr(STDIN_FILENO, &t); // get the current terminal I/O structure + t.c_lflag |= ICANON; // re-enable canonical input + tcsetattr(STDIN_FILENO, TCSANOW, &t); // apply the new settings + + if (!fDeviceTerminationRequested) { - keepRunning = false; + RunShutdownSequence(); } } - - tcgetattr(STDIN_FILENO, &t); // get the current terminal I/O structure - t.c_lflag |= ICANON; // re-enable canonical input - tcsetattr(STDIN_FILENO, TCSANOW, &t); // apply the new settings - - UnsubscribeFromDeviceStateChange(); - ReleaseDeviceControl(); + catch (PluginServices::DeviceControlError& e) + { + // If we are here, it means another plugin has taken control. That's fine, just print the exception message and do nothing else. + LOG(DEBUG) << e.what(); + } } auto Control::PrintInteractiveHelp() -> void @@ -207,42 +224,117 @@ auto Control::WaitForNextState() -> DeviceState auto Control::StaticMode() -> void { - SubscribeToDeviceStateChange([&](DeviceState newState) + try { + SubscribeToDeviceStateChange([&](DeviceState newState) { - lock_guard lock{fEventsMutex}; - fEvents.push(newState); + { + lock_guard lock{fEventsMutex}; + fEvents.push(newState); + } + fNewEvent.notify_one(); + }); + + RunStartupSequence(); + + { + // Wait for next state, which is DeviceState::Ready, + // or for device termination request + unique_lock lock{fEventsMutex}; + while (fEvents.empty() && !fDeviceTerminationRequested) + { + fNewEvent.wait(lock); + } } - fNewEvent.notify_one(); - }); - ChangeDeviceState(DeviceStateTransition::InitDevice); - while (WaitForNextState() != DeviceState::DeviceReady) {} - - ChangeDeviceState(DeviceStateTransition::InitTask); - while (WaitForNextState() != DeviceState::Ready) {} - ChangeDeviceState(DeviceStateTransition::Run); - while (WaitForNextState() != DeviceState::Ready) {} - if (!DeviceTerminated()) + if (!fDeviceTerminationRequested) + { + RunShutdownSequence(); + } + } + catch (PluginServices::DeviceControlError& e) { - ChangeDeviceState(DeviceStateTransition::ResetTask); - while (WaitForNextState() != DeviceState::DeviceReady) {} - ChangeDeviceState(DeviceStateTransition::ResetDevice); - while (WaitForNextState() != DeviceState::Idle) {} - ChangeDeviceState(DeviceStateTransition::End); - while (WaitForNextState() != DeviceState::Exiting) {} + // If we are here, it means another plugin has taken control. That's fine, just print the exception message and do nothing else. + LOG(DEBUG) << e.what(); + } +} + +auto Control::SignalHandler(int signal) -> void +{ + + if (!fDeviceTerminationRequested) + { + fDeviceTerminationRequested = true; + + StealDeviceControl(); + + LOG(INFO) << "Received device shutdown request (signal " << signal << ")."; + LOG(INFO) << "Waiting for graceful device shutdown. Hit Ctrl-C again to abort immediately."; + + fSignalHandlerThread = thread(&Control::RunShutdownSequence, this); + } + else + { + LOG(WARN) << "Received 2nd device shutdown request (signal " << signal << ")."; + LOG(WARN) << "Aborting immediately !"; + abort(); + } +} + +auto Control::RunShutdownSequence() -> void +{ + auto nextState = GetCurrentDeviceState(); + EmptyEventQueue(); + while (nextState != DeviceState::Exiting) + { + switch (nextState) + { + case DeviceState::Idle: + ChangeDeviceState(DeviceStateTransition::End); + break; + case DeviceState::DeviceReady: + ChangeDeviceState(DeviceStateTransition::ResetDevice); + break; + case DeviceState::Ready: + ChangeDeviceState(DeviceStateTransition::ResetTask); + break; + case DeviceState::Running: + ChangeDeviceState(DeviceStateTransition::Stop); + break; + case DeviceState::Paused: + ChangeDeviceState(DeviceStateTransition::Resume); + break; + default: + break; + } + + nextState = WaitForNextState(); } UnsubscribeFromDeviceStateChange(); ReleaseDeviceControl(); } +auto Control::RunStartupSequence() -> void +{ + ChangeDeviceState(DeviceStateTransition::InitDevice); + while (WaitForNextState() != DeviceState::DeviceReady) {} + ChangeDeviceState(DeviceStateTransition::InitTask); + while (WaitForNextState() != DeviceState::Ready) {} + ChangeDeviceState(DeviceStateTransition::Run); + while (WaitForNextState() != DeviceState::Running) {} +} + +auto Control::EmptyEventQueue() -> void +{ + lock_guard lock{fEventsMutex}; + fEvents = queue{}; +} + Control::~Control() { - if (fControllerThread.joinable()) - { - fControllerThread.join(); - } + if (fControllerThread.joinable()) fControllerThread.join(); + if (fSignalHandlerThread.joinable()) fSignalHandlerThread.join(); } } /* namespace plugins */ diff --git a/fairmq/plugins/Control.h b/fairmq/plugins/Control.h index d238744f..85a891aa 100644 --- a/fairmq/plugins/Control.h +++ b/fairmq/plugins/Control.h @@ -16,6 +16,7 @@ #include #include #include +#include namespace fair { @@ -36,11 +37,17 @@ class Control : public Plugin auto PrintInteractiveHelp() -> void; auto StaticMode() -> void; auto WaitForNextState() -> DeviceState; + auto SignalHandler(int signal) -> void; + auto RunShutdownSequence() -> void; + auto RunStartupSequence() -> void; + auto EmptyEventQueue() -> void; std::thread fControllerThread; + std::thread fSignalHandlerThread; std::queue fEvents; std::mutex fEventsMutex; std::condition_variable fNewEvent; + std::atomic fDeviceTerminationRequested; }; /* class Control */ auto ControlPluginProgramOptions() -> Plugin::ProgOptions; diff --git a/fairmq/runFairMQDevice.h b/fairmq/runFairMQDevice.h index 442956d5..897e319b 100644 --- a/fairmq/runFairMQDevice.h +++ b/fairmq/runFairMQDevice.h @@ -95,16 +95,6 @@ int main(int argc, const char** argv) return 0; } - // Handle --catch-signals - if (config.GetValue("catch-signals") > 0) - { - device->CatchSignals(); - } - else - { - LOG(WARN) << "Signal handling (e.g. ctrl+C) has been deactivated via command line argument"; - } - LOG(DEBUG) << "PID: " << getpid(); // Configure device diff --git a/fairmq/test/plugins/_plugin_manager.cxx b/fairmq/test/plugins/_plugin_manager.cxx index da5d4114..32fecc62 100644 --- a/fairmq/test/plugins/_plugin_manager.cxx +++ b/fairmq/test/plugins/_plugin_manager.cxx @@ -67,15 +67,17 @@ TEST(PluginManager, LoadPluginDynamic) TEST(PluginManager, LoadPluginStatic) { - FairMQProgOptions config{}; auto mgr = PluginManager{}; auto device = make_shared(); - mgr.EmplacePluginServices(&config, device); - device->SetTransport("zeromq"); ASSERT_NO_THROW(mgr.LoadPlugin("s:control")); + FairMQProgOptions config{}; + config.SetValue("control", "static"); + config.SetValue("catch-signals", 0); + mgr.EmplacePluginServices(&config, device); + ASSERT_NO_THROW(mgr.InstantiatePlugins()); // check order