diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index 787980da..1b359aa2 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -141,7 +141,7 @@ install(FILES ${FairMQHDRFiles} DESTINATION include) set(DEPENDENCIES ${DEPENDENCIES} - boost_thread boost_timer boost_system boost_program_options boost_random + boost_thread boost_timer boost_system boost_program_options boost_random boost_chrono ) set(LIBRARY_NAME FairMQ) diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index accf78e7..885cef6c 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -103,12 +103,12 @@ void FairMQDevice::InitWrapper() Init(); - // notify parent thread about end of processing. - boost::lock_guard lock(fInitializingMutex); - fInitializingFinished = true; - fInitializingCondition.notify_one(); - ChangeState(internal_DEVICE_READY); + + // notify parent thread about end of processing. + boost::lock_guard lock(fStateMutex); + fStateFinished = true; + fStateCondition.notify_one(); } void FairMQDevice::Init() @@ -173,12 +173,12 @@ void FairMQDevice::InitTaskWrapper() { InitTask(); - // notify parent thread about end of processing. - boost::lock_guard lock(fInitializingTaskMutex); - fInitializingTaskFinished = true; - fInitializingTaskCondition.notify_one(); - ChangeState(internal_READY); + + // notify parent thread about end of processing. + boost::lock_guard lock(fStateMutex); + fStateFinished = true; + fStateCondition.notify_one(); } void FairMQDevice::InitTask() @@ -248,9 +248,9 @@ void FairMQDevice::RunWrapper() } // notify parent thread about end of processing. - boost::lock_guard lock(fRunningMutex); - fRunningFinished = true; - fRunningCondition.notify_one(); + boost::lock_guard lock(fStateMutex); + fStateFinished = true; + fStateCondition.notify_one(); } void FairMQDevice::Run() @@ -278,12 +278,12 @@ void FairMQDevice::ResetTaskWrapper() { ResetTask(); - // notify parent thread about end of processing. - boost::lock_guard lock(fResetTaskMutex); - fResetTaskFinished = true; - fResetTaskCondition.notify_one(); - ChangeState(internal_DEVICE_READY); + + // notify parent thread about end of processing. + boost::lock_guard lock(fStateMutex); + fStateFinished = true; + fStateCondition.notify_one(); } void FairMQDevice::ResetTask() @@ -294,12 +294,12 @@ void FairMQDevice::ResetWrapper() { Reset(); - // notify parent thread about end of processing. - boost::lock_guard lock(fResetMutex); - fResetFinished = true; - fResetCondition.notify_one(); - ChangeState(internal_IDLE); + + // notify parent thread about end of processing. + boost::lock_guard lock(fStateMutex); + fStateFinished = true; + fStateCondition.notify_one(); } void FairMQDevice::Reset() diff --git a/fairmq/FairMQStateMachine.cxx b/fairmq/FairMQStateMachine.cxx index e216520c..852c50c3 100644 --- a/fairmq/FairMQStateMachine.cxx +++ b/fairmq/FairMQStateMachine.cxx @@ -12,19 +12,12 @@ * @author D. Klein, A. Rybalchenko */ +#include // for WaitForEndOfStateForMs() + #include "FairMQStateMachine.h" #include "FairMQLogger.h" FairMQStateMachine::FairMQStateMachine() - : fInitializingFinished(false) - , fInitializingCondition() - , fInitializingMutex() - , fInitializingTaskFinished(false) - , fInitializingTaskCondition() - , fInitializingTaskMutex() - , fRunningFinished(false) - , fRunningCondition() - , fRunningMutex() { start(); } @@ -39,6 +32,22 @@ int FairMQStateMachine::GetInterfaceVersion() return FAIRMQ_INTERFACE_VERSION; } +int FairMQStateMachine::GetEventNumber(std::string event) +{ + if (event == "INIT_DEVICE") return INIT_DEVICE; + if (event == "INIT_TASK") return INIT_TASK; + if (event == "RUN") return RUN; + if (event == "PAUSE") return PAUSE; + if (event == "RESUME") return RESUME; + if (event == "STOP") return STOP; + if (event == "RESET_DEVICE") return RESET_DEVICE; + if (event == "RESET_TASK") return RESET_TASK; + if (event == "END") return END; + LOG(ERROR) << "Requested number for non-existent event... " << event << std::endl + << "Supported are: INIT_DEVICE, INIT_TASK, RUN, PAUSE, RESUME, STOP, RESET_DEVICE, RESET_TASK, END"; + return -1; +} + bool FairMQStateMachine::ChangeState(int event) { try @@ -87,7 +96,7 @@ bool FairMQStateMachine::ChangeState(int event) return false; } } - catch (boost::bad_function_call& e) + catch (std::exception& e) { LOG(ERROR) << e.what(); } @@ -95,48 +104,7 @@ bool FairMQStateMachine::ChangeState(int event) bool FairMQStateMachine::ChangeState(std::string event) { - if (event == "INIT_DEVICE") - { - return ChangeState(INIT_DEVICE); - } - if (event == "INIT_TASK") - { - return ChangeState(INIT_TASK); - } - else if (event == "RUN") - { - return ChangeState(RUN); - } - else if (event == "PAUSE") - { - return ChangeState(PAUSE); - } - else if (event == "RESUME") - { - return ChangeState(RESUME); - } - else if (event == "STOP") - { - return ChangeState(STOP); - } - if (event == "RESET_DEVICE") - { - return ChangeState(RESET_DEVICE); - } - if (event == "RESET_TASK") - { - return ChangeState(RESET_TASK); - } - else if (event == "END") - { - return ChangeState(END); - } - else - { - LOG(ERROR) << "Requested unsupported state: " << event << std::endl - << "Supported are: INIT_DEVICE, INIT_TASK, RUN, PAUSE, RESUME, STOP, RESET_TASK, RESET_DEVICE, END"; - return false; - } + return ChangeState(GetEventNumber(event)); } void FairMQStateMachine::WaitForEndOfState(int event) @@ -144,47 +112,15 @@ void FairMQStateMachine::WaitForEndOfState(int event) switch (event) { case INIT_DEVICE: - { - boost::unique_lock lock(fInitializingMutex); - while (!fInitializingFinished) - { - fInitializingCondition.wait(lock); - } - break; - } case INIT_TASK: - { - boost::unique_lock initTaskLock(fInitializingTaskMutex); - while (!fInitializingTaskFinished) - { - fInitializingTaskCondition.wait(initTaskLock); - } - break; - } case RUN: - { - boost::unique_lock runLock(fRunningMutex); - while (!fRunningFinished) - { - fRunningCondition.wait(runLock); - } - break; - } case RESET_TASK: - { - boost::unique_lock runLock(fResetTaskMutex); - while (!fResetTaskFinished) - { - fResetTaskCondition.wait(runLock); - } - break; - } case RESET_DEVICE: { - boost::unique_lock runLock(fResetMutex); - while (!fResetFinished) + boost::unique_lock lock(fStateMutex); + while (!fStateFinished) { - fResetCondition.wait(runLock); + fStateCondition.wait(lock); } break; } @@ -196,45 +132,38 @@ void FairMQStateMachine::WaitForEndOfState(int event) void FairMQStateMachine::WaitForEndOfState(std::string event) { - if (event == "INIT_DEVICE") + return WaitForEndOfState(GetEventNumber(event)); +} + +bool FairMQStateMachine::WaitForEndOfStateForMs(int event, int durationInMs) +{ + switch (event) { - return WaitForEndOfState(INIT_DEVICE); + case INIT_DEVICE: + case INIT_TASK: + case RUN: + case RESET_TASK: + case RESET_DEVICE: + { + boost::unique_lock lock(fStateMutex); + while (!fStateFinished) + { + fStateCondition.wait_until(lock, boost::chrono::system_clock::now() + boost::chrono::milliseconds(durationInMs)); + if (!fStateFinished) + { + return false; + } + } + return true; + break; + } + default: + LOG(ERROR) << "Requested state is either synchronous or does not exist."; + break; } - if (event == "INIT_TASK") - { - return WaitForEndOfState(INIT_TASK); - } - else if (event == "RUN") - { - return WaitForEndOfState(RUN); - } - else if (event == "PAUSE") - { - return WaitForEndOfState(PAUSE); - } - else if (event == "RESUME") - { - return WaitForEndOfState(RESUME); - } - else if (event == "STOP") - { - return WaitForEndOfState(STOP); - } - if (event == "RESET_DEVICE") - { - return WaitForEndOfState(RESET_DEVICE); - } - if (event == "RESET_TASK") - { - return WaitForEndOfState(RESET_TASK); - } - else if (event == "END") - { - return WaitForEndOfState(END); - } - else - { - LOG(ERROR) << "Requested unsupported state: " << event << std::endl - << "Supported are: INIT_DEVICE, INIT_TASK, RUN, PAUSE, RESUME, STOP, RESET_TASK, RESET_DEVICE, END"; - } -} \ No newline at end of file +} + +bool FairMQStateMachine::WaitForEndOfStateForMs(std::string event, int durationInMs) +{ + return WaitForEndOfStateForMs(GetEventNumber(event), durationInMs); +} diff --git a/fairmq/FairMQStateMachine.h b/fairmq/FairMQStateMachine.h index 035f892d..5f1626cd 100644 --- a/fairmq/FairMQStateMachine.h +++ b/fairmq/FairMQStateMachine.h @@ -59,6 +59,9 @@ struct FairMQFSM_ : public msm::front::state_machine_def : fState() , fStateThread() , fTerminateStateThread() + , fStateFinished(false) + , fStateCondition() + , fStateMutex() {} // Destructor @@ -107,6 +110,7 @@ struct FairMQFSM_ : public msm::front::state_machine_def template void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) { + fsm.fStateFinished = false; LOG(STATE) << "Entering INITIALIZING DEVICE state"; fsm.fState = INITIALIZING_DEVICE; fsm.fStateThread = boost::thread(boost::bind(&FairMQFSM_::InitWrapper, &fsm)); @@ -127,6 +131,7 @@ struct FairMQFSM_ : public msm::front::state_machine_def template void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) { + fsm.fStateFinished = false; LOG(STATE) << "Entering INITIALIZING TASK state"; fsm.fState = INITIALIZING_TASK; fsm.InitTaskWrapper(); @@ -148,6 +153,7 @@ struct FairMQFSM_ : public msm::front::state_machine_def template void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) { + fsm.fStateFinished = false; LOG(STATE) << "Entering RUNNING state"; fsm.fState = RUNNING; fsm.fStateThread = boost::thread(boost::bind(&FairMQFSM_::RunWrapper, &fsm)); @@ -159,6 +165,7 @@ struct FairMQFSM_ : public msm::front::state_machine_def template void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) { + fsm.fStateFinished = false; LOG(STATE) << "Entering PAUSED state"; fsm.fState = PAUSED; fsm.SendCommand("pause"); @@ -175,6 +182,7 @@ struct FairMQFSM_ : public msm::front::state_machine_def fsm.fState = RUNNING; fsm.fStateThread.interrupt(); fsm.fStateThread.join(); + fsm.fStateFinished = false; LOG(STATE) << "Entering RUNNING state"; fsm.fStateThread = boost::thread(boost::bind(&FairMQFSM_::RunWrapper, &fsm)); } @@ -185,6 +193,7 @@ struct FairMQFSM_ : public msm::front::state_machine_def template void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) { + LOG(STATE) << "Received STOP event"; fsm.fState = IDLE; // fsm.SendCommand("stop"); fsm.fStateThread.join(); @@ -196,6 +205,7 @@ struct FairMQFSM_ : public msm::front::state_machine_def template void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) { + fsm.fStateFinished = false; LOG(STATE) << "Entering RESETTING TASK state"; fsm.fState = RESETTING_TASK; fsm.fStateThread = boost::thread(boost::bind(&FairMQFSM_::ResetTaskWrapper, &fsm)); @@ -207,6 +217,7 @@ struct FairMQFSM_ : public msm::front::state_machine_def template void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) { + fsm.fStateFinished = false; LOG(STATE) << "Entering RESETTING DEVICE state"; fsm.fState = RESETTING_DEVICE; fsm.fStateThread = boost::thread(boost::bind(&FairMQFSM_::ResetWrapper, &fsm)); @@ -218,6 +229,7 @@ struct FairMQFSM_ : public msm::front::state_machine_def template void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) { + LOG(STATE) << "Received END event"; fsm.fState = EXITING; fsm.fStateThread = boost::thread(boost::bind(&FairMQFSM_::Terminate, &fsm)); @@ -327,7 +339,7 @@ struct FairMQFSM_ : public msm::front::state_machine_def case EXITING: return "EXITING"; default: - return "something went wrong..."; + return "requested name for non-existent state..."; } } @@ -336,6 +348,16 @@ struct FairMQFSM_ : public msm::front::state_machine_def return fState; } + std::string GetCurrentStateName() + { + return GetStateName(fState); + } + + // condition variable to notify parent thread about end of state. + bool fStateFinished; + boost::condition_variable fStateCondition; + boost::mutex fStateMutex; + private: State fState; }; @@ -366,32 +388,16 @@ class FairMQStateMachine : public FairMQFSM::FairMQFSM int GetInterfaceVersion(); + int GetEventNumber(std::string event); + bool ChangeState(int event); bool ChangeState(std::string event); void WaitForEndOfState(int state); void WaitForEndOfState(std::string state); - // condition variables to notify parent thread about end of state. - bool fInitializingFinished; - boost::condition_variable fInitializingCondition; - boost::mutex fInitializingMutex; - - bool fInitializingTaskFinished; - boost::condition_variable fInitializingTaskCondition; - boost::mutex fInitializingTaskMutex; - - bool fRunningFinished; - boost::condition_variable fRunningCondition; - boost::mutex fRunningMutex; - - bool fResetFinished; - boost::condition_variable fResetCondition; - boost::mutex fResetMutex; - - bool fResetTaskFinished; - boost::condition_variable fResetTaskCondition; - boost::mutex fResetTaskMutex; + bool WaitForEndOfStateForMs(int state, int durationInMs); + bool WaitForEndOfStateForMs(std::string state, int durationInMs); }; #endif /* FAIRMQSTATEMACHINE_H_ */