mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 08:41:16 +00:00
Resolve hanging process in case of uncaught exception
This commit is contained in:
parent
e1f555bc05
commit
1aab354a5d
|
@ -222,7 +222,7 @@ void FairMQDevice::InitWrapper()
|
||||||
AttachChannels(uninitializedConnectingChannels);
|
AttachChannels(uninitializedConnectingChannels);
|
||||||
}
|
}
|
||||||
|
|
||||||
Init();
|
CallAndHandleError(std::bind(&FairMQDevice::Init, this));
|
||||||
|
|
||||||
ChangeState(internal_DEVICE_READY);
|
ChangeState(internal_DEVICE_READY);
|
||||||
}
|
}
|
||||||
|
@ -428,7 +428,7 @@ void FairMQDevice::InitTaskWrapper()
|
||||||
{
|
{
|
||||||
CallStateChangeCallbacks(INITIALIZING_TASK);
|
CallStateChangeCallbacks(INITIALIZING_TASK);
|
||||||
|
|
||||||
InitTask();
|
CallAndHandleError(std::bind(&FairMQDevice::InitTask, this));
|
||||||
|
|
||||||
ChangeState(internal_READY);
|
ChangeState(internal_READY);
|
||||||
}
|
}
|
||||||
|
@ -503,43 +503,46 @@ void FairMQDevice::RunWrapper()
|
||||||
t.second->Resume();
|
t.second->Resume();
|
||||||
}
|
}
|
||||||
|
|
||||||
try
|
CallAndHandleError([this]
|
||||||
{
|
{
|
||||||
PreRun();
|
try
|
||||||
|
|
||||||
// process either data callbacks or ConditionalRun/Run
|
|
||||||
if (fDataCallbacks)
|
|
||||||
{
|
{
|
||||||
// if only one input channel, do lightweight handling without additional polling.
|
PreRun();
|
||||||
if (fInputChannelKeys.size() == 1 && fChannels.at(fInputChannelKeys.at(0)).size() == 1)
|
|
||||||
{
|
|
||||||
HandleSingleChannelInput();
|
|
||||||
}
|
|
||||||
else // otherwise do full handling with polling
|
|
||||||
{
|
|
||||||
HandleMultipleChannelInput();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
fair::mq::tools::RateLimiter rateLimiter(fRate);
|
|
||||||
|
|
||||||
while (CheckCurrentState(RUNNING) && ConditionalRun())
|
// process either data callbacks or ConditionalRun/Run
|
||||||
|
if (fDataCallbacks)
|
||||||
{
|
{
|
||||||
if (fRate > 0.001)
|
// if only one input channel, do lightweight handling without additional polling.
|
||||||
|
if (fInputChannelKeys.size() == 1 && fChannels.at(fInputChannelKeys.at(0)).size() == 1)
|
||||||
{
|
{
|
||||||
rateLimiter.maybe_sleep();
|
HandleSingleChannelInput();
|
||||||
|
}
|
||||||
|
else // otherwise do full handling with polling
|
||||||
|
{
|
||||||
|
HandleMultipleChannelInput();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
fair::mq::tools::RateLimiter rateLimiter(fRate);
|
||||||
|
|
||||||
Run();
|
while (CheckCurrentState(RUNNING) && ConditionalRun())
|
||||||
|
{
|
||||||
|
if (fRate > 0.001)
|
||||||
|
{
|
||||||
|
rateLimiter.maybe_sleep();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Run();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
catch (const out_of_range& oor)
|
||||||
catch (const out_of_range& oor)
|
{
|
||||||
{
|
LOG(error) << "out of range: " << oor.what();
|
||||||
LOG(error) << "out of range: " << oor.what();
|
LOG(error) << "incorrect/incomplete channel configuration?";
|
||||||
LOG(error) << "incorrect/incomplete channel configuration?";
|
}
|
||||||
}
|
});
|
||||||
|
|
||||||
// if Run() exited and the state is still RUNNING, transition to READY.
|
// if Run() exited and the state is still RUNNING, transition to READY.
|
||||||
if (CheckCurrentState(RUNNING))
|
if (CheckCurrentState(RUNNING))
|
||||||
|
@ -547,7 +550,7 @@ void FairMQDevice::RunWrapper()
|
||||||
ChangeState(internal_READY);
|
ChangeState(internal_READY);
|
||||||
}
|
}
|
||||||
|
|
||||||
PostRun();
|
CallAndHandleError(std::bind(&FairMQDevice::PostRun, this));
|
||||||
|
|
||||||
rateLogger.join();
|
rateLogger.join();
|
||||||
}
|
}
|
||||||
|
@ -770,7 +773,7 @@ void FairMQDevice::PauseWrapper()
|
||||||
{
|
{
|
||||||
CallStateChangeCallbacks(PAUSED);
|
CallStateChangeCallbacks(PAUSED);
|
||||||
|
|
||||||
Pause();
|
CallAndHandleError(std::bind(&FairMQDevice::Pause, this));
|
||||||
}
|
}
|
||||||
|
|
||||||
void FairMQDevice::Pause()
|
void FairMQDevice::Pause()
|
||||||
|
@ -936,7 +939,7 @@ void FairMQDevice::ResetTaskWrapper()
|
||||||
{
|
{
|
||||||
CallStateChangeCallbacks(RESETTING_TASK);
|
CallStateChangeCallbacks(RESETTING_TASK);
|
||||||
|
|
||||||
ResetTask();
|
CallAndHandleError(std::bind(&FairMQDevice::ResetTask, this));
|
||||||
|
|
||||||
ChangeState(internal_DEVICE_READY);
|
ChangeState(internal_DEVICE_READY);
|
||||||
}
|
}
|
||||||
|
@ -949,7 +952,7 @@ void FairMQDevice::ResetWrapper()
|
||||||
{
|
{
|
||||||
CallStateChangeCallbacks(RESETTING_DEVICE);
|
CallStateChangeCallbacks(RESETTING_DEVICE);
|
||||||
|
|
||||||
Reset();
|
CallAndHandleError(std::bind(&FairMQDevice::Reset, this));
|
||||||
|
|
||||||
ChangeState(internal_IDLE);
|
ChangeState(internal_IDLE);
|
||||||
}
|
}
|
||||||
|
@ -973,6 +976,17 @@ const FairMQChannel& FairMQDevice::GetChannel(const string& channelName, const i
|
||||||
return fChannels.at(channelName).at(index);
|
return fChannels.at(channelName).at(index);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void FairMQDevice::CallAndHandleError(std::function<void()> callable)
|
||||||
|
try
|
||||||
|
{
|
||||||
|
callable();
|
||||||
|
}
|
||||||
|
catch(...)
|
||||||
|
{
|
||||||
|
ChangeState(ERROR_FOUND);
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
|
||||||
void FairMQDevice::Exit()
|
void FairMQDevice::Exit()
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
|
@ -462,39 +462,30 @@ class FairMQDevice : public FairMQStateMachine
|
||||||
std::string fId; ///< Device ID
|
std::string fId; ///< Device ID
|
||||||
|
|
||||||
/// Additional user initialization (can be overloaded in child classes). Prefer to use InitTask().
|
/// Additional user initialization (can be overloaded in child classes). Prefer to use InitTask().
|
||||||
/// Executed in a worker thread
|
|
||||||
virtual void Init();
|
virtual void Init();
|
||||||
|
|
||||||
/// Task initialization (can be overloaded in child classes)
|
/// Task initialization (can be overloaded in child classes)
|
||||||
/// Executed in a worker thread
|
|
||||||
virtual void InitTask();
|
virtual void InitTask();
|
||||||
|
|
||||||
/// Runs the device (to be overloaded in child classes)
|
/// Runs the device (to be overloaded in child classes)
|
||||||
/// Executed in a worker thread
|
|
||||||
virtual void Run();
|
virtual void Run();
|
||||||
|
|
||||||
/// Called in the RUNNING state once before executing the Run()/ConditionalRun() method
|
/// Called in the RUNNING state once before executing the Run()/ConditionalRun() method
|
||||||
/// Executed in a worker thread
|
|
||||||
virtual void PreRun();
|
virtual void PreRun();
|
||||||
|
|
||||||
/// Called during RUNNING state repeatedly until it returns false or device state changes
|
/// Called during RUNNING state repeatedly until it returns false or device state changes
|
||||||
/// Executed in a worker thread
|
|
||||||
virtual bool ConditionalRun();
|
virtual bool ConditionalRun();
|
||||||
|
|
||||||
/// Called in the RUNNING state once after executing the Run()/ConditionalRun() method
|
/// Called in the RUNNING state once after executing the Run()/ConditionalRun() method
|
||||||
/// Executed in a worker thread
|
|
||||||
virtual void PostRun();
|
virtual void PostRun();
|
||||||
|
|
||||||
/// Handles the PAUSE state
|
/// Handles the PAUSE state
|
||||||
/// Executed in a worker thread
|
|
||||||
virtual void Pause();
|
virtual void Pause();
|
||||||
|
|
||||||
/// Resets the user task (to be overloaded in child classes)
|
/// Resets the user task (to be overloaded in child classes)
|
||||||
/// Executed in a worker thread
|
|
||||||
virtual void ResetTask();
|
virtual void ResetTask();
|
||||||
|
|
||||||
/// Resets the device (can be overloaded in child classes)
|
/// Resets the device (can be overloaded in child classes)
|
||||||
/// Executed in a worker thread
|
|
||||||
virtual void Reset();
|
virtual void Reset();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
@ -521,6 +512,9 @@ class FairMQDevice : public FairMQStateMachine
|
||||||
/// Handles the Reset() method
|
/// Handles the Reset() method
|
||||||
void ResetWrapper();
|
void ResetWrapper();
|
||||||
|
|
||||||
|
/// Used to call user code and handle uncaught exceptions
|
||||||
|
void CallAndHandleError(std::function<void()> callable);
|
||||||
|
|
||||||
/// Unblocks blocking channel send/receive calls
|
/// Unblocks blocking channel send/receive calls
|
||||||
void Unblock();
|
void Unblock();
|
||||||
|
|
||||||
|
|
|
@ -21,6 +21,7 @@
|
||||||
#include <unordered_map>
|
#include <unordered_map>
|
||||||
#include <mutex>
|
#include <mutex>
|
||||||
#include <condition_variable>
|
#include <condition_variable>
|
||||||
|
#include <stdexcept>
|
||||||
|
|
||||||
namespace fair
|
namespace fair
|
||||||
{
|
{
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
/********************************************************************************
|
/********************************************************************************
|
||||||
* Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
* Copyright (C) 2017-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||||
* *
|
* *
|
||||||
* This software is distributed under the terms of the *
|
* This software is distributed under the terms of the *
|
||||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||||
|
@ -11,6 +11,7 @@
|
||||||
#include <termios.h> // for the interactive mode
|
#include <termios.h> // for the interactive mode
|
||||||
#include <poll.h> // for the interactive mode
|
#include <poll.h> // for the interactive mode
|
||||||
#include <csignal> // catching system signals
|
#include <csignal> // catching system signals
|
||||||
|
#include <cstdlib>
|
||||||
#include <functional>
|
#include <functional>
|
||||||
#include <atomic>
|
#include <atomic>
|
||||||
|
|
||||||
|
@ -18,11 +19,18 @@ using namespace std;
|
||||||
|
|
||||||
namespace
|
namespace
|
||||||
{
|
{
|
||||||
std::atomic<sig_atomic_t> gSignalStatus(0);
|
std::atomic<sig_atomic_t> gLastSignal(0);
|
||||||
|
std::atomic<int> gSignalCount(0);
|
||||||
|
|
||||||
extern "C" auto signal_handler(int signal) -> void
|
extern "C" auto signal_handler(int signal) -> void
|
||||||
{
|
{
|
||||||
gSignalStatus = signal;
|
++gSignalCount;
|
||||||
|
gLastSignal = signal;
|
||||||
|
|
||||||
|
if (gSignalCount > 1)
|
||||||
|
{
|
||||||
|
std::abort();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -37,14 +45,23 @@ Control::Control(const string& name, const Plugin::Version version, const string
|
||||||
: Plugin(name, version, maintainer, homepage, pluginServices)
|
: Plugin(name, version, maintainer, homepage, pluginServices)
|
||||||
, fControllerThread()
|
, fControllerThread()
|
||||||
, fSignalHandlerThread()
|
, fSignalHandlerThread()
|
||||||
, fShutdownThread()
|
|
||||||
, fEvents()
|
, fEvents()
|
||||||
, fEventsMutex()
|
, fEventsMutex()
|
||||||
, fShutdownMutex()
|
, fControllerMutex()
|
||||||
, fNewEvent()
|
, fNewEvent()
|
||||||
, fDeviceTerminationRequested(false)
|
, fDeviceShutdownRequested(false)
|
||||||
, fHasShutdown(false)
|
, fDeviceHasShutdown(false)
|
||||||
|
, fPluginShutdownRequested(false)
|
||||||
{
|
{
|
||||||
|
SubscribeToDeviceStateChange([&](DeviceState newState)
|
||||||
|
{
|
||||||
|
{
|
||||||
|
lock_guard<mutex> lock{fEventsMutex};
|
||||||
|
fEvents.push(newState);
|
||||||
|
}
|
||||||
|
fNewEvent.notify_one();
|
||||||
|
});
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
TakeDeviceControl();
|
TakeDeviceControl();
|
||||||
|
@ -97,123 +114,121 @@ auto ControlPluginProgramOptions() -> Plugin::ProgOptions
|
||||||
}
|
}
|
||||||
|
|
||||||
auto Control::InteractiveMode() -> void
|
auto Control::InteractiveMode() -> void
|
||||||
|
try
|
||||||
{
|
{
|
||||||
try
|
RunStartupSequence();
|
||||||
|
|
||||||
|
char input; // hold the user console input
|
||||||
|
pollfd cinfd[1];
|
||||||
|
cinfd[0].fd = fileno(stdin);
|
||||||
|
cinfd[0].events = POLLIN;
|
||||||
|
|
||||||
|
struct termios t;
|
||||||
|
tcgetattr(STDIN_FILENO, &t); // get the current terminal I/O structure
|
||||||
|
t.c_lflag &= ~ICANON; // disable canonical input
|
||||||
|
t.c_lflag &= ~ECHO; // do not echo input chars
|
||||||
|
tcsetattr(STDIN_FILENO, TCSANOW, &t); // apply the new settings
|
||||||
|
|
||||||
|
PrintInteractiveHelp();
|
||||||
|
|
||||||
|
bool keepRunning = true;
|
||||||
|
|
||||||
|
while (keepRunning)
|
||||||
{
|
{
|
||||||
RunStartupSequence();
|
if (poll(cinfd, 1, 500))
|
||||||
|
|
||||||
char input; // hold the user console input
|
|
||||||
pollfd cinfd[1];
|
|
||||||
cinfd[0].fd = fileno(stdin);
|
|
||||||
cinfd[0].events = POLLIN;
|
|
||||||
|
|
||||||
struct termios t;
|
|
||||||
tcgetattr(STDIN_FILENO, &t); // get the current terminal I/O structure
|
|
||||||
t.c_lflag &= ~ICANON; // disable canonical input
|
|
||||||
t.c_lflag &= ~ECHO; // do not echo input chars
|
|
||||||
tcsetattr(STDIN_FILENO, TCSANOW, &t); // apply the new settings
|
|
||||||
|
|
||||||
PrintInteractiveHelp();
|
|
||||||
|
|
||||||
bool keepRunning = true;
|
|
||||||
|
|
||||||
while (keepRunning)
|
|
||||||
{
|
{
|
||||||
if (poll(cinfd, 1, 500))
|
if (fDeviceShutdownRequested)
|
||||||
{
|
{
|
||||||
if (fDeviceTerminationRequested)
|
break;
|
||||||
{
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
cin >> input;
|
|
||||||
|
|
||||||
switch (input)
|
|
||||||
{
|
|
||||||
case 'i':
|
|
||||||
LOG(info) << "\n\n --> [i] init device\n";
|
|
||||||
ChangeDeviceState(DeviceStateTransition::InitDevice);
|
|
||||||
break;
|
|
||||||
case 'j':
|
|
||||||
LOG(info) << "\n\n --> [j] init task\n";
|
|
||||||
ChangeDeviceState(DeviceStateTransition::InitTask);
|
|
||||||
break;
|
|
||||||
case 'p':
|
|
||||||
LOG(info) << "\n\n --> [p] pause\n";
|
|
||||||
ChangeDeviceState(DeviceStateTransition::Pause);
|
|
||||||
break;
|
|
||||||
case 'r':
|
|
||||||
LOG(info) << "\n\n --> [r] run\n";
|
|
||||||
ChangeDeviceState(DeviceStateTransition::Run);
|
|
||||||
break;
|
|
||||||
case 's':
|
|
||||||
LOG(info) << "\n\n --> [s] stop\n";
|
|
||||||
ChangeDeviceState(DeviceStateTransition::Stop);
|
|
||||||
break;
|
|
||||||
case 't':
|
|
||||||
LOG(info) << "\n\n --> [t] reset task\n";
|
|
||||||
ChangeDeviceState(DeviceStateTransition::ResetTask);
|
|
||||||
break;
|
|
||||||
case 'd':
|
|
||||||
LOG(info) << "\n\n --> [d] reset device\n";
|
|
||||||
ChangeDeviceState(DeviceStateTransition::ResetDevice);
|
|
||||||
break;
|
|
||||||
case 'k':
|
|
||||||
LOG(info) << "\n\n --> [k] increase log severity\n";
|
|
||||||
CycleLogConsoleSeverityUp();
|
|
||||||
break;
|
|
||||||
case 'l':
|
|
||||||
LOG(info) << "\n\n --> [l] decrease log severity\n";
|
|
||||||
CycleLogConsoleSeverityDown();
|
|
||||||
break;
|
|
||||||
case 'n':
|
|
||||||
LOG(info) << "\n\n --> [n] increase log verbosity\n";
|
|
||||||
CycleLogVerbosityUp();
|
|
||||||
break;
|
|
||||||
case 'm':
|
|
||||||
LOG(info) << "\n\n --> [m] decrease log verbosity\n";
|
|
||||||
CycleLogVerbosityDown();
|
|
||||||
break;
|
|
||||||
case 'h':
|
|
||||||
LOG(info) << "\n\n --> [h] help\n";
|
|
||||||
PrintInteractiveHelp();
|
|
||||||
break;
|
|
||||||
// case 'x':
|
|
||||||
// LOG(info) << "\n\n --> [x] ERROR\n";
|
|
||||||
// ChangeDeviceState(DeviceStateTransition::ERROR_FOUND);
|
|
||||||
// break;
|
|
||||||
case 'q':
|
|
||||||
LOG(info) << "\n\n --> [q] end\n";
|
|
||||||
keepRunning = false;
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
LOG(info) << "Invalid input: [" << input << "]";
|
|
||||||
PrintInteractiveHelp();
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (fDeviceTerminationRequested)
|
cin >> input;
|
||||||
|
|
||||||
|
switch (input)
|
||||||
{
|
{
|
||||||
keepRunning = false;
|
case 'i':
|
||||||
|
LOG(info) << "\n\n --> [i] init device\n";
|
||||||
|
ChangeDeviceState(DeviceStateTransition::InitDevice);
|
||||||
|
break;
|
||||||
|
case 'j':
|
||||||
|
LOG(info) << "\n\n --> [j] init task\n";
|
||||||
|
ChangeDeviceState(DeviceStateTransition::InitTask);
|
||||||
|
break;
|
||||||
|
case 'p':
|
||||||
|
LOG(info) << "\n\n --> [p] pause\n";
|
||||||
|
ChangeDeviceState(DeviceStateTransition::Pause);
|
||||||
|
break;
|
||||||
|
case 'r':
|
||||||
|
LOG(info) << "\n\n --> [r] run\n";
|
||||||
|
ChangeDeviceState(DeviceStateTransition::Run);
|
||||||
|
break;
|
||||||
|
case 's':
|
||||||
|
LOG(info) << "\n\n --> [s] stop\n";
|
||||||
|
ChangeDeviceState(DeviceStateTransition::Stop);
|
||||||
|
break;
|
||||||
|
case 't':
|
||||||
|
LOG(info) << "\n\n --> [t] reset task\n";
|
||||||
|
ChangeDeviceState(DeviceStateTransition::ResetTask);
|
||||||
|
break;
|
||||||
|
case 'd':
|
||||||
|
LOG(info) << "\n\n --> [d] reset device\n";
|
||||||
|
ChangeDeviceState(DeviceStateTransition::ResetDevice);
|
||||||
|
break;
|
||||||
|
case 'k':
|
||||||
|
LOG(info) << "\n\n --> [k] increase log severity\n";
|
||||||
|
CycleLogConsoleSeverityUp();
|
||||||
|
break;
|
||||||
|
case 'l':
|
||||||
|
LOG(info) << "\n\n --> [l] decrease log severity\n";
|
||||||
|
CycleLogConsoleSeverityDown();
|
||||||
|
break;
|
||||||
|
case 'n':
|
||||||
|
LOG(info) << "\n\n --> [n] increase log verbosity\n";
|
||||||
|
CycleLogVerbosityUp();
|
||||||
|
break;
|
||||||
|
case 'm':
|
||||||
|
LOG(info) << "\n\n --> [m] decrease log verbosity\n";
|
||||||
|
CycleLogVerbosityDown();
|
||||||
|
break;
|
||||||
|
case 'h':
|
||||||
|
LOG(info) << "\n\n --> [h] help\n";
|
||||||
|
PrintInteractiveHelp();
|
||||||
|
break;
|
||||||
|
// case 'x':
|
||||||
|
// LOG(info) << "\n\n --> [x] ERROR\n";
|
||||||
|
// ChangeDeviceState(DeviceStateTransition::ERROR_FOUND);
|
||||||
|
// break;
|
||||||
|
case 'q':
|
||||||
|
LOG(info) << "\n\n --> [q] end\n";
|
||||||
|
keepRunning = false;
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
LOG(info) << "Invalid input: [" << input << "]";
|
||||||
|
PrintInteractiveHelp();
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
tcgetattr(STDIN_FILENO, &t); // get the current terminal I/O structure
|
if (fDeviceShutdownRequested)
|
||||||
t.c_lflag |= ICANON; // re-enable canonical input
|
|
||||||
t.c_lflag |= ECHO; // echo input chars
|
|
||||||
tcsetattr(STDIN_FILENO, TCSANOW, &t); // apply the new settings
|
|
||||||
|
|
||||||
if (!fDeviceTerminationRequested)
|
|
||||||
{
|
{
|
||||||
RunShutdownSequence();
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (PluginServices::DeviceControlError& e)
|
|
||||||
{
|
tcgetattr(STDIN_FILENO, &t); // get the current terminal I/O structure
|
||||||
// If we are here, it means another plugin has taken control. That's fine, just print the exception message and do nothing else.
|
t.c_lflag |= ICANON; // re-enable canonical input
|
||||||
LOG(debug) << e.what();
|
t.c_lflag |= ECHO; // echo input chars
|
||||||
}
|
tcsetattr(STDIN_FILENO, TCSANOW, &t); // apply the new settings
|
||||||
|
|
||||||
|
RunShutdownSequence();
|
||||||
|
}
|
||||||
|
catch (PluginServices::DeviceControlError& e)
|
||||||
|
{
|
||||||
|
// If we are here, it means another plugin has taken control. That's fine, just print the exception message and do nothing else.
|
||||||
|
LOG(debug) << e.what();
|
||||||
|
}
|
||||||
|
catch (DeviceErrorState&)
|
||||||
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
auto Control::PrintInteractiveHelp() -> void
|
auto Control::PrintInteractiveHelp() -> void
|
||||||
|
@ -234,137 +249,119 @@ auto Control::WaitForNextState() -> DeviceState
|
||||||
}
|
}
|
||||||
|
|
||||||
auto result = fEvents.front();
|
auto result = fEvents.front();
|
||||||
|
|
||||||
|
if (result == DeviceState::Error)
|
||||||
|
{
|
||||||
|
throw DeviceErrorState("Controlled device transitioned to error state.");
|
||||||
|
}
|
||||||
|
|
||||||
fEvents.pop();
|
fEvents.pop();
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto Control::StaticMode() -> void
|
auto Control::StaticMode() -> void
|
||||||
|
try
|
||||||
{
|
{
|
||||||
try
|
RunStartupSequence();
|
||||||
|
|
||||||
{
|
{
|
||||||
RunStartupSequence();
|
// Wait for next state, which is DeviceState::Ready,
|
||||||
|
// or for device shutdown request (Ctrl-C)
|
||||||
|
unique_lock<mutex> lock{fEventsMutex};
|
||||||
|
while (fEvents.empty() && !fDeviceShutdownRequested)
|
||||||
{
|
{
|
||||||
// Wait for next state, which is DeviceState::Ready,
|
fNewEvent.wait_for(lock, chrono::milliseconds(50));
|
||||||
// or for device termination request
|
|
||||||
unique_lock<mutex> lock{fEventsMutex};
|
|
||||||
while (fEvents.empty() && !fDeviceTerminationRequested)
|
|
||||||
{
|
|
||||||
fNewEvent.wait(lock);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!fDeviceTerminationRequested)
|
|
||||||
{
|
|
||||||
RunShutdownSequence();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (PluginServices::DeviceControlError& e)
|
|
||||||
{
|
RunShutdownSequence();
|
||||||
// If we are here, it means another plugin has taken control. That's fine, just print the exception message and do nothing else.
|
}
|
||||||
LOG(debug) << e.what();
|
catch (PluginServices::DeviceControlError& e)
|
||||||
}
|
{
|
||||||
|
// If we are here, it means another plugin has taken control. That's fine, just print the exception message and do nothing else.
|
||||||
|
LOG(debug) << e.what();
|
||||||
|
}
|
||||||
|
catch (DeviceErrorState&)
|
||||||
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
auto Control::SignalHandler() -> void
|
auto Control::SignalHandler() -> void
|
||||||
{
|
{
|
||||||
while (true)
|
while (gSignalCount == 0 && !fPluginShutdownRequested)
|
||||||
{
|
{
|
||||||
if (gSignalStatus != 0 && !fHasShutdown)
|
|
||||||
{
|
|
||||||
LOG(info) << "Received device shutdown request (signal " << gSignalStatus << ").";
|
|
||||||
LOG(info) << "Waiting for graceful device shutdown. Hit Ctrl-C again to abort immediately.";
|
|
||||||
|
|
||||||
if (!fDeviceTerminationRequested)
|
|
||||||
{
|
|
||||||
fDeviceTerminationRequested = true;
|
|
||||||
gSignalStatus = 0;
|
|
||||||
fShutdownThread = thread(&Control::HandleShutdownSignal, this);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
LOG(warn) << "Received 2nd device shutdown request (signal " << gSignalStatus << ").";
|
|
||||||
LOG(warn) << "Aborting immediately!";
|
|
||||||
abort();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else if (fHasShutdown)
|
|
||||||
{
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
this_thread::sleep_for(chrono::milliseconds(100));
|
this_thread::sleep_for(chrono::milliseconds(100));
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
auto Control::HandleShutdownSignal() -> void
|
if (!fPluginShutdownRequested)
|
||||||
{
|
|
||||||
StealDeviceControl();
|
|
||||||
|
|
||||||
UnsubscribeFromDeviceStateChange(); // In case, static or interactive mode have subscribed already
|
|
||||||
SubscribeToDeviceStateChange([&](DeviceState newState)
|
|
||||||
{
|
{
|
||||||
{
|
LOG(info) << "Received device shutdown request (signal " << gLastSignal << ").";
|
||||||
lock_guard<mutex> lock{fEventsMutex};
|
LOG(info) << "Waiting for graceful device shutdown. Hit Ctrl-C again to abort immediately.";
|
||||||
fEvents.push(newState);
|
|
||||||
}
|
|
||||||
fNewEvent.notify_one();
|
|
||||||
});
|
|
||||||
|
|
||||||
RunShutdownSequence();
|
// Signal and wait for controller thread, if we are controller
|
||||||
|
fDeviceShutdownRequested = true;
|
||||||
|
{
|
||||||
|
unique_lock<mutex> lock(fControllerMutex);
|
||||||
|
if (fControllerThread.joinable()) fControllerThread.join();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!fDeviceHasShutdown)
|
||||||
|
{
|
||||||
|
// Take over control and attempt graceful shutdown
|
||||||
|
StealDeviceControl();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
RunShutdownSequence();
|
||||||
|
}
|
||||||
|
catch (PluginServices::DeviceControlError& e)
|
||||||
|
{
|
||||||
|
LOG(info) << "Graceful device shutdown failed: " << e.what() << " If hanging, hit Ctrl-C again to abort immediately.";
|
||||||
|
}
|
||||||
|
catch (...)
|
||||||
|
{
|
||||||
|
LOG(info) << "Graceful device shutdown failed. If hanging, hit Ctrl-C again to abort immediately.";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
auto Control::RunShutdownSequence() -> void
|
auto Control::RunShutdownSequence() -> void
|
||||||
{
|
{
|
||||||
lock_guard<mutex> lock(fShutdownMutex);
|
auto nextState = GetCurrentDeviceState();
|
||||||
if (!fHasShutdown)
|
EmptyEventQueue();
|
||||||
|
while (nextState != DeviceState::Exiting)
|
||||||
{
|
{
|
||||||
auto nextState = GetCurrentDeviceState();
|
switch (nextState)
|
||||||
EmptyEventQueue();
|
|
||||||
while (nextState != DeviceState::Exiting)
|
|
||||||
{
|
{
|
||||||
switch (nextState)
|
case DeviceState::Idle:
|
||||||
{
|
ChangeDeviceState(DeviceStateTransition::End);
|
||||||
case DeviceState::Idle:
|
break;
|
||||||
ChangeDeviceState(DeviceStateTransition::End);
|
case DeviceState::DeviceReady:
|
||||||
break;
|
ChangeDeviceState(DeviceStateTransition::ResetDevice);
|
||||||
case DeviceState::DeviceReady:
|
break;
|
||||||
ChangeDeviceState(DeviceStateTransition::ResetDevice);
|
case DeviceState::Ready:
|
||||||
break;
|
ChangeDeviceState(DeviceStateTransition::ResetTask);
|
||||||
case DeviceState::Ready:
|
break;
|
||||||
ChangeDeviceState(DeviceStateTransition::ResetTask);
|
case DeviceState::Running:
|
||||||
break;
|
ChangeDeviceState(DeviceStateTransition::Stop);
|
||||||
case DeviceState::Running:
|
break;
|
||||||
ChangeDeviceState(DeviceStateTransition::Stop);
|
case DeviceState::Paused:
|
||||||
break;
|
ChangeDeviceState(DeviceStateTransition::Resume);
|
||||||
case DeviceState::Paused:
|
break;
|
||||||
ChangeDeviceState(DeviceStateTransition::Resume);
|
default:
|
||||||
break;
|
// ignore other states
|
||||||
default:
|
break;
|
||||||
// ignore other states
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
nextState = WaitForNextState();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fHasShutdown = true;
|
nextState = WaitForNextState();
|
||||||
UnsubscribeFromDeviceStateChange();
|
|
||||||
ReleaseDeviceControl();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fDeviceHasShutdown = true;
|
||||||
|
ReleaseDeviceControl();
|
||||||
}
|
}
|
||||||
|
|
||||||
auto Control::RunStartupSequence() -> void
|
auto Control::RunStartupSequence() -> void
|
||||||
{
|
{
|
||||||
SubscribeToDeviceStateChange([&](DeviceState newState)
|
|
||||||
{
|
|
||||||
{
|
|
||||||
lock_guard<mutex> lock{fEventsMutex};
|
|
||||||
fEvents.push(newState);
|
|
||||||
}
|
|
||||||
fNewEvent.notify_one();
|
|
||||||
});
|
|
||||||
|
|
||||||
ChangeDeviceState(DeviceStateTransition::InitDevice);
|
ChangeDeviceState(DeviceStateTransition::InitDevice);
|
||||||
while (WaitForNextState() != DeviceState::DeviceReady) {}
|
while (WaitForNextState() != DeviceState::DeviceReady) {}
|
||||||
ChangeDeviceState(DeviceStateTransition::InitTask);
|
ChangeDeviceState(DeviceStateTransition::InitTask);
|
||||||
|
@ -381,9 +378,16 @@ auto Control::EmptyEventQueue() -> void
|
||||||
|
|
||||||
Control::~Control()
|
Control::~Control()
|
||||||
{
|
{
|
||||||
if (fControllerThread.joinable()) fControllerThread.join();
|
// Notify threads to exit
|
||||||
|
fPluginShutdownRequested = true;
|
||||||
|
|
||||||
|
{
|
||||||
|
unique_lock<mutex> lock(fControllerMutex);
|
||||||
|
if (fControllerThread.joinable()) fControllerThread.join();
|
||||||
|
}
|
||||||
if (fSignalHandlerThread.joinable()) fSignalHandlerThread.join();
|
if (fSignalHandlerThread.joinable()) fSignalHandlerThread.join();
|
||||||
if (fShutdownThread.joinable()) fShutdownThread.join();
|
|
||||||
|
UnsubscribeFromDeviceStateChange();
|
||||||
}
|
}
|
||||||
|
|
||||||
} /* namespace plugins */
|
} /* namespace plugins */
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
#include <queue>
|
#include <queue>
|
||||||
#include <thread>
|
#include <thread>
|
||||||
#include <atomic>
|
#include <atomic>
|
||||||
|
#include <stdexcept>
|
||||||
|
|
||||||
namespace fair
|
namespace fair
|
||||||
{
|
{
|
||||||
|
@ -35,24 +36,25 @@ class Control : public Plugin
|
||||||
|
|
||||||
private:
|
private:
|
||||||
auto InteractiveMode() -> void;
|
auto InteractiveMode() -> void;
|
||||||
auto PrintInteractiveHelp() -> void;
|
static auto PrintInteractiveHelp() -> void;
|
||||||
auto StaticMode() -> void;
|
auto StaticMode() -> void;
|
||||||
auto WaitForNextState() -> DeviceState;
|
auto WaitForNextState() -> DeviceState;
|
||||||
auto SignalHandler() -> void;
|
auto SignalHandler() -> void;
|
||||||
auto HandleShutdownSignal() -> void;
|
|
||||||
auto RunShutdownSequence() -> void;
|
auto RunShutdownSequence() -> void;
|
||||||
auto RunStartupSequence() -> void;
|
auto RunStartupSequence() -> void;
|
||||||
auto EmptyEventQueue() -> void;
|
auto EmptyEventQueue() -> void;
|
||||||
|
|
||||||
std::thread fControllerThread;
|
std::thread fControllerThread;
|
||||||
std::thread fSignalHandlerThread;
|
std::thread fSignalHandlerThread;
|
||||||
std::thread fShutdownThread;
|
|
||||||
std::queue<DeviceState> fEvents;
|
std::queue<DeviceState> fEvents;
|
||||||
std::mutex fEventsMutex;
|
std::mutex fEventsMutex;
|
||||||
std::mutex fShutdownMutex;
|
std::mutex fControllerMutex;
|
||||||
std::condition_variable fNewEvent;
|
std::condition_variable fNewEvent;
|
||||||
std::atomic<bool> fDeviceTerminationRequested;
|
std::atomic<bool> fDeviceShutdownRequested;
|
||||||
std::atomic<bool> fHasShutdown;
|
std::atomic<bool> fDeviceHasShutdown;
|
||||||
|
std::atomic<bool> fPluginShutdownRequested;
|
||||||
|
|
||||||
|
struct DeviceErrorState : std::runtime_error { using std::runtime_error::runtime_error; };
|
||||||
}; /* class Control */
|
}; /* class Control */
|
||||||
|
|
||||||
auto ControlPluginProgramOptions() -> Plugin::ProgOptions;
|
auto ControlPluginProgramOptions() -> Plugin::ProgOptions;
|
||||||
|
|
Loading…
Reference in New Issue
Block a user