diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index 2154305e..9087a965 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -222,7 +222,7 @@ void FairMQDevice::InitWrapper() AttachChannels(uninitializedConnectingChannels); } - Init(); + CallAndHandleError(std::bind(&FairMQDevice::Init, this)); ChangeState(internal_DEVICE_READY); } @@ -428,7 +428,7 @@ void FairMQDevice::InitTaskWrapper() { CallStateChangeCallbacks(INITIALIZING_TASK); - InitTask(); + CallAndHandleError(std::bind(&FairMQDevice::InitTask, this)); ChangeState(internal_READY); } @@ -503,43 +503,46 @@ void FairMQDevice::RunWrapper() t.second->Resume(); } - try + CallAndHandleError([this] { - PreRun(); - - // process either data callbacks or ConditionalRun/Run - if (fDataCallbacks) + try { - // if only one input channel, do lightweight handling without additional polling. - if (fInputChannelKeys.size() == 1 && fChannels.at(fInputChannelKeys.at(0)).size() == 1) - { - HandleSingleChannelInput(); - } - else // otherwise do full handling with polling - { - HandleMultipleChannelInput(); - } - } - else - { - fair::mq::tools::RateLimiter rateLimiter(fRate); + PreRun(); - while (CheckCurrentState(RUNNING) && ConditionalRun()) + // process either data callbacks or ConditionalRun/Run + if (fDataCallbacks) { - if (fRate > 0.001) + // if only one input channel, do lightweight handling without additional polling. + if (fInputChannelKeys.size() == 1 && fChannels.at(fInputChannelKeys.at(0)).size() == 1) { - rateLimiter.maybe_sleep(); + HandleSingleChannelInput(); + } + else // otherwise do full handling with polling + { + HandleMultipleChannelInput(); } } + else + { + fair::mq::tools::RateLimiter rateLimiter(fRate); - Run(); + while (CheckCurrentState(RUNNING) && ConditionalRun()) + { + if (fRate > 0.001) + { + rateLimiter.maybe_sleep(); + } + } + + Run(); + } } - } - catch (const out_of_range& oor) - { - LOG(error) << "out of range: " << oor.what(); - LOG(error) << "incorrect/incomplete channel configuration?"; - } + catch (const out_of_range& oor) + { + LOG(error) << "out of range: " << oor.what(); + LOG(error) << "incorrect/incomplete channel configuration?"; + } + }); // if Run() exited and the state is still RUNNING, transition to READY. if (CheckCurrentState(RUNNING)) @@ -547,7 +550,7 @@ void FairMQDevice::RunWrapper() ChangeState(internal_READY); } - PostRun(); + CallAndHandleError(std::bind(&FairMQDevice::PostRun, this)); rateLogger.join(); } @@ -770,7 +773,7 @@ void FairMQDevice::PauseWrapper() { CallStateChangeCallbacks(PAUSED); - Pause(); + CallAndHandleError(std::bind(&FairMQDevice::Pause, this)); } void FairMQDevice::Pause() @@ -936,7 +939,7 @@ void FairMQDevice::ResetTaskWrapper() { CallStateChangeCallbacks(RESETTING_TASK); - ResetTask(); + CallAndHandleError(std::bind(&FairMQDevice::ResetTask, this)); ChangeState(internal_DEVICE_READY); } @@ -949,7 +952,7 @@ void FairMQDevice::ResetWrapper() { CallStateChangeCallbacks(RESETTING_DEVICE); - Reset(); + CallAndHandleError(std::bind(&FairMQDevice::Reset, this)); ChangeState(internal_IDLE); } @@ -973,6 +976,17 @@ const FairMQChannel& FairMQDevice::GetChannel(const string& channelName, const i return fChannels.at(channelName).at(index); } +void FairMQDevice::CallAndHandleError(std::function callable) +try +{ + callable(); +} +catch(...) +{ + ChangeState(ERROR_FOUND); + throw; +} + void FairMQDevice::Exit() { } diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index 3e4aadbe..ceb6b53a 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -462,39 +462,30 @@ class FairMQDevice : public FairMQStateMachine std::string fId; ///< Device ID /// Additional user initialization (can be overloaded in child classes). Prefer to use InitTask(). - /// Executed in a worker thread virtual void Init(); /// Task initialization (can be overloaded in child classes) - /// Executed in a worker thread virtual void InitTask(); /// Runs the device (to be overloaded in child classes) - /// Executed in a worker thread virtual void Run(); /// Called in the RUNNING state once before executing the Run()/ConditionalRun() method - /// Executed in a worker thread virtual void PreRun(); /// Called during RUNNING state repeatedly until it returns false or device state changes - /// Executed in a worker thread virtual bool ConditionalRun(); /// Called in the RUNNING state once after executing the Run()/ConditionalRun() method - /// Executed in a worker thread virtual void PostRun(); /// Handles the PAUSE state - /// Executed in a worker thread virtual void Pause(); /// Resets the user task (to be overloaded in child classes) - /// Executed in a worker thread virtual void ResetTask(); /// Resets the device (can be overloaded in child classes) - /// Executed in a worker thread virtual void Reset(); private: @@ -521,6 +512,9 @@ class FairMQDevice : public FairMQStateMachine /// Handles the Reset() method void ResetWrapper(); + /// Used to call user code and handle uncaught exceptions + void CallAndHandleError(std::function callable); + /// Unblocks blocking channel send/receive calls void Unblock(); diff --git a/fairmq/PluginServices.h b/fairmq/PluginServices.h index 7a27c034..dee322c0 100644 --- a/fairmq/PluginServices.h +++ b/fairmq/PluginServices.h @@ -21,6 +21,7 @@ #include #include #include +#include namespace fair { diff --git a/fairmq/plugins/Control.cxx b/fairmq/plugins/Control.cxx index 578657b1..c7262bd0 100644 --- a/fairmq/plugins/Control.cxx +++ b/fairmq/plugins/Control.cxx @@ -1,5 +1,5 @@ /******************************************************************************** - * Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * Copyright (C) 2017-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence (LGPL) version 3, * @@ -11,6 +11,7 @@ #include // for the interactive mode #include // for the interactive mode #include // catching system signals +#include #include #include @@ -18,11 +19,18 @@ using namespace std; namespace { - std::atomic gSignalStatus(0); + std::atomic gLastSignal(0); + std::atomic gSignalCount(0); extern "C" auto signal_handler(int signal) -> void { - gSignalStatus = signal; + ++gSignalCount; + gLastSignal = signal; + + if (gSignalCount > 1) + { + std::abort(); + } } } @@ -37,14 +45,23 @@ Control::Control(const string& name, const Plugin::Version version, const string : Plugin(name, version, maintainer, homepage, pluginServices) , fControllerThread() , fSignalHandlerThread() - , fShutdownThread() , fEvents() , fEventsMutex() - , fShutdownMutex() + , fControllerMutex() , fNewEvent() - , fDeviceTerminationRequested(false) - , fHasShutdown(false) + , fDeviceShutdownRequested(false) + , fDeviceHasShutdown(false) + , fPluginShutdownRequested(false) { + SubscribeToDeviceStateChange([&](DeviceState newState) + { + { + lock_guard lock{fEventsMutex}; + fEvents.push(newState); + } + fNewEvent.notify_one(); + }); + try { TakeDeviceControl(); @@ -97,123 +114,121 @@ auto ControlPluginProgramOptions() -> Plugin::ProgOptions } auto Control::InteractiveMode() -> void +try { - try + RunStartupSequence(); + + char input; // hold the user console input + pollfd cinfd[1]; + cinfd[0].fd = fileno(stdin); + cinfd[0].events = POLLIN; + + struct termios t; + tcgetattr(STDIN_FILENO, &t); // get the current terminal I/O structure + t.c_lflag &= ~ICANON; // disable canonical input + t.c_lflag &= ~ECHO; // do not echo input chars + tcsetattr(STDIN_FILENO, TCSANOW, &t); // apply the new settings + + PrintInteractiveHelp(); + + bool keepRunning = true; + + while (keepRunning) { - RunStartupSequence(); - - char input; // hold the user console input - pollfd cinfd[1]; - cinfd[0].fd = fileno(stdin); - cinfd[0].events = POLLIN; - - struct termios t; - tcgetattr(STDIN_FILENO, &t); // get the current terminal I/O structure - t.c_lflag &= ~ICANON; // disable canonical input - t.c_lflag &= ~ECHO; // do not echo input chars - tcsetattr(STDIN_FILENO, TCSANOW, &t); // apply the new settings - - PrintInteractiveHelp(); - - bool keepRunning = true; - - while (keepRunning) + if (poll(cinfd, 1, 500)) { - if (poll(cinfd, 1, 500)) + if (fDeviceShutdownRequested) { - if (fDeviceTerminationRequested) - { - break; - } - - cin >> input; - - switch (input) - { - case 'i': - LOG(info) << "\n\n --> [i] init device\n"; - ChangeDeviceState(DeviceStateTransition::InitDevice); - break; - case 'j': - LOG(info) << "\n\n --> [j] init task\n"; - ChangeDeviceState(DeviceStateTransition::InitTask); - break; - case 'p': - LOG(info) << "\n\n --> [p] pause\n"; - ChangeDeviceState(DeviceStateTransition::Pause); - break; - case 'r': - LOG(info) << "\n\n --> [r] run\n"; - ChangeDeviceState(DeviceStateTransition::Run); - break; - case 's': - LOG(info) << "\n\n --> [s] stop\n"; - ChangeDeviceState(DeviceStateTransition::Stop); - break; - case 't': - LOG(info) << "\n\n --> [t] reset task\n"; - ChangeDeviceState(DeviceStateTransition::ResetTask); - break; - case 'd': - LOG(info) << "\n\n --> [d] reset device\n"; - ChangeDeviceState(DeviceStateTransition::ResetDevice); - break; - case 'k': - LOG(info) << "\n\n --> [k] increase log severity\n"; - CycleLogConsoleSeverityUp(); - break; - case 'l': - LOG(info) << "\n\n --> [l] decrease log severity\n"; - CycleLogConsoleSeverityDown(); - break; - case 'n': - LOG(info) << "\n\n --> [n] increase log verbosity\n"; - CycleLogVerbosityUp(); - break; - case 'm': - LOG(info) << "\n\n --> [m] decrease log verbosity\n"; - CycleLogVerbosityDown(); - break; - case 'h': - LOG(info) << "\n\n --> [h] help\n"; - PrintInteractiveHelp(); - break; - // case 'x': - // LOG(info) << "\n\n --> [x] ERROR\n"; - // ChangeDeviceState(DeviceStateTransition::ERROR_FOUND); - // break; - case 'q': - LOG(info) << "\n\n --> [q] end\n"; - keepRunning = false; - break; - default: - LOG(info) << "Invalid input: [" << input << "]"; - PrintInteractiveHelp(); - break; - } + break; } - if (fDeviceTerminationRequested) + cin >> input; + + switch (input) { - keepRunning = false; + case 'i': + LOG(info) << "\n\n --> [i] init device\n"; + ChangeDeviceState(DeviceStateTransition::InitDevice); + break; + case 'j': + LOG(info) << "\n\n --> [j] init task\n"; + ChangeDeviceState(DeviceStateTransition::InitTask); + break; + case 'p': + LOG(info) << "\n\n --> [p] pause\n"; + ChangeDeviceState(DeviceStateTransition::Pause); + break; + case 'r': + LOG(info) << "\n\n --> [r] run\n"; + ChangeDeviceState(DeviceStateTransition::Run); + break; + case 's': + LOG(info) << "\n\n --> [s] stop\n"; + ChangeDeviceState(DeviceStateTransition::Stop); + break; + case 't': + LOG(info) << "\n\n --> [t] reset task\n"; + ChangeDeviceState(DeviceStateTransition::ResetTask); + break; + case 'd': + LOG(info) << "\n\n --> [d] reset device\n"; + ChangeDeviceState(DeviceStateTransition::ResetDevice); + break; + case 'k': + LOG(info) << "\n\n --> [k] increase log severity\n"; + CycleLogConsoleSeverityUp(); + break; + case 'l': + LOG(info) << "\n\n --> [l] decrease log severity\n"; + CycleLogConsoleSeverityDown(); + break; + case 'n': + LOG(info) << "\n\n --> [n] increase log verbosity\n"; + CycleLogVerbosityUp(); + break; + case 'm': + LOG(info) << "\n\n --> [m] decrease log verbosity\n"; + CycleLogVerbosityDown(); + break; + case 'h': + LOG(info) << "\n\n --> [h] help\n"; + PrintInteractiveHelp(); + break; + // case 'x': + // LOG(info) << "\n\n --> [x] ERROR\n"; + // ChangeDeviceState(DeviceStateTransition::ERROR_FOUND); + // break; + case 'q': + LOG(info) << "\n\n --> [q] end\n"; + keepRunning = false; + break; + default: + LOG(info) << "Invalid input: [" << input << "]"; + PrintInteractiveHelp(); + break; } } - tcgetattr(STDIN_FILENO, &t); // get the current terminal I/O structure - t.c_lflag |= ICANON; // re-enable canonical input - t.c_lflag |= ECHO; // echo input chars - tcsetattr(STDIN_FILENO, TCSANOW, &t); // apply the new settings - - if (!fDeviceTerminationRequested) + if (fDeviceShutdownRequested) { - RunShutdownSequence(); + break; } } - catch (PluginServices::DeviceControlError& e) - { - // If we are here, it means another plugin has taken control. That's fine, just print the exception message and do nothing else. - LOG(debug) << e.what(); - } + + tcgetattr(STDIN_FILENO, &t); // get the current terminal I/O structure + t.c_lflag |= ICANON; // re-enable canonical input + t.c_lflag |= ECHO; // echo input chars + tcsetattr(STDIN_FILENO, TCSANOW, &t); // apply the new settings + + RunShutdownSequence(); +} +catch (PluginServices::DeviceControlError& e) +{ + // If we are here, it means another plugin has taken control. That's fine, just print the exception message and do nothing else. + LOG(debug) << e.what(); +} +catch (DeviceErrorState&) +{ } auto Control::PrintInteractiveHelp() -> void @@ -234,137 +249,119 @@ auto Control::WaitForNextState() -> DeviceState } auto result = fEvents.front(); + + if (result == DeviceState::Error) + { + throw DeviceErrorState("Controlled device transitioned to error state."); + } + fEvents.pop(); + return result; } auto Control::StaticMode() -> void +try { - try + RunStartupSequence(); + { - RunStartupSequence(); - + // Wait for next state, which is DeviceState::Ready, + // or for device shutdown request (Ctrl-C) + unique_lock lock{fEventsMutex}; + while (fEvents.empty() && !fDeviceShutdownRequested) { - // Wait for next state, which is DeviceState::Ready, - // or for device termination request - unique_lock lock{fEventsMutex}; - while (fEvents.empty() && !fDeviceTerminationRequested) - { - fNewEvent.wait(lock); - } - } - - if (!fDeviceTerminationRequested) - { - RunShutdownSequence(); + fNewEvent.wait_for(lock, chrono::milliseconds(50)); } } - catch (PluginServices::DeviceControlError& e) - { - // If we are here, it means another plugin has taken control. That's fine, just print the exception message and do nothing else. - LOG(debug) << e.what(); - } + + RunShutdownSequence(); +} +catch (PluginServices::DeviceControlError& e) +{ + // If we are here, it means another plugin has taken control. That's fine, just print the exception message and do nothing else. + LOG(debug) << e.what(); +} +catch (DeviceErrorState&) +{ } auto Control::SignalHandler() -> void { - while (true) + while (gSignalCount == 0 && !fPluginShutdownRequested) { - if (gSignalStatus != 0 && !fHasShutdown) - { - LOG(info) << "Received device shutdown request (signal " << gSignalStatus << ")."; - LOG(info) << "Waiting for graceful device shutdown. Hit Ctrl-C again to abort immediately."; - - if (!fDeviceTerminationRequested) - { - fDeviceTerminationRequested = true; - gSignalStatus = 0; - fShutdownThread = thread(&Control::HandleShutdownSignal, this); - } - else - { - LOG(warn) << "Received 2nd device shutdown request (signal " << gSignalStatus << ")."; - LOG(warn) << "Aborting immediately!"; - abort(); - } - } - else if (fHasShutdown) - { - break; - } - this_thread::sleep_for(chrono::milliseconds(100)); } -} -auto Control::HandleShutdownSignal() -> void -{ - StealDeviceControl(); - - UnsubscribeFromDeviceStateChange(); // In case, static or interactive mode have subscribed already - SubscribeToDeviceStateChange([&](DeviceState newState) + if (!fPluginShutdownRequested) { - { - lock_guard lock{fEventsMutex}; - fEvents.push(newState); - } - fNewEvent.notify_one(); - }); + LOG(info) << "Received device shutdown request (signal " << gLastSignal << ")."; + LOG(info) << "Waiting for graceful device shutdown. Hit Ctrl-C again to abort immediately."; - RunShutdownSequence(); + // Signal and wait for controller thread, if we are controller + fDeviceShutdownRequested = true; + { + unique_lock lock(fControllerMutex); + if (fControllerThread.joinable()) fControllerThread.join(); + } + + if (!fDeviceHasShutdown) + { + // Take over control and attempt graceful shutdown + StealDeviceControl(); + try + { + RunShutdownSequence(); + } + catch (PluginServices::DeviceControlError& e) + { + LOG(info) << "Graceful device shutdown failed: " << e.what() << " If hanging, hit Ctrl-C again to abort immediately."; + } + catch (...) + { + LOG(info) << "Graceful device shutdown failed. If hanging, hit Ctrl-C again to abort immediately."; + } + } + } } auto Control::RunShutdownSequence() -> void { - lock_guard lock(fShutdownMutex); - if (!fHasShutdown) + auto nextState = GetCurrentDeviceState(); + EmptyEventQueue(); + while (nextState != DeviceState::Exiting) { - auto nextState = GetCurrentDeviceState(); - EmptyEventQueue(); - while (nextState != DeviceState::Exiting) + switch (nextState) { - switch (nextState) - { - case DeviceState::Idle: - ChangeDeviceState(DeviceStateTransition::End); - break; - case DeviceState::DeviceReady: - ChangeDeviceState(DeviceStateTransition::ResetDevice); - break; - case DeviceState::Ready: - ChangeDeviceState(DeviceStateTransition::ResetTask); - break; - case DeviceState::Running: - ChangeDeviceState(DeviceStateTransition::Stop); - break; - case DeviceState::Paused: - ChangeDeviceState(DeviceStateTransition::Resume); - break; - default: - // ignore other states - break; - } - - nextState = WaitForNextState(); + case DeviceState::Idle: + ChangeDeviceState(DeviceStateTransition::End); + break; + case DeviceState::DeviceReady: + ChangeDeviceState(DeviceStateTransition::ResetDevice); + break; + case DeviceState::Ready: + ChangeDeviceState(DeviceStateTransition::ResetTask); + break; + case DeviceState::Running: + ChangeDeviceState(DeviceStateTransition::Stop); + break; + case DeviceState::Paused: + ChangeDeviceState(DeviceStateTransition::Resume); + break; + default: + // ignore other states + break; } - fHasShutdown = true; - UnsubscribeFromDeviceStateChange(); - ReleaseDeviceControl(); + nextState = WaitForNextState(); } + + fDeviceHasShutdown = true; + ReleaseDeviceControl(); } auto Control::RunStartupSequence() -> void { - SubscribeToDeviceStateChange([&](DeviceState newState) - { - { - lock_guard lock{fEventsMutex}; - fEvents.push(newState); - } - fNewEvent.notify_one(); - }); - ChangeDeviceState(DeviceStateTransition::InitDevice); while (WaitForNextState() != DeviceState::DeviceReady) {} ChangeDeviceState(DeviceStateTransition::InitTask); @@ -381,9 +378,16 @@ auto Control::EmptyEventQueue() -> void Control::~Control() { - if (fControllerThread.joinable()) fControllerThread.join(); + // Notify threads to exit + fPluginShutdownRequested = true; + + { + unique_lock lock(fControllerMutex); + if (fControllerThread.joinable()) fControllerThread.join(); + } if (fSignalHandlerThread.joinable()) fSignalHandlerThread.join(); - if (fShutdownThread.joinable()) fShutdownThread.join(); + + UnsubscribeFromDeviceStateChange(); } } /* namespace plugins */ diff --git a/fairmq/plugins/Control.h b/fairmq/plugins/Control.h index 802deaac..0815d39b 100644 --- a/fairmq/plugins/Control.h +++ b/fairmq/plugins/Control.h @@ -18,6 +18,7 @@ #include #include #include +#include namespace fair { @@ -35,24 +36,25 @@ class Control : public Plugin private: auto InteractiveMode() -> void; - auto PrintInteractiveHelp() -> void; + static auto PrintInteractiveHelp() -> void; auto StaticMode() -> void; auto WaitForNextState() -> DeviceState; auto SignalHandler() -> void; - auto HandleShutdownSignal() -> void; auto RunShutdownSequence() -> void; auto RunStartupSequence() -> void; auto EmptyEventQueue() -> void; std::thread fControllerThread; std::thread fSignalHandlerThread; - std::thread fShutdownThread; std::queue fEvents; std::mutex fEventsMutex; - std::mutex fShutdownMutex; + std::mutex fControllerMutex; std::condition_variable fNewEvent; - std::atomic fDeviceTerminationRequested; - std::atomic fHasShutdown; + std::atomic fDeviceShutdownRequested; + std::atomic fDeviceHasShutdown; + std::atomic fPluginShutdownRequested; + + struct DeviceErrorState : std::runtime_error { using std::runtime_error::runtime_error; }; }; /* class Control */ auto ControlPluginProgramOptions() -> Plugin::ProgOptions;