FairMQ: Move --catch-signals logic to control plugin

* Add StealDeviceControl() API to plugin services
This commit is contained in:
Dennis Klein 2017-09-20 01:46:10 +02:00 committed by Mohammad Al-Turany
parent 7dcd09692c
commit 44a59f25a7
10 changed files with 256 additions and 209 deletions

View File

@ -7,7 +7,6 @@
********************************************************************************/
#include <list>
#include <csignal> // catching system signals
#include <cstdlib>
#include <stdexcept>
#include <random>
@ -36,13 +35,6 @@
using namespace std;
// function and a wrapper to catch the signals
function<void(int)> sigHandler;
static void CallSignalHandler(int signal)
{
sigHandler(signal);
}
FairMQDevice::FairMQDevice()
: fTransportFactory(nullptr)
, fTransports()
@ -58,8 +50,6 @@ FairMQDevice::FairMQDevice()
, fNetworkInterface()
, fDefaultTransport()
, fInitializationTimeoutInS(120)
, fCatchingSignals(false)
, fTerminationRequested(false)
, fDataCallbacks(false)
, fDeviceCmdSockets()
, fMsgInputs()
@ -89,8 +79,6 @@ FairMQDevice::FairMQDevice(const fair::mq::tools::Version version)
, fNetworkInterface()
, fDefaultTransport()
, fInitializationTimeoutInS(120)
, fCatchingSignals(false)
, fTerminationRequested(false)
, fDataCallbacks(false)
, fDeviceCmdSockets()
, fMsgInputs()
@ -105,46 +93,6 @@ FairMQDevice::FairMQDevice(const fair::mq::tools::Version version)
{
}
void FairMQDevice::CatchSignals()
{
if (!fCatchingSignals)
{
sigHandler = bind1st(mem_fun(&FairMQDevice::SignalHandler), this);
signal(SIGINT, CallSignalHandler);
signal(SIGTERM, CallSignalHandler);
fCatchingSignals = true;
}
}
void FairMQDevice::SignalHandler(int signal)
{
LOG(INFO) << "Caught signal " << signal;
if (!fTerminationRequested)
{
fTerminationRequested = true;
ChangeState(STOP);
ChangeState(RESET_TASK);
WaitForEndOfState(RESET_TASK);
ChangeState(RESET_DEVICE);
WaitForEndOfState(RESET_DEVICE);
ChangeState(END);
// exit(EXIT_FAILURE);
LOG(INFO) << "Exiting.";
}
else
{
LOG(WARN) << "Repeated termination or bad initialization? Aborting.";
abort();
// exit(EXIT_FAILURE);
}
}
void FairMQDevice::InitWrapper()
{
if (!fTransportFactory)

View File

@ -556,11 +556,6 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable
void CreateOwnConfig();
/// Signal handler
void SignalHandler(int signal);
bool fCatchingSignals;
std::atomic<bool> fTerminationRequested;
bool fDataCallbacks;
std::unordered_map<FairMQ::Transport, FairMQSocketPtr> fDeviceCmdSockets; ///< Sockets used for the internal unblocking mechanism
std::unordered_map<std::string, InputMsgCallback> fMsgInputs;

View File

@ -76,11 +76,12 @@ class Plugin
auto ToStr(DeviceStateTransition transition) const -> std::string { return fPluginServices->ToStr(transition); }
auto GetCurrentDeviceState() const -> DeviceState { return fPluginServices->GetCurrentDeviceState(); }
auto TakeDeviceControl() -> void { fPluginServices->TakeDeviceControl(fkName); };
auto StealDeviceControl() -> void { fPluginServices->StealDeviceControl(fkName); };
auto ReleaseDeviceControl() -> void { fPluginServices->ReleaseDeviceControl(fkName); };
auto ChangeDeviceState(const DeviceStateTransition next) -> void { fPluginServices->ChangeDeviceState(fkName, next); }
auto SubscribeToDeviceStateChange(std::function<void(DeviceState)> callback) -> void { fPluginServices->SubscribeToDeviceStateChange(fkName, callback); }
auto UnsubscribeFromDeviceStateChange() -> void { fPluginServices->UnsubscribeFromDeviceStateChange(fkName); }
auto DeviceTerminated() const -> bool { return fPluginServices->DeviceTerminated(); }
// device config API
// see <fairmq/PluginServices.h> for docs
template<typename T>

View File

@ -44,6 +44,7 @@ const std::unordered_map<std::string, PluginServices::DeviceStateTransition> Plu
{"INIT TASK", DeviceStateTransition::InitTask},
{"RUN", DeviceStateTransition::Run},
{"PAUSE", DeviceStateTransition::Pause},
{"RESUME", DeviceStateTransition::Resume},
{"STOP", DeviceStateTransition::Stop},
{"RESET TASK", DeviceStateTransition::ResetTask},
{"RESET DEVICE", DeviceStateTransition::ResetDevice},
@ -55,6 +56,7 @@ const std::unordered_map<PluginServices::DeviceStateTransition, std::string, too
{DeviceStateTransition::InitTask, "INIT TASK"},
{DeviceStateTransition::Run, "RUN"},
{DeviceStateTransition::Pause, "PAUSE"},
{DeviceStateTransition::Resume, "RESUME"},
{DeviceStateTransition::Stop, "STOP"},
{DeviceStateTransition::ResetTask, "RESET TASK"},
{DeviceStateTransition::ResetDevice, "RESET DEVICE"},
@ -80,6 +82,7 @@ const std::unordered_map<PluginServices::DeviceStateTransition, FairMQDevice::Ev
{DeviceStateTransition::InitTask, FairMQDevice::INIT_TASK},
{DeviceStateTransition::Run, FairMQDevice::RUN},
{DeviceStateTransition::Pause, FairMQDevice::PAUSE},
{DeviceStateTransition::Resume, FairMQDevice::RUN},
{DeviceStateTransition::Stop, FairMQDevice::STOP},
{DeviceStateTransition::ResetTask, FairMQDevice::RESET_TASK},
{DeviceStateTransition::ResetDevice, FairMQDevice::RESET_DEVICE},
@ -89,9 +92,9 @@ const std::unordered_map<PluginServices::DeviceStateTransition, FairMQDevice::Ev
auto PluginServices::ChangeDeviceState(const std::string& controller, const DeviceStateTransition next) -> void
{
// lock_guard<mutex> lock{fDeviceControllerMutex};
//
// if (!fDeviceController) fDeviceController = controller;
lock_guard<mutex> lock{fDeviceControllerMutex};
if (!fDeviceController) fDeviceController = controller;
if (fDeviceController == controller)
{
@ -125,7 +128,16 @@ auto PluginServices::TakeDeviceControl(const std::string& controller) -> void
"Currently, plugin '", fDeviceController, "' has taken control."
)};
}
}
auto PluginServices::StealDeviceControl(const std::string& controller) -> void
{
lock_guard<mutex> lock{fDeviceControllerMutex};
if (!fDeviceController)
{
fDeviceController = controller;
}
}
auto PluginServices::ReleaseDeviceControl(const std::string& controller) -> void

