Revert refactoring that releases lock too early.

This commit is contained in:
Alexey Rybalchenko 2017-09-22 11:31:11 +02:00 committed by Mohammad Al-Turany
parent 6d7009b331
commit 26cfe69b41
2 changed files with 267 additions and 226 deletions

View File

@ -14,7 +14,218 @@
#include "FairMQStateMachine.h" #include "FairMQStateMachine.h"
// void FairMQStateMachine::SubscribeToStateChange(const std::string& key, std::function<void(const State)> callback) FairMQStateMachine::FairMQStateMachine()
// { {
// fStateChangeSignalsMap.insert({key, fStateChangeSignal.connect(callback)}); start();
// } }
FairMQStateMachine::~FairMQStateMachine()
{
stop();
}
int FairMQStateMachine::GetInterfaceVersion() const
{
return FAIRMQ_INTERFACE_VERSION;
}
bool FairMQStateMachine::ChangeState(int event)
{
try
{
switch (event)
{
case INIT_DEVICE:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(fair::mq::fsm::INIT_DEVICE());
return true;
}
case internal_DEVICE_READY:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(fair::mq::fsm::internal_DEVICE_READY());
return true;
}
case INIT_TASK:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(fair::mq::fsm::INIT_TASK());
return true;
}
case internal_READY:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(fair::mq::fsm::internal_READY());
return true;
}
case RUN:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(fair::mq::fsm::RUN());
return true;
}
case PAUSE:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(fair::mq::fsm::PAUSE());
return true;
}
case STOP:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(fair::mq::fsm::STOP());
return true;
}
case RESET_DEVICE:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(fair::mq::fsm::RESET_DEVICE());
return true;
}
case RESET_TASK:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(fair::mq::fsm::RESET_TASK());
return true;
}
case internal_IDLE:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(fair::mq::fsm::internal_IDLE());
return true;
}
case END:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(fair::mq::fsm::END());
return true;
}
case ERROR_FOUND:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(fair::mq::fsm::ERROR_FOUND());
return true;
}
default:
{
LOG(ERROR) << "Requested state transition with an unsupported event: " << event << std::endl
<< "Supported are: INIT_DEVICE, INIT_TASK, RUN, PAUSE, STOP, RESET_TASK, RESET_DEVICE, END, ERROR_FOUND";
return false;
}
}
}
catch (std::exception& e)
{
LOG(ERROR) << "Exception in FairMQStateMachine::ChangeState(): " << e.what();
exit(EXIT_FAILURE);
}
return false;
}
bool FairMQStateMachine::ChangeState(const std::string& event)
{
return ChangeState(GetEventNumber(event));
}
void FairMQStateMachine::WaitForEndOfState(int event)
{
try
{
switch (event)
{
case INIT_DEVICE:
case INIT_TASK:
case RUN:
case RESET_TASK:
case RESET_DEVICE:
{
std::unique_lock<std::mutex> lock(fWorkMutex);
while (fWorkActive || fWorkAvailable)
{
fWorkDoneCondition.wait_for(lock, std::chrono::seconds(1));
}
break;
}
default:
LOG(ERROR) << "Requested state is either synchronous or does not exist.";
break;
}
}
catch (std::exception& e)
{
LOG(ERROR) << "Exception in FairMQStateMachine::WaitForEndOfState(): " << e.what();
}
}
void FairMQStateMachine::WaitForEndOfState(const std::string& event)
{
return WaitForEndOfState(GetEventNumber(event));
}
bool FairMQStateMachine::WaitForEndOfStateForMs(int event, int durationInMs)
{
try
{
switch (event)
{
case INIT_DEVICE:
case INIT_TASK:
case RUN:
case RESET_TASK:
case RESET_DEVICE:
{
std::unique_lock<std::mutex> lock(fWorkMutex);
while (fWorkActive || fWorkAvailable)
{
fWorkDoneCondition.wait_for(lock, std::chrono::milliseconds(durationInMs));
if (fWorkActive)
{
return false;
}
}
return true;
}
default:
LOG(ERROR) << "Requested state is either synchronous or does not exist.";
return false;
}
}
catch (std::exception& e)
{
LOG(ERROR) << "Exception in FairMQStateMachine::WaitForEndOfStateForMs(): " << e.what();
}
return false;
}
bool FairMQStateMachine::WaitForEndOfStateForMs(const std::string& event, int durationInMs)
{
return WaitForEndOfStateForMs(GetEventNumber(event), durationInMs);
}
void FairMQStateMachine::SubscribeToStateChange(const std::string& key, std::function<void(const State)> callback)
{
fStateChangeSignalsMap.insert({key, fStateChangeSignal.connect(callback)});
}
void FairMQStateMachine::UnsubscribeFromStateChange(const std::string& key)
{
fStateChangeSignalsMap.at(key).disconnect();
fStateChangeSignalsMap.erase(key);
}
int FairMQStateMachine::GetEventNumber(const std::string& event)
{
if (event == "INIT_DEVICE") return INIT_DEVICE;
if (event == "INIT_TASK") return INIT_TASK;
if (event == "RUN") return RUN;
if (event == "PAUSE") return PAUSE;
if (event == "STOP") return STOP;
if (event == "RESET_DEVICE") return RESET_DEVICE;
if (event == "RESET_TASK") return RESET_TASK;
if (event == "END") return END;
if (event == "ERROR_FOUND") return ERROR_FOUND;
LOG(ERROR) << "Requested number for non-existent event... " << event << std::endl
<< "Supported are: INIT_DEVICE, INIT_TASK, RUN, PAUSE, STOP, RESET_DEVICE, RESET_TASK, END, ERROR_FOUND";
return -1;
}

View File

@ -150,7 +150,11 @@ struct FairMQFSM : public msmf::state_machine_def<FairMQFSM>
{ {
fsm.fState = INITIALIZING_DEVICE; fsm.fState = INITIALIZING_DEVICE;
fsm.WaitForWorkCompletion(); std::unique_lock<std::mutex> lock(fsm.fWorkMutex);
while (fsm.fWorkActive)
{
fsm.fWorkDoneCondition.wait(lock);
}
fsm.fWorkAvailable = true; fsm.fWorkAvailable = true;
LOG(STATE) << "Entering INITIALIZING DEVICE state"; LOG(STATE) << "Entering INITIALIZING DEVICE state";
fsm.fWork = std::bind(&FairMQFSM::InitWrapper, &fsm); fsm.fWork = std::bind(&FairMQFSM::InitWrapper, &fsm);
@ -175,7 +179,11 @@ struct FairMQFSM : public msmf::state_machine_def<FairMQFSM>
{ {
fsm.fState = INITIALIZING_TASK; fsm.fState = INITIALIZING_TASK;
fsm.WaitForWorkCompletion(); std::unique_lock<std::mutex> lock(fsm.fWorkMutex);
while (fsm.fWorkActive)
{
fsm.fWorkDoneCondition.wait(lock);
}
fsm.fWorkAvailable = true; fsm.fWorkAvailable = true;
LOG(STATE) << "Entering INITIALIZING TASK state"; LOG(STATE) << "Entering INITIALIZING TASK state";
fsm.fWork = std::bind(&FairMQFSM::InitTaskWrapper, &fsm); fsm.fWork = std::bind(&FairMQFSM::InitTaskWrapper, &fsm);
@ -200,7 +208,11 @@ struct FairMQFSM : public msmf::state_machine_def<FairMQFSM>
{ {
fsm.fState = RUNNING; fsm.fState = RUNNING;
fsm.WaitForWorkCompletion(); std::unique_lock<std::mutex> lock(fsm.fWorkMutex);
while (fsm.fWorkActive)
{
fsm.fWorkDoneCondition.wait(lock);
}
fsm.fWorkAvailable = true; fsm.fWorkAvailable = true;
LOG(STATE) << "Entering RUNNING state"; LOG(STATE) << "Entering RUNNING state";
fsm.fWork = std::bind(&FairMQFSM::RunWrapper, &fsm); fsm.fWork = std::bind(&FairMQFSM::RunWrapper, &fsm);
@ -216,7 +228,11 @@ struct FairMQFSM : public msmf::state_machine_def<FairMQFSM>
fsm.fState = PAUSED; fsm.fState = PAUSED;
fsm.Unblock(); fsm.Unblock();
fsm.WaitForWorkCompletion(); std::unique_lock<std::mutex> lock(fsm.fWorkMutex);
while (fsm.fWorkActive)
{
fsm.fWorkDoneCondition.wait(lock);
}
fsm.fWorkAvailable = true; fsm.fWorkAvailable = true;
LOG(STATE) << "Entering PAUSED state"; LOG(STATE) << "Entering PAUSED state";
fsm.fWork = std::bind(&FairMQFSM::PauseWrapper, &fsm); fsm.fWork = std::bind(&FairMQFSM::PauseWrapper, &fsm);
@ -231,7 +247,11 @@ struct FairMQFSM : public msmf::state_machine_def<FairMQFSM>
{ {
fsm.fState = RUNNING; fsm.fState = RUNNING;
fsm.WaitForWorkCompletion(); std::unique_lock<std::mutex> lock(fsm.fWorkMutex);
while (fsm.fWorkActive)
{
fsm.fWorkDoneCondition.wait(lock);
}
fsm.fWorkAvailable = true; fsm.fWorkAvailable = true;
LOG(STATE) << "Entering RUNNING state"; LOG(STATE) << "Entering RUNNING state";
fsm.fWork = std::bind(&FairMQFSM::RunWrapper, &fsm); fsm.fWork = std::bind(&FairMQFSM::RunWrapper, &fsm);
@ -247,7 +267,11 @@ struct FairMQFSM : public msmf::state_machine_def<FairMQFSM>
fsm.fState = READY; fsm.fState = READY;
fsm.Unblock(); fsm.Unblock();
fsm.WaitForWorkCompletion(); std::unique_lock<std::mutex> lock(fsm.fWorkMutex);
while (fsm.fWorkActive)
{
fsm.fWorkDoneCondition.wait(lock);
}
LOG(STATE) << "Entering READY state"; LOG(STATE) << "Entering READY state";
} }
}; };
@ -270,7 +294,11 @@ struct FairMQFSM : public msmf::state_machine_def<FairMQFSM>
{ {
fsm.fState = RESETTING_TASK; fsm.fState = RESETTING_TASK;
fsm.WaitForWorkCompletion(); std::unique_lock<std::mutex> lock(fsm.fWorkMutex);
while (fsm.fWorkActive)
{
fsm.fWorkDoneCondition.wait(lock);
}
fsm.fWorkAvailable = true; fsm.fWorkAvailable = true;
LOG(STATE) << "Entering RESETTING TASK state"; LOG(STATE) << "Entering RESETTING TASK state";
fsm.fWork = std::bind(&FairMQFSM::ResetTaskWrapper, &fsm); fsm.fWork = std::bind(&FairMQFSM::ResetTaskWrapper, &fsm);
@ -285,7 +313,11 @@ struct FairMQFSM : public msmf::state_machine_def<FairMQFSM>
{ {
fsm.fState = RESETTING_DEVICE; fsm.fState = RESETTING_DEVICE;
fsm.WaitForWorkCompletion(); std::unique_lock<std::mutex> lock(fsm.fWorkMutex);
while (fsm.fWorkActive)
{
fsm.fWorkDoneCondition.wait(lock);
}
fsm.fWorkAvailable = true; fsm.fWorkAvailable = true;
LOG(STATE) << "Entering RESETTING DEVICE state"; LOG(STATE) << "Entering RESETTING DEVICE state";
fsm.fWork = std::bind(&FairMQFSM::ResetWrapper, &fsm); fsm.fWork = std::bind(&FairMQFSM::ResetWrapper, &fsm);
@ -519,15 +551,6 @@ struct FairMQFSM : public msmf::state_machine_def<FairMQFSM>
} }
} }
void WaitForWorkCompletion()
{
std::unique_lock<std::mutex> lock(fWorkMutex);
while (fWorkActive)
{
fWorkDoneCondition.wait(lock);
}
}
// run state handlers in a separate thread // run state handlers in a separate thread
std::thread fWorkerThread; std::thread fWorkerThread;
}; };
@ -562,218 +585,25 @@ class FairMQStateMachine : public boost::msm::back::state_machine<fair::mq::fsm:
ERROR_FOUND ERROR_FOUND
}; };
FairMQStateMachine() FairMQStateMachine();
{ virtual ~FairMQStateMachine();
start();
}
virtual ~FairMQStateMachine()
{
stop();
}
int GetInterfaceVersion() int GetInterfaceVersion() const;
{
return FAIRMQ_INTERFACE_VERSION;
}
bool ChangeState(int event) bool ChangeState(int event);
{ bool ChangeState(const std::string& event);
try
{
switch (event)
{
case INIT_DEVICE:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(fair::mq::fsm::INIT_DEVICE());
return true;
}
case internal_DEVICE_READY:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(fair::mq::fsm::internal_DEVICE_READY());
return true;
}
case INIT_TASK:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(fair::mq::fsm::INIT_TASK());
return true;
}
case internal_READY:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(fair::mq::fsm::internal_READY());
return true;
}
case RUN:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(fair::mq::fsm::RUN());
return true;
}
case PAUSE:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(fair::mq::fsm::PAUSE());
return true;
}
case STOP:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(fair::mq::fsm::STOP());
return true;
}
case RESET_DEVICE:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(fair::mq::fsm::RESET_DEVICE());
return true;
}
case RESET_TASK:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(fair::mq::fsm::RESET_TASK());
return true;
}
case internal_IDLE:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(fair::mq::fsm::internal_IDLE());
return true;
}
case END:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(fair::mq::fsm::END());
return true;
}
case ERROR_FOUND:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(fair::mq::fsm::ERROR_FOUND());
return true;
}
default:
{
LOG(ERROR) << "Requested state transition with an unsupported event: " << event << std::endl
<< "Supported are: INIT_DEVICE, INIT_TASK, RUN, PAUSE, STOP, RESET_TASK, RESET_DEVICE, END, ERROR_FOUND";
return false;
}
}
}
catch (std::exception& e)
{
LOG(ERROR) << "Exception in FairMQStateMachine::ChangeState(): " << e.what();
exit(EXIT_FAILURE);
}
return false;
}
bool ChangeState(const std::string& event)
{
return ChangeState(GetEventNumber(event));
}
void WaitForEndOfState(int event) void WaitForEndOfState(int event);
{ void WaitForEndOfState(const std::string& event);
try
{
switch (event)
{
case INIT_DEVICE:
case INIT_TASK:
case RUN:
case RESET_TASK:
case RESET_DEVICE:
{
std::unique_lock<std::mutex> lock(fWorkMutex);
while (fWorkActive || fWorkAvailable)
{
fWorkDoneCondition.wait_for(lock, std::chrono::seconds(1));
}
break; bool WaitForEndOfStateForMs(int event, int durationInMs);
} bool WaitForEndOfStateForMs(const std::string& event, int durationInMs);
default:
LOG(ERROR) << "Requested state is either synchronous or does not exist.";
break;
}
}
catch (std::exception& e)
{
LOG(ERROR) << "Exception in FairMQStateMachine::WaitForEndOfState(): " << e.what();
}
}
void WaitForEndOfState(const std::string& event)
{
return WaitForEndOfState(GetEventNumber(event));
}
bool WaitForEndOfStateForMs(int event, int durationInMs) void SubscribeToStateChange(const std::string& key, std::function<void(const State)> callback);
{ void UnsubscribeFromStateChange(const std::string& key);
try
{
switch (event)
{
case INIT_DEVICE:
case INIT_TASK:
case RUN:
case RESET_TASK:
case RESET_DEVICE:
{
std::unique_lock<std::mutex> lock(fWorkMutex);
while (fWorkActive || fWorkAvailable)
{
fWorkDoneCondition.wait_for(lock, std::chrono::milliseconds(durationInMs));
if (fWorkActive)
{
return false;
}
}
return true;
}
default:
LOG(ERROR) << "Requested state is either synchronous or does not exist.";
return false;
}
}
catch (std::exception& e)
{
LOG(ERROR) << "Exception in FairMQStateMachine::WaitForEndOfStateForMs(): " << e.what();
}
return false;
}
bool WaitForEndOfStateForMs(const std::string& event, int durationInMs)
{
return WaitForEndOfStateForMs(GetEventNumber(event), durationInMs);
}
void SubscribeToStateChange(const std::string& key, std::function<void(const State)> callback)
{
fStateChangeSignalsMap.insert({key, fStateChangeSignal.connect(callback)});
}
void UnsubscribeFromStateChange(const std::string& key)
{
fStateChangeSignalsMap.at(key).disconnect();
fStateChangeSignalsMap.erase(key);
}
private: private:
int GetEventNumber(const std::string& event) int GetEventNumber(const std::string& event);
{
if (event == "INIT_DEVICE") return INIT_DEVICE;
if (event == "INIT_TASK") return INIT_TASK;
if (event == "RUN") return RUN;
if (event == "PAUSE") return PAUSE;
if (event == "STOP") return STOP;
if (event == "RESET_DEVICE") return RESET_DEVICE;
if (event == "RESET_TASK") return RESET_TASK;
if (event == "END") return END;
if (event == "ERROR_FOUND") return ERROR_FOUND;
LOG(ERROR) << "Requested number for non-existent event... " << event << std::endl
<< "Supported are: INIT_DEVICE, INIT_TASK, RUN, PAUSE, STOP, RESET_DEVICE, RESET_TASK, END, ERROR_FOUND";
return -1;
}
}; };
#endif /* FAIRMQSTATEMACHINE_H_ */ #endif /* FAIRMQSTATEMACHINE_H_ */