fix(control): Honor SIGINT and SIGTERM in more places

* Queue next transition for long-running states (fix #421)
* Add *OrCustom/Push/Locked family of functions to StateQueue to enable
  composition with custom signals
This commit is contained in:
Dennis Klein 2022-03-03 21:20:49 +01:00 committed by Dennis Klein
parent 27277b11b4
commit 6780b7452c
5 changed files with 191 additions and 96 deletions

View File

@ -290,7 +290,9 @@ void Device::BindWrapper()
Bind(); Bind();
ChangeState(Transition::Auto); if (!NewStatePending()) {
ChangeState(Transition::Auto);
}
} }
void Device::ConnectWrapper() void Device::ConnectWrapper()
@ -327,7 +329,9 @@ void Device::ConnectWrapper()
Connect(); Connect();
ChangeState(Transition::Auto); if (!NewStatePending()) {
ChangeState(Transition::Auto);
}
} }
void Device::AttachChannels(vector<Channel*>& chans) void Device::AttachChannels(vector<Channel*>& chans)
@ -427,7 +431,9 @@ void Device::InitTaskWrapper()
{ {
InitTask(); InitTask();
ChangeState(Transition::Auto); if (!NewStatePending()) {
ChangeState(Transition::Auto);
}
} }
void Device::RunWrapper() void Device::RunWrapper()
@ -443,6 +449,10 @@ void Device::RunWrapper()
if (rateLogging) { if (rateLogging) {
rateLogger = make_unique<thread>(&Device::LogSocketRates, this); rateLogger = make_unique<thread>(&Device::LogSocketRates, this);
} }
tools::CallOnDestruction joinRateLogger([&](){
if (rateLogging && rateLogger->joinable()) { rateLogger->join(); }
});
// notify transports to resume transfers // notify transports to resume transfers
for (auto& t : fTransports) { for (auto& t : fTransports) {
@ -485,10 +495,6 @@ void Device::RunWrapper()
PostRun(); PostRun();
cod.disable(); cod.disable();
if (rateLogging && rateLogger->joinable()) {
rateLogger->join();
}
} }
void Device::HandleSingleChannelInput() void Device::HandleSingleChannelInput()
@ -772,7 +778,9 @@ void Device::ResetTaskWrapper()
{ {
ResetTask(); ResetTask();
ChangeState(Transition::Auto); if (!NewStatePending()) {
ChangeState(Transition::Auto);
}
} }
void Device::ResetWrapper() void Device::ResetWrapper()
@ -786,7 +794,9 @@ void Device::ResetWrapper()
fChannels.clear(); fChannels.clear();
fTransports.clear(); fTransports.clear();
fTransportFactory.reset(); fTransportFactory.reset();
ChangeState(Transition::Auto); if (!NewStatePending()) {
ChangeState(Transition::Auto);
}
} }
Device::~Device() Device::~Device()

View File

@ -187,9 +187,7 @@ struct Machine_ : public state_machine_def<Machine_>
{ {
unique_lock<mutex> lock(fStateMtx); unique_lock<mutex> lock(fStateMtx);
while (!fNewStatePending) { fNewStatePendingCV.wait(lock, [this]{ return fNewStatePending.load(); });
fNewStatePendingCV.wait_for(lock, chrono::milliseconds(100));
}
LOG(state) << fState << " ---> " << fNewState; LOG(state) << fState << " ---> " << fNewState;
fState = static_cast<State>(fNewState); fState = static_cast<State>(fNewState);

View File

@ -1,5 +1,5 @@
/******************************************************************************** /********************************************************************************
* Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Copyright (C) 2019-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* * * *
* This software is distributed under the terms of the * * This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
@ -9,16 +9,14 @@
#ifndef FAIRMQSTATEQUEUE_H_ #ifndef FAIRMQSTATEQUEUE_H_
#define FAIRMQSTATEQUEUE_H_ #define FAIRMQSTATEQUEUE_H_
#include <fairmq/States.h>
#include <queue>
#include <mutex>
#include <chrono> #include <chrono>
#include <utility> // pair
#include <condition_variable> #include <condition_variable>
#include <fairmq/States.h>
#include <mutex>
#include <queue>
#include <utility> // pair
namespace fair::mq namespace fair::mq {
{
class StateQueue class StateQueue
{ {
@ -33,41 +31,47 @@ class StateQueue
fair::mq::State WaitForNext() fair::mq::State WaitForNext()
{ {
std::unique_lock<std::mutex> lock(fMtx); std::unique_lock<std::mutex> lock(fMtx);
while (fStates.empty()) { fCV.wait(lock, [this] { return Predicate(); });
fCV.wait_for(lock, std::chrono::milliseconds(50)); return PopFrontUnsafe();
}
fair::mq::State state = fStates.front();
if (state == fair::mq::State::Error) {
throw DeviceErrorState("Controlled device transitioned to error state.");
}
fStates.pop();
return state;
} }
template<typename Rep, typename Period> template<typename Timeout>
std::pair<bool, fair::mq::State> WaitForNext(std::chrono::duration<Rep, Period> const& duration) std::pair<bool, fair::mq::State> WaitForNext(Timeout&& duration)
{ {
std::unique_lock<std::mutex> lock(fMtx); std::unique_lock<std::mutex> lock(fMtx);
fCV.wait_for(lock, duration); fCV.wait_for(lock, std::forward<Timeout>(duration), [this] { return Predicate(); });
return ReturnPairUnsafe();
if (fStates.empty()) {
return { false, fair::mq::State::Ok };
}
fair::mq::State state = fStates.front();
if (state == fair::mq::State::Error) {
throw DeviceErrorState("Controlled device transitioned to error state.");
}
fStates.pop();
return { true, state };
} }
void WaitForState(fair::mq::State state) { while (WaitForNext() != state) {} } template<typename CustomPredicate>
std::pair<bool, fair::mq::State> WaitForNextOrCustom(CustomPredicate&& customPredicate)
{
std::unique_lock<std::mutex> lock(fMtx);
fCV.wait(lock, [this, cp = std::move(customPredicate)] { return Predicate() || cp(); });
return ReturnPairUnsafe();
}
template<typename CustomPredicate>
std::pair<bool, fair::mq::State> WaitForCustom(CustomPredicate&& customPredicate)
{
std::unique_lock<std::mutex> lock(fMtx);
fCV.wait(lock, [cp = std::move(customPredicate)] { return cp(); });
return ReturnPairUnsafe();
}
void WaitForState(fair::mq::State state)
{
while (WaitForNext() != state) {}
}
template<typename CustomPredicate>
void WaitForStateOrCustom(fair::mq::State state, CustomPredicate customPredicate)
{
auto next = WaitForNextOrCustom(customPredicate);
while (!customPredicate() && (next.first && next.second != state)) {
next = WaitForNextOrCustom(customPredicate);
}
}
void Push(fair::mq::State state) void Push(fair::mq::State state)
{ {
@ -75,7 +79,35 @@ class StateQueue
std::lock_guard<std::mutex> lock(fMtx); std::lock_guard<std::mutex> lock(fMtx);
fStates.push(state); fStates.push(state);
} }
fCV.notify_all(); fCV.notify_one();
}
template<typename CustomSignal>
void Push(fair::mq::State state, CustomSignal&& signal)
{
{
std::lock_guard<std::mutex> lock(fMtx);
fStates.push(state);
signal();
}
fCV.notify_one();
}
template<typename CustomSignal>
void Notify(CustomSignal&& signal)
{
{
std::lock_guard<std::mutex> lock(fMtx);
signal();
}
fCV.notify_one();
}
template<typename CustomSignal>
void Locked(CustomSignal&& signal)
{
std::lock_guard<std::mutex> lock(fMtx);
signal();
} }
void Clear() void Clear()
@ -88,8 +120,29 @@ class StateQueue
std::queue<fair::mq::State> fStates; std::queue<fair::mq::State> fStates;
std::mutex fMtx; std::mutex fMtx;
std::condition_variable fCV; std::condition_variable fCV;
// must be called under locked fMtx
fair::mq::State PopFrontUnsafe()
{
fair::mq::State state = fStates.front();
if (state == fair::mq::State::Error) {
throw DeviceErrorState("Controlled device transitioned to error state.");
}
fStates.pop();
return state;
}
// must be called under locked fMtx
std::pair<bool, fair::mq::State> ReturnPairUnsafe()
{
auto const pred = Predicate();
return {pred, pred ? PopFrontUnsafe() : fair::mq::State::Ok};
}
// must be called under locked fMtx
bool Predicate() { return !fStates.empty(); }
}; };
} // namespace fair::mq } // namespace fair::mq
#endif /* FAIRMQSTATEQUEUE_H_ */ #endif /* FAIRMQSTATEQUEUE_H_ */

View File

@ -1,5 +1,5 @@
/******************************************************************************** /********************************************************************************
* Copyright (C) 2017-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Copyright (C) 2017-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* * * *
* This software is distributed under the terms of the * * This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
@ -56,11 +56,11 @@ Control::Control(const string& name, Plugin::Version version, const string& main
SubscribeToDeviceStateChange([&](DeviceState newState) { SubscribeToDeviceStateChange([&](DeviceState newState) {
LOG(trace) << "control plugin notified on new state: " << newState; LOG(trace) << "control plugin notified on new state: " << newState;
fStateQueue.Push(newState);
if (newState == DeviceState::Error) { if (newState == DeviceState::Error) {
fPluginShutdownRequested = true; fPluginShutdownRequested = true;
fDeviceShutdownRequested = true; fStateQueue.Push(newState, [this]{ fDeviceShutdownRequested = true; });
} else {
fStateQueue.Push(newState);
} }
}); });
@ -99,18 +99,42 @@ Control::Control(const string& name, Plugin::Version version, const string& main
auto Control::RunStartupSequence() -> void auto Control::RunStartupSequence() -> void
{ {
ChangeDeviceState(DeviceStateTransition::InitDevice); using Transition = DeviceStateTransition;
while (fStateQueue.WaitForNext() != DeviceState::InitializingDevice) {} using State = DeviceState;
ChangeDeviceState(DeviceStateTransition::CompleteInit); auto shutdownRequested = [this]{ return fDeviceShutdownRequested.load(); };
while (fStateQueue.WaitForNext() != DeviceState::Initialized) {}
ChangeDeviceState(DeviceStateTransition::Bind); ChangeDeviceState(Transition::InitDevice);
while (fStateQueue.WaitForNext() != DeviceState::Bound) {} fStateQueue.WaitForStateOrCustom(State::InitializingDevice, shutdownRequested);
ChangeDeviceState(DeviceStateTransition::Connect); if (fDeviceShutdownRequested) { return; /* --> shutdown sequence */ }
while (fStateQueue.WaitForNext() != DeviceState::DeviceReady) {}
ChangeDeviceState(DeviceStateTransition::InitTask); ChangeDeviceState(Transition::CompleteInit);
while (fStateQueue.WaitForNext() != DeviceState::Ready) {} fStateQueue.WaitForStateOrCustom(State::Initialized, shutdownRequested);
ChangeDeviceState(DeviceStateTransition::Run); if (fDeviceShutdownRequested) { return; /* --> shutdown sequence */ }
while (fStateQueue.WaitForNext() != DeviceState::Running) {}
ChangeDeviceState(Transition::Bind);
fStateQueue.WaitForStateOrCustom(State::Binding, shutdownRequested);
if (fDeviceShutdownRequested) { return; /* --> shutdown sequence */ }
fStateQueue.WaitForStateOrCustom(State::Bound, shutdownRequested);
if (fDeviceShutdownRequested) { return; /* --> shutdown sequence */ }
ChangeDeviceState(Transition::Connect);
fStateQueue.WaitForStateOrCustom(State::Connecting, shutdownRequested);
if (fDeviceShutdownRequested) { return; /* --> shutdown sequence */ }
fStateQueue.WaitForStateOrCustom(State::DeviceReady, shutdownRequested);
if (fDeviceShutdownRequested) { return; /* --> shutdown sequence */ }
ChangeDeviceState(Transition::InitTask);
fStateQueue.WaitForStateOrCustom(State::InitializingTask, shutdownRequested);
if (fDeviceShutdownRequested) { return; /* --> shutdown sequence */ }
fStateQueue.WaitForStateOrCustom(State::Ready, shutdownRequested);
if (fDeviceShutdownRequested) { return; /* --> shutdown sequence */ }
ChangeDeviceState(Transition::Run);
fStateQueue.WaitForStateOrCustom(State::Running, shutdownRequested);
if (fDeviceShutdownRequested) { return; /* --> shutdown sequence */ }
} }
auto ControlPluginProgramOptions() -> Plugin::ProgOptions auto ControlPluginProgramOptions() -> Plugin::ProgOptions
@ -123,10 +147,8 @@ auto ControlPluginProgramOptions() -> Plugin::ProgOptions
return pluginOptions; return pluginOptions;
} }
auto Control::InteractiveMode() -> void auto Control::RunREPL() -> void
try { {
RunStartupSequence();
char input = 0; // hold the user console input char input = 0; // hold the user console input
pollfd cinfd[1]; pollfd cinfd[1];
cinfd[0].fd = fileno(stdin); cinfd[0].fd = fileno(stdin);
@ -161,7 +183,7 @@ try {
case 'i': case 'i':
cout << "\n --> [i] init device\n\n" << flush; cout << "\n --> [i] init device\n\n" << flush;
if (ChangeDeviceState(DeviceStateTransition::InitDevice)) { if (ChangeDeviceState(DeviceStateTransition::InitDevice)) {
while (fStateQueue.WaitForNext() != DeviceState::InitializingDevice) {} fStateQueue.WaitForState(DeviceState::InitializingDevice);
ChangeDeviceState(DeviceStateTransition::CompleteInit); ChangeDeviceState(DeviceStateTransition::CompleteInit);
} }
break; break;
@ -243,7 +265,19 @@ try {
} }
} }
RunShutdownSequence(); }
auto Control::InteractiveMode() -> void
try {
RunStartupSequence();
if(!fDeviceShutdownRequested) {
RunREPL();
}
if(!fDeviceShutdownRequested) {
RunShutdownSequence();
}
} catch (PluginServices::DeviceControlError& e) { } catch (PluginServices::DeviceControlError& e) {
// If we are here, it means another plugin has taken control. That's fine, just print the exception message and do nothing else. // If we are here, it means another plugin has taken control. That's fine, just print the exception message and do nothing else.
LOG(debug) << e.what(); LOG(debug) << e.what();
@ -366,16 +400,13 @@ auto Control::StaticMode() -> void
try { try {
RunStartupSequence(); RunStartupSequence();
{ // Wait for next state, which is DeviceState::Ready,
// Wait for next state, which is DeviceState::Ready, // or for device shutdown request (Ctrl-C)
// or for device shutdown request (Ctrl-C) fStateQueue.WaitForNextOrCustom([this]{ return fDeviceShutdownRequested.load(); });
pair<bool, fair::mq::State> result;
do {
result = fStateQueue.WaitForNext(chrono::milliseconds(50));
} while (result.first == false && !fDeviceShutdownRequested);
}
RunShutdownSequence(); if(!fDeviceShutdownRequested) {
RunShutdownSequence();
}
} catch (PluginServices::DeviceControlError& e) { } catch (PluginServices::DeviceControlError& e) {
// If we are here, it means another plugin has taken control. That's fine, just print the exception message and do nothing else. // If we are here, it means another plugin has taken control. That's fine, just print the exception message and do nothing else.
LOG(debug) << e.what(); LOG(debug) << e.what();
@ -387,16 +418,12 @@ auto Control::GUIMode() -> void
try { try {
RunStartupSequence(); RunStartupSequence();
{ // Wait for device shutdown request (Ctrl-C)
// Wait for next state, which is DeviceState::Ready, fStateQueue.WaitForCustom([this]{ return fDeviceShutdownRequested.load(); });
// or for device shutdown request (Ctrl-C)
pair<bool, fair::mq::State> result;
do {
result = fStateQueue.WaitForNext(chrono::milliseconds(50));
} while (!fDeviceShutdownRequested);
}
RunShutdownSequence(); if(!fDeviceShutdownRequested) {
RunShutdownSequence();
}
} catch (PluginServices::DeviceControlError& e) { } catch (PluginServices::DeviceControlError& e) {
// If we are here, it means another plugin has taken control. That's fine, just print the // If we are here, it means another plugin has taken control. That's fine, just print the
// exception message and do nothing else. // exception message and do nothing else.
@ -416,10 +443,10 @@ auto Control::SignalHandler() -> void
LOG(info) << "Waiting for graceful device shutdown. Hit Ctrl-C again to abort immediately."; LOG(info) << "Waiting for graceful device shutdown. Hit Ctrl-C again to abort immediately.";
// Signal and wait for controller thread, if we are controller // Signal and wait for controller thread, if we are controller
fDeviceShutdownRequested = true; fStateQueue.Notify([this] { fDeviceShutdownRequested = true; });
{ {
unique_lock<mutex> lock(fControllerMutex); unique_lock<mutex> lock(fControllerMutex);
if (fControllerThread.joinable()) fControllerThread.join(); if (fControllerThread.joinable()) { fControllerThread.join(); }
} }
if (!fDeviceHasShutdown) { if (!fDeviceHasShutdown) {
@ -462,6 +489,12 @@ auto Control::RunShutdownSequence() -> void
case DeviceState::Running: case DeviceState::Running:
ChangeDeviceState(DeviceStateTransition::Stop); ChangeDeviceState(DeviceStateTransition::Stop);
break; break;
case DeviceState::Binding:
case DeviceState::Connecting:
case DeviceState::InitializingTask:
case DeviceState::ResettingTask:
case DeviceState::ResettingDevice:
ChangeDeviceState(DeviceStateTransition::Auto);
default: default:
// LOG(debug) << "Controller ignoring event: " << nextState; // LOG(debug) << "Controller ignoring event: " << nextState;
break; break;
@ -481,9 +514,9 @@ Control::~Control()
{ {
unique_lock<mutex> lock(fControllerMutex); unique_lock<mutex> lock(fControllerMutex);
if (fControllerThread.joinable()) fControllerThread.join(); if (fControllerThread.joinable()) { fControllerThread.join(); }
} }
if (fSignalHandlerThread.joinable()) fSignalHandlerThread.join(); if (fSignalHandlerThread.joinable()) { fSignalHandlerThread.join(); }
UnsubscribeFromDeviceStateChange(); UnsubscribeFromDeviceStateChange();
} }

View File

@ -1,5 +1,5 @@
/******************************************************************************** /********************************************************************************
* Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Copyright (C) 2017-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* * * *
* This software is distributed under the terms of the * * This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
@ -46,6 +46,7 @@ class Control : public Plugin
auto GUIMode() -> void; auto GUIMode() -> void;
auto SignalHandler() -> void; auto SignalHandler() -> void;
auto RunShutdownSequence() -> void; auto RunShutdownSequence() -> void;
auto RunREPL() -> void;
auto RunStartupSequence() -> void; auto RunStartupSequence() -> void;
std::thread fControllerThread; std::thread fControllerThread;