View File

@ -73,6 +73,7 @@ class PluginServices
InitTask,
Run,
Pause,
Resume,
Stop,
ResetTask,
ResetDevice,
@ -118,6 +119,13 @@ class PluginServices
auto TakeDeviceControl(const std::string& controller) -> void;
struct DeviceControlError : std::runtime_error { using std::runtime_error::runtime_error; };
/// @brief Become device controller by force
/// @param controller id
///
/// Take over device controller privileges by force. Does not trigger the ReleaseDeviceControl condition!
/// This function is intended to implement override/emergency control functionality (e.g. device shutdown on SIGINT).
auto StealDeviceControl(const std::string& controller) -> void;
/// @brief Release device controller role
/// @param controller id
/// @throws fair::mq::PluginServices::DeviceControlError if passed controller id is not the current device controller.
@ -156,8 +164,6 @@ class PluginServices
/// @param subscriber id
auto UnsubscribeFromDeviceStateChange(const std::string& subscriber) -> void { fDevice->UnsubscribeFromStateChange(subscriber); }
auto DeviceTerminated() const -> bool { return fDevice->Terminated(); }
// Config API
/// @brief Set config property

View File

@ -83,12 +83,6 @@ void FairMQProgOptions::ParseAll(const int argc, char const* const* argv, bool a
return;
}
if (fVarMap.count("id") == 0)
{
LOG(ERROR) << "Device id not provided, provide with --id";
exit(EXIT_FAILURE);
}
string verbosity = GetValue<string>("verbosity");
string logFile = GetValue<string>("log-to-file");
bool color = GetValue<bool>("log-color");

