From 26cfe69b4153240ac0d492a3d3bb8ca5afa28ea6 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Fri, 22 Sep 2017 11:31:11 +0200 Subject: [PATCH] Revert refactoring that releases lock too early. --- fairmq/FairMQStateMachine.cxx | 219 ++++++++++++++++++++++++++- fairmq/FairMQStateMachine.h | 274 +++++++--------------------------- 2 files changed, 267 insertions(+), 226 deletions(-) diff --git a/fairmq/FairMQStateMachine.cxx b/fairmq/FairMQStateMachine.cxx index 0e4c39ab..748e90e4 100644 --- a/fairmq/FairMQStateMachine.cxx +++ b/fairmq/FairMQStateMachine.cxx @@ -14,7 +14,218 @@ #include "FairMQStateMachine.h" -// void FairMQStateMachine::SubscribeToStateChange(const std::string& key, std::function callback) -// { -// fStateChangeSignalsMap.insert({key, fStateChangeSignal.connect(callback)}); -// } +FairMQStateMachine::FairMQStateMachine() +{ + start(); +} + +FairMQStateMachine::~FairMQStateMachine() +{ + stop(); +} + +int FairMQStateMachine::GetInterfaceVersion() const +{ + return FAIRMQ_INTERFACE_VERSION; +} + +bool FairMQStateMachine::ChangeState(int event) +{ + try + { + switch (event) + { + case INIT_DEVICE: + { + std::lock_guard lock(fChangeStateMutex); + process_event(fair::mq::fsm::INIT_DEVICE()); + return true; + } + case internal_DEVICE_READY: + { + std::lock_guard lock(fChangeStateMutex); + process_event(fair::mq::fsm::internal_DEVICE_READY()); + return true; + } + case INIT_TASK: + { + std::lock_guard lock(fChangeStateMutex); + process_event(fair::mq::fsm::INIT_TASK()); + return true; + } + case internal_READY: + { + std::lock_guard lock(fChangeStateMutex); + process_event(fair::mq::fsm::internal_READY()); + return true; + } + case RUN: + { + std::lock_guard lock(fChangeStateMutex); + process_event(fair::mq::fsm::RUN()); + return true; + } + case PAUSE: + { + std::lock_guard lock(fChangeStateMutex); + process_event(fair::mq::fsm::PAUSE()); + return true; + } + case STOP: + { + std::lock_guard lock(fChangeStateMutex); + process_event(fair::mq::fsm::STOP()); + return true; + } + case RESET_DEVICE: + { + std::lock_guard lock(fChangeStateMutex); + process_event(fair::mq::fsm::RESET_DEVICE()); + return true; + } + case RESET_TASK: + { + std::lock_guard lock(fChangeStateMutex); + process_event(fair::mq::fsm::RESET_TASK()); + return true; + } + case internal_IDLE: + { + std::lock_guard lock(fChangeStateMutex); + process_event(fair::mq::fsm::internal_IDLE()); + return true; + } + case END: + { + std::lock_guard lock(fChangeStateMutex); + process_event(fair::mq::fsm::END()); + return true; + } + case ERROR_FOUND: + { + std::lock_guard lock(fChangeStateMutex); + process_event(fair::mq::fsm::ERROR_FOUND()); + return true; + } + default: + { + LOG(ERROR) << "Requested state transition with an unsupported event: " << event << std::endl + << "Supported are: INIT_DEVICE, INIT_TASK, RUN, PAUSE, STOP, RESET_TASK, RESET_DEVICE, END, ERROR_FOUND"; + return false; + } + } + } + catch (std::exception& e) + { + LOG(ERROR) << "Exception in FairMQStateMachine::ChangeState(): " << e.what(); + exit(EXIT_FAILURE); + } + return false; +} + +bool FairMQStateMachine::ChangeState(const std::string& event) +{ + return ChangeState(GetEventNumber(event)); +} + +void FairMQStateMachine::WaitForEndOfState(int event) +{ + try + { + switch (event) + { + case INIT_DEVICE: + case INIT_TASK: + case RUN: + case RESET_TASK: + case RESET_DEVICE: + { + std::unique_lock lock(fWorkMutex); + while (fWorkActive || fWorkAvailable) + { + fWorkDoneCondition.wait_for(lock, std::chrono::seconds(1)); + } + + break; + } + default: + LOG(ERROR) << "Requested state is either synchronous or does not exist."; + break; + } + } + catch (std::exception& e) + { + LOG(ERROR) << "Exception in FairMQStateMachine::WaitForEndOfState(): " << e.what(); + } +} + +void FairMQStateMachine::WaitForEndOfState(const std::string& event) +{ + return WaitForEndOfState(GetEventNumber(event)); +} + +bool FairMQStateMachine::WaitForEndOfStateForMs(int event, int durationInMs) +{ + try + { + switch (event) + { + case INIT_DEVICE: + case INIT_TASK: + case RUN: + case RESET_TASK: + case RESET_DEVICE: + { + std::unique_lock lock(fWorkMutex); + while (fWorkActive || fWorkAvailable) + { + fWorkDoneCondition.wait_for(lock, std::chrono::milliseconds(durationInMs)); + if (fWorkActive) + { + return false; + } + } + return true; + } + default: + LOG(ERROR) << "Requested state is either synchronous or does not exist."; + return false; + } + } + catch (std::exception& e) + { + LOG(ERROR) << "Exception in FairMQStateMachine::WaitForEndOfStateForMs(): " << e.what(); + } + return false; +} + +bool FairMQStateMachine::WaitForEndOfStateForMs(const std::string& event, int durationInMs) +{ + return WaitForEndOfStateForMs(GetEventNumber(event), durationInMs); +} + +void FairMQStateMachine::SubscribeToStateChange(const std::string& key, std::function callback) +{ + fStateChangeSignalsMap.insert({key, fStateChangeSignal.connect(callback)}); +} +void FairMQStateMachine::UnsubscribeFromStateChange(const std::string& key) +{ + fStateChangeSignalsMap.at(key).disconnect(); + fStateChangeSignalsMap.erase(key); +} + +int FairMQStateMachine::GetEventNumber(const std::string& event) +{ + if (event == "INIT_DEVICE") return INIT_DEVICE; + if (event == "INIT_TASK") return INIT_TASK; + if (event == "RUN") return RUN; + if (event == "PAUSE") return PAUSE; + if (event == "STOP") return STOP; + if (event == "RESET_DEVICE") return RESET_DEVICE; + if (event == "RESET_TASK") return RESET_TASK; + if (event == "END") return END; + if (event == "ERROR_FOUND") return ERROR_FOUND; + LOG(ERROR) << "Requested number for non-existent event... " << event << std::endl + << "Supported are: INIT_DEVICE, INIT_TASK, RUN, PAUSE, STOP, RESET_DEVICE, RESET_TASK, END, ERROR_FOUND"; + return -1; +} \ No newline at end of file diff --git a/fairmq/FairMQStateMachine.h b/fairmq/FairMQStateMachine.h index 56c4778f..5fd19c21 100644 --- a/fairmq/FairMQStateMachine.h +++ b/fairmq/FairMQStateMachine.h @@ -150,7 +150,11 @@ struct FairMQFSM : public msmf::state_machine_def { fsm.fState = INITIALIZING_DEVICE; - fsm.WaitForWorkCompletion(); + std::unique_lock lock(fsm.fWorkMutex); + while (fsm.fWorkActive) + { + fsm.fWorkDoneCondition.wait(lock); + } fsm.fWorkAvailable = true; LOG(STATE) << "Entering INITIALIZING DEVICE state"; fsm.fWork = std::bind(&FairMQFSM::InitWrapper, &fsm); @@ -175,7 +179,11 @@ struct FairMQFSM : public msmf::state_machine_def { fsm.fState = INITIALIZING_TASK; - fsm.WaitForWorkCompletion(); + std::unique_lock lock(fsm.fWorkMutex); + while (fsm.fWorkActive) + { + fsm.fWorkDoneCondition.wait(lock); + } fsm.fWorkAvailable = true; LOG(STATE) << "Entering INITIALIZING TASK state"; fsm.fWork = std::bind(&FairMQFSM::InitTaskWrapper, &fsm); @@ -200,7 +208,11 @@ struct FairMQFSM : public msmf::state_machine_def { fsm.fState = RUNNING; - fsm.WaitForWorkCompletion(); + std::unique_lock lock(fsm.fWorkMutex); + while (fsm.fWorkActive) + { + fsm.fWorkDoneCondition.wait(lock); + } fsm.fWorkAvailable = true; LOG(STATE) << "Entering RUNNING state"; fsm.fWork = std::bind(&FairMQFSM::RunWrapper, &fsm); @@ -216,7 +228,11 @@ struct FairMQFSM : public msmf::state_machine_def fsm.fState = PAUSED; fsm.Unblock(); - fsm.WaitForWorkCompletion(); + std::unique_lock lock(fsm.fWorkMutex); + while (fsm.fWorkActive) + { + fsm.fWorkDoneCondition.wait(lock); + } fsm.fWorkAvailable = true; LOG(STATE) << "Entering PAUSED state"; fsm.fWork = std::bind(&FairMQFSM::PauseWrapper, &fsm); @@ -231,7 +247,11 @@ struct FairMQFSM : public msmf::state_machine_def { fsm.fState = RUNNING; - fsm.WaitForWorkCompletion(); + std::unique_lock lock(fsm.fWorkMutex); + while (fsm.fWorkActive) + { + fsm.fWorkDoneCondition.wait(lock); + } fsm.fWorkAvailable = true; LOG(STATE) << "Entering RUNNING state"; fsm.fWork = std::bind(&FairMQFSM::RunWrapper, &fsm); @@ -247,7 +267,11 @@ struct FairMQFSM : public msmf::state_machine_def fsm.fState = READY; fsm.Unblock(); - fsm.WaitForWorkCompletion(); + std::unique_lock lock(fsm.fWorkMutex); + while (fsm.fWorkActive) + { + fsm.fWorkDoneCondition.wait(lock); + } LOG(STATE) << "Entering READY state"; } }; @@ -270,7 +294,11 @@ struct FairMQFSM : public msmf::state_machine_def { fsm.fState = RESETTING_TASK; - fsm.WaitForWorkCompletion(); + std::unique_lock lock(fsm.fWorkMutex); + while (fsm.fWorkActive) + { + fsm.fWorkDoneCondition.wait(lock); + } fsm.fWorkAvailable = true; LOG(STATE) << "Entering RESETTING TASK state"; fsm.fWork = std::bind(&FairMQFSM::ResetTaskWrapper, &fsm); @@ -285,7 +313,11 @@ struct FairMQFSM : public msmf::state_machine_def { fsm.fState = RESETTING_DEVICE; - fsm.WaitForWorkCompletion(); + std::unique_lock lock(fsm.fWorkMutex); + while (fsm.fWorkActive) + { + fsm.fWorkDoneCondition.wait(lock); + } fsm.fWorkAvailable = true; LOG(STATE) << "Entering RESETTING DEVICE state"; fsm.fWork = std::bind(&FairMQFSM::ResetWrapper, &fsm); @@ -519,15 +551,6 @@ struct FairMQFSM : public msmf::state_machine_def } } - void WaitForWorkCompletion() - { - std::unique_lock lock(fWorkMutex); - while (fWorkActive) - { - fWorkDoneCondition.wait(lock); - } - } - // run state handlers in a separate thread std::thread fWorkerThread; }; @@ -562,218 +585,25 @@ class FairMQStateMachine : public boost::msm::back::state_machine lock(fChangeStateMutex); - process_event(fair::mq::fsm::INIT_DEVICE()); - return true; - } - case internal_DEVICE_READY: - { - std::lock_guard lock(fChangeStateMutex); - process_event(fair::mq::fsm::internal_DEVICE_READY()); - return true; - } - case INIT_TASK: - { - std::lock_guard lock(fChangeStateMutex); - process_event(fair::mq::fsm::INIT_TASK()); - return true; - } - case internal_READY: - { - std::lock_guard lock(fChangeStateMutex); - process_event(fair::mq::fsm::internal_READY()); - return true; - } - case RUN: - { - std::lock_guard lock(fChangeStateMutex); - process_event(fair::mq::fsm::RUN()); - return true; - } - case PAUSE: - { - std::lock_guard lock(fChangeStateMutex); - process_event(fair::mq::fsm::PAUSE()); - return true; - } - case STOP: - { - std::lock_guard lock(fChangeStateMutex); - process_event(fair::mq::fsm::STOP()); - return true; - } - case RESET_DEVICE: - { - std::lock_guard lock(fChangeStateMutex); - process_event(fair::mq::fsm::RESET_DEVICE()); - return true; - } - case RESET_TASK: - { - std::lock_guard lock(fChangeStateMutex); - process_event(fair::mq::fsm::RESET_TASK()); - return true; - } - case internal_IDLE: - { - std::lock_guard lock(fChangeStateMutex); - process_event(fair::mq::fsm::internal_IDLE()); - return true; - } - case END: - { - std::lock_guard lock(fChangeStateMutex); - process_event(fair::mq::fsm::END()); - return true; - } - case ERROR_FOUND: - { - std::lock_guard lock(fChangeStateMutex); - process_event(fair::mq::fsm::ERROR_FOUND()); - return true; - } - default: - { - LOG(ERROR) << "Requested state transition with an unsupported event: " << event << std::endl - << "Supported are: INIT_DEVICE, INIT_TASK, RUN, PAUSE, STOP, RESET_TASK, RESET_DEVICE, END, ERROR_FOUND"; - return false; - } - } - } - catch (std::exception& e) - { - LOG(ERROR) << "Exception in FairMQStateMachine::ChangeState(): " << e.what(); - exit(EXIT_FAILURE); - } - return false; - } - bool ChangeState(const std::string& event) - { - return ChangeState(GetEventNumber(event)); - } + bool ChangeState(int event); + bool ChangeState(const std::string& event); - void WaitForEndOfState(int event) - { - try - { - switch (event) - { - case INIT_DEVICE: - case INIT_TASK: - case RUN: - case RESET_TASK: - case RESET_DEVICE: - { - std::unique_lock lock(fWorkMutex); - while (fWorkActive || fWorkAvailable) - { - fWorkDoneCondition.wait_for(lock, std::chrono::seconds(1)); - } + void WaitForEndOfState(int event); + void WaitForEndOfState(const std::string& event); - break; - } - default: - LOG(ERROR) << "Requested state is either synchronous or does not exist."; - break; - } - } - catch (std::exception& e) - { - LOG(ERROR) << "Exception in FairMQStateMachine::WaitForEndOfState(): " << e.what(); - } - } - void WaitForEndOfState(const std::string& event) - { - return WaitForEndOfState(GetEventNumber(event)); - } + bool WaitForEndOfStateForMs(int event, int durationInMs); + bool WaitForEndOfStateForMs(const std::string& event, int durationInMs); - bool WaitForEndOfStateForMs(int event, int durationInMs) - { - try - { - switch (event) - { - case INIT_DEVICE: - case INIT_TASK: - case RUN: - case RESET_TASK: - case RESET_DEVICE: - { - std::unique_lock lock(fWorkMutex); - while (fWorkActive || fWorkAvailable) - { - fWorkDoneCondition.wait_for(lock, std::chrono::milliseconds(durationInMs)); - if (fWorkActive) - { - return false; - } - } - return true; - } - default: - LOG(ERROR) << "Requested state is either synchronous or does not exist."; - return false; - } - } - catch (std::exception& e) - { - LOG(ERROR) << "Exception in FairMQStateMachine::WaitForEndOfStateForMs(): " << e.what(); - } - return false; - } - bool WaitForEndOfStateForMs(const std::string& event, int durationInMs) - { - return WaitForEndOfStateForMs(GetEventNumber(event), durationInMs); - } - - void SubscribeToStateChange(const std::string& key, std::function callback) - { - fStateChangeSignalsMap.insert({key, fStateChangeSignal.connect(callback)}); - } - void UnsubscribeFromStateChange(const std::string& key) - { - fStateChangeSignalsMap.at(key).disconnect(); - fStateChangeSignalsMap.erase(key); - } + void SubscribeToStateChange(const std::string& key, std::function callback); + void UnsubscribeFromStateChange(const std::string& key); private: - int GetEventNumber(const std::string& event) - { - if (event == "INIT_DEVICE") return INIT_DEVICE; - if (event == "INIT_TASK") return INIT_TASK; - if (event == "RUN") return RUN; - if (event == "PAUSE") return PAUSE; - if (event == "STOP") return STOP; - if (event == "RESET_DEVICE") return RESET_DEVICE; - if (event == "RESET_TASK") return RESET_TASK; - if (event == "END") return END; - if (event == "ERROR_FOUND") return ERROR_FOUND; - LOG(ERROR) << "Requested number for non-existent event... " << event << std::endl - << "Supported are: INIT_DEVICE, INIT_TASK, RUN, PAUSE, STOP, RESET_DEVICE, RESET_TASK, END, ERROR_FOUND"; - return -1; - } + int GetEventNumber(const std::string& event); }; #endif /* FAIRMQSTATEMACHINE_H_ */