/******************************************************************************** * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ #include #include #include // Increase maximum number of boost::msm states (default is 10) // This #define has to be before any msm header includes #define FUSION_MAX_VECTOR_SIZE 20 #include #include #include #include #include #include #include #include // signal/slot for onStateChange callbacks #include #include #include #include #include using namespace std; using namespace boost::msm; using namespace boost::msm::front; using namespace boost::msm::back; namespace bmpl = boost::mpl; namespace fair::mq { namespace fsm { // list of FSM states struct OK_S : public state<> { static string Name() { return "OK"; } static State Type() { return State::Ok; } }; struct IDLE_S : public state<> { static string Name() { return "IDLE"; } static State Type() { return State::Idle; } }; struct INITIALIZING_DEVICE_S : public state<> { static string Name() { return "INITIALIZING_DEVICE"; } static State Type() { return State::InitializingDevice; } }; struct INITIALIZED_S : public state<> { static string Name() { return "INITIALIZED"; } static State Type() { return State::Initialized; } }; struct BINDING_S : public state<> { static string Name() { return "BINDING"; } static State Type() { return State::Binding; } }; struct BOUND_S : public state<> { static string Name() { return "BOUND"; } static State Type() { return State::Bound; } }; struct CONNECTING_S : public state<> { static string Name() { return "CONNECTING"; } static State Type() { return State::Connecting; } }; struct DEVICE_READY_S : public state<> { static string Name() { return "DEVICE_READY"; } static State Type() { return State::DeviceReady; } }; struct INITIALIZING_TASK_S : public state<> { static string Name() { return "INITIALIZING_TASK"; } static State Type() { return State::InitializingTask; } }; struct READY_S : public state<> { static string Name() { return "READY"; } static State Type() { return State::Ready; } }; struct RUNNING_S : public state<> { static string Name() { return "RUNNING"; } static State Type() { return State::Running; } }; struct RESETTING_TASK_S : public state<> { static string Name() { return "RESETTING_TASK"; } static State Type() { return State::ResettingTask; } }; struct RESETTING_DEVICE_S : public state<> { static string Name() { return "RESETTING_DEVICE"; } static State Type() { return State::ResettingDevice; } }; struct EXITING_S : public state<> { static string Name() { return "EXITING"; } static State Type() { return State::Exiting; } }; struct ERROR_S : public terminate_state<> { static string Name() { return "ERROR"; } static State Type() { return State::Error; } }; // list of FSM transitions (events) struct AUTO_E { static string Name() { return "AUTO"; } static Transition Type() { return Transition::Auto; } }; struct INIT_DEVICE_E { static string Name() { return "INIT_DEVICE"; } static Transition Type() { return Transition::InitDevice; } }; struct COMPLETE_INIT_E { static string Name() { return "COMPLETE_INIT"; } static Transition Type() { return Transition::CompleteInit; } }; struct BIND_E { static string Name() { return "BIND"; } static Transition Type() { return Transition::Bind; } }; struct CONNECT_E { static string Name() { return "CONNECT"; } static Transition Type() { return Transition::Connect; } }; struct INIT_TASK_E { static string Name() { return "INIT_TASK"; } static Transition Type() { return Transition::InitTask; } }; struct RUN_E { static string Name() { return "RUN"; } static Transition Type() { return Transition::Run; } }; struct STOP_E { static string Name() { return "STOP"; } static Transition Type() { return Transition::Stop; } }; struct RESET_TASK_E { static string Name() { return "RESET_TASK"; } static Transition Type() { return Transition::ResetTask; } }; struct RESET_DEVICE_E { static string Name() { return "RESET_DEVICE"; } static Transition Type() { return Transition::ResetDevice; } }; struct END_E { static string Name() { return "END"; } static Transition Type() { return Transition::End; } }; struct ERROR_FOUND_E { static string Name() { return "ERROR_FOUND"; } static Transition Type() { return Transition::ErrorFound; } }; // defining the boost MSM state machine struct Machine_ : public state_machine_def { public: Machine_() : fState(State::Ok) , fNewState(State::Ok) , fLastTransitionResult(true) , fNewStatePending(false) {} // initial states using initial_state = bmpl::vector; template void on_entry(Transition const&, FSM& /* fsm */) { LOG(state) << "Starting FairMQ state machine --> IDLE"; fState = State::Idle; } template void on_exit(Transition const&, FSM& /*fsm*/) { LOG(state) << "Exiting FairMQ state machine"; } struct DefaultFct { template void operator()(EVT const& e, FSM& fsm, SourceState& /* ss */, TargetState& ts) { fsm.fNewState = ts.Type(); fsm.fLastTransitionResult = true; fsm.CallNewTransitionCallbacks(e.Type()); fsm.fNewStatePending = true; fsm.fNewStatePendingCV.notify_all(); } }; struct transition_table : bmpl::vector< // Start Transition Next Action Guard Row, Row, Row, Row, Row, Row, Row, Row, Row, Row, Row, Row, Row, Row, Row, Row, Row, Row> {}; void CallStateChangeCallbacks(const State state) const { if (!fStateChangeSignal.empty()) { fStateChangeSignal(state); } } void CallStateHandler(const State state) const { if (!fStateHandleSignal.empty()) { fStateHandleSignal(state); } } void CallStatePrep(const State state) const { if (!fStatePrepSignal.empty()) { fStatePrepSignal(state); } } void CallNewTransitionCallbacks(const Transition transition) const { if (!fNewTransitionSignal.empty()) { fNewTransitionSignal(transition); } } atomic fState; atomic fNewState; atomic fLastTransitionResult; mutex fStateMtx; atomic fNewStatePending; condition_variable fNewStatePendingCV; boost::signals2::signal fStateChangeSignal; boost::signals2::signal fStateHandleSignal; boost::signals2::signal fStatePrepSignal; boost::signals2::signal fNewTransitionSignal; unordered_map fStateChangeSignalsMap; unordered_map fNewTransitionSignalsMap; void ProcessWork() { bool stop = false; while (!stop) { { unique_lock lock(fStateMtx); fNewStatePendingCV.wait(lock, [this]{ return fNewStatePending.load(); }); LOG(state) << fState << " ---> " << fNewState; fState = static_cast(fNewState); fNewStatePending = false; if (fState == State::Exiting || fState == State::Error) { stop = true; } } CallStatePrep(fState); CallStateChangeCallbacks(fState); CallStateHandler(fState); } if (fState == State::Error) { LOG(trace) << "Device transitioned to error state"; throw StateMachine::ErrorStateException("Device transitioned to error state"); } } // replaces the default no-transition response. template void no_transition(Transition const& t, FSM& fsm, int state) { using RecursiveStt = typename recursive_get_transition_table::type; using AllStates = typename generate_state_set::type; string stateName; bmpl::for_each>(get_state_name(stateName, state)); stateName = boost::core::demangle(stateName.c_str()); size_t pos = stateName.rfind(':'); stateName = stateName.substr(pos + 1); size_t pos2 = stateName.rfind('_'); stateName = stateName.substr(0, pos2); if (stateName != "OK") { LOG(state) << "No transition from state " << stateName << " on transition " << t.Name(); } fsm.fLastTransitionResult = false; } }; // Machine_ using FairMQFSM = state_machine; } // namespace fsm } // namespace fair::mq using namespace fair::mq::fsm; using namespace fair::mq; StateMachine::StateMachine() : fFsm(new FairMQFSM) {} void StateMachine::Start() { static_pointer_cast(fFsm)->start(); } StateMachine::~StateMachine() { static_pointer_cast(fFsm)->stop(); } bool StateMachine::ChangeState(Transition transition) try { auto fsm = static_pointer_cast(fFsm); lock_guard lock(fsm->fStateMtx); if (!static_cast(fsm->fNewStatePending) || transition == Transition::ErrorFound) { switch (transition) { case Transition::Auto: fsm->process_event(AUTO_E()); return fsm->fLastTransitionResult; case Transition::InitDevice: fsm->process_event(INIT_DEVICE_E()); return fsm->fLastTransitionResult; case Transition::CompleteInit: fsm->process_event(COMPLETE_INIT_E()); return fsm->fLastTransitionResult; case Transition::Bind: fsm->process_event(BIND_E()); return fsm->fLastTransitionResult; case Transition::Connect: fsm->process_event(CONNECT_E()); return fsm->fLastTransitionResult; case Transition::InitTask: fsm->process_event(INIT_TASK_E()); return fsm->fLastTransitionResult; case Transition::Run: fsm->process_event(RUN_E()); return fsm->fLastTransitionResult; case Transition::Stop: fsm->process_event(STOP_E()); return fsm->fLastTransitionResult; case Transition::ResetDevice: fsm->process_event(RESET_DEVICE_E()); return fsm->fLastTransitionResult; case Transition::ResetTask: fsm->process_event(RESET_TASK_E()); return fsm->fLastTransitionResult; case Transition::End: fsm->process_event(END_E()); return fsm->fLastTransitionResult; case Transition::ErrorFound: fsm->process_event(ERROR_FOUND_E()); return fsm->fLastTransitionResult; default: LOG(error) << "Requested unsupported state transition: " << transition << endl; return false; } } else { LOG(state) << "Transition " << GetTransitionName(transition) << " incoming, but another state transition is already ongoing."; return false; } } catch (exception& e) { LOG(error) << "Exception in StateMachine::ChangeState(): " << e.what(); return false; } void StateMachine::SubscribeToStateChange(const string& key, function callback) { static_pointer_cast(fFsm)->fStateChangeSignalsMap.insert({key, static_pointer_cast(fFsm)->fStateChangeSignal.connect(callback)}); } void StateMachine::UnsubscribeFromStateChange(const string& key) { auto fsm = static_pointer_cast(fFsm); if (fsm->fStateChangeSignalsMap.count(key)) { fsm->fStateChangeSignalsMap.at(key).disconnect(); fsm->fStateChangeSignalsMap.erase(key); } } void StateMachine::PrepareState(std::function callback) { auto fsm = static_pointer_cast(fFsm); if (fsm->fStatePrepSignal.empty()) { fsm->fStatePrepSignal.connect(callback); } else { LOG(error) << "state preparation handler is already set"; } } void StateMachine::HandleStates(function callback) { auto fsm = static_pointer_cast(fFsm); if (fsm->fStateHandleSignal.empty()) { fsm->fStateHandleSignal.connect(callback); } else { LOG(error) << "state handler is already set"; } } void StateMachine::StopHandlingStates() { auto fsm = static_pointer_cast(fFsm); if (!fsm->fStatePrepSignal.empty()) { fsm->fStatePrepSignal.disconnect_all_slots(); } if (!fsm->fStateHandleSignal.empty()) { fsm->fStateHandleSignal.disconnect_all_slots(); } } void StateMachine::SubscribeToNewTransition(const string& key, function callback) { static_pointer_cast(fFsm)->fNewTransitionSignalsMap.insert({key, static_pointer_cast(fFsm)->fNewTransitionSignal.connect(callback)}); } void StateMachine::UnsubscribeFromNewTransition(const string& key) { auto fsm = static_pointer_cast(fFsm); if (fsm->fNewTransitionSignalsMap.count(key)) { fsm->fNewTransitionSignalsMap.at(key).disconnect(); fsm->fNewTransitionSignalsMap.erase(key); } } State StateMachine::GetCurrentState() const { return static_pointer_cast(fFsm)->fState; } string StateMachine::GetCurrentStateName() const { return GetStateName(static_pointer_cast(fFsm)->fState); } bool StateMachine::NewStatePending() const { return static_cast(static_pointer_cast(fFsm)->fNewStatePending); } void StateMachine::WaitForPendingState() const { auto fsm = static_pointer_cast(fFsm); unique_lock lock(fsm->fStateMtx); fsm->fNewStatePendingCV.wait(lock, [&]{ return static_cast(fsm->fNewStatePending); }); } bool StateMachine::WaitForPendingStateFor(int durationInMs) const { auto fsm = static_pointer_cast(fFsm); unique_lock lock(fsm->fStateMtx); return fsm->fNewStatePendingCV.wait_for(lock, std::chrono::milliseconds(durationInMs), [&]{ return static_cast(fsm->fNewStatePending); }); } void StateMachine::ProcessWork() { auto fsm = static_pointer_cast(fFsm); fair::mq::tools::CallOnDestruction cod([&](){ LOG(debug) << "Exception caught in ProcessWork(), going to Error state"; { lock_guard lock(fsm->fStateMtx); fsm->fState = State::Error; fsm->CallStateChangeCallbacks(State::Error); } ChangeState(Transition::ErrorFound); }); fsm->CallStateChangeCallbacks(State::Idle); fsm->ProcessWork(); cod.disable(); }