View File

@ -10,9 +10,21 @@
#include <termios.h> // for the interactive mode
#include <poll.h> // for the interactive mode
#include <csignal> // catching system signals
#include <functional>
using namespace std;
namespace
{
std::function<void(int)> gSignalHandlerClosure;
extern "C" auto signal_handler(int signal) -> void
{
gSignalHandlerClosure(signal);
}
}
namespace fair
{
namespace mq
@ -23,6 +35,8 @@ namespace plugins
Control::Control(const string name, const Plugin::Version version, const string maintainer, const string homepage, PluginServices* pluginServices)
: Plugin(name, version, maintainer, homepage, pluginServices)
, fControllerThread()
, fSignalHandlerThread()
, fDeviceTerminationRequested{false}
, fEvents()
, fEventsMutex()
, fNewEvent()
@ -45,9 +59,21 @@ Control::Control(const string name, const Plugin::Version version, const string
}
else
{
LOG(ERROR) << "Unrecognized control mode '" << control << "' requested via command line. " << "Ignoring and falling back to static control mode.";
LOG(ERROR) << "Unrecognized control mode '" << control << "' requested. " << "Ignoring and falling back to static control mode.";
fControllerThread = thread(&Control::StaticMode, this);
}
LOG(DEBUG) << "catch-signals: " << GetProperty<int>("catch-signals");
if (GetProperty<int>("catch-signals") > 0)
{
gSignalHandlerClosure = bind(&Control::SignalHandler, this, placeholders::_1);
signal(SIGINT, signal_handler);
signal(SIGTERM, signal_handler);
}
else
{
LOG(WARN) << "Signal handling (e.g. Ctrl-C) has been deactivated.";
}
}
catch (PluginServices::DeviceControlError& e)
{
@ -58,13 +84,17 @@ Control::Control(const string name, const Plugin::Version version, const string
auto ControlPluginProgramOptions() -> Plugin::ProgOptions
{
auto pluginOptions = boost::program_options::options_description{"Control (builtin) Plugin"};
namespace po = boost::program_options;
auto pluginOptions = po::options_description{"Control (builtin) Plugin"};
pluginOptions.add_options()
("control", boost::program_options::value<string>()->default_value("interactive"), "Control mode, 'static' or 'interactive'");
("control", po::value<string>()->default_value("interactive"), "Control mode, 'static' or 'interactive'")
("catch-signals", po::value<int >()->default_value(1), "Enable signal handling (1/0).");
return pluginOptions;
}
auto Control::InteractiveMode() -> void
{
try
{
SubscribeToDeviceStateChange([&](DeviceState newState)
{
@ -75,11 +105,7 @@ auto Control::InteractiveMode() -> void
fNewEvent.notify_one();
});
ChangeDeviceState(DeviceStateTransition::InitDevice);
while (WaitForNextState() != DeviceState::DeviceReady) {}
ChangeDeviceState(DeviceStateTransition::InitTask);
while (WaitForNextState() != DeviceState::Ready) {}
ChangeDeviceState(DeviceStateTransition::Run);
RunStartupSequence();
char input; // hold the user console input
pollfd cinfd[1];
@ -99,9 +125,8 @@ auto Control::InteractiveMode() -> void
{
if (poll(cinfd, 1, 500))
{
if (DeviceTerminated())
if (fDeviceTerminationRequested)
{
keepRunning = false;
break;
}
@ -147,23 +172,7 @@ auto Control::InteractiveMode() -> void
// break;
case 'q':
LOG(INFO) << "[q] end";
ChangeDeviceState(DeviceStateTransition::Stop);
ChangeDeviceState(DeviceStateTransition::ResetTask);
while (WaitForNextState() != DeviceState::DeviceReady) {}
ChangeDeviceState(DeviceStateTransition::ResetDevice);
while (WaitForNextState() != DeviceState::Idle) {}
ChangeDeviceState(DeviceStateTransition::End);
if (GetCurrentDeviceState() == DeviceState::Exiting)
{
keepRunning = false;
}
LOG(INFO) << "Exiting.";
break;
default:
LOG(INFO) << "Invalid input: [" << input << "]";
@ -172,7 +181,7 @@ auto Control::InteractiveMode() -> void
}
}
if (DeviceTerminated())
if (fDeviceTerminationRequested)
{
keepRunning = false;
}
@ -182,8 +191,16 @@ auto Control::InteractiveMode() -> void
t.c_lflag |= ICANON; // re-enable canonical input
tcsetattr(STDIN_FILENO, TCSANOW, &t); // apply the new settings
UnsubscribeFromDeviceStateChange();
ReleaseDeviceControl();
if (!fDeviceTerminationRequested)
{
RunShutdownSequence();
}
}
catch (PluginServices::DeviceControlError& e)
{
// If we are here, it means another plugin has taken control. That's fine, just print the exception message and do nothing else.
LOG(DEBUG) << e.what();
}
}
auto Control::PrintInteractiveHelp() -> void
@ -206,6 +223,8 @@ auto Control::WaitForNextState() -> DeviceState
}
auto Control::StaticMode() -> void
{
try
{
SubscribeToDeviceStateChange([&](DeviceState newState)
{
@ -216,33 +235,106 @@ auto Control::StaticMode() -> void
fNewEvent.notify_one();
});
ChangeDeviceState(DeviceStateTransition::InitDevice);
while (WaitForNextState() != DeviceState::DeviceReady) {}
RunStartupSequence();
ChangeDeviceState(DeviceStateTransition::InitTask);
while (WaitForNextState() != DeviceState::Ready) {}
ChangeDeviceState(DeviceStateTransition::Run);
while (WaitForNextState() != DeviceState::Ready) {}
if (!DeviceTerminated())
{
ChangeDeviceState(DeviceStateTransition::ResetTask);
while (WaitForNextState() != DeviceState::DeviceReady) {}
ChangeDeviceState(DeviceStateTransition::ResetDevice);
while (WaitForNextState() != DeviceState::Idle) {}
// Wait for next state, which is DeviceState::Ready,
// or for device termination request
unique_lock<mutex> lock{fEventsMutex};
while (fEvents.empty() && !fDeviceTerminationRequested)
{
fNewEvent.wait(lock);
}
}
if (!fDeviceTerminationRequested)
{
RunShutdownSequence();
}
}
catch (PluginServices::DeviceControlError& e)
{
// If we are here, it means another plugin has taken control. That's fine, just print the exception message and do nothing else.
LOG(DEBUG) << e.what();
}
}
auto Control::SignalHandler(int signal) -> void
{
if (!fDeviceTerminationRequested)
{
fDeviceTerminationRequested = true;
StealDeviceControl();
LOG(INFO) << "Received device shutdown request (signal " << signal << ").";
LOG(INFO) << "Waiting for graceful device shutdown. Hit Ctrl-C again to abort immediately.";
fSignalHandlerThread = thread(&Control::RunShutdownSequence, this);
}
else
{
LOG(WARN) << "Received 2nd device shutdown request (signal " << signal << ").";
LOG(WARN) << "Aborting immediately !";
abort();
}
}
auto Control::RunShutdownSequence() -> void
{
auto nextState = GetCurrentDeviceState();
EmptyEventQueue();
while (nextState != DeviceState::Exiting)
{
switch (nextState)
{
case DeviceState::Idle:
ChangeDeviceState(DeviceStateTransition::End);
while (WaitForNextState() != DeviceState::Exiting) {}
break;
case DeviceState::DeviceReady:
ChangeDeviceState(DeviceStateTransition::ResetDevice);
break;
case DeviceState::Ready:
ChangeDeviceState(DeviceStateTransition::ResetTask);
break;
case DeviceState::Running:
ChangeDeviceState(DeviceStateTransition::Stop);
break;
case DeviceState::Paused:
ChangeDeviceState(DeviceStateTransition::Resume);
break;
default:
break;
}
nextState = WaitForNextState();
}
UnsubscribeFromDeviceStateChange();
ReleaseDeviceControl();
}
auto Control::RunStartupSequence() -> void
{
ChangeDeviceState(DeviceStateTransition::InitDevice);
while (WaitForNextState() != DeviceState::DeviceReady) {}
ChangeDeviceState(DeviceStateTransition::InitTask);
while (WaitForNextState() != DeviceState::Ready) {}
ChangeDeviceState(DeviceStateTransition::Run);
while (WaitForNextState() != DeviceState::Running) {}
}
auto Control::EmptyEventQueue() -> void
{
lock_guard<mutex> lock{fEventsMutex};
fEvents = queue<DeviceState>{};
}
Control::~Control()
{
if (fControllerThread.joinable())
{
fControllerThread.join();
}
if (fControllerThread.joinable()) fControllerThread.join();
if (fSignalHandlerThread.joinable()) fSignalHandlerThread.join();
}
} /* namespace plugins */

View File

@ -16,6 +16,7 @@
#include <string>
#include <queue>
#include <thread>
#include <atomic>
namespace fair
{
@ -36,11 +37,17 @@ class Control : public Plugin
auto PrintInteractiveHelp() -> void;
auto StaticMode() -> void;
auto WaitForNextState() -> DeviceState;
auto SignalHandler(int signal) -> void;
auto RunShutdownSequence() -> void;
auto RunStartupSequence() -> void;
auto EmptyEventQueue() -> void;
std::thread fControllerThread;
std::thread fSignalHandlerThread;
std::queue<DeviceState> fEvents;
std::mutex fEventsMutex;
std::condition_variable fNewEvent;
std::atomic<bool> fDeviceTerminationRequested;
}; /* class Control */
auto ControlPluginProgramOptions() -> Plugin::ProgOptions;

View File

@ -95,16 +95,6 @@ int main(int argc, const char** argv)
return 0;
}
// Handle --catch-signals
if (config.GetValue<int>("catch-signals") > 0)
{
device->CatchSignals();
}
else
{
LOG(WARN) << "Signal handling (e.g. ctrl+C) has been deactivated via command line argument";
}
LOG(DEBUG) << "PID: " << getpid();
// Configure device

View File

@ -67,15 +67,17 @@ TEST(PluginManager, LoadPluginDynamic)
TEST(PluginManager, LoadPluginStatic)
{
FairMQProgOptions config{};
auto mgr = PluginManager{};
auto device = make_shared<FairMQDevice>();
mgr.EmplacePluginServices(&config, device);
device->SetTransport("zeromq");
ASSERT_NO_THROW(mgr.LoadPlugin("s:control"));
FairMQProgOptions config{};
config.SetValue<string>("control", "static");
config.SetValue("catch-signals", 0);
mgr.EmplacePluginServices(&config, device);
ASSERT_NO_THROW(mgr.InstantiatePlugins());
// check order