State machine update

- Fix regression from last commit (preventing static run from proper shutdown).
 - Guard state changes (for the internal transitions) (msm::process_event is not thread safe).
 - Remove unused transition from RUNNING to EXITING.
This commit is contained in:
Alexey Rybalchenko 2016-11-24 13:59:26 +01:00
parent b166cedb63
commit 0fb49a0986
2 changed files with 51 additions and 29 deletions

View File

@ -55,47 +55,85 @@ bool FairMQStateMachine::ChangeState(int event)
switch (event) switch (event)
{ {
case INIT_DEVICE: case INIT_DEVICE:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(FairMQFSM::INIT_DEVICE()); process_event(FairMQFSM::INIT_DEVICE());
return true; return true;
}
case internal_DEVICE_READY: case internal_DEVICE_READY:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(FairMQFSM::internal_DEVICE_READY()); process_event(FairMQFSM::internal_DEVICE_READY());
return true; return true;
}
case INIT_TASK: case INIT_TASK:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(FairMQFSM::INIT_TASK()); process_event(FairMQFSM::INIT_TASK());
return true; return true;
}
case internal_READY: case internal_READY:
{
// std::lock_guard<std::mutex> lock(fChangeStateMutex); // InitTask is synchronous, until ROOT workaround is no longer needed.
process_event(FairMQFSM::internal_READY()); process_event(FairMQFSM::internal_READY());
return true; return true;
}
case RUN: case RUN:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(FairMQFSM::RUN()); process_event(FairMQFSM::RUN());
return true; return true;
}
case PAUSE: case PAUSE:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(FairMQFSM::PAUSE()); process_event(FairMQFSM::PAUSE());
return true; return true;
}
case STOP: case STOP:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(FairMQFSM::STOP()); process_event(FairMQFSM::STOP());
return true; return true;
}
case RESET_DEVICE: case RESET_DEVICE:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(FairMQFSM::RESET_DEVICE()); process_event(FairMQFSM::RESET_DEVICE());
return true; return true;
}
case RESET_TASK: case RESET_TASK:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(FairMQFSM::RESET_TASK()); process_event(FairMQFSM::RESET_TASK());
return true; return true;
}
case internal_IDLE: case internal_IDLE:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(FairMQFSM::internal_IDLE()); process_event(FairMQFSM::internal_IDLE());
return true; return true;
}
case END: case END:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(FairMQFSM::END()); process_event(FairMQFSM::END());
return true; return true;
}
case ERROR_FOUND: case ERROR_FOUND:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(FairMQFSM::ERROR_FOUND()); process_event(FairMQFSM::ERROR_FOUND());
return true; return true;
}
default: default:
{
LOG(ERROR) << "Requested state transition with an unsupported event: " << event << std::endl LOG(ERROR) << "Requested state transition with an unsupported event: " << event << std::endl
<< "Supported are: INIT_DEVICE, INIT_TASK, RUN, PAUSE, STOP, RESET_TASK, RESET_DEVICE, END, ERROR_FOUND"; << "Supported are: INIT_DEVICE, INIT_TASK, RUN, PAUSE, STOP, RESET_TASK, RESET_DEVICE, END, ERROR_FOUND";
return false; return false;
} }
} }
}
catch (std::exception& e) catch (std::exception& e)
{ {
LOG(ERROR) << "Exception in FairMQStateMachine::ChangeState(): " << e.what(); LOG(ERROR) << "Exception in FairMQStateMachine::ChangeState(): " << e.what();
@ -123,7 +161,7 @@ void FairMQStateMachine::WaitForEndOfState(int event)
std::unique_lock<std::mutex> lock(fWorkMutex); std::unique_lock<std::mutex> lock(fWorkMutex);
while (fWorkActive || fWorkAvailable) while (fWorkActive || fWorkAvailable)
{ {
fWorkDoneCondition.wait(lock); fWorkDoneCondition.wait_for(lock, std::chrono::seconds(1));
} }
break; break;

View File

@ -82,6 +82,7 @@ struct FairMQFSM_ : public msmf::state_machine_def<FairMQFSM_>
, fWorkActive(false) , fWorkActive(false)
, fWorkAvailable(false) , fWorkAvailable(false)
, fState() , fState()
, fChangeStateMutex()
{} {}
// Destructor // Destructor
@ -100,9 +101,6 @@ struct FairMQFSM_ : public msmf::state_machine_def<FairMQFSM_>
template <class Event, class FSM> template <class Event, class FSM>
void on_exit(Event const&, FSM& fsm) void on_exit(Event const&, FSM& fsm)
{ {
// join the worker thread (executing user states)
fsm.fWorkerThread.join();
LOG(STATE) << "Exiting FairMQ state machine"; LOG(STATE) << "Exiting FairMQ state machine";
} }
@ -320,31 +318,17 @@ struct FairMQFSM_ : public msmf::state_machine_def<FairMQFSM_>
fsm.fState = EXITING; fsm.fState = EXITING;
// terminate worker thread // terminate worker thread
{
std::lock_guard<std::mutex> lock(fsm.fWorkMutex); std::lock_guard<std::mutex> lock(fsm.fWorkMutex);
fsm.fWorkerTerminated = true; fsm.fWorkerTerminated = true;
fsm.fWorkAvailableCondition.notify_one(); fsm.fWorkAvailableCondition.notify_one();
fsm.fTerminateStateThread = std::thread(&FairMQFSM_::Terminate, &fsm);
fsm.Shutdown();
fsm.fTerminateStateThread.join();
} }
};
struct ExitingRunFct // join the worker thread (executing user states)
if (fsm.fWorkerThread.joinable())
{ {
template <class EVT, class FSM, class SourceState, class TargetState> fsm.fWorkerThread.join();
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) }
{
LOG(STATE) << "Entering EXITING state";
fsm.fState = EXITING;
fsm.Unblock(); // Unblock potential blocking transfer calls
// terminate worker thread
std::lock_guard<std::mutex> lock(fsm.fWorkMutex);
fsm.fWorkerTerminated = true;
fsm.fWorkAvailableCondition.notify_one();
fsm.fTerminateStateThread = std::thread(&FairMQFSM_::Terminate, &fsm); fsm.fTerminateStateThread = std::thread(&FairMQFSM_::Terminate, &fsm);
fsm.Shutdown(); fsm.Shutdown();
@ -423,7 +407,6 @@ struct FairMQFSM_ : public msmf::state_machine_def<FairMQFSM_>
msmf::Row<RUNNING_FSM, PAUSE, PAUSED_FSM, PauseFct, msmf::none>, msmf::Row<RUNNING_FSM, PAUSE, PAUSED_FSM, PauseFct, msmf::none>,
msmf::Row<RUNNING_FSM, STOP, READY_FSM, StopFct, msmf::none>, msmf::Row<RUNNING_FSM, STOP, READY_FSM, StopFct, msmf::none>,
msmf::Row<RUNNING_FSM, internal_READY, READY_FSM, InternalStopFct, msmf::none>, msmf::Row<RUNNING_FSM, internal_READY, READY_FSM, InternalStopFct, msmf::none>,
msmf::Row<RUNNING_FSM, END, EXITING_FSM, ExitingRunFct, msmf::none>,
msmf::Row<PAUSED_FSM, RUN, RUNNING_FSM, ResumeFct, msmf::none>, msmf::Row<PAUSED_FSM, RUN, RUNNING_FSM, ResumeFct, msmf::none>,
msmf::Row<RESETTING_TASK_FSM, internal_DEVICE_READY, DEVICE_READY_FSM, DeviceReadyFct, msmf::none>, msmf::Row<RESETTING_TASK_FSM, internal_DEVICE_READY, DEVICE_READY_FSM, DeviceReadyFct, msmf::none>,
msmf::Row<RESETTING_DEVICE_FSM, internal_IDLE, IDLE_FSM, IdleFct, msmf::none>, msmf::Row<RESETTING_DEVICE_FSM, internal_IDLE, IDLE_FSM, IdleFct, msmf::none>,
@ -556,6 +539,7 @@ struct FairMQFSM_ : public msmf::state_machine_def<FairMQFSM_>
bool fWorkAvailable; bool fWorkAvailable;
protected: protected:
std::mutex fChangeStateMutex;
std::atomic<State> fState; std::atomic<State> fState;
}; };