Extract state queue into own class. Use in device, plugins

This commit is contained in:
Alexey Rybalchenko 2019-07-17 15:19:08 +02:00 committed by Dennis Klein
parent 4487b81de8
commit f515eb1100
9 changed files with 159 additions and 129 deletions

View File

@ -92,6 +92,7 @@ if(BUILD_FAIRMQ OR BUILD_SDK)
set(FSM_PUBLIC_HEADER_FILES
StateMachine.h
States.h
StateQueue.h
)
set(FSM_SOURCE_FILES

View File

@ -66,6 +66,27 @@ constexpr uint64_t FairMQDevice::DefaultMaxRunTime;
constexpr float FairMQDevice::DefaultRate;
constexpr const char* FairMQDevice::DefaultSession;
struct StateSubscription
{
fair::mq::StateMachine& fStateMachine;
fair::mq::StateQueue& fStateQueue;
string fId;
explicit StateSubscription(const string& id, fair::mq::StateMachine& stateMachine, fair::mq::StateQueue& stateQueue)
: fStateMachine(stateMachine)
, fStateQueue(stateQueue)
, fId(id)
{
fStateMachine.SubscribeToStateChange(fId, [&](fair::mq::State state) {
fStateQueue.Push(state);
});
}
~StateSubscription() {
fStateMachine.UnsubscribeFromStateChange(fId);
}
};
FairMQDevice::FairMQDevice()
: FairMQDevice(nullptr, {0, 0, 0})
{}
@ -106,9 +127,6 @@ FairMQDevice::FairMQDevice(ProgOptions* config, const tools::Version version)
, fMaxRunRuntimeInS(DefaultMaxRunTime)
, fInitializationTimeoutInS(DefaultInitTimeout)
, fRawCmdLineArgs()
, fStates()
, fStatesMtx()
, fStatesCV()
, fTransitionMtx()
, fTransitioning(false)
{
@ -127,11 +145,7 @@ FairMQDevice::FairMQDevice(ProgOptions* config, const tools::Version version)
fStateMachine.HandleStates([&](fair::mq::State state) {
LOG(trace) << "device notified on new state: " << state;
{
lock_guard<mutex> lock(fStatesMtx);
fStates.push(state);
}
fStatesCV.notify_all();
fStateQueue.Push(state);
switch (state) {
case fair::mq::State::InitializingDevice:
@ -167,29 +181,6 @@ FairMQDevice::FairMQDevice(ProgOptions* config, const tools::Version version)
fStateMachine.Start();
}
fair::mq::State FairMQDevice::WaitForNextState()
{
unique_lock<mutex> lock(fStatesMtx);
while (fStates.empty()) {
fStatesCV.wait_for(lock, chrono::milliseconds(50));
}
auto state = fStates.front();
if (state == fair::mq::State::Error) {
throw DeviceStateError("Device transitioned to error state.");
}
fStates.pop();
return state;
}
void FairMQDevice::WaitForState(fair::mq::State state)
{
while (WaitForNextState() != state) {}
}
void FairMQDevice::TransitionTo(const fair::mq::State s)
{
{
@ -202,6 +193,10 @@ void FairMQDevice::TransitionTo(const fair::mq::State s)
}
using fair::mq::State;
StateQueue sq;
StateSubscription ss(tools::ToString(fId, ".TransitionTo"), fStateMachine, sq);
State currentState = GetCurrentState();
while (s != currentState) {
@ -244,7 +239,7 @@ void FairMQDevice::TransitionTo(const fair::mq::State s)
break;
}
currentState = WaitForNextState();
currentState = sq.WaitForNext();
}
{

View File

@ -12,6 +12,7 @@
#include <StateMachine.h>
#include <FairMQTransportFactory.h>
#include <fairmq/Transports.h>
#include <fairmq/StateQueue.h>
#include <FairMQSocket.h>
#include <FairMQChannel.h>
@ -498,8 +499,8 @@ class FairMQDevice
void WaitForEndOfState(const fair::mq::Transition transition) __attribute__((deprecated("Use WaitForState(fair::mq::State expectedState).")));
void WaitForEndOfState(const std::string& transition) __attribute__((deprecated("Use WaitForState(fair::mq::State expectedState)."))) { WaitForState(transition); }
fair::mq::State WaitForNextState();
void WaitForState(fair::mq::State state);
fair::mq::State WaitForNextState() { return fStateQueue.WaitForNext(); }
void WaitForState(fair::mq::State state) { fStateQueue.WaitForState(state); }
void WaitForState(const std::string& state) { WaitForState(fair::mq::GetState(state)); }
void TransitionTo(const fair::mq::State state);
@ -522,8 +523,6 @@ class FairMQDevice
static std::string GetStateName(const fair::mq::State state) { return fair::mq::GetStateName(state); }
static std::string GetTransitionName(const fair::mq::Transition transition) { return fair::mq::GetTransitionName(transition); }
struct DeviceStateError : std::runtime_error { using std::runtime_error::runtime_error; };
static constexpr const char* DefaultId = "";
static constexpr int DefaultIOThreads = 1;
static constexpr const char* DefaultTransportName = "zeromq";
@ -589,9 +588,7 @@ class FairMQDevice
int fInitializationTimeoutInS;
std::vector<std::string> fRawCmdLineArgs;
std::queue<fair::mq::State> fStates;
std::mutex fStatesMtx;
std::condition_variable fStatesCV;
fair::mq::StateQueue fStateQueue;
std::mutex fTransitionMtx;
bool fTransitioning;

94
fairmq/StateQueue.h Normal file
View File

@ -0,0 +1,94 @@
/********************************************************************************
* Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIRMQSTATEQUEUE_H_
#define FAIRMQSTATEQUEUE_H_
#include <fairmq/States.h>
#include <queue>
#include <mutex>
#include <chrono>
#include <utility> // pair
#include <condition_variable>
namespace fair
{
namespace mq
{
class StateQueue
{
public:
StateQueue() {}
~StateQueue() {}
fair::mq::State WaitForNext()
{
std::unique_lock<std::mutex> lock(fMtx);
while (fStates.empty()) {
fCV.wait_for(lock, std::chrono::milliseconds(50));
}
fair::mq::State state = fStates.front();
if (state == fair::mq::State::Error) {
throw DeviceErrorState("Controlled device transitioned to error state.");
}
fStates.pop();
return state;
}
template<typename Rep, typename Period>
std::pair<bool, fair::mq::State> WaitForNext(std::chrono::duration<Rep, Period> const& duration)
{
std::unique_lock<std::mutex> lock(fMtx);
fCV.wait_for(lock, duration);
if (fStates.empty()) {
return { false, fair::mq::State::Ok };
}
fair::mq::State state = fStates.front();
if (state == fair::mq::State::Error) {
throw DeviceErrorState("Controlled device transitioned to error state.");
}
fStates.pop();
return { true, state };
}
void WaitForState(fair::mq::State state) { while (WaitForNext() != state) {} }
void Push(fair::mq::State state)
{
{
std::lock_guard<std::mutex> lock(fMtx);
fStates.push(state);
}
fCV.notify_all();
}
void Clear()
{
std::lock_guard<std::mutex> lock(fMtx);
fStates = std::queue<fair::mq::State>();
}
private:
std::queue<fair::mq::State> fStates;
std::mutex fMtx;
std::condition_variable fCV;
};
} // namespace mq
} // namespace fair
#endif /* FAIRMQSTATEQUEUE_H_ */

View File

@ -11,6 +11,7 @@
#include <string>
#include <ostream>
#include <stdexcept>
namespace fair
{
@ -57,6 +58,8 @@ std::string GetTransitionName(const Transition);
State GetState(const std::string& state);
Transition GetTransition(const std::string& transition);
struct DeviceErrorState : std::runtime_error { using std::runtime_error::runtime_error; };
inline std::ostream& operator<<(std::ostream& os, const State& state) { return os << GetStateName(state); }
inline std::ostream& operator<<(std::ostream& os, const Transition& transition) { return os << GetTransitionName(transition); }

View File

@ -46,10 +46,7 @@ Control::Control(const string& name, const Plugin::Version version, const string
: Plugin(name, version, maintainer, homepage, pluginServices)
, fControllerThread()
, fSignalHandlerThread()
, fEvents()
, fEventsMutex()
, fControllerMutex()
, fNewEvent()
, fDeviceShutdownRequested(false)
, fDeviceHasShutdown(false)
, fPluginShutdownRequested(false)
@ -57,16 +54,11 @@ Control::Control(const string& name, const Plugin::Version version, const string
SubscribeToDeviceStateChange([&](DeviceState newState) {
LOG(trace) << "control plugin notified on new state: " << newState;
{
lock_guard<mutex> lock{fEventsMutex};
fEvents.push(newState);
}
fNewEvent.notify_one();
fStateQueue.Push(newState);
if (newState == DeviceState::Error) {
fPluginShutdownRequested = true;
fDeviceShutdownRequested = true;
// throw DeviceErrorState("Controlled device transitioned to error state.");
}
});
@ -103,36 +95,17 @@ Control::Control(const string& name, const Plugin::Version version, const string
auto Control::RunStartupSequence() -> void
{
ChangeDeviceState(DeviceStateTransition::InitDevice);
while (WaitForNextState() != DeviceState::InitializingDevice) {}
while (fStateQueue.WaitForNext() != DeviceState::InitializingDevice) {}
ChangeDeviceState(DeviceStateTransition::CompleteInit);
while (WaitForNextState() != DeviceState::Initialized) {}
while (fStateQueue.WaitForNext() != DeviceState::Initialized) {}
ChangeDeviceState(DeviceStateTransition::Bind);
while (WaitForNextState() != DeviceState::Bound) {}
while (fStateQueue.WaitForNext() != DeviceState::Bound) {}
ChangeDeviceState(DeviceStateTransition::Connect);
while (WaitForNextState() != DeviceState::DeviceReady) {}
while (fStateQueue.WaitForNext() != DeviceState::DeviceReady) {}
ChangeDeviceState(DeviceStateTransition::InitTask);
while (WaitForNextState() != DeviceState::Ready) {}
while (fStateQueue.WaitForNext() != DeviceState::Ready) {}
ChangeDeviceState(DeviceStateTransition::Run);
while (WaitForNextState() != DeviceState::Running) {}
}
auto Control::WaitForNextState() -> DeviceState
{
unique_lock<mutex> lock{fEventsMutex};
while (fEvents.empty()) {
fNewEvent.wait_for(lock, chrono::milliseconds(50));
}
auto result = fEvents.front();
if (result == DeviceState::Error) {
ReleaseDeviceControl();
throw DeviceErrorState("Controlled device transitioned to error state.");
}
fEvents.pop();
return result;
while (fStateQueue.WaitForNext() != DeviceState::Running) {}
}
auto ControlPluginProgramOptions() -> Plugin::ProgOptions
@ -204,7 +177,7 @@ try {
case 'i':
cout << "\n --> [i] init device\n\n" << flush;
if (ChangeDeviceState(DeviceStateTransition::InitDevice)) {
while (WaitForNextState() != DeviceState::InitializingDevice) {}
while (fStateQueue.WaitForNext() != DeviceState::InitializingDevice) {}
ChangeDeviceState(DeviceStateTransition::CompleteInit);
}
break;
@ -274,7 +247,6 @@ try {
}
if (GetCurrentDeviceState() == DeviceState::Error) {
ReleaseDeviceControl();
throw DeviceErrorState("Controlled device transitioned to error state.");
}
@ -288,6 +260,7 @@ try {
// If we are here, it means another plugin has taken control. That's fine, just print the exception message and do nothing else.
LOG(debug) << e.what();
} catch (DeviceErrorState&) {
ReleaseDeviceControl();
}
auto Control::PrintInteractiveHelpColor() -> void
@ -397,15 +370,10 @@ try {
{
// Wait for next state, which is DeviceState::Ready,
// or for device shutdown request (Ctrl-C)
unique_lock<mutex> lock{fEventsMutex};
while (fEvents.empty() && !fDeviceShutdownRequested) {
fNewEvent.wait_for(lock, chrono::milliseconds(50));
}
if (fEvents.front() == DeviceState::Error) {
ReleaseDeviceControl();
throw DeviceErrorState("Controlled device transitioned to error state.");
}
pair<bool, fair::mq::State> result;
do {
result = fStateQueue.WaitForNext(chrono::milliseconds(50));
} while (result.first == false && !fDeviceShutdownRequested);
}
RunShutdownSequence();
@ -413,6 +381,7 @@ try {
// If we are here, it means another plugin has taken control. That's fine, just print the exception message and do nothing else.
LOG(debug) << e.what();
} catch (DeviceErrorState&) {
ReleaseDeviceControl();
}
auto Control::SignalHandler() -> void
@ -440,6 +409,7 @@ auto Control::SignalHandler() -> void
} catch (PluginServices::DeviceControlError& e) {
LOG(info) << "Graceful device shutdown failed: " << e.what() << " If hanging, hit Ctrl-C again to abort immediately.";
} catch (...) {
ReleaseDeviceControl();
LOG(info) << "Graceful device shutdown failed. If hanging, hit Ctrl-C again to abort immediately.";
}
}
@ -450,7 +420,7 @@ auto Control::RunShutdownSequence() -> void
{
auto nextState = GetCurrentDeviceState();
if (nextState != DeviceState::Error) {
EmptyEventQueue();
fStateQueue.Clear();
}
while (nextState != DeviceState::Exiting && nextState != DeviceState::Error) {
switch (nextState) {
@ -473,19 +443,13 @@ auto Control::RunShutdownSequence() -> void
break;
}
nextState = WaitForNextState();
nextState = fStateQueue.WaitForNext();
}
fDeviceHasShutdown = true;
ReleaseDeviceControl();
}
auto Control::EmptyEventQueue() -> void
{
lock_guard<mutex> lock{fEventsMutex};
fEvents = queue<DeviceState>{};
}
Control::~Control()
{
// Notify threads to exit

View File

@ -11,6 +11,7 @@
#include <fairmq/Plugin.h>
#include <fairmq/Version.h>
#include <fairmq/StateQueue.h>
#include <condition_variable>
#include <mutex>
@ -41,23 +42,17 @@ class Control : public Plugin
static auto PrintStateMachineColor() -> void;
static auto PrintStateMachine() -> void;
auto StaticMode() -> void;
auto WaitForNextState() -> DeviceState;
auto SignalHandler() -> void;
auto RunShutdownSequence() -> void;
auto RunStartupSequence() -> void;
auto EmptyEventQueue() -> void;
std::thread fControllerThread;
std::thread fSignalHandlerThread;
std::queue<DeviceState> fEvents;
std::mutex fEventsMutex;
std::mutex fControllerMutex;
std::condition_variable fNewEvent;
std::atomic<bool> fDeviceShutdownRequested;
std::atomic<bool> fDeviceHasShutdown;
std::atomic<bool> fPluginShutdownRequested;
struct DeviceErrorState : std::runtime_error { using std::runtime_error::runtime_error; };
fair::mq::StateQueue fStateQueue;
}; /* class Control */
auto ControlPluginProgramOptions() -> Plugin::ProgOptions;

View File

@ -42,9 +42,6 @@ DDS::DDS(const string& name, const Plugin::Version version, const string& mainta
, fStopCondition()
, fTransitions({ "BIND", "CONNECT", "INIT TASK", "RUN", "STOP", "RESET TASK", "RESET DEVICE" })
, fControllerThread()
, fEvents()
, fEventsMutex()
, fNewEvent()
, fCurrentState(DeviceState::Idle)
, fLastState(DeviceState::Idle)
, fDeviceTerminationRequested(false)
@ -86,11 +83,7 @@ auto DDS::HandleControl() -> void
// subscribe to device state changes, pushing new state changes into the event queue
SubscribeToDeviceStateChange([&](DeviceState newState) {
{
lock_guard<mutex> lock{fEventsMutex};
fEvents.push(newState);
}
fNewEvent.notify_one();
fStateQueue.Push(newState);
if (newState == DeviceState::Exiting) {
fDeviceTerminationRequested = true;
}
@ -108,11 +101,11 @@ auto DDS::HandleControl() -> void
});
ChangeDeviceState(DeviceStateTransition::InitDevice);
while (WaitForNextState() != DeviceState::InitializingDevice) {}
while (fStateQueue.WaitForNext() != DeviceState::InitializingDevice) {}
ChangeDeviceState(DeviceStateTransition::CompleteInit);
while (WaitForNextState() != DeviceState::Initialized) {}
while (fStateQueue.WaitForNext() != DeviceState::Initialized) {}
ChangeDeviceState(DeviceStateTransition::Bind);
while (WaitForNextState() != DeviceState::Bound) {}
while (fStateQueue.WaitForNext() != DeviceState::Bound) {}
// in the Initializing state subscribe to receive addresses of connecting channels from DDS
// and propagate addresses of bound channels to DDS.
@ -126,10 +119,10 @@ auto DDS::HandleControl() -> void
PublishBoundChannels();
ChangeDeviceState(DeviceStateTransition::Connect);
while (WaitForNextState() != DeviceState::DeviceReady) {}
while (fStateQueue.WaitForNext() != DeviceState::DeviceReady) {}
ChangeDeviceState(DeviceStateTransition::InitTask);
while (WaitForNextState() != DeviceState::Ready) {}
while (fStateQueue.WaitForNext() != DeviceState::Ready) {}
ChangeDeviceState(DeviceStateTransition::Run);
// wait until stop signal
@ -138,6 +131,8 @@ auto DDS::HandleControl() -> void
fStopCondition.wait_for(lock, chrono::seconds(1));
}
LOG(debug) << "Stopping DDS control plugin";
} catch (DeviceErrorState&) {
ReleaseDeviceControl();
} catch (exception& e) {
LOG(error) << "Error: " << e.what() << endl;
return;
@ -321,7 +316,7 @@ auto DDS::SubscribeForCustomCommands() -> void
} else if (cmd == "INIT DEVICE") {
if (ChangeDeviceState(ToDeviceStateTransition(cmd))) {
fDDSCustomCmd.send(id + ": queued " + cmd + " transition", to_string(senderId));
while (WaitForNextState() != DeviceState::InitializingDevice) {}
while (fStateQueue.WaitForNext() != DeviceState::InitializingDevice) {}
ChangeDeviceState(DeviceStateTransition::CompleteInit);
} else {
fDDSCustomCmd.send(id + ": could not queue " + cmd + " transition", to_string(senderId));
@ -391,18 +386,6 @@ auto DDS::SubscribeForCustomCommands() -> void
});
}
auto DDS::WaitForNextState() -> DeviceState
{
unique_lock<mutex> lock{fEventsMutex};
while (fEvents.empty()) {
fNewEvent.wait_for(lock, chrono::milliseconds(50));
}
auto result = fEvents.front();
fEvents.pop();
return result;
}
DDS::~DDS()
{
if (fControllerThread.joinable()) {

View File

@ -11,6 +11,7 @@
#include <fairmq/Plugin.h>
#include <fairmq/Version.h>
#include <fairmq/StateQueue.h>
#include <DDS/dds_intercom.h>
@ -67,7 +68,6 @@ class DDS : public Plugin
private:
auto HandleControl() -> void;
auto WaitForNextState() -> DeviceState;
auto FillChannelContainers() -> void;
auto SubscribeForConnectingChannels() -> void;
@ -92,10 +92,8 @@ class DDS : public Plugin
const std::set<std::string> fTransitions;
std::thread fControllerThread;
std::queue<DeviceState> fEvents;
std::mutex fEventsMutex;
std::condition_variable fNewEvent;
DeviceState fCurrentState, fLastState;
fair::mq::StateQueue fStateQueue;
std::atomic<bool> fDeviceTerminationRequested;
std::atomic<bool> fServiceStarted;