diff --git a/fairmq/FairMQStateMachine.cxx b/fairmq/FairMQStateMachine.cxx index be889dc9..0e4c39ab 100644 --- a/fairmq/FairMQStateMachine.cxx +++ b/fairmq/FairMQStateMachine.cxx @@ -12,225 +12,9 @@ * @author D. Klein, A. Rybalchenko */ -#include // WaitForEndOfStateForMs() - #include "FairMQStateMachine.h" -#include "FairMQLogger.h" -FairMQStateMachine::FairMQStateMachine() -{ - start(); -} - -FairMQStateMachine::~FairMQStateMachine() -{ - stop(); -} - -int FairMQStateMachine::GetInterfaceVersion() -{ - return FAIRMQ_INTERFACE_VERSION; -} - -int FairMQStateMachine::GetEventNumber(const std::string& event) -{ - if (event == "INIT_DEVICE") return INIT_DEVICE; - if (event == "INIT_TASK") return INIT_TASK; - if (event == "RUN") return RUN; - if (event == "PAUSE") return PAUSE; - if (event == "STOP") return STOP; - if (event == "RESET_DEVICE") return RESET_DEVICE; - if (event == "RESET_TASK") return RESET_TASK; - if (event == "END") return END; - if (event == "ERROR_FOUND") return ERROR_FOUND; - LOG(ERROR) << "Requested number for non-existent event... " << event << std::endl - << "Supported are: INIT_DEVICE, INIT_TASK, RUN, PAUSE, STOP, RESET_DEVICE, RESET_TASK, END, ERROR_FOUND"; - return -1; -} - -bool FairMQStateMachine::ChangeState(int event) -{ - try - { - switch (event) - { - case INIT_DEVICE: - { - std::lock_guard lock(fChangeStateMutex); - process_event(FairMQFSM::INIT_DEVICE()); - return true; - } - case internal_DEVICE_READY: - { - std::lock_guard lock(fChangeStateMutex); - process_event(FairMQFSM::internal_DEVICE_READY()); - return true; - } - case INIT_TASK: - { - std::lock_guard lock(fChangeStateMutex); - process_event(FairMQFSM::INIT_TASK()); - return true; - } - case internal_READY: - { - std::lock_guard lock(fChangeStateMutex); - process_event(FairMQFSM::internal_READY()); - return true; - } - case RUN: - { - std::lock_guard lock(fChangeStateMutex); - process_event(FairMQFSM::RUN()); - return true; - } - case PAUSE: - { - std::lock_guard lock(fChangeStateMutex); - process_event(FairMQFSM::PAUSE()); - return true; - } - case STOP: - { - std::lock_guard lock(fChangeStateMutex); - process_event(FairMQFSM::STOP()); - return true; - } - case RESET_DEVICE: - { - std::lock_guard lock(fChangeStateMutex); - process_event(FairMQFSM::RESET_DEVICE()); - return true; - } - case RESET_TASK: - { - std::lock_guard lock(fChangeStateMutex); - process_event(FairMQFSM::RESET_TASK()); - return true; - } - case internal_IDLE: - { - std::lock_guard lock(fChangeStateMutex); - process_event(FairMQFSM::internal_IDLE()); - return true; - } - case END: - { - std::lock_guard lock(fChangeStateMutex); - process_event(FairMQFSM::END()); - return true; - } - case ERROR_FOUND: - { - std::lock_guard lock(fChangeStateMutex); - process_event(FairMQFSM::ERROR_FOUND()); - return true; - } - default: - { - LOG(ERROR) << "Requested state transition with an unsupported event: " << event << std::endl - << "Supported are: INIT_DEVICE, INIT_TASK, RUN, PAUSE, STOP, RESET_TASK, RESET_DEVICE, END, ERROR_FOUND"; - return false; - } - } - } - catch (std::exception& e) - { - LOG(ERROR) << "Exception in FairMQStateMachine::ChangeState(): " << e.what(); - exit(EXIT_FAILURE); - } - return false; -} - -bool FairMQStateMachine::ChangeState(const std::string& event) -{ - return ChangeState(GetEventNumber(event)); -} - -void FairMQStateMachine::WaitForEndOfState(int event) -{ - try - { - switch (event) - { - case INIT_DEVICE: - case INIT_TASK: - case RUN: - case RESET_TASK: - case RESET_DEVICE: - { - std::unique_lock lock(fWorkMutex); - while (fWorkActive || fWorkAvailable) - { - fWorkDoneCondition.wait_for(lock, std::chrono::seconds(1)); - } - - break; - } - default: - LOG(ERROR) << "Requested state is either synchronous or does not exist."; - break; - } - } - catch (std::exception& e) - { - LOG(ERROR) << "Exception in FairMQStateMachine::WaitForEndOfState(): " << e.what(); - } -} - -void FairMQStateMachine::WaitForEndOfState(const std::string& event) -{ - return WaitForEndOfState(GetEventNumber(event)); -} - -bool FairMQStateMachine::WaitForEndOfStateForMs(int event, int durationInMs) -{ - try - { - switch (event) - { - case INIT_DEVICE: - case INIT_TASK: - case RUN: - case RESET_TASK: - case RESET_DEVICE: - { - std::unique_lock lock(fWorkMutex); - while (fWorkActive || fWorkAvailable) - { - fWorkDoneCondition.wait_for(lock, std::chrono::milliseconds(durationInMs)); - if (fWorkActive) - { - return false; - } - } - return true; - break; - } - default: - LOG(ERROR) << "Requested state is either synchronous or does not exist."; - return false; - } - } - catch (std::exception& e) - { - LOG(ERROR) << "Exception in FairMQStateMachine::WaitForEndOfStateForMs(): " << e.what(); - } - return false; -} - -bool FairMQStateMachine::WaitForEndOfStateForMs(const std::string& event, int durationInMs) -{ - return WaitForEndOfStateForMs(GetEventNumber(event), durationInMs); -} - -void FairMQStateMachine::SubscribeToStateChange(const std::string& key, std::function callback) -{ - fStateChangeSignalsMap.insert({key, fStateChangeSignal.connect(callback)}); -} - -void FairMQStateMachine::UnsubscribeFromStateChange(const std::string& key) -{ - fStateChangeSignalsMap.at(key).disconnect(); - fStateChangeSignalsMap.erase(key); -} +// void FairMQStateMachine::SubscribeToStateChange(const std::string& key, std::function callback) +// { +// fStateChangeSignalsMap.insert({key, fStateChangeSignal.connect(callback)}); +// } diff --git a/fairmq/FairMQStateMachine.h b/fairmq/FairMQStateMachine.h index 247a8266..42a3d5a5 100644 --- a/fairmq/FairMQStateMachine.h +++ b/fairmq/FairMQStateMachine.h @@ -22,6 +22,7 @@ #include #include #include +#include #include #include @@ -40,12 +41,15 @@ #include "FairMQLogger.h" -namespace msm = boost::msm; -namespace mpl = boost::mpl; namespace msmf = boost::msm::front; -namespace FairMQFSM +namespace fair { +namespace mq +{ +namespace fsm +{ + // defining events for the boost MSM state machine struct INIT_DEVICE { std::string name() const { return "INIT_DEVICE"; } }; struct internal_DEVICE_READY { std::string name() const { return "internal_DEVICE_READY"; } }; @@ -71,10 +75,10 @@ _Pragma("GCC diagnostic ignored \"-Weffc++\"") #endif // defining the boost MSM state machine -struct FairMQFSM_ : public msmf::state_machine_def +struct FairMQFSM : public msmf::state_machine_def { public: - FairMQFSM_() + FairMQFSM() : fWorkerThread() , fWork() , fWorkAvailableCondition() @@ -87,247 +91,210 @@ struct FairMQFSM_ : public msmf::state_machine_def , fChangeStateMutex() , fStateChangeSignal() , fStateChangeSignalsMap() - {} + {} - // Destructor - virtual ~FairMQFSM_() {}; + virtual ~FairMQFSM() + {} - template + template void on_entry(Event const&, FSM& fsm) { LOG(STATE) << "Starting FairMQ state machine"; fState = IDLE; + fsm.CallStateChangeCallbacks(IDLE); // start a worker thread to execute user states in. - fsm.fWorkerThread = std::thread(&FairMQFSM_::Worker, &fsm); + fsm.fWorkerThread = std::thread(&FairMQFSM::Worker, &fsm); } - template + template void on_exit(Event const&, FSM& /*fsm*/) { LOG(STATE) << "Exiting FairMQ state machine"; } - // The list of FSM states - struct OK_FSM : public msmf::state<> {}; - struct ERROR_FSM : public msmf::terminate_state<> {}; + // list of FSM states + struct OK_FSM : public msmf::state<> {}; + struct ERROR_FSM : public msmf::terminate_state<> {}; - struct IDLE_FSM : public msmf::state<> {}; + struct IDLE_FSM : public msmf::state<> {}; struct INITIALIZING_DEVICE_FSM : public msmf::state<> {}; - struct DEVICE_READY_FSM : public msmf::state<> {}; - struct INITIALIZING_TASK_FSM : public msmf::state<> {}; - struct READY_FSM : public msmf::state<> {}; - struct RUNNING_FSM : public msmf::state<> {}; - struct PAUSED_FSM : public msmf::state<> {}; - struct RESETTING_TASK_FSM : public msmf::state<> {}; - struct RESETTING_DEVICE_FSM : public msmf::state<> {}; - struct EXITING_FSM : public msmf::state<> {}; + struct DEVICE_READY_FSM : public msmf::state<> {}; + struct INITIALIZING_TASK_FSM : public msmf::state<> {}; + struct READY_FSM : public msmf::state<> {}; + struct RUNNING_FSM : public msmf::state<> {}; + struct PAUSED_FSM : public msmf::state<> {}; + struct RESETTING_TASK_FSM : public msmf::state<> {}; + struct RESETTING_DEVICE_FSM : public msmf::state<> {}; + struct EXITING_FSM : public msmf::state<> {}; - // Define initial states - typedef mpl::vector initial_state; + // initial states + using initial_state = boost::mpl::vector; - // Actions + // actions struct IdleFct { - template + template void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) { LOG(STATE) << "Entering IDLE state"; fsm.fState = IDLE; - fsm.CallStateChangeCallbacks(IDLE); } }; struct InitDeviceFct { - template + template void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) { LOG(STATE) << "Entering INITIALIZING DEVICE state"; fsm.fState = INITIALIZING_DEVICE; - std::unique_lock lock(fsm.fWorkMutex); - while (fsm.fWorkActive) - { - fsm.fWorkDoneCondition.wait(lock); - } + fsm.WaitForWorkCompletion(); fsm.fWorkAvailable = true; - fsm.fWork = std::bind(&FairMQFSM_::InitWrapper, &fsm); + fsm.fWork = std::bind(&FairMQFSM::InitWrapper, &fsm); fsm.fWorkAvailableCondition.notify_one(); } }; struct DeviceReadyFct { - template + template void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) { LOG(STATE) << "Entering DEVICE READY state"; fsm.fState = DEVICE_READY; - fsm.CallStateChangeCallbacks(DEVICE_READY); } }; struct InitTaskFct { - template + template void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) { LOG(STATE) << "Entering INITIALIZING TASK state"; fsm.fState = INITIALIZING_TASK; - std::unique_lock lock(fsm.fWorkMutex); - while (fsm.fWorkActive) - { - fsm.fWorkDoneCondition.wait(lock); - } + fsm.WaitForWorkCompletion(); fsm.fWorkAvailable = true; - fsm.fWork = std::bind(&FairMQFSM_::InitTaskWrapper, &fsm); + fsm.fWork = std::bind(&FairMQFSM::InitTaskWrapper, &fsm); fsm.fWorkAvailableCondition.notify_one(); } }; struct ReadyFct { - template + template void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) { LOG(STATE) << "Entering READY state"; fsm.fState = READY; - fsm.CallStateChangeCallbacks(READY); } }; struct RunFct { - template + template void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) { LOG(STATE) << "Entering RUNNING state"; fsm.fState = RUNNING; - std::unique_lock lock(fsm.fWorkMutex); - while (fsm.fWorkActive) - { - fsm.fWorkDoneCondition.wait(lock); - } + fsm.WaitForWorkCompletion(); fsm.fWorkAvailable = true; - fsm.fWork = std::bind(&FairMQFSM_::RunWrapper, &fsm); + fsm.fWork = std::bind(&FairMQFSM::RunWrapper, &fsm); fsm.fWorkAvailableCondition.notify_one(); } }; struct PauseFct { - template + template void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) { LOG(STATE) << "Entering PAUSED state"; fsm.fState = PAUSED; fsm.Unblock(); - std::unique_lock lock(fsm.fWorkMutex); - while (fsm.fWorkActive) - { - fsm.fWorkDoneCondition.wait(lock); - } + fsm.WaitForWorkCompletion(); fsm.fWorkAvailable = true; - fsm.fWork = std::bind(&FairMQFSM_::PauseWrapper, &fsm); + fsm.fWork = std::bind(&FairMQFSM::PauseWrapper, &fsm); fsm.fWorkAvailableCondition.notify_one(); } }; struct ResumeFct { - template + template void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) { LOG(STATE) << "Entering RUNNING state"; fsm.fState = RUNNING; - std::unique_lock lock(fsm.fWorkMutex); - while (fsm.fWorkActive) - { - fsm.fWorkDoneCondition.wait(lock); - } + fsm.WaitForWorkCompletion(); fsm.fWorkAvailable = true; - fsm.fWork = std::bind(&FairMQFSM_::RunWrapper, &fsm); + fsm.fWork = std::bind(&FairMQFSM::RunWrapper, &fsm); fsm.fWorkAvailableCondition.notify_one(); } }; struct StopFct { - template + template void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) { LOG(STATE) << "Entering READY state"; fsm.fState = READY; - fsm.CallStateChangeCallbacks(READY); fsm.Unblock(); - std::unique_lock lock(fsm.fWorkMutex); - while (fsm.fWorkActive) - { - fsm.fWorkDoneCondition.wait(lock); - } + fsm.WaitForWorkCompletion(); } }; struct InternalStopFct { - template + template void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) { LOG(STATE) << "RUNNING state finished without an external event, entering READY state"; fsm.fState = READY; - fsm.CallStateChangeCallbacks(READY); - fsm.Unblock(); } }; struct ResetTaskFct { - template + template void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) { LOG(STATE) << "Entering RESETTING TASK state"; fsm.fState = RESETTING_TASK; - std::unique_lock lock(fsm.fWorkMutex); - while (fsm.fWorkActive) - { - fsm.fWorkDoneCondition.wait(lock); - } + fsm.WaitForWorkCompletion(); fsm.fWorkAvailable = true; - fsm.fWork = std::bind(&FairMQFSM_::ResetTaskWrapper, &fsm); + fsm.fWork = std::bind(&FairMQFSM::ResetTaskWrapper, &fsm); fsm.fWorkAvailableCondition.notify_one(); } }; struct ResetDeviceFct { - template + template void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) { LOG(STATE) << "Entering RESETTING DEVICE state"; fsm.fState = RESETTING_DEVICE; - std::unique_lock lock(fsm.fWorkMutex); - while (fsm.fWorkActive) - { - fsm.fWorkDoneCondition.wait(lock); - } + fsm.WaitForWorkCompletion(); fsm.fWorkAvailable = true; - fsm.fWork = std::bind(&FairMQFSM_::ResetWrapper, &fsm); + fsm.fWork = std::bind(&FairMQFSM::ResetWrapper, &fsm); fsm.fWorkAvailableCondition.notify_one(); } }; struct ExitingFct { - template + template void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) { LOG(STATE) << "Entering EXITING state"; @@ -353,7 +320,7 @@ struct FairMQFSM_ : public msmf::state_machine_def struct ErrorFoundFct { - template + template void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) { LOG(STATE) << "Entering ERROR state"; @@ -362,55 +329,9 @@ struct FairMQFSM_ : public msmf::state_machine_def } }; - // actions to be overwritten by derived classes - virtual void InitWrapper() {} - virtual void Init() {} - virtual void InitTaskWrapper() {} - virtual void InitTask() {} - virtual void RunWrapper() {} - virtual void Run() {} - virtual void PauseWrapper() {} - virtual void Pause() {} - virtual void ResetWrapper() {} - virtual void Reset() {} - virtual void ResetTaskWrapper() {} - virtual void ResetTask() {} - virtual void Exit() {} - virtual void Unblock() {} // Method to send commands. - - void Worker() - { - while (true) - { - { - std::unique_lock lock(fWorkMutex); - // Wait for work to be done. - while (!fWorkAvailable && !fWorkerTerminated) - { - fWorkAvailableCondition.wait(lock); - } - - if (fWorkerTerminated) - { - break; - } - - fWorkActive = true; - } - - fWork(); - - std::lock_guard lock(fWorkMutex); - fWorkActive = false; - fWorkAvailable = false; - fWorkDoneCondition.notify_one(); - } - } - // Transition table for FairMQFSM - struct transition_table : mpl::vector< + struct transition_table : boost::mpl::vector< // Start Event Next Action Guard - // +------------------------+----------------------+------------------------+----------------+---------+ msmf::Row, msmf::Row, msmf::Row, @@ -426,18 +347,18 @@ struct FairMQFSM_ : public msmf::state_machine_def msmf::Row, msmf::Row, msmf::Row> - {}; + {}; - // Replaces the default no-transition response. - template + // replaces the default no-transition response. + template void no_transition(Event const& e, FSM&, int state) { - typedef typename msm::back::recursive_get_transition_table::type recursive_stt; - typedef typename msm::back::generate_state_set::type all_states; + using recursive_stt = typename boost::msm::back::recursive_get_transition_table::type; + using all_states = typename boost::msm::back::generate_state_set::type; std::string stateName; - mpl::for_each>(msm::back::get_state_name(stateName, state)); + boost::mpl::for_each>(boost::msm::back::get_state_name(stateName, state)); stateName = stateName.substr(24); std::size_t pos = stateName.find("_FSME"); @@ -506,21 +427,36 @@ struct FairMQFSM_ : public msmf::state_machine_def } } - std::string GetCurrentStateName() const { return GetStateName(fState); } - int GetCurrentState() const { return fState; } - bool CheckCurrentState(int state) const { return state == fState; } - bool CheckCurrentState(std::string state) const { return state == GetCurrentStateName(); } - - void CallStateChangeCallbacks(const State state) const + std::string GetCurrentStateName() const { - if (!fStateChangeSignal.empty()) - { - fStateChangeSignal(state); - } + return GetStateName(fState); + } + int GetCurrentState() const + { + return fState; + } + bool CheckCurrentState(int state) const + { + return state == fState; + } + bool CheckCurrentState(std::string state) const + { + return state == GetCurrentStateName(); } - // this is to run certain functions in a separate thread - std::thread fWorkerThread; + // actions to be overwritten by derived classes + virtual void InitWrapper() {} + virtual void InitTaskWrapper() {} + virtual void RunWrapper() {} + virtual void PauseWrapper() {} + virtual void ResetWrapper() {} + virtual void ResetTaskWrapper() {} + virtual void Exit() {} + virtual void Unblock() {} + + protected: + std::atomic fState; + std::mutex fChangeStateMutex; // function to execute user states in a worker thread std::function fWork; @@ -531,12 +467,61 @@ struct FairMQFSM_ : public msmf::state_machine_def bool fWorkActive; bool fWorkAvailable; - protected: - std::atomic fState; - std::mutex fChangeStateMutex; - boost::signals2::signal fStateChangeSignal; std::unordered_map fStateChangeSignalsMap; + + void CallStateChangeCallbacks(const State state) const + { + if (!fStateChangeSignal.empty()) + { + fStateChangeSignal(state); + } + } + + private: + void Worker() + { + while (true) + { + { + std::unique_lock lock(fWorkMutex); + // Wait for work to be done. + while (!fWorkAvailable && !fWorkerTerminated) + { + fWorkAvailableCondition.wait(lock); + } + + if (fWorkerTerminated) + { + break; + } + + fWorkActive = true; + } + + fWork(); + + { + std::lock_guard lock(fWorkMutex); + fWorkActive = false; + fWorkAvailable = false; + fWorkDoneCondition.notify_one(); + } + CallStateChangeCallbacks(fState); + } + } + + void WaitForWorkCompletion() + { + std::unique_lock lock(fWorkMutex); + while (fWorkActive) + { + fWorkDoneCondition.wait(lock); + } + } + + // run state handlers in a separate thread + std::thread fWorkerThread; }; // reactivate the warning for non-virtual destructor @@ -546,11 +531,11 @@ _Pragma("clang diagnostic pop") _Pragma("GCC diagnostic pop") #endif -typedef msm::back::state_machine FairMQFSM; -} // namespace FairMQFSM +} // namespace fsm +} // namespace mq +} // namespace fair - -class FairMQStateMachine : public FairMQFSM::FairMQFSM +class FairMQStateMachine : public boost::msm::back::state_machine { public: enum Event @@ -569,24 +554,218 @@ class FairMQStateMachine : public FairMQFSM::FairMQFSM ERROR_FOUND }; - FairMQStateMachine(); - virtual ~FairMQStateMachine(); + FairMQStateMachine() + { + start(); + } + virtual ~FairMQStateMachine() + { + stop(); + } - int GetInterfaceVersion(); + int GetInterfaceVersion() + { + return FAIRMQ_INTERFACE_VERSION; + } - int GetEventNumber(const std::string& event); + bool ChangeState(int event) + { + try + { + switch (event) + { + case INIT_DEVICE: + { + std::lock_guard lock(fChangeStateMutex); + process_event(fair::mq::fsm::INIT_DEVICE()); + return true; + } + case internal_DEVICE_READY: + { + std::lock_guard lock(fChangeStateMutex); + process_event(fair::mq::fsm::internal_DEVICE_READY()); + return true; + } + case INIT_TASK: + { + std::lock_guard lock(fChangeStateMutex); + process_event(fair::mq::fsm::INIT_TASK()); + return true; + } + case internal_READY: + { + std::lock_guard lock(fChangeStateMutex); + process_event(fair::mq::fsm::internal_READY()); + return true; + } + case RUN: + { + std::lock_guard lock(fChangeStateMutex); + process_event(fair::mq::fsm::RUN()); + return true; + } + case PAUSE: + { + std::lock_guard lock(fChangeStateMutex); + process_event(fair::mq::fsm::PAUSE()); + return true; + } + case STOP: + { + std::lock_guard lock(fChangeStateMutex); + process_event(fair::mq::fsm::STOP()); + return true; + } + case RESET_DEVICE: + { + std::lock_guard lock(fChangeStateMutex); + process_event(fair::mq::fsm::RESET_DEVICE()); + return true; + } + case RESET_TASK: + { + std::lock_guard lock(fChangeStateMutex); + process_event(fair::mq::fsm::RESET_TASK()); + return true; + } + case internal_IDLE: + { + std::lock_guard lock(fChangeStateMutex); + process_event(fair::mq::fsm::internal_IDLE()); + return true; + } + case END: + { + std::lock_guard lock(fChangeStateMutex); + process_event(fair::mq::fsm::END()); + return true; + } + case ERROR_FOUND: + { + std::lock_guard lock(fChangeStateMutex); + process_event(fair::mq::fsm::ERROR_FOUND()); + return true; + } + default: + { + LOG(ERROR) << "Requested state transition with an unsupported event: " << event << std::endl + << "Supported are: INIT_DEVICE, INIT_TASK, RUN, PAUSE, STOP, RESET_TASK, RESET_DEVICE, END, ERROR_FOUND"; + return false; + } + } + } + catch (std::exception& e) + { + LOG(ERROR) << "Exception in FairMQStateMachine::ChangeState(): " << e.what(); + exit(EXIT_FAILURE); + } + return false; + } + bool ChangeState(const std::string& event) + { + return ChangeState(GetEventNumber(event)); + } - bool ChangeState(int event); - bool ChangeState(const std::string& event); + void WaitForEndOfState(int event) + { + try + { + switch (event) + { + case INIT_DEVICE: + case INIT_TASK: + case RUN: + case RESET_TASK: + case RESET_DEVICE: + { + std::unique_lock lock(fWorkMutex); + while (fWorkActive || fWorkAvailable) + { + fWorkDoneCondition.wait_for(lock, std::chrono::seconds(1)); + } - void WaitForEndOfState(int state); - void WaitForEndOfState(const std::string& state); + break; + } + default: + LOG(ERROR) << "Requested state is either synchronous or does not exist."; + break; + } + } + catch (std::exception& e) + { + LOG(ERROR) << "Exception in FairMQStateMachine::WaitForEndOfState(): " << e.what(); + } + } + void WaitForEndOfState(const std::string& event) + { + return WaitForEndOfState(GetEventNumber(event)); + } - bool WaitForEndOfStateForMs(int state, int durationInMs); - bool WaitForEndOfStateForMs(const std::string& state, int durationInMs); + bool WaitForEndOfStateForMs(int event, int durationInMs) + { + try + { + switch (event) + { + case INIT_DEVICE: + case INIT_TASK: + case RUN: + case RESET_TASK: + case RESET_DEVICE: + { + std::unique_lock lock(fWorkMutex); + while (fWorkActive || fWorkAvailable) + { + fWorkDoneCondition.wait_for(lock, std::chrono::milliseconds(durationInMs)); + if (fWorkActive) + { + return false; + } + } + return true; + } + default: + LOG(ERROR) << "Requested state is either synchronous or does not exist."; + return false; + } + } + catch (std::exception& e) + { + LOG(ERROR) << "Exception in FairMQStateMachine::WaitForEndOfStateForMs(): " << e.what(); + } + return false; + } + bool WaitForEndOfStateForMs(const std::string& event, int durationInMs) + { + return WaitForEndOfStateForMs(GetEventNumber(event), durationInMs); + } - void SubscribeToStateChange(const std::string& key, std::function callback); - void UnsubscribeFromStateChange(const std::string& key); + void SubscribeToStateChange(const std::string& key, std::function callback) + { + fStateChangeSignalsMap.insert({key, fStateChangeSignal.connect(callback)}); + } + void UnsubscribeFromStateChange(const std::string& key) + { + fStateChangeSignalsMap.at(key).disconnect(); + fStateChangeSignalsMap.erase(key); + } + + private: + int GetEventNumber(const std::string& event) + { + if (event == "INIT_DEVICE") return INIT_DEVICE; + if (event == "INIT_TASK") return INIT_TASK; + if (event == "RUN") return RUN; + if (event == "PAUSE") return PAUSE; + if (event == "STOP") return STOP; + if (event == "RESET_DEVICE") return RESET_DEVICE; + if (event == "RESET_TASK") return RESET_TASK; + if (event == "END") return END; + if (event == "ERROR_FOUND") return ERROR_FOUND; + LOG(ERROR) << "Requested number for non-existent event... " << event << std::endl + << "Supported are: INIT_DEVICE, INIT_TASK, RUN, PAUSE, STOP, RESET_DEVICE, RESET_TASK, END, ERROR_FOUND"; + return -1; + } }; #endif /* FAIRMQSTATEMACHINE_H_ */ diff --git a/fairmq/FairMQTransportFactory.cxx b/fairmq/FairMQTransportFactory.cxx index edab9441..a36abc43 100644 --- a/fairmq/FairMQTransportFactory.cxx +++ b/fairmq/FairMQTransportFactory.cxx @@ -13,15 +13,17 @@ #include #endif /* NANOMSG_FOUND */ #include -#include + #include #include #include + +#include #include #include FairMQTransportFactory::FairMQTransportFactory(const std::string& id) -: fkId(id) + : fkId(id) { } @@ -29,26 +31,26 @@ auto FairMQTransportFactory::CreateTransportFactory(const std::string& type, con { using namespace std; - auto final_id = id; + auto finalId = id; // Generate uuid if empty - if (final_id == "") + if (finalId == "") { - final_id = boost::uuids::to_string(boost::uuids::random_generator()()); + finalId = boost::uuids::to_string(boost::uuids::random_generator()()); } if (type == "zeromq") { - return std::make_shared(final_id, config); + return std::make_shared(finalId, config); } else if (type == "shmem") { - return std::make_shared(final_id, config); + return std::make_shared(finalId, config); } #ifdef NANOMSG_FOUND else if (type == "nanomsg") { - return std::make_shared(final_id, config); + return std::make_shared(finalId, config); } #endif /* NANOMSG_FOUND */ else diff --git a/fairmq/devices/FairMQBenchmarkSampler.cxx b/fairmq/devices/FairMQBenchmarkSampler.cxx index 0b453e13..2ab7cbf9 100644 --- a/fairmq/devices/FairMQBenchmarkSampler.cxx +++ b/fairmq/devices/FairMQBenchmarkSampler.cxx @@ -27,7 +27,8 @@ FairMQBenchmarkSampler::FairMQBenchmarkSampler() , fMsgSize(10000) , fMsgCounter(0) , fMsgRate(1) - , fNumMsgs(0) + , fNumIterations(0) + , fMaxIterations(0) , fOutChannelName() , fResetMsgCounter() { @@ -42,7 +43,7 @@ void FairMQBenchmarkSampler::InitTask() fSameMessage = fConfig->GetValue("same-msg"); fMsgSize = fConfig->GetValue("msg-size"); fMsgRate = fConfig->GetValue("msg-rate"); - fNumMsgs = fConfig->GetValue("num-msgs"); + fMaxIterations = fConfig->GetValue("max-iterations"); fOutChannelName = fConfig->GetValue("out-channel"); } @@ -53,14 +54,12 @@ void FairMQBenchmarkSampler::PreRun() void FairMQBenchmarkSampler::Run() { - uint64_t numSentMsgs = 0; - // store the channel reference to avoid traversing the map on every loop iteration FairMQChannel& dataOutChannel = fChannels.at(fOutChannelName).at(0); FairMQMessagePtr baseMsg(dataOutChannel.Transport()->CreateMessage(fMsgSize)); - LOG(INFO) << "Starting the benchmark with message size of " << fMsgSize << " and number of messages " << fNumMsgs << "."; + LOG(INFO) << "Starting the benchmark with message size of " << fMsgSize << " and " << fMaxIterations << " iterations."; auto tStart = chrono::high_resolution_clock::now(); while (CheckCurrentState(RUNNING)) @@ -72,14 +71,14 @@ void FairMQBenchmarkSampler::Run() if (dataOutChannel.Send(msg) >= 0) { - if (fNumMsgs > 0) + if (fMaxIterations > 0) { - if (numSentMsgs >= fNumMsgs) + if (fNumIterations >= fMaxIterations) { break; } } - ++numSentMsgs; + ++fNumIterations; } } else @@ -88,14 +87,14 @@ void FairMQBenchmarkSampler::Run() if (dataOutChannel.Send(msg) >= 0) { - if (fNumMsgs > 0) + if (fMaxIterations > 0) { - if (numSentMsgs >= fNumMsgs) + if (fNumIterations >= fMaxIterations) { break; } } - ++numSentMsgs; + ++fNumIterations; } } @@ -109,7 +108,7 @@ void FairMQBenchmarkSampler::Run() auto tEnd = chrono::high_resolution_clock::now(); - LOG(INFO) << "Leaving RUNNING state. Sent " << numSentMsgs << " messages in " << chrono::duration(tEnd - tStart).count() << "ms."; + LOG(INFO) << "Leaving RUNNING state. Done " << fNumIterations << " iterations in " << chrono::duration(tEnd - tStart).count() << "ms."; } diff --git a/fairmq/devices/FairMQBenchmarkSampler.h b/fairmq/devices/FairMQBenchmarkSampler.h index 3bf66fb8..af05a21a 100644 --- a/fairmq/devices/FairMQBenchmarkSampler.h +++ b/fairmq/devices/FairMQBenchmarkSampler.h @@ -40,7 +40,8 @@ class FairMQBenchmarkSampler : public FairMQDevice int fMsgSize; int fMsgCounter; int fMsgRate; - uint64_t fNumMsgs; + uint64_t fNumIterations; + uint64_t fMaxIterations; std::string fOutChannelName; std::thread fResetMsgCounter; diff --git a/fairmq/devices/FairMQSink.h b/fairmq/devices/FairMQSink.h index a2768879..75aadeb2 100644 --- a/fairmq/devices/FairMQSink.h +++ b/fairmq/devices/FairMQSink.h @@ -42,7 +42,7 @@ class FairMQSink : public FairMQDevice//, public OutputPolicy virtual void InitTask() { - fMaxIterations = fConfig->GetValue("num-iterations"); + fMaxIterations = fConfig->GetValue("max-iterations"); fInChannelName = fConfig->GetValue("in-channel"); } diff --git a/fairmq/run/runBenchmarkSampler.cxx b/fairmq/run/runBenchmarkSampler.cxx index efae1c36..56f15773 100644 --- a/fairmq/run/runBenchmarkSampler.cxx +++ b/fairmq/run/runBenchmarkSampler.cxx @@ -17,7 +17,7 @@ void addCustomOptions(bpo::options_description& options) ("out-channel", bpo::value()->default_value("data"), "Name of the output channel") ("same-msg", bpo::value()->default_value(true), "Re-send the same message (default), or recreate for each iteration") ("msg-size", bpo::value()->default_value(1000), "Message size in bytes") - ("num-msgs", bpo::value()->default_value(0), "Number of messages to send") + ("max-iterations", bpo::value()->default_value(0), "Number of run iterations (0 - infinite)") ("msg-rate", bpo::value()->default_value(0), "Msg rate limit in maximum number of messages per second"); } diff --git a/fairmq/run/runSink.cxx b/fairmq/run/runSink.cxx index abbd60b9..3b9c5955 100644 --- a/fairmq/run/runSink.cxx +++ b/fairmq/run/runSink.cxx @@ -15,7 +15,7 @@ void addCustomOptions(bpo::options_description& options) { options.add_options() ("in-channel", bpo::value()->default_value("data"), "Name of the input channel") - ("num-msgs", bpo::value()->default_value(0), "Number of messages to receive"); + ("max-iterations", bpo::value()->default_value(0), "Number of run iterations (0 - infinite)"); } FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) diff --git a/fairmq/run/startMQBenchmark.sh.in b/fairmq/run/startMQBenchmark.sh.in index 7459486a..71baa9bf 100755 --- a/fairmq/run/startMQBenchmark.sh.in +++ b/fairmq/run/startMQBenchmark.sh.in @@ -1,6 +1,6 @@ #!/bin/bash -numMsgs="0" +maxIterations="0" msgSize="1000000" transport="zeromq" sameMsg="true" @@ -14,7 +14,7 @@ if [[ $1 =~ ^[0-9]+$ ]]; then fi if [[ $2 =~ ^[0-9]+$ ]]; then - numMsgs=$2 + maxIterations=$2 fi if [[ $3 =~ ^[a-z]+$ ]]; then @@ -35,10 +35,10 @@ echo "Starting benchmark with following settings:" echo "" echo "message size: $msgSize bytes" -if [ $numMsgs = 0 ]; then - echo "number of messages: unlimited" +if [ $maxIterations = 0 ]; then + echo "number of iterations: unlimited" else - echo "number of messages: $numMsgs" + echo "number of iterations: $maxIterations" fi echo "transport: $transport" @@ -58,7 +58,7 @@ else fi echo "" -echo "Usage: startBenchmark [message size=1000000] [number of messages=0] [transport=zeromq/nanomsg/shmem] [resend same message=true] [affinity=false]" +echo "Usage: startBenchmark [message size=1000000] [number of iterations=0] [transport=zeromq/nanomsg/shmem] [resend same message=true] [affinity=false]" SAMPLER="bsampler" SAMPLER+=" --id bsampler1" @@ -68,7 +68,7 @@ SAMPLER+=" --transport $transport" SAMPLER+=" --msg-size $msgSize" SAMPLER+=" --same-msg $sameMsg" # SAMPLER+=" --msg-rate 1000" -SAMPLER+=" --num-msgs $numMsgs" +SAMPLER+=" --max-iterations $maxIterations" SAMPLER+=" --mq-config @CMAKE_BINARY_DIR@/bin/config/benchmark.json" xterm -geometry 90x23+0+0 -hold -e $affinitySamp @CMAKE_BINARY_DIR@/bin/$SAMPLER & echo "" @@ -80,7 +80,7 @@ SINK+=" --id sink1" #SINK+=" --io-threads 2" #SINK+=" --control static" SINK+=" --transport $transport" -SINK+=" --num-msgs $numMsgs" +SINK+=" --max-iterations $maxIterations" SINK+=" --mq-config @CMAKE_BINARY_DIR@/bin/config/benchmark.json" xterm -geometry 90x23+550+0 -hold -e $affinitySink @CMAKE_BINARY_DIR@/bin/$SINK & echo ""