diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index 9475da7b..e3edf08d 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -50,7 +50,7 @@ set(FAIRMQ_PUBLIC_HEADER_FILES FairMQPoller.h FairMQUnmanagedRegion.h FairMQSocket.h - FairMQStateMachine.h + StateMachine.h FairMQTransportFactory.h MemoryResources.h MemoryResourceTools.h @@ -83,7 +83,6 @@ set(FAIRMQ_PRIVATE_HEADER_FILES options/FairProgOptionsHelper.h plugins/Builtin.h plugins/Control.h - StateMachine.h shmem/FairMQMessageSHM.h shmem/FairMQPollerSHM.h shmem/FairMQUnmanagedRegionSHM.h @@ -130,7 +129,7 @@ set(FAIRMQ_SOURCE_FILES FairMQMessage.cxx FairMQPoller.cxx FairMQSocket.cxx - FairMQStateMachine.cxx + StateMachine.cxx FairMQTransportFactory.cxx devices/FairMQBenchmarkSampler.cxx devices/FairMQMerger.cxx @@ -144,7 +143,6 @@ set(FAIRMQ_SOURCE_FILES PluginManager.cxx PluginServices.cxx plugins/Control.cxx - StateMachine.cxx shmem/FairMQMessageSHM.cxx shmem/FairMQPollerSHM.cxx shmem/FairMQUnmanagedRegionSHM.cxx diff --git a/fairmq/DeviceRunner.cxx b/fairmq/DeviceRunner.cxx index 95ad9712..d76e5d02 100644 --- a/fairmq/DeviceRunner.cxx +++ b/fairmq/DeviceRunner.cxx @@ -74,15 +74,14 @@ auto DeviceRunner::Run() -> int fDevice->RegisterChannelEndpoints(); if (fConfig.Count("print-channels")) { fDevice->PrintRegisteredChannels(); - fDevice->ChangeState(FairMQDevice::END); + fDevice->ChangeState(fair::mq::Transition::End); return 0; } // Handle --version if (fConfig.Count("version")) { std::cout << "User device version: " << fDevice->GetVersion() << std::endl; - std::cout << "FAIRMQ_INTERFACE_VERSION: " << FAIRMQ_INTERFACE_VERSION << std::endl; - fDevice->ChangeState(FairMQDevice::END); + fDevice->ChangeState(fair::mq::Transition::End); return 0; } diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index fbf8a6e2..918ae15a 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -27,6 +27,19 @@ using namespace std; +static map backwardsCompatibilityWaitForEndOfStateHelper = +{ + { fair::mq::Transition::InitDevice, fair::mq::State::InitializingDevice }, + { fair::mq::Transition::CompleteInit, fair::mq::State::Initialized }, + { fair::mq::Transition::Bind, fair::mq::State::Bound }, + { fair::mq::Transition::Connect, fair::mq::State::DeviceReady }, + { fair::mq::Transition::InitTask, fair::mq::State::Ready }, + { fair::mq::Transition::Run, fair::mq::State::Ready }, + { fair::mq::Transition::Stop, fair::mq::State::Ready }, + { fair::mq::Transition::ResetTask, fair::mq::State::DeviceReady }, + { fair::mq::Transition::ResetDevice, fair::mq::State::Idle } +}; + FairMQDevice::FairMQDevice() : FairMQDevice(nullptr, {0, 0, 0}) { @@ -54,7 +67,10 @@ FairMQDevice::FairMQDevice(FairMQProgOptions* config, const fair::mq::tools::Ver , fInternalConfig(config ? nullptr : fair::mq::tools::make_unique()) , fConfig(config ? config : fInternalConfig.get()) , fId() - , fDefaultTransportType(fair::mq::Transport::DEFAULT) + , fDefaultTransportType(fair::mq::Transport::ZMQ) + , fStateMachine() + , fUninitializedBindingChannels() + , fUninitializedConnectingChannels() , fDataCallbacks(false) , fMsgInputs() , fMultipartInputs() @@ -66,16 +82,98 @@ FairMQDevice::FairMQDevice(FairMQProgOptions* config, const fair::mq::tools::Ver , fVersion(version) , fRate(0.) , fRawCmdLineArgs() - , fInterrupted(false) - , fInterruptedCV() - , fInterruptedMtx() - , fRateLogging(true) { + SubscribeToNewTransition("device", [&](fair::mq::Transition transition) { + LOG(trace) << "device notified on new transition: " << transition; + + switch (transition) { + case fair::mq::Transition::Stop: + UnblockTransports(); + break; + default: + break; + } + }); + + fStateMachine.HandleStates([&](fair::mq::State state) { + LOG(trace) << "device notified on new state: " << state; + + { + lock_guard lock(fStatesMtx); + fStates.push(state); + } + fStatesCV.notify_one(); + + switch (state) { + case fair::mq::State::InitializingDevice: + InitWrapper(); + break; + case fair::mq::State::Binding: + BindWrapper(); + break; + case fair::mq::State::Connecting: + ConnectWrapper(); + break; + case fair::mq::State::InitializingTask: + InitTaskWrapper(); + break; + case fair::mq::State::Running: + RunWrapper(); + break; + case fair::mq::State::ResettingTask: + ResetTaskWrapper(); + break; + case fair::mq::State::ResettingDevice: + ResetWrapper(); + break; + case fair::mq::State::Exiting: + Exit(); + break; + default: + LOG(trace) << "device notified on new state without a matching handler: " << state; + break; + } + }); + + fStateMachine.Start(); +} + +fair::mq::State FairMQDevice::WaitForNextState() +{ + unique_lock lock(fStatesMtx); + while (fStates.empty()) { + fStatesCV.wait_for(lock, chrono::milliseconds(50)); + } + + auto result = fStates.front(); + + if (result == fair::mq::State::Error) { + throw DeviceStateError("Device transitioned to error state."); + } + + fStates.pop(); + + return result; +} + +void FairMQDevice::WaitForState(fair::mq::State state) +{ + while (WaitForNextState() != state) {} +} + +void FairMQDevice::WaitForEndOfState(fair::mq::Transition transition) +{ + WaitForState(backwardsCompatibilityWaitForEndOfStateHelper.at(transition)); } void FairMQDevice::InitWrapper() { + fStateMachine.WaitForPendingState(); + fId = fConfig->GetValue("id"); + + Init(); + fRate = fConfig->GetValue("rate"); try { @@ -96,13 +194,9 @@ void FairMQDevice::InitWrapper() } } - LOG(debug) << "Requesting '" << fair::mq::TransportNames.at(fDefaultTransportType) << "' as default transport for the device"; + LOG(debug) << "Setting '" << fair::mq::TransportNames.at(fDefaultTransportType) << "' as default transport for the device"; fTransportFactory = AddTransport(fDefaultTransportType); - // Containers to store the uninitialized channels. - vector uninitializedBindingChannels; - vector uninitializedConnectingChannels; - string networkInterface = fConfig->GetValue("network-interface"); // Fill the uninitialized channel containers @@ -113,13 +207,8 @@ void FairMQDevice::InitWrapper() vi.fName = fair::mq::tools::ToString(mi.first, "[", subChannelIndex, "]"); // set channel transport - if (vi.fTransportType == fair::mq::Transport::DEFAULT || vi.fTransportType == fTransportFactory->GetType()) { - LOG(debug) << vi.fName << ": using default transport"; - vi.InitTransport(fTransportFactory); - } else { - LOG(debug) << vi.fName << ": channel transport (" << fair::mq::TransportNames.at(fDefaultTransportType) << ") overriden to " << fair::mq::TransportNames.at(vi.fTransportType); - vi.InitTransport(AddTransport(vi.fTransportType)); - } + LOG(debug) << "Initializing transport for channel " << vi.fName << ": " << fair::mq::TransportNames.at(vi.fTransportType); + vi.InitTransport(AddTransport(vi.fTransportType)); if (vi.fMethod == "bind") { // if binding address is not specified, try getting it from the configured network interface @@ -131,13 +220,13 @@ void FairMQDevice::InitWrapper() vi.fAddress = "tcp://" + fair::mq::tools::getInterfaceIP(networkInterface) + ":1"; } // fill the uninitialized list - uninitializedBindingChannels.push_back(&vi); + fUninitializedBindingChannels.push_back(&vi); } else if (vi.fMethod == "connect") { // fill the uninitialized list - uninitializedConnectingChannels.push_back(&vi); + fUninitializedConnectingChannels.push_back(&vi); } else if (vi.fAddress.find_first_of("@+>") != string::npos) { // fill the uninitialized list - uninitializedConnectingChannels.push_back(&vi); + fUninitializedConnectingChannels.push_back(&vi); } else { LOG(error) << "Cannot update configuration. Socket method (bind/connect) for channel '" << vi.fName << "' not specified."; throw runtime_error(fair::mq::tools::ToString("Cannot update configuration. Socket method (bind/connect) for channel ", vi.fName, " not specified.")); @@ -147,17 +236,27 @@ void FairMQDevice::InitWrapper() } } + // ChangeState(fair::mq::Transition::Auto); +} + +void FairMQDevice::BindWrapper() +{ // Bind channels. Here one run is enough, because bind settings should be available locally // If necessary this could be handled in the same way as the connecting channels - AttachChannels(uninitializedBindingChannels); + AttachChannels(fUninitializedBindingChannels); - if (!uninitializedBindingChannels.empty()) { - LOG(error) << uninitializedBindingChannels.size() << " of the binding channels could not initialize. Initial configuration incomplete."; - throw runtime_error(fair::mq::tools::ToString(uninitializedBindingChannels.size(), " of the binding channels could not initialize. Initial configuration incomplete.")); + if (!fUninitializedBindingChannels.empty()) { + LOG(error) << fUninitializedBindingChannels.size() << " of the binding channels could not initialize. Initial configuration incomplete."; + throw runtime_error(fair::mq::tools::ToString(fUninitializedBindingChannels.size(), " of the binding channels could not initialize. Initial configuration incomplete.")); } - CallStateChangeCallbacks(INITIALIZING_DEVICE); + Bind(); + ChangeState(fair::mq::Transition::Auto); +} + +void FairMQDevice::ConnectWrapper() +{ int initializationTimeoutInS = fConfig->GetValue("initialization-timeout"); // go over the list of channels until all are initialized (and removed from the uninitialized list) @@ -165,12 +264,12 @@ void FairMQDevice::InitWrapper() auto sleepTimeInMS = 50; auto maxAttempts = initializationTimeoutInS * 1000 / sleepTimeInMS; // first attempt - AttachChannels(uninitializedConnectingChannels); + AttachChannels(fUninitializedConnectingChannels); // if not all channels could be connected, update their address values from config and retry - while (!uninitializedConnectingChannels.empty()) { + while (!fUninitializedConnectingChannels.empty()) { this_thread::sleep_for(chrono::milliseconds(sleepTimeInMS)); - for (auto& chan : uninitializedConnectingChannels) { + for (auto& chan : fUninitializedConnectingChannels) { string key{"chans." + chan->GetChannelPrefix() + "." + chan->GetChannelIndex() + ".address"}; string newAddress = fConfig->GetValue(key); if (newAddress != chan->GetAddress()) { @@ -183,20 +282,16 @@ void FairMQDevice::InitWrapper() throw runtime_error(fair::mq::tools::ToString("could not connect all channels after ", initializationTimeoutInS, " attempts")); } - AttachChannels(uninitializedConnectingChannels); + AttachChannels(fUninitializedConnectingChannels); } - Init(); - if (fChannels.empty()) { LOG(warn) << "No channels created after finishing initialization"; } - ChangeState(internal_DEVICE_READY); -} + Connect(); -void FairMQDevice::Init() -{ + ChangeState(fair::mq::Transition::Auto); } void FairMQDevice::AttachChannels(vector& chans) @@ -295,15 +390,9 @@ bool FairMQDevice::AttachChannel(FairMQChannel& chan) void FairMQDevice::InitTaskWrapper() { - CallStateChangeCallbacks(INITIALIZING_TASK); - InitTask(); - ChangeState(internal_READY); -} - -void FairMQDevice::InitTask() -{ + ChangeState(fair::mq::Transition::Auto); } bool FairMQDevice::SortSocketsByAddress(const FairMQChannel &lhs, const FairMQChannel &rhs) @@ -334,21 +423,13 @@ void FairMQDevice::SortChannel(const string& name, const bool reindex) void FairMQDevice::RunWrapper() { - CallStateChangeCallbacks(RUNNING); - LOG(info) << "DEVICE: Running..."; // start the rate logger thread - fRateLogging = true; future rateLogger = async(launch::async, &FairMQDevice::LogSocketRates, this); // notify transports to resume transfers - { - lock_guard guard(fInterruptedMtx); - fInterrupted = false; - } - for (auto& t : fTransports) - { + for (auto& t : fTransports) { t.second->Resume(); } @@ -356,50 +437,43 @@ void FairMQDevice::RunWrapper() PreRun(); // process either data callbacks or ConditionalRun/Run - if (fDataCallbacks) - { + if (fDataCallbacks) { // if only one input channel, do lightweight handling without additional polling. - if (fInputChannelKeys.size() == 1 && fChannels.at(fInputChannelKeys.at(0)).size() == 1) - { + if (fInputChannelKeys.size() == 1 && fChannels.at(fInputChannelKeys.at(0)).size() == 1) { HandleSingleChannelInput(); - } - else // otherwise do full handling with polling - { + } else {// otherwise do full handling with polling HandleMultipleChannelInput(); } - } - else - { + } else { fair::mq::tools::RateLimiter rateLimiter(fRate); - while (CheckCurrentState(RUNNING) && ConditionalRun()) - { - if (fRate > 0.001) - { + while (!NewStatePending() && ConditionalRun()) { + if (fRate > 0.001) { rateLimiter.maybe_sleep(); } } Run(); } + + // if Run() exited and the state is still RUNNING, transition to READY. + if (!NewStatePending()) { + UnblockTransports(); + ChangeState(fair::mq::Transition::Stop); + } + + PostRun(); + } catch (const out_of_range& oor) { LOG(error) << "out of range: " << oor.what(); LOG(error) << "incorrect/incomplete channel configuration?"; - fRateLogging = false; + ChangeState(fair::mq::Transition::ErrorFound); throw; } catch (...) { - fRateLogging = false; + ChangeState(fair::mq::Transition::ErrorFound); throw; } - // if Run() exited and the state is still RUNNING, transition to READY. - if (CheckCurrentState(RUNNING)) - { - ChangeState(internal_READY); - } - - PostRun(); - rateLogger.get(); } @@ -409,14 +483,14 @@ void FairMQDevice::HandleSingleChannelInput() if (fMsgInputs.size() > 0) { - while (CheckCurrentState(RUNNING) && proceed) + while (!NewStatePending() && proceed) { proceed = HandleMsgInput(fInputChannelKeys.at(0), fMsgInputs.begin()->second, 0); } } else if (fMultipartInputs.size() > 0) { - while (CheckCurrentState(RUNNING) && proceed) + while (!NewStatePending() && proceed) { proceed = HandleMultipartInput(fInputChannelKeys.at(0), fMultipartInputs.begin()->second, 0); } @@ -468,7 +542,7 @@ void FairMQDevice::HandleMultipleChannelInput() FairMQPollerPtr poller(fChannels.at(fInputChannelKeys.at(0)).at(0).fTransportFactory->CreatePoller(fChannels, fInputChannelKeys)); - while (CheckCurrentState(RUNNING) && proceed) + while (!NewStatePending() && proceed) { poller->Poll(200); @@ -526,7 +600,7 @@ void FairMQDevice::PollForTransport(const FairMQTransportFactory* factory, const { FairMQPollerPtr poller(factory->CreatePoller(fChannels, channelKeys)); - while (CheckCurrentState(RUNNING) && fMultitransportProceed) + while (!NewStatePending() && fMultitransportProceed) { poller->Poll(500); @@ -600,58 +674,21 @@ bool FairMQDevice::HandleMultipartInput(const string& chName, const InputMultipa } } -void FairMQDevice::Run() +shared_ptr FairMQDevice::AddTransport(fair::mq::Transport transport) { -} - -void FairMQDevice::PreRun() -{ -} - -bool FairMQDevice::ConditionalRun() -{ - return false; -} - -void FairMQDevice::PostRun() -{ -} - -void FairMQDevice::PauseWrapper() -{ - CallStateChangeCallbacks(PAUSED); - - Pause(); -} - -void FairMQDevice::Pause() -{ - while (CheckCurrentState(PAUSED)) - { - this_thread::sleep_for(chrono::milliseconds(500)); - LOG(debug) << "paused..."; + if (transport == fair::mq::Transport::DEFAULT) { + transport = fDefaultTransportType; } - LOG(debug) << "Unpausing"; -} -shared_ptr FairMQDevice::AddTransport(const fair::mq::Transport transport) -{ auto i = fTransports.find(transport); - if (i == fTransports.end()) - { + if (i == fTransports.end()) { + LOG(debug) << "Adding '" << fair::mq::TransportNames.at(transport) << "' transport"; auto tr = FairMQTransportFactory::CreateTransportFactory(fair::mq::TransportNames.at(transport), fId, fConfig); - - LOG(debug) << "Adding '" << fair::mq::TransportNames.at(transport) << "' transport to the device."; - - pair> trPair(transport, tr); - fTransports.insert(trPair); - + fTransports.insert({transport, tr}); return tr; - } - else - { - LOG(debug) << "Reusing existing '" << fair::mq::TransportNames.at(transport) << "' transport."; + } else { + LOG(debug) << "Reusing existing '" << fair::mq::TransportNames.at(transport) << "' transport"; return i->second; } } @@ -724,7 +761,7 @@ void FairMQDevice::LogSocketRates() LOG(debug) << ": in: <#msgs> () out: <#msgs> ()"; - while (fRateLogging) + while (!NewStatePending()) { t1 = chrono::high_resolution_clock::now(); @@ -769,48 +806,30 @@ void FairMQDevice::LogSocketRates() } } -void FairMQDevice::Unblock() +void FairMQDevice::UnblockTransports() { - for (auto& t : fTransports) - { + for (auto& t : fTransports) { t.second->Interrupt(); } - { - lock_guard guard(fInterruptedMtx); - fInterrupted = true; - fRateLogging = false; - } - fInterruptedCV.notify_all(); } void FairMQDevice::ResetTaskWrapper() { - CallStateChangeCallbacks(RESETTING_TASK); - ResetTask(); - ChangeState(internal_DEVICE_READY); -} - -void FairMQDevice::ResetTask() -{ + ChangeState(fair::mq::Transition::Auto); } void FairMQDevice::ResetWrapper() { - CallStateChangeCallbacks(RESETTING_DEVICE); - - for (auto& t : fTransports) - { + for (auto& t : fTransports) { t.second->Reset(); } // iterate over the channels map - for (auto& mi : fChannels) - { + for (auto& mi : fChannels) { // iterate over the channels vector - for (auto& vi : mi.second) - { + for (auto& vi : mi.second) { // vi.fReset = true; vi.fSocket.reset(); // destroy FairMQSocket } @@ -818,18 +837,12 @@ void FairMQDevice::ResetWrapper() Reset(); - ChangeState(internal_IDLE); -} - -void FairMQDevice::Reset() -{ -} - -void FairMQDevice::Exit() -{ + ChangeState(fair::mq::Transition::Auto); } FairMQDevice::~FairMQDevice() { - LOG(debug) << "Destructing device " << fId; + UnsubscribeFromNewTransition("device"); + fStateMachine.StopHandlingStates(); + LOG(debug) << "Shutting down device " << fId; } diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index 2e87b7bc..b57d8950 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -9,7 +9,7 @@ #ifndef FAIRMQDEVICE_H_ #define FAIRMQDEVICE_H_ -#include +#include #include #include @@ -32,6 +32,7 @@ #include // static_assert #include // is_trivially_copyable #include +#include #include #include @@ -43,11 +44,43 @@ using FairMQChannelMap = std::unordered_map; using InputMultipartCallback = std::function; -class FairMQDevice : public FairMQStateMachine +class FairMQDevice { friend class FairMQChannel; public: + // backwards-compatibility enum for old state machine interface, todo: delete this + enum Event + { + INIT_DEVICE, + internal_DEVICE_READY, + INIT_TASK, + internal_READY, + RUN, + STOP, + RESET_TASK, + RESET_DEVICE, + internal_IDLE, + END, + ERROR_FOUND + }; + + // backwards-compatibility enum for old state machine interface, todo: delete this + enum State + { + OK, + Error, + IDLE, + INITIALIZING_DEVICE, + DEVICE_READY, + INITIALIZING_TASK, + READY, + RUNNING, + RESETTING_TASK, + RESETTING_DEVICE, + EXITING + }; + /// Default constructor FairMQDevice(); /// Constructor with external FairMQProgOptions @@ -330,7 +363,6 @@ class FairMQDevice : public FairMQStateMachine } catch (const std::out_of_range& oor) { LOG(error) << "out of range: " << oor.what(); LOG(error) << "requested channel has not been configured? check channel names/configuration."; - fRateLogging = false; throw; } @@ -339,8 +371,7 @@ class FairMQDevice : public FairMQStateMachine bool RegisterChannelEndpoint(const std::string& channelName, uint16_t minNumSubChannels = 1, uint16_t maxNumSubChannels = 1) { bool ok = fChannelRegistry.insert(std::make_pair(channelName, std::make_pair(minNumSubChannels, maxNumSubChannels))).second; - if (!ok) - { + if (!ok) { LOG(warn) << "Registering channel: name already registered: \"" << channelName << "\""; } return ok; @@ -348,14 +379,10 @@ class FairMQDevice : public FairMQStateMachine void PrintRegisteredChannels() { - if (fChannelRegistry.size() < 1) - { + if (fChannelRegistry.size() < 1) { std::cout << "no channels registered." << std::endl; - } - else - { - for (const auto& c : fChannelRegistry) - { + } else { + for (const auto& c : fChannelRegistry) { std::cout << c.first << ":" << c.second.first << ":" << c.second.second << std::endl; } } @@ -389,8 +416,7 @@ class FairMQDevice : public FairMQStateMachine void RunStateMachine() { - CallStateChangeCallbacks(FairMQStateMachine::IDLE); - ProcessWork(); + fStateMachine.ProcessWork(); }; /// Wait for the supplied amount of time or for interruption. @@ -399,8 +425,7 @@ class FairMQDevice : public FairMQStateMachine template bool WaitFor(std::chrono::duration const& duration) { - std::unique_lock lock(fInterruptedMtx); - return !fInterruptedCV.wait_for(lock, duration, [&] { return fInterrupted.load(); }); // return true if no interruption happened + return fStateMachine.WaitForPendingStateFor(std::chrono::duration_cast(duration).count()); } protected: @@ -421,53 +446,90 @@ class FairMQDevice : public FairMQStateMachine std::string fId; ///< Device ID /// Additional user initialization (can be overloaded in child classes). Prefer to use InitTask(). - virtual void Init(); + virtual void Init() {} + + virtual void Bind() {} + + virtual void Connect() {} /// Task initialization (can be overloaded in child classes) - virtual void InitTask(); + virtual void InitTask() {} /// Runs the device (to be overloaded in child classes) - virtual void Run(); + virtual void Run() {} /// Called in the RUNNING state once before executing the Run()/ConditionalRun() method - virtual void PreRun(); + virtual void PreRun() {} /// Called during RUNNING state repeatedly until it returns false or device state changes - virtual bool ConditionalRun(); + virtual bool ConditionalRun() { return false; } /// Called in the RUNNING state once after executing the Run()/ConditionalRun() method - virtual void PostRun(); + virtual void PostRun() {} - /// Handles the PAUSE state - virtual void Pause(); + virtual void Pause() __attribute__((deprecated("PAUSE state is removed. This method is never called. To pause Run, go to READY with STOP transition and back to RUNNING with RUN to resume."))) {} /// Resets the user task (to be overloaded in child classes) - virtual void ResetTask(); + virtual void ResetTask() {} /// Resets the device (can be overloaded in child classes) - virtual void Reset(); + virtual void Reset() {} + + public: + bool ChangeState(const fair::mq::Transition transition) { return fStateMachine.ChangeState(transition); } + bool ChangeState(const std::string& transition) { return fStateMachine.ChangeState(fair::mq::StateMachine::GetTransition(transition)); } + + void WaitForEndOfState(const fair::mq::Transition transition) __attribute__((deprecated("Use WaitForState(fair::mq::State expectedState)."))); + void WaitForEndOfState(const std::string& transition) __attribute__((deprecated("Use WaitForState(fair::mq::State expectedState)."))) { WaitForState(transition); } + + fair::mq::State WaitForNextState(); + void WaitForState(fair::mq::State state); + void WaitForState(const std::string& state) { fair::mq::StateMachine::GetState(state); } + + void SubscribeToStateChange(const std::string& key, std::function callback) { fStateMachine.SubscribeToStateChange(key, callback); } + void UnsubscribeFromStateChange(const std::string& key) { fStateMachine.UnsubscribeFromStateChange(key); } + + void SubscribeToNewTransition(const std::string& key, std::function callback) { fStateMachine.SubscribeToNewTransition(key, callback); } + void UnsubscribeFromNewTransition(const std::string& key) { fStateMachine.UnsubscribeFromNewTransition(key); } + + bool CheckCurrentState(const int /* state */) const __attribute__((deprecated("Use NewStatePending()."))) { return !fStateMachine.NewStatePending(); } + bool CheckCurrentState(const std::string& /* state */) const __attribute__((deprecated("Use NewStatePending()."))) { return !fStateMachine.NewStatePending(); } + + /// Returns true is a new state has been requested, signaling the current handler to stop. + bool NewStatePending() const { return fStateMachine.NewStatePending(); } + + fair::mq::State GetCurrentState() const { return fStateMachine.GetCurrentState(); } + std::string GetCurrentStateName() const { return fStateMachine.GetCurrentStateName(); } + + static std::string GetStateName(const fair::mq::State state) { return fair::mq::StateMachine::GetStateName(state); } + static std::string GetTransitionName(const fair::mq::Transition transition) { return fair::mq::StateMachine::GetTransitionName(transition); } + + struct DeviceStateError : std::runtime_error { using std::runtime_error::runtime_error; }; private: fair::mq::Transport fDefaultTransportType; ///< Default transport for the device + fair::mq::StateMachine fStateMachine; - /// Handles the initialization and the Init() method + /// Handles the initialization void InitWrapper(); + /// Initializes binding channels + void BindWrapper(); + /// Initializes connecting channels + void ConnectWrapper(); /// Handles the InitTask() method void InitTaskWrapper(); /// Handles the Run() method void RunWrapper(); - /// Handles the Pause() method - void PauseWrapper(); /// Handles the ResetTask() method void ResetTaskWrapper(); /// Handles the Reset() method void ResetWrapper(); - /// Unblocks blocking channel send/receive calls - void Unblock(); + /// Notifies transports to cease any blocking activity + void UnblockTransports(); /// Shuts down the transports and the device - void Exit(); + void Exit() {} /// Attach (bind/connect) channels in the list void AttachChannels(std::vector& chans); @@ -483,6 +545,9 @@ class FairMQDevice : public FairMQStateMachine void CreateOwnConfig(); + std::vector fUninitializedBindingChannels; + std::vector fUninitializedConnectingChannels; + bool fDataCallbacks; std::unordered_map fMsgInputs; std::unordered_map fMultipartInputs; @@ -496,10 +561,9 @@ class FairMQDevice : public FairMQStateMachine float fRate; ///< Rate limiting for ConditionalRun std::vector fRawCmdLineArgs; - std::atomic fInterrupted; - std::condition_variable fInterruptedCV; - std::mutex fInterruptedMtx; - mutable std::atomic fRateLogging; + std::queue fStates; + std::mutex fStatesMtx; + std::condition_variable fStatesCV; }; #endif /* FAIRMQDEVICE_H_ */ diff --git a/fairmq/FairMQStateMachine.cxx b/fairmq/FairMQStateMachine.cxx deleted file mode 100644 index 5b745794..00000000 --- a/fairmq/FairMQStateMachine.cxx +++ /dev/null @@ -1,672 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ -/** - * FairMQStateMachine.cxx - * - * @since 2012-10-25 - * @author D. Klein, A. Rybalchenko - */ - -#include "FairMQStateMachine.h" -#include - -// Increase maximum number of boost::msm states (default is 10) -// This #define has to be before any msm header includes -#define FUSION_MAX_VECTOR_SIZE 20 - -#include -#include -#include -#include -#include -#include -#include -#include // signal/slot for onStateChange callbacks - -#include -#include -#include -#include -#include - -using namespace std; -using namespace boost::msm::front; - -namespace std -{ - -template<> -struct hash : fair::mq::tools::HashEnum {}; - -} /* namespace std */ - -namespace fair -{ -namespace mq -{ -namespace fsm -{ - -// list of FSM states -struct OK_FSM_STATE : public state<> { static string Name() { return "OK"; } static FairMQStateMachine::State Type() { return FairMQStateMachine::State::OK; } }; -struct ERROR_FSM_STATE : public terminate_state<> { static string Name() { return "ERROR"; } static FairMQStateMachine::State Type() { return FairMQStateMachine::State::Error; } }; - -struct IDLE_FSM_STATE : public state<> { static string Name() { return "IDLE"; } static FairMQStateMachine::State Type() { return FairMQStateMachine::State::IDLE; } }; -struct INITIALIZING_DEVICE_FSM_STATE : public state<> { static string Name() { return "INITIALIZING_DEVICE"; } static FairMQStateMachine::State Type() { return FairMQStateMachine::State::INITIALIZING_DEVICE; } }; -struct DEVICE_READY_FSM_STATE : public state<> { static string Name() { return "DEVICE_READY"; } static FairMQStateMachine::State Type() { return FairMQStateMachine::State::DEVICE_READY; } }; -struct INITIALIZING_TASK_FSM_STATE : public state<> { static string Name() { return "INITIALIZING_TASK"; } static FairMQStateMachine::State Type() { return FairMQStateMachine::State::INITIALIZING_TASK; } }; -struct READY_FSM_STATE : public state<> { static string Name() { return "READY"; } static FairMQStateMachine::State Type() { return FairMQStateMachine::State::READY; } }; -struct RUNNING_FSM_STATE : public state<> { static string Name() { return "RUNNING"; } static FairMQStateMachine::State Type() { return FairMQStateMachine::State::RUNNING; } }; -struct PAUSED_FSM_STATE : public state<> { static string Name() { return "PAUSED"; } static FairMQStateMachine::State Type() { return FairMQStateMachine::State::PAUSED; } }; -struct RESETTING_TASK_FSM_STATE : public state<> { static string Name() { return "RESETTING_TASK"; } static FairMQStateMachine::State Type() { return FairMQStateMachine::State::RESETTING_TASK; } }; -struct RESETTING_DEVICE_FSM_STATE : public state<> { static string Name() { return "RESETTING_DEVICE"; } static FairMQStateMachine::State Type() { return FairMQStateMachine::State::RESETTING_DEVICE; } }; -struct EXITING_FSM_STATE : public state<> { static string Name() { return "EXITING"; } static FairMQStateMachine::State Type() { return FairMQStateMachine::State::EXITING; } }; - -// list of FSM events -struct INIT_DEVICE_FSM_EVENT { static string Name() { return "INIT_DEVICE"; } static FairMQStateMachine::Event Type() { return FairMQStateMachine::Event::INIT_DEVICE; } }; -struct internal_DEVICE_READY_FSM_EVENT { static string Name() { return "internal_DEVICE_READY"; } static FairMQStateMachine::Event Type() { return FairMQStateMachine::Event::internal_DEVICE_READY; } }; -struct INIT_TASK_FSM_EVENT { static string Name() { return "INIT_TASK"; } static FairMQStateMachine::Event Type() { return FairMQStateMachine::Event::INIT_TASK; } }; -struct internal_READY_FSM_EVENT { static string Name() { return "internal_READY"; } static FairMQStateMachine::Event Type() { return FairMQStateMachine::Event::internal_READY; } }; -struct RUN_FSM_EVENT { static string Name() { return "RUN"; } static FairMQStateMachine::Event Type() { return FairMQStateMachine::Event::RUN; } }; -struct PAUSE_FSM_EVENT { static string Name() { return "PAUSE"; } static FairMQStateMachine::Event Type() { return FairMQStateMachine::Event::PAUSE; } }; -struct STOP_FSM_EVENT { static string Name() { return "STOP"; } static FairMQStateMachine::Event Type() { return FairMQStateMachine::Event::STOP; } }; -struct RESET_TASK_FSM_EVENT { static string Name() { return "RESET_TASK"; } static FairMQStateMachine::Event Type() { return FairMQStateMachine::Event::RESET_TASK; } }; -struct RESET_DEVICE_FSM_EVENT { static string Name() { return "RESET_DEVICE"; } static FairMQStateMachine::Event Type() { return FairMQStateMachine::Event::RESET_DEVICE; } }; -struct internal_IDLE_FSM_EVENT { static string Name() { return "internal_IDLE"; } static FairMQStateMachine::Event Type() { return FairMQStateMachine::Event::internal_IDLE; } }; -struct END_FSM_EVENT { static string Name() { return "END"; } static FairMQStateMachine::Event Type() { return FairMQStateMachine::Event::END; } }; -struct ERROR_FOUND_FSM_EVENT { static string Name() { return "ERROR_FOUND"; } static FairMQStateMachine::Event Type() { return FairMQStateMachine::Event::ERROR_FOUND; } }; - -static array stateNames = -{ - { - "OK", - "Error", - "IDLE", - "INITIALIZING_DEVICE", - "DEVICE_READY", - "INITIALIZING_TASK", - "READY", - "RUNNING", - "PAUSED", - "RESETTING_TASK", - "RESETTING_DEVICE", - "EXITING" - } -}; - -static array eventNames = -{ - { - "INIT_DEVICE", - "internal_DEVICE_READY", - "INIT_TASK", - "internal_READY", - "RUN", - "PAUSE", - "STOP", - "RESET_TASK", - "RESET_DEVICE", - "internal_IDLE", - "END", - "ERROR_FOUND" - } -}; - -static map stateNumbers = -{ - { "OK", FairMQStateMachine::State::OK }, - { "Error", FairMQStateMachine::State::Error }, - { "IDLE", FairMQStateMachine::State::IDLE }, - { "INITIALIZING_DEVICE", FairMQStateMachine::State::INITIALIZING_DEVICE }, - { "DEVICE_READY", FairMQStateMachine::State::DEVICE_READY }, - { "INITIALIZING_TASK", FairMQStateMachine::State::INITIALIZING_TASK }, - { "READY", FairMQStateMachine::State::READY }, - { "RUNNING", FairMQStateMachine::State::RUNNING }, - { "PAUSED", FairMQStateMachine::State::PAUSED }, - { "RESETTING_TASK", FairMQStateMachine::State::RESETTING_TASK }, - { "RESETTING_DEVICE", FairMQStateMachine::State::RESETTING_DEVICE }, - { "EXITING", FairMQStateMachine::State::EXITING } -}; - -static map eventNumbers = -{ - { "INIT_DEVICE", FairMQStateMachine::Event::INIT_DEVICE }, - { "internal_DEVICE_READY", FairMQStateMachine::Event::internal_DEVICE_READY }, - { "INIT_TASK", FairMQStateMachine::Event::INIT_TASK }, - { "internal_READY", FairMQStateMachine::Event::internal_READY }, - { "RUN", FairMQStateMachine::Event::RUN }, - { "PAUSE", FairMQStateMachine::Event::PAUSE }, - { "STOP", FairMQStateMachine::Event::STOP }, - { "RESET_TASK", FairMQStateMachine::Event::RESET_TASK }, - { "RESET_DEVICE", FairMQStateMachine::Event::RESET_DEVICE }, - { "internal_IDLE", FairMQStateMachine::Event::internal_IDLE }, - { "END", FairMQStateMachine::Event::END }, - { "ERROR_FOUND", FairMQStateMachine::Event::ERROR_FOUND } -}; - -// defining the boost MSM state machine -struct Machine_ : public state_machine_def -{ - public: - Machine_() - : fUnblockHandler() - , fStateHandlers() - , fWork() - , fWorkAvailableCondition() - , fWorkDoneCondition() - , fWorkMutex() - , fWorkerTerminated(false) - , fWorkActive(false) - , fWorkAvailable(false) - , fStateChangeSignal() - , fStateChangeSignalsMap() - , fState() - {} - - virtual ~Machine_() - {} - - // initial states - using initial_state = boost::mpl::vector; - - template - void on_entry(Event const&, FSM& /*fsm*/) - { - LOG(state) << "Starting FairMQ state machine"; - fState = FairMQStateMachine::IDLE; - LOG(state) << "Entering IDLE state"; - // fsm.CallStateChangeCallbacks(FairMQStateMachine::IDLE); - // we call this for now in FairMQDevice::RunStateMachine() - } - - template - void on_exit(Event const&, FSM& /*fsm*/) - { - LOG(state) << "Exiting FairMQ state machine"; - } - - // actions - struct AutomaticFct - { - template - void operator()(EVT const&, FSM& fsm, SourceState& /* ss */, TargetState& ts) - { - fsm.fState = ts.Type(); - LOG(state) << "Entering " << ts.Name() << " state"; - } - }; - - struct DefaultFct - { - template - void operator()(EVT const& e, FSM& fsm, SourceState& /* ss */, TargetState& ts) - { - fsm.fState = ts.Type(); - - unique_lock lock(fsm.fWorkMutex); - while (fsm.fWorkActive) - { - fsm.fWorkDoneCondition.wait(lock); - } - fsm.fWorkAvailable = true; - LOG(state) << "Entering " << ts.Name() << " state"; - fsm.fWork = fsm.fStateHandlers.at(e.Type()); - fsm.fWorkAvailableCondition.notify_one(); - } - }; - - struct PauseFct - { - template - void operator()(EVT const&, FSM& fsm, SourceState& /* ss */, TargetState& ts) - { - fsm.fState = ts.Type(); - - fsm.fUnblockHandler(); - unique_lock lock(fsm.fWorkMutex); - while (fsm.fWorkActive) - { - fsm.fWorkDoneCondition.wait(lock); - } - fsm.fWorkAvailable = true; - LOG(state) << "Entering " << ts.Name() << " state"; - fsm.fWork = fsm.fPauseWrapperHandler; - fsm.fWorkAvailableCondition.notify_one(); - } - }; - - struct StopFct - { - template - void operator()(EVT const&, FSM& fsm, SourceState& /* ss */, TargetState& ts) - { - fsm.fState = ts.Type(); - - fsm.fUnblockHandler(); - unique_lock lock(fsm.fWorkMutex); - while (fsm.fWorkActive) - { - fsm.fWorkDoneCondition.wait(lock); - } - LOG(state) << "Entering " << ts.Name() << " state"; - } - }; - - struct InternalStopFct - { - template - void operator()(EVT const&, FSM& fsm, SourceState& /* ss */, TargetState& ts) - { - fsm.fState = ts.Type(); - fsm.fUnblockHandler(); - LOG(state) << "RUNNING state finished without an external event, entering " << ts.Name() << " state"; - } - }; - - struct ExitingFct - { - template - void operator()(EVT const& e, FSM& fsm, SourceState& /* ss */, TargetState& ts) - { - LOG(state) << "Entering " << ts.Name() << " state"; - fsm.fState = ts.Type(); - fsm.CallStateChangeCallbacks(FairMQStateMachine::EXITING); - - // Stop ProcessWork() - { - lock_guard lock(fsm.fWorkMutex); - fsm.fWorkerTerminated = true; - fsm.fWorkAvailableCondition.notify_one(); - } - - fsm.fStateHandlers.at(e.Type())(); - } - }; - - struct ErrorFoundFct - { - template - void operator()(EVT const&, FSM& fsm, SourceState& /* ss */, TargetState& ts) - { - fsm.fState = ts.Type(); - LOG(state) << "Entering " << ts.Name() << " state"; - fsm.CallStateChangeCallbacks(FairMQStateMachine::Error); - } - }; - - // Transition table for Machine_ - struct transition_table : boost::mpl::vector< - // Start Event Next Action Guard - Row, - Row, - Row, - Row, - Row, - Row, - Row, - Row, - Row, - Row, - Row, - Row, - Row, - Row, - Row> - {}; - - // replaces the default no-transition response. - template - void no_transition(Event const& e, FSM&, int state) - { - using recursive_stt = typename boost::msm::back::recursive_get_transition_table::type; - using all_states = typename boost::msm::back::generate_state_set::type; - - string stateName; - - boost::mpl::for_each>(boost::msm::back::get_state_name(stateName, state)); - - stateName = boost::core::demangle(stateName.c_str()); - size_t pos = stateName.rfind(":"); - if (pos != string::npos) - { - stateName = stateName.substr(pos + 1); - stateName = stateName.substr(0, stateName.size() - 10); - } - - if (stateName != "OK") - { - LOG(state) << "No transition from state " << stateName << " on event " << e.Name(); - } - } - - void CallStateChangeCallbacks(const FairMQStateMachine::State state) const - { - if (!fStateChangeSignal.empty()) - { - fStateChangeSignal(state); - } - } - - function fUnblockHandler; - unordered_map> fStateHandlers; - - // function to execute user states in a worker thread - function fWork; - condition_variable fWorkAvailableCondition; - condition_variable fWorkDoneCondition; - mutex fWorkMutex; - bool fWorkerTerminated; - bool fWorkActive; - bool fWorkAvailable; - - boost::signals2::signal fStateChangeSignal; - unordered_map fStateChangeSignalsMap; - - atomic fState; - - void ProcessWork() - { - while (true) - { - { - unique_lock lock(fWorkMutex); - // Wait for work to be done. - while (!fWorkAvailable && !fWorkerTerminated) - { - fWorkAvailableCondition.wait_for(lock, chrono::milliseconds(100)); - } - - if (fWorkerTerminated) - { - break; - } - - fWorkActive = true; - } - - fWork(); - - { - lock_guard lock(fWorkMutex); - fWorkActive = false; - fWorkAvailable = false; - fWorkDoneCondition.notify_one(); - } - CallStateChangeCallbacks(fState); - } - } -}; // Machine_ - -using FairMQFSM = boost::msm::back::state_machine; - -} // namespace fsm -} // namespace mq -} // namespace fair - -using namespace fair::mq::fsm; - -FairMQStateMachine::FairMQStateMachine() - : fChangeStateMutex() - , fFsm(new FairMQFSM) -{ - static_pointer_cast(fFsm)->fStateHandlers.emplace(INIT_DEVICE, bind(&FairMQStateMachine::InitWrapper, this)); - static_pointer_cast(fFsm)->fStateHandlers.emplace(INIT_TASK, bind(&FairMQStateMachine::InitTaskWrapper, this)); - static_pointer_cast(fFsm)->fStateHandlers.emplace(RUN, bind(&FairMQStateMachine::RunWrapper, this)); - static_pointer_cast(fFsm)->fStateHandlers.emplace(PAUSE, bind(&FairMQStateMachine::PauseWrapper, this)); - static_pointer_cast(fFsm)->fStateHandlers.emplace(RESET_TASK, bind(&FairMQStateMachine::ResetTaskWrapper, this)); - static_pointer_cast(fFsm)->fStateHandlers.emplace(RESET_DEVICE, bind(&FairMQStateMachine::ResetWrapper, this)); - static_pointer_cast(fFsm)->fStateHandlers.emplace(END, bind(&FairMQStateMachine::Exit, this)); - static_pointer_cast(fFsm)->fUnblockHandler = bind(&FairMQStateMachine::Unblock, this); - - static_pointer_cast(fFsm)->start(); -} - -FairMQStateMachine::~FairMQStateMachine() -{ - static_pointer_cast(fFsm)->stop(); -} - -int FairMQStateMachine::GetInterfaceVersion() const -{ - return FAIRMQ_INTERFACE_VERSION; -} - -bool FairMQStateMachine::ChangeState(int event) -{ - try - { - switch (event) - { - case INIT_DEVICE: - { - lock_guard lock(fChangeStateMutex); - static_pointer_cast(fFsm)->process_event(INIT_DEVICE_FSM_EVENT()); - return true; - } - case internal_DEVICE_READY: - { - lock_guard lock(fChangeStateMutex); - static_pointer_cast(fFsm)->process_event(internal_DEVICE_READY_FSM_EVENT()); - return true; - } - case INIT_TASK: - { - lock_guard lock(fChangeStateMutex); - static_pointer_cast(fFsm)->process_event(INIT_TASK_FSM_EVENT()); - return true; - } - case internal_READY: - { - lock_guard lock(fChangeStateMutex); - static_pointer_cast(fFsm)->process_event(internal_READY_FSM_EVENT()); - return true; - } - case RUN: - { - lock_guard lock(fChangeStateMutex); - static_pointer_cast(fFsm)->process_event(RUN_FSM_EVENT()); - return true; - } - case PAUSE: - { - lock_guard lock(fChangeStateMutex); - static_pointer_cast(fFsm)->process_event(PAUSE_FSM_EVENT()); - return true; - } - case STOP: - { - lock_guard lock(fChangeStateMutex); - static_pointer_cast(fFsm)->process_event(STOP_FSM_EVENT()); - return true; - } - case RESET_DEVICE: - { - lock_guard lock(fChangeStateMutex); - static_pointer_cast(fFsm)->process_event(RESET_DEVICE_FSM_EVENT()); - return true; - } - case RESET_TASK: - { - lock_guard lock(fChangeStateMutex); - static_pointer_cast(fFsm)->process_event(RESET_TASK_FSM_EVENT()); - return true; - } - case internal_IDLE: - { - lock_guard lock(fChangeStateMutex); - static_pointer_cast(fFsm)->process_event(internal_IDLE_FSM_EVENT()); - return true; - } - case END: - { - lock_guard lock(fChangeStateMutex); - static_pointer_cast(fFsm)->process_event(END_FSM_EVENT()); - return true; - } - case ERROR_FOUND: - { - lock_guard lock(fChangeStateMutex); - static_pointer_cast(fFsm)->process_event(ERROR_FOUND_FSM_EVENT()); - return true; - } - default: - { - LOG(error) << "Requested state transition with an unsupported event: " << event << endl - << "Supported are: INIT_DEVICE, INIT_TASK, RUN, PAUSE, STOP, RESET_TASK, RESET_DEVICE, END, ERROR_FOUND"; - return false; - } - } - } - catch (exception& e) - { - LOG(error) << "Exception in FairMQStateMachine::ChangeState(): " << e.what(); - exit(EXIT_FAILURE); - } - return false; -} - -bool FairMQStateMachine::ChangeState(const string& event) -{ - return ChangeState(GetEventNumber(event)); -} - -void FairMQStateMachine::WaitForEndOfState(int event) -{ - try - { - switch (event) - { - case INIT_DEVICE: - case INIT_TASK: - case RUN: - case RESET_TASK: - case RESET_DEVICE: - { - unique_lock lock(static_pointer_cast(fFsm)->fWorkMutex); - while (static_pointer_cast(fFsm)->fWorkActive || static_pointer_cast(fFsm)->fWorkAvailable) - { - static_pointer_cast(fFsm)->fWorkDoneCondition.wait_for(lock, chrono::seconds(1)); - } - - break; - } - default: - LOG(error) << "Requested state is either synchronous or does not exist."; - break; - } - } - catch (exception& e) - { - LOG(error) << "Exception in FairMQStateMachine::WaitForEndOfState(): " << e.what(); - } -} - -void FairMQStateMachine::WaitForEndOfState(const string& event) -{ - return WaitForEndOfState(GetEventNumber(event)); -} - -bool FairMQStateMachine::WaitForEndOfStateForMs(int event, int durationInMs) -{ - try - { - switch (event) - { - case INIT_DEVICE: - case INIT_TASK: - case RUN: - case RESET_TASK: - case RESET_DEVICE: - { - unique_lock lock(static_pointer_cast(fFsm)->fWorkMutex); - while (static_pointer_cast(fFsm)->fWorkActive || static_pointer_cast(fFsm)->fWorkAvailable) - { - static_pointer_cast(fFsm)->fWorkDoneCondition.wait_for(lock, chrono::milliseconds(durationInMs)); - if (static_pointer_cast(fFsm)->fWorkActive) - { - return false; - } - } - return true; - } - default: - LOG(error) << "Requested state is either synchronous or does not exist."; - return false; - } - } - catch (exception& e) - { - LOG(error) << "Exception in FairMQStateMachine::WaitForEndOfStateForMs(): " << e.what(); - } - return false; -} - -bool FairMQStateMachine::WaitForEndOfStateForMs(const string& event, int durationInMs) -{ - return WaitForEndOfStateForMs(GetEventNumber(event), durationInMs); -} - -void FairMQStateMachine::SubscribeToStateChange(const string& key, function callback) -{ - static_pointer_cast(fFsm)->fStateChangeSignalsMap.insert({key, static_pointer_cast(fFsm)->fStateChangeSignal.connect(callback)}); -} -void FairMQStateMachine::UnsubscribeFromStateChange(const string& key) -{ - if (static_pointer_cast(fFsm)->fStateChangeSignalsMap.count(key)) - { - static_pointer_cast(fFsm)->fStateChangeSignalsMap.at(key).disconnect(); - static_pointer_cast(fFsm)->fStateChangeSignalsMap.erase(key); - } -} - -void FairMQStateMachine::CallStateChangeCallbacks(const State state) const -{ - static_pointer_cast(fFsm)->CallStateChangeCallbacks(state); -} - -string FairMQStateMachine::GetCurrentStateName() const -{ - return GetStateName(static_pointer_cast(fFsm)->fState); -} -string FairMQStateMachine::GetStateName(const State state) -{ - return stateNames.at(state); -} -int FairMQStateMachine::GetCurrentState() const -{ - return static_pointer_cast(fFsm)->fState; -} -bool FairMQStateMachine::CheckCurrentState(int state) const -{ - return state == static_pointer_cast(fFsm)->fState; -} -bool FairMQStateMachine::CheckCurrentState(const string& state) const -{ - return state == GetCurrentStateName(); -} - -void FairMQStateMachine::ProcessWork() -try -{ - static_pointer_cast(fFsm)->ProcessWork(); -} catch(...) { - { - lock_guard lock(static_pointer_cast(fFsm)->fWorkMutex); - static_pointer_cast(fFsm)->fWorkActive = false; - static_pointer_cast(fFsm)->fWorkAvailable = false; - static_pointer_cast(fFsm)->fWorkDoneCondition.notify_one(); - } - ChangeState(ERROR_FOUND); - throw; -} - - -int FairMQStateMachine::GetEventNumber(const string& event) -{ - return eventNumbers.at(event); -} diff --git a/fairmq/FairMQStateMachine.h b/fairmq/FairMQStateMachine.h deleted file mode 100644 index e406abd1..00000000 --- a/fairmq/FairMQStateMachine.h +++ /dev/null @@ -1,107 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ -/** - * FairMQStateMachine.h - * - * @since 2012-10-25 - * @author D. Klein, A. Rybalchenko - */ - -#ifndef FAIRMQSTATEMACHINE_H_ -#define FAIRMQSTATEMACHINE_H_ - -#define FAIRMQ_INTERFACE_VERSION 3 - -#include "FairMQLogger.h" - -#include -#include -#include -#include - -class FairMQStateMachine -{ - public: - enum Event - { - INIT_DEVICE, - internal_DEVICE_READY, - INIT_TASK, - internal_READY, - RUN, - PAUSE, - STOP, - RESET_TASK, - RESET_DEVICE, - internal_IDLE, - END, - ERROR_FOUND - }; - - enum State - { - OK, - Error, - IDLE, - INITIALIZING_DEVICE, - DEVICE_READY, - INITIALIZING_TASK, - READY, - RUNNING, - PAUSED, - RESETTING_TASK, - RESETTING_DEVICE, - EXITING - }; - - FairMQStateMachine(); - virtual ~FairMQStateMachine(); - - int GetInterfaceVersion() const; - - bool ChangeState(int event); - bool ChangeState(const std::string& event); - - void WaitForEndOfState(int event); - void WaitForEndOfState(const std::string& event); - - bool WaitForEndOfStateForMs(int event, int durationInMs); - bool WaitForEndOfStateForMs(const std::string& event, int durationInMs); - - void SubscribeToStateChange(const std::string& key, std::function callback); - void UnsubscribeFromStateChange(const std::string& key); - - void CallStateChangeCallbacks(const State state) const; - - std::string GetCurrentStateName() const; - static std::string GetStateName(const State); - int GetCurrentState() const; - bool CheckCurrentState(int state) const; - bool CheckCurrentState(const std::string& state) const; - - // actions to be overwritten by derived classes - virtual void InitWrapper() {} - virtual void InitTaskWrapper() {} - virtual void RunWrapper() {} - virtual void PauseWrapper() {} - virtual void ResetWrapper() {} - virtual void ResetTaskWrapper() {} - virtual void Exit() {} - virtual void Unblock() {} - - void ProcessWork(); - - private: - static int GetEventNumber(const std::string& event); - - std::mutex fChangeStateMutex; - - std::shared_ptr fFsm; -}; - -#endif /* FAIRMQSTATEMACHINE_H_ */ diff --git a/fairmq/Plugin.h b/fairmq/Plugin.h index db4dc98a..0984a38a 100644 --- a/fairmq/Plugin.h +++ b/fairmq/Plugin.h @@ -78,7 +78,7 @@ class Plugin auto TakeDeviceControl() -> void { fPluginServices->TakeDeviceControl(fkName); }; auto StealDeviceControl() -> void { fPluginServices->StealDeviceControl(fkName); }; auto ReleaseDeviceControl() -> void { fPluginServices->ReleaseDeviceControl(fkName); }; - auto ChangeDeviceState(const DeviceStateTransition next) -> void { fPluginServices->ChangeDeviceState(fkName, next); } + auto ChangeDeviceState(const DeviceStateTransition next) -> bool { return fPluginServices->ChangeDeviceState(fkName, next); } auto SubscribeToDeviceStateChange(std::function callback) -> void { fPluginServices->SubscribeToDeviceStateChange(fkName, callback); } auto UnsubscribeFromDeviceStateChange() -> void { fPluginServices->UnsubscribeFromDeviceStateChange(fkName); } diff --git a/fairmq/PluginServices.cxx b/fairmq/PluginServices.cxx index 69f32950..3bd3eb05 100644 --- a/fairmq/PluginServices.cxx +++ b/fairmq/PluginServices.cxx @@ -16,11 +16,14 @@ const std::unordered_map PluginService {"ERROR", DeviceState::Error}, {"IDLE", DeviceState::Idle}, {"INITIALIZING DEVICE", DeviceState::InitializingDevice}, + {"INITIALIZED", DeviceState::Initialized}, + {"BINDING", DeviceState::Binding}, + {"BOUND", DeviceState::Bound}, + {"CONNECTING", DeviceState::Connecting}, {"DEVICE READY", DeviceState::DeviceReady}, {"INITIALIZING TASK", DeviceState::InitializingTask}, {"READY", DeviceState::Ready}, {"RUNNING", DeviceState::Running}, - {"PAUSED", DeviceState::Paused}, {"RESETTING TASK", DeviceState::ResettingTask}, {"RESETTING DEVICE", DeviceState::ResettingDevice}, {"EXITING", DeviceState::Exiting} @@ -30,83 +33,96 @@ const std::unordered_map PluginServices::fkDeviceStateTransitionStrMap = { - {"INIT DEVICE", DeviceStateTransition::InitDevice}, - {"INIT TASK", DeviceStateTransition::InitTask}, - {"RUN", DeviceStateTransition::Run}, - {"PAUSE", DeviceStateTransition::Pause}, - {"RESUME", DeviceStateTransition::Resume}, - {"STOP", DeviceStateTransition::Stop}, - {"RESET TASK", DeviceStateTransition::ResetTask}, - {"RESET DEVICE", DeviceStateTransition::ResetDevice}, - {"END", DeviceStateTransition::End}, - {"ERROR FOUND", DeviceStateTransition::ErrorFound}, + {"AUTO", DeviceStateTransition::Auto}, + {"INIT DEVICE", DeviceStateTransition::InitDevice}, + {"COMPLETE INIT", DeviceStateTransition::CompleteInit}, + {"BIND", DeviceStateTransition::Bind}, + {"CONNECT", DeviceStateTransition::Connect}, + {"INIT TASK", DeviceStateTransition::InitTask}, + {"RUN", DeviceStateTransition::Run}, + {"STOP", DeviceStateTransition::Stop}, + {"RESET TASK", DeviceStateTransition::ResetTask}, + {"RESET DEVICE", DeviceStateTransition::ResetDevice}, + {"END", DeviceStateTransition::End}, + {"ERROR FOUND", DeviceStateTransition::ErrorFound}, }; const std::unordered_map> PluginServices::fkStrDeviceStateTransitionMap = { - {DeviceStateTransition::InitDevice, "INIT DEVICE"}, - {DeviceStateTransition::InitTask, "INIT TASK"}, - {DeviceStateTransition::Run, "RUN"}, - {DeviceStateTransition::Pause, "PAUSE"}, - {DeviceStateTransition::Resume, "RESUME"}, - {DeviceStateTransition::Stop, "STOP"}, - {DeviceStateTransition::ResetTask, "RESET TASK"}, - {DeviceStateTransition::ResetDevice, "RESET DEVICE"}, - {DeviceStateTransition::End, "END"}, - {DeviceStateTransition::ErrorFound, "ERROR FOUND"}, + {DeviceStateTransition::Auto, "Auto"}, + {DeviceStateTransition::InitDevice, "INIT DEVICE"}, + {DeviceStateTransition::CompleteInit, "COMPLETE INIT"}, + {DeviceStateTransition::Bind, "BIND"}, + {DeviceStateTransition::Connect, "CONNECT"}, + {DeviceStateTransition::InitTask, "INIT TASK"}, + {DeviceStateTransition::Run, "RUN"}, + {DeviceStateTransition::Stop, "STOP"}, + {DeviceStateTransition::ResetTask, "RESET TASK"}, + {DeviceStateTransition::ResetDevice, "RESET DEVICE"}, + {DeviceStateTransition::End, "END"}, + {DeviceStateTransition::ErrorFound, "ERROR FOUND"}, }; -const std::unordered_map> PluginServices::fkDeviceStateMap = { - {FairMQDevice::OK, DeviceState::Ok}, - {FairMQDevice::Error, DeviceState::Error}, - {FairMQDevice::IDLE, DeviceState::Idle}, - {FairMQDevice::INITIALIZING_DEVICE, DeviceState::InitializingDevice}, - {FairMQDevice::DEVICE_READY, DeviceState::DeviceReady}, - {FairMQDevice::INITIALIZING_TASK, DeviceState::InitializingTask}, - {FairMQDevice::READY, DeviceState::Ready}, - {FairMQDevice::RUNNING, DeviceState::Running}, - {FairMQDevice::PAUSED, DeviceState::Paused}, - {FairMQDevice::RESETTING_TASK, DeviceState::ResettingTask}, - {FairMQDevice::RESETTING_DEVICE, DeviceState::ResettingDevice}, - {FairMQDevice::EXITING, DeviceState::Exiting} +const std::unordered_map> PluginServices::fkDeviceStateMap = { + {fair::mq::State::Ok, DeviceState::Ok}, + {fair::mq::State::Error, DeviceState::Error}, + {fair::mq::State::Idle, DeviceState::Idle}, + {fair::mq::State::InitializingDevice, DeviceState::InitializingDevice}, + {fair::mq::State::Initialized, DeviceState::Initialized}, + {fair::mq::State::Binding, DeviceState::Binding}, + {fair::mq::State::Bound, DeviceState::Bound}, + {fair::mq::State::Connecting, DeviceState::Connecting}, + {fair::mq::State::DeviceReady, DeviceState::DeviceReady}, + {fair::mq::State::InitializingTask, DeviceState::InitializingTask}, + {fair::mq::State::Ready, DeviceState::Ready}, + {fair::mq::State::Running, DeviceState::Running}, + {fair::mq::State::ResettingTask, DeviceState::ResettingTask}, + {fair::mq::State::ResettingDevice, DeviceState::ResettingDevice}, + {fair::mq::State::Exiting, DeviceState::Exiting} }; -const std::unordered_map> PluginServices::fkDeviceStateTransitionMap = { - {DeviceStateTransition::InitDevice, FairMQDevice::INIT_DEVICE}, - {DeviceStateTransition::InitTask, FairMQDevice::INIT_TASK}, - {DeviceStateTransition::Run, FairMQDevice::RUN}, - {DeviceStateTransition::Pause, FairMQDevice::PAUSE}, - {DeviceStateTransition::Resume, FairMQDevice::RUN}, - {DeviceStateTransition::Stop, FairMQDevice::STOP}, - {DeviceStateTransition::ResetTask, FairMQDevice::RESET_TASK}, - {DeviceStateTransition::ResetDevice, FairMQDevice::RESET_DEVICE}, - {DeviceStateTransition::End, FairMQDevice::END}, - {DeviceStateTransition::ErrorFound, FairMQDevice::ERROR_FOUND} +const std::unordered_map> PluginServices::fkDeviceStateTransitionMap = { + {DeviceStateTransition::Auto, fair::mq::Transition::Auto}, + {DeviceStateTransition::InitDevice, fair::mq::Transition::InitDevice}, + {DeviceStateTransition::CompleteInit, fair::mq::Transition::CompleteInit}, + {DeviceStateTransition::Bind, fair::mq::Transition::Bind}, + {DeviceStateTransition::Connect, fair::mq::Transition::Connect}, + {DeviceStateTransition::InitTask, fair::mq::Transition::InitTask}, + {DeviceStateTransition::Run, fair::mq::Transition::Run}, + {DeviceStateTransition::Stop, fair::mq::Transition::Stop}, + {DeviceStateTransition::ResetTask, fair::mq::Transition::ResetTask}, + {DeviceStateTransition::ResetDevice, fair::mq::Transition::ResetDevice}, + {DeviceStateTransition::End, fair::mq::Transition::End}, + {DeviceStateTransition::ErrorFound, fair::mq::Transition::ErrorFound} }; -auto PluginServices::ChangeDeviceState(const std::string& controller, const DeviceStateTransition next) -> void +auto PluginServices::ChangeDeviceState(const std::string& controller, const DeviceStateTransition next) -> bool { lock_guard lock{fDeviceControllerMutex}; if (!fDeviceController) fDeviceController = controller; - if (fDeviceController == controller) - { - fDevice.ChangeState(fkDeviceStateTransitionMap.at(next)); - } - else - { + bool result = false; + + if (fDeviceController == controller) { + result = fDevice.ChangeState(fkDeviceStateTransitionMap.at(next)); + } else { throw DeviceControlError{tools::ToString( "Plugin '", controller, "' is not allowed to change device states. ", "Currently, plugin '", *fDeviceController, "' has taken control." )}; } + + return result; } auto PluginServices::TakeDeviceControl(const std::string& controller) -> void diff --git a/fairmq/PluginServices.h b/fairmq/PluginServices.h index d2f2d438..2fa8a9d1 100644 --- a/fairmq/PluginServices.h +++ b/fairmq/PluginServices.h @@ -63,11 +63,14 @@ class PluginServices Error, Idle, InitializingDevice, + Initialized, + Binding, + Bound, + Connecting, DeviceReady, InitializingTask, Ready, Running, - Paused, ResettingTask, ResettingDevice, Exiting @@ -75,11 +78,13 @@ class PluginServices enum class DeviceStateTransition : int // transition event between DeviceStates { + Auto, InitDevice, + CompleteInit, + Bind, + Connect, InitTask, Run, - Pause, - Resume, Stop, ResetTask, ResetDevice, @@ -115,7 +120,7 @@ class PluginServices friend auto operator<<(std::ostream& os, const DeviceStateTransition& transition) -> std::ostream& { return os << ToStr(transition); } /// @return current device state - auto GetCurrentDeviceState() const -> DeviceState { return fkDeviceStateMap.at(static_cast(fDevice.GetCurrentState())); } + auto GetCurrentDeviceState() const -> DeviceState { return fkDeviceStateMap.at(static_cast(fDevice.GetCurrentState())); } /// @brief Become device controller /// @param controller id @@ -151,7 +156,7 @@ class PluginServices /// The state transition may not happen immediately, but when the current state evaluates the /// pending transition event and terminates. In other words, the device states are scheduled cooperatively. /// If the device control role has not been taken yet, calling this function will take over control implicitely. - auto ChangeDeviceState(const std::string& controller, const DeviceStateTransition next) -> void; + auto ChangeDeviceState(const std::string& controller, const DeviceStateTransition next) -> bool; /// @brief Subscribe with a callback to device state changes /// @param subscriber id @@ -161,7 +166,7 @@ class PluginServices /// the state is running in. auto SubscribeToDeviceStateChange(const std::string& subscriber, std::function callback) -> void { - fDevice.SubscribeToStateChange(subscriber, [&,callback](FairMQDevice::State newState){ + fDevice.SubscribeToStateChange(subscriber, [&,callback](fair::mq::State newState){ callback(fkDeviceStateMap.at(newState)); }); } @@ -187,12 +192,13 @@ class PluginServices { auto currentState = GetCurrentDeviceState(); if ( (currentState == DeviceState::InitializingDevice) - || ((currentState == DeviceState::Idle) && (key == "channel-config"))) - { + || (currentState == DeviceState::Initialized) + || (currentState == DeviceState::Binding) + || (currentState == DeviceState::Bound) + || (currentState == DeviceState::Connecting) + || (currentState == DeviceState::Idle && key == "channel-config")) { fConfig.SetValue(key, val); - } - else - { + } else { throw InvalidStateError{ tools::ToString("PluginServices::SetProperty is not supported in device state ", currentState, ". ", "Supported state is ", DeviceState::InitializingDevice, ".")}; @@ -271,8 +277,8 @@ class PluginServices static const std::unordered_map> fkStrDeviceStateMap; static const std::unordered_map fkDeviceStateTransitionStrMap; static const std::unordered_map> fkStrDeviceStateTransitionMap; - static const std::unordered_map> fkDeviceStateMap; - static const std::unordered_map> fkDeviceStateTransitionMap; + static const std::unordered_map> fkDeviceStateMap; + static const std::unordered_map> fkDeviceStateTransitionMap; private: FairMQProgOptions& fConfig; diff --git a/fairmq/StateMachine.cxx b/fairmq/StateMachine.cxx index 3cbad339..36908c01 100644 --- a/fairmq/StateMachine.cxx +++ b/fairmq/StateMachine.cxx @@ -1,5 +1,5 @@ /******************************************************************************** - * Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence (LGPL) version 3, * @@ -7,189 +7,488 @@ ********************************************************************************/ #include "StateMachine.h" +#include + +// Increase maximum number of boost::msm states (default is 10) +// This #define has to be before any msm header includes +#define FUSION_MAX_VECTOR_SIZE 20 + +#include +#include +#include +#include +#include +#include +#include +#include // signal/slot for onStateChange callbacks + +#include +#include +#include +#include +#include +#include -using namespace fair::mq; using namespace std; +using namespace boost::msm; +using namespace boost::msm::front; +using namespace boost::msm::back; +namespace bmpl = boost::mpl; -const std::unordered_map StateMachine::fkStateStrMap = { - {"OK", State::Ok}, - {"ERROR", State::Error}, - {"IDLE", State::Idle}, - {"INITIALIZING DEVICE", State::InitializingDevice}, - {"DEVICE READY", State::DeviceReady}, - {"INITIALIZING TASK", State::InitializingTask}, - {"READY", State::Ready}, - {"RUNNING", State::Running}, - {"RESETTING TASK", State::ResettingTask}, - {"RESETTING DEVICE", State::ResettingDevice}, - {"EXITING", State::Exiting} -}; -const std::unordered_map> StateMachine::fkStrStateMap = { - {State::Ok, "OK"}, - {State::Error, "ERROR"}, - {State::Idle, "IDLE"}, - {State::InitializingDevice, "INITIALIZING DEVICE"}, - {State::DeviceReady, "DEVICE READY"}, - {State::InitializingTask, "INITIALIZING TASK"}, - {State::Ready, "READY"}, - {State::Running, "RUNNING"}, - {State::ResettingTask, "RESETTING TASK"}, - {State::ResettingDevice, "RESETTING DEVICE"}, - {State::Exiting, "EXITING"} -}; -const std::unordered_map StateMachine::fkStateTransitionStrMap = { - {"INIT DEVICE", StateTransition::InitDevice}, - {"INIT TASK", StateTransition::InitTask}, - {"RUN", StateTransition::Run}, - {"STOP", StateTransition::Stop}, - {"RESET TASK", StateTransition::ResetTask}, - {"RESET DEVICE", StateTransition::ResetDevice}, - {"END", StateTransition::End}, - {"ERROR FOUND", StateTransition::ErrorFound}, - {"AUTOMATIC", StateTransition::Automatic}, -}; -const std::unordered_map> StateMachine::fkStrStateTransitionMap = { - {StateTransition::InitDevice, "INIT DEVICE"}, - {StateTransition::InitTask, "INIT TASK"}, - {StateTransition::Run, "RUN"}, - {StateTransition::Stop, "STOP"}, - {StateTransition::ResetTask, "RESET TASK"}, - {StateTransition::ResetDevice, "RESET DEVICE"}, - {StateTransition::End, "END"}, - {StateTransition::ErrorFound, "ERROR FOUND"}, - {StateTransition::Automatic, "AUTOMATIC"}, -}; - -auto StateMachine::Run() -> void +namespace std { - LOG(state) << "Starting FairMQ state machine"; - LOG(debug) << "Entering initial " << fErrorState << " state (orthogonal error state machine)"; - LOG(state) << "Entering initial " << fState << " state"; +template<> +struct hash : fair::mq::tools::HashEnum {}; - std::unique_lock lock{fMutex}; - while (true) +template<> +struct hash : fair::mq::tools::HashEnum {}; + +} /* namespace std */ + +namespace fair +{ +namespace mq +{ +namespace fsm +{ + +// list of FSM states +struct OK_S : public state<> { static string Name() { return "OK"; } static State Type() { return State::Ok; } }; + +struct IDLE_S : public state<> { static string Name() { return "IDLE"; } static State Type() { return State::Idle; } }; +struct INITIALIZING_DEVICE_S : public state<> { static string Name() { return "INITIALIZING_DEVICE"; } static State Type() { return State::InitializingDevice; } }; +struct INITIALIZED_S : public state<> { static string Name() { return "INITIALIZED"; } static State Type() { return State::Initialized; } }; +struct BINDING_S : public state<> { static string Name() { return "BINDING"; } static State Type() { return State::Binding; } }; +struct BOUND_S : public state<> { static string Name() { return "BOUND"; } static State Type() { return State::Bound; } }; +struct CONNECTING_S : public state<> { static string Name() { return "CONNECTING"; } static State Type() { return State::Connecting; } }; +struct DEVICE_READY_S : public state<> { static string Name() { return "DEVICE_READY"; } static State Type() { return State::DeviceReady; } }; +struct INITIALIZING_TASK_S : public state<> { static string Name() { return "INITIALIZING_TASK"; } static State Type() { return State::InitializingTask; } }; +struct READY_S : public state<> { static string Name() { return "READY"; } static State Type() { return State::Ready; } }; +struct RUNNING_S : public state<> { static string Name() { return "RUNNING"; } static State Type() { return State::Running; } }; +struct RESETTING_TASK_S : public state<> { static string Name() { return "RESETTING_TASK"; } static State Type() { return State::ResettingTask; } }; +struct RESETTING_DEVICE_S : public state<> { static string Name() { return "RESETTING_DEVICE"; } static State Type() { return State::ResettingDevice; } }; +struct EXITING_S : public state<> { static string Name() { return "EXITING"; } static State Type() { return State::Exiting; } }; + +struct ERROR_S : public terminate_state<> { static string Name() { return "ERROR"; } static State Type() { return State::Error; } }; + +// list of FSM transitions (events) +struct AUTO_E { static string Name() { return "AUTO"; } static Transition Type() { return Transition::Auto; } }; +struct INIT_DEVICE_E { static string Name() { return "INIT_DEVICE"; } static Transition Type() { return Transition::InitDevice; } }; +struct COMPLETE_INIT_E { static string Name() { return "COMPLETE_INIT"; } static Transition Type() { return Transition::CompleteInit; } }; +struct BIND_E { static string Name() { return "BIND"; } static Transition Type() { return Transition::Bind; } }; +struct CONNECT_E { static string Name() { return "CONNECT"; } static Transition Type() { return Transition::Connect; } }; +struct INIT_TASK_E { static string Name() { return "INIT_TASK"; } static Transition Type() { return Transition::InitTask; } }; +struct RUN_E { static string Name() { return "RUN"; } static Transition Type() { return Transition::Run; } }; +struct STOP_E { static string Name() { return "STOP"; } static Transition Type() { return Transition::Stop; } }; +struct RESET_TASK_E { static string Name() { return "RESET_TASK"; } static Transition Type() { return Transition::ResetTask; } }; +struct RESET_DEVICE_E { static string Name() { return "RESET_DEVICE"; } static Transition Type() { return Transition::ResetDevice; } }; +struct END_E { static string Name() { return "END"; } static Transition Type() { return Transition::End; } }; +struct ERROR_FOUND_E { static string Name() { return "ERROR_FOUND"; } static Transition Type() { return Transition::ErrorFound; } }; + +static array stateNames = +{ { - while (fNextStates.empty()) + "OK", + "Error", + "IDLE", + "INITIALIZING_DEVICE", + "INITIALIZED", + "BINDING", + "BOUND", + "CONNECTING", + "DEVICE_READY", + "INITIALIZING_TASK", + "READY", + "RUNNING", + "RESETTING_TASK", + "RESETTING_DEVICE", + "EXITING" + } +}; + +static array transitionNames = +{ + { + "AUTO", + "INIT_DEVICE", + "COMPLETE_INIT", + "BIND", + "CONNECT", + "INIT_TASK", + "RUN", + "STOP", + "RESET_TASK", + "RESET_DEVICE", + "END", + "ERROR_FOUND" + } +}; + +static map stateNumbers = +{ + { "OK", State::Ok }, + { "Error", State::Error }, + { "IDLE", State::Idle }, + { "INITIALIZING_DEVICE", State::InitializingDevice }, + { "INITIALIZED", State::Initialized }, + { "BINDING", State::Binding }, + { "BOUND", State::Bound }, + { "CONNECTING", State::Connecting }, + { "DEVICE_READY", State::DeviceReady }, + { "INITIALIZING_TASK", State::InitializingTask }, + { "READY", State::Ready }, + { "RUNNING", State::Running }, + { "RESETTING_TASK", State::ResettingTask }, + { "RESETTING_DEVICE", State::ResettingDevice }, + { "EXITING", State::Exiting } +}; + +static map transitionNumbers = +{ + { "AUTO", Transition::Auto }, + { "INIT_DEVICE", Transition::InitDevice }, + { "COMPLETE_INIT", Transition::CompleteInit }, + { "BIND", Transition::Bind }, + { "CONNECT", Transition::Connect }, + { "INIT_TASK", Transition::InitTask }, + { "RUN", Transition::Run }, + { "STOP", Transition::Stop }, + { "RESET_TASK", Transition::ResetTask }, + { "RESET_DEVICE", Transition::ResetDevice }, + { "END", Transition::End }, + { "ERROR_FOUND", Transition::ErrorFound } +}; + +// defining the boost MSM state machine +struct Machine_ : public state_machine_def +{ + public: + Machine_() + : fLastTransitionResult(true) + , fNewStatePending(false) + , fWorkOngoing(false) + {} + + virtual ~Machine_() {} + + // initial states + using initial_state = bmpl::vector; + + template + void on_entry(Transition const&, FSM& /* fsm */) + { + LOG(state) << "Starting FairMQ state machine --> IDLE"; + fState = State::Idle; + } + + template + void on_exit(Transition const&, FSM& /*fsm*/) + { + LOG(state) << "Exiting FairMQ state machine"; + } + + struct DefaultFct + { + template + void operator()(EVT const& e, FSM& fsm, SourceState& /* ss */, TargetState& ts) { - fNewState.wait(lock); + fsm.fNewState = ts.Type(); + fsm.fLastTransitionResult = true; + fsm.CallNewTransitionCallbacks(e.Type()); + fsm.fNewStatePending = true; + fsm.fNewStatePendingCV.notify_all(); } + }; - State lastState; - - if (fNextStates.front() == State::Error) + struct ErrorFct + { + template + void operator()(EVT const& e, FSM& fsm, SourceState& /* ss */, TargetState& ts) { - // advance error FSM - lastState = fErrorState; - fErrorState = fNextStates.front(); - fNextStates.pop_front(); - LOG(error) << "Entering " << fErrorState << " state (orthogonal error state machine)"; + fsm.fState = ts.Type(); + fsm.fLastTransitionResult = true; + fsm.CallNewTransitionCallbacks(e.Type()); + fsm.CallStateChangeCallbacks(ts.Type()); + fsm.fNewStatePending = true; + fsm.fNewStatePendingCV.notify_all(); } - else + }; + + struct transition_table : bmpl::vector< + // Start Transition Next Action Guard + Row, + Row, + + Row, + Row, + Row, + + Row, + Row, + Row, + + Row, + Row, + Row, + + Row, + + Row, + Row, + + Row, + + Row, + Row, + + Row> {}; + + void CallStateChangeCallbacks(const State state) const + { + if (!fStateChangeSignal.empty()) { + fStateChangeSignal(state); + } + } + + void CallStateHandler(const State state) const + { + if (!fStateHandleSignal.empty()) { + fStateHandleSignal(state); + } + } + + void CallNewTransitionCallbacks(const Transition transition) const + { + if (!fNewTransitionSignal.empty()) { + fNewTransitionSignal(transition); + } + } + + + atomic fState; + atomic fNewState; + atomic fLastTransitionResult; + + mutex fStateMtx; + atomic fNewStatePending; + atomic fWorkOngoing; + condition_variable fNewStatePendingCV; + condition_variable fWorkDoneCV; + + boost::signals2::signal fStateChangeSignal; + boost::signals2::signal fStateHandleSignal; + boost::signals2::signal fNewTransitionSignal; + unordered_map fStateChangeSignalsMap; + unordered_map fNewTransitionSignalsMap; + + void ProcessWork() + { + bool stop = false; + + while (!stop) { + { + unique_lock lock(fStateMtx); + + while (!fNewStatePending) { + fNewStatePendingCV.wait_for(lock, chrono::milliseconds(100)); + } + + LOG(state) << fState << " ---> " << fNewState; + fState = static_cast(fNewState); + fNewStatePending = false; + fWorkOngoing = true; + + if (fState == State::Exiting || fState == State::Error) { + stop = true; + } + } + + CallStateChangeCallbacks(fState); + CallStateHandler(fState); + + { + lock_guard lock(fStateMtx); + fWorkOngoing = false; + fWorkDoneCV.notify_one(); + } + } + } + + // replaces the default no-transition response. + template + void no_transition(Transition const& t, FSM& fsm, int state) + { + using RecursiveStt = typename recursive_get_transition_table::type; + using AllStates = typename generate_state_set::type; + + string stateName; + + bmpl::for_each>(get_state_name(stateName, state)); + + stateName = boost::core::demangle(stateName.c_str()); + size_t pos = stateName.rfind(":"); + stateName = stateName.substr(pos + 1); + size_t pos2 = stateName.rfind("_"); + stateName = stateName.substr(0, pos2); + + if (stateName != "OK") { + LOG(state) << "No transition from state " << stateName << " on transition " << t.Name(); + } + fsm.fLastTransitionResult = false; + } +}; // Machine_ + +using FairMQFSM = state_machine; + +} // namespace fsm +} // namespace mq +} // namespace fair + +using namespace fair::mq::fsm; +using namespace fair::mq; + +StateMachine::StateMachine() : fFsm(new FairMQFSM) {} +void StateMachine::Start() { static_pointer_cast(fFsm)->start(); } +StateMachine::~StateMachine() { static_pointer_cast(fFsm)->stop(); } + +bool StateMachine::ChangeState(const Transition transition) +try { + auto fsm = static_pointer_cast(fFsm); + lock_guard lock(fsm->fStateMtx); + if (!static_cast(fsm->fNewStatePending) || transition == Transition::ErrorFound) { + switch (transition) { + case Transition::Auto: + fsm->process_event(AUTO_E()); + return fsm->fLastTransitionResult; + case Transition::InitDevice: + fsm->process_event(INIT_DEVICE_E()); + return fsm->fLastTransitionResult; + case Transition::CompleteInit: + fsm->process_event(COMPLETE_INIT_E()); + return fsm->fLastTransitionResult; + case Transition::Bind: + fsm->process_event(BIND_E()); + return fsm->fLastTransitionResult; + case Transition::Connect: + fsm->process_event(CONNECT_E()); + return fsm->fLastTransitionResult; + case Transition::InitTask: + fsm->process_event(INIT_TASK_E()); + return fsm->fLastTransitionResult; + case Transition::Run: + fsm->process_event(RUN_E()); + return fsm->fLastTransitionResult; + case Transition::Stop: + fsm->process_event(STOP_E()); + return fsm->fLastTransitionResult; + case Transition::ResetDevice: + fsm->process_event(RESET_DEVICE_E()); + return fsm->fLastTransitionResult; + case Transition::ResetTask: + fsm->process_event(RESET_TASK_E()); + return fsm->fLastTransitionResult; + case Transition::End: + fsm->process_event(END_E()); + return fsm->fLastTransitionResult; + case Transition::ErrorFound: + fsm->process_event(ERROR_FOUND_E()); + return fsm->fLastTransitionResult; + default: + LOG(error) << "Requested unsupported state transition: " << transition << endl; + return false; + } + } else { + LOG(state) << "Transition " << transitionNames.at(static_cast(transition)) << " incoming, but another state transition is already ongoing."; + return false; + } +} catch (exception& e) { + LOG(error) << "Exception in StateMachine::ChangeState(): " << e.what(); + return false; +} + +void StateMachine::SubscribeToStateChange(const string& key, function callback) +{ + static_pointer_cast(fFsm)->fStateChangeSignalsMap.insert({key, static_pointer_cast(fFsm)->fStateChangeSignal.connect(callback)}); +} + +void StateMachine::UnsubscribeFromStateChange(const string& key) +{ + auto fsm = static_pointer_cast(fFsm); + if (fsm->fStateChangeSignalsMap.count(key)) { + fsm->fStateChangeSignalsMap.at(key).disconnect(); + fsm->fStateChangeSignalsMap.erase(key); + } +} + +void StateMachine::HandleStates(function callback) +{ + auto fsm = static_pointer_cast(fFsm); + if (fsm->fStateHandleSignal.empty()) { + fsm->fStateHandleSignal.connect(callback); + } else { + LOG(error) << "state handler is already set"; + } +} + +void StateMachine::StopHandlingStates() +{ + auto fsm = static_pointer_cast(fFsm); + if (!fsm->fStateHandleSignal.empty()) { + fsm->fStateHandleSignal.disconnect_all_slots(); + } +} + +void StateMachine::SubscribeToNewTransition(const string& key, function callback) +{ + static_pointer_cast(fFsm)->fNewTransitionSignalsMap.insert({key, static_pointer_cast(fFsm)->fNewTransitionSignal.connect(callback)}); +} + +void StateMachine::UnsubscribeFromNewTransition(const string& key) +{ + auto fsm = static_pointer_cast(fFsm); + if (fsm->fNewTransitionSignalsMap.count(key)) { + fsm->fNewTransitionSignalsMap.at(key).disconnect(); + fsm->fNewTransitionSignalsMap.erase(key); + } +} + +State StateMachine::GetCurrentState() const { return static_pointer_cast(fFsm)->fState; } +string StateMachine::GetCurrentStateName() const { return GetStateName(static_pointer_cast(fFsm)->fState); } + +bool StateMachine::NewStatePending() const { return static_cast(static_pointer_cast(fFsm)->fNewStatePending); } +void StateMachine::WaitForPendingState() const +{ + auto fsm = static_pointer_cast(fFsm); + unique_lock lock(fsm->fStateMtx); + fsm->fNewStatePendingCV.wait(lock, [&]{ return static_cast(fsm->fNewStatePending); }); +} +bool StateMachine::WaitForPendingStateFor(const int durationInMs) const +{ + auto fsm = static_pointer_cast(fFsm); + unique_lock lock(fsm->fStateMtx); + return fsm->fNewStatePendingCV.wait_for(lock, std::chrono::milliseconds(durationInMs), [&]{ return static_cast(fsm->fNewStatePending); }); +} + +void StateMachine::ProcessWork() +{ + auto fsm = static_pointer_cast(fFsm); + + try { + fsm->CallStateChangeCallbacks(State::Idle); + fsm->ProcessWork(); + } catch(...) { { - // advance regular FSM - lastState = fState; - fState = fNextStates.front(); - fNextStates.pop_front(); - LOG(state) << "Entering " << fState << " state"; + lock_guard lock(fsm->fStateMtx); + fsm->fWorkOngoing = false; + fsm->fWorkDoneCV.notify_one(); } - lock.unlock(); - - fCallbacks.Emit(fState, lastState); - - lock.lock(); - if (fState == State::Exiting || fErrorState == State::Error) break; + ChangeState(Transition::ErrorFound); + throw; } - - LOG(state) << "Exiting FairMQ state machine"; } -auto StateMachine::ChangeState(StateTransition transition) -> void -{ - State lastState; - - std::unique_lock lock{fMutex}; - - if (transition == StateTransition::ErrorFound) - { - lastState = fErrorState; - } - else if (fNextStates.empty()) - { - lastState = fState; - } - else - { - lastState = fNextStates.back(); - } - - const State nextState{Transition(lastState, transition)}; - fNextStates.push_back(nextState); - lock.unlock(); - - fCallbacks.Emit(nextState, lastState); - fNewState.notify_one(); -} - -auto StateMachine::Transition(const State currentState, const StateTransition transition) -> State -{ - switch (currentState) { - case State::Idle: - if (transition == StateTransition::InitDevice ) return State::InitializingDevice; - if (transition == StateTransition::End ) return State::Exiting; - break; - case State::InitializingDevice: - if (transition == StateTransition::Automatic ) return State::DeviceReady; - break; - case State::DeviceReady: - if (transition == StateTransition::InitTask ) return State::InitializingTask; - if (transition == StateTransition::ResetDevice) return State::ResettingDevice; - break; - case State::InitializingTask: - if (transition == StateTransition::Automatic ) return State::Ready; - break; - case State::Ready: - if (transition == StateTransition::Run ) return State::Running; - if (transition == StateTransition::ResetTask ) return State::ResettingTask; - break; - case State::Running: - if (transition == StateTransition::Stop ) return State::Ready; - break; - case State::ResettingTask: - if (transition == StateTransition::Automatic ) return State::DeviceReady; - break; - case State::ResettingDevice: - if (transition == StateTransition::Automatic ) return State::Idle; - break; - case State::Exiting: - break; - case State::Ok: - if (transition == StateTransition::ErrorFound ) return State::Error; - break; - case State::Error: - break; - } - throw IllegalTransition{tools::ToString("No transition ", transition, " from state ", currentState, ".")}; -} - -StateMachine::StateMachine() -: fState{State::Idle} -, fErrorState{State::Ok} -{ -} - -auto StateMachine::Reset() -> void -{ - std::unique_lock lock{fMutex}; - - fState = State::Idle; - fErrorState = State::Ok; - fNextStates.clear(); -} - -auto StateMachine::NextStatePending() -> bool -{ - std::unique_lock lock{fMutex}; - - return fNextStates.size() > 0; -} +string StateMachine::GetStateName(const State state) { return stateNames.at(static_cast(state)); } +string StateMachine::GetTransitionName(const Transition transition) { return transitionNames.at(static_cast(transition)); } +State StateMachine::GetState(const string& state) { return stateNumbers.at(state); } +Transition StateMachine::GetTransition(const string& transition) { return transitionNumbers.at(transition); } diff --git a/fairmq/StateMachine.h b/fairmq/StateMachine.h index 22a830c5..414b9495 100644 --- a/fairmq/StateMachine.h +++ b/fairmq/StateMachine.h @@ -1,132 +1,107 @@ /******************************************************************************** - * Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ -#ifndef FAIR_MQ_STATEMACHINE_H -#define FAIR_MQ_STATEMACHINE_H +#ifndef FAIRMQSTATEMACHINE_H_ +#define FAIRMQSTATEMACHINE_H_ -#include -#include -#include -#include -#include +#include "FairMQLogger.h" + +#include +#include #include -#include #include -#include -#include +#include +#include +#include +#include namespace fair { namespace mq { -/** - * @class StateMachine StateMachine.h - * @brief Implements the state machine for FairMQ devices - * - * See https://github.com/FairRootGroup/FairRoot/blob/dev/fairmq/docs/Device.md#13-state-machine - */ +enum class State : int +{ + Ok, + Error, + Idle, + InitializingDevice, + Initialized, + Binding, + Bound, + Connecting, + DeviceReady, + InitializingTask, + Ready, + Running, + ResettingTask, + ResettingDevice, + Exiting +}; + +enum class Transition : int +{ + Auto, + InitDevice, + CompleteInit, + Bind, + Connect, + InitTask, + Run, + Stop, + ResetTask, + ResetDevice, + End, + ErrorFound +}; + class StateMachine { public: - enum class State : int - { - Ok, - Error, - Idle, - InitializingDevice, - DeviceReady, - InitializingTask, - Ready, - Running, - ResettingTask, - ResettingDevice, - Exiting - }; - - enum class StateTransition : int // transition event between States - { - InitDevice, - InitTask, - Run, - Stop, - ResetTask, - ResetDevice, - End, - ErrorFound, - Automatic - }; - - /// @brief Convert string to State - /// @param state to convert - /// @return State enum entry - /// @throw std::out_of_range if a string cannot be resolved to a State - static auto ToState(const std::string& state) -> State { return fkStateStrMap.at(state); } - - /// @brief Convert string to StateTransition - /// @param transition to convert - /// @return StateTransition enum entry - /// @throw std::out_of_range if a string cannot be resolved to a StateTransition - static auto ToStateTransition(const std::string& transition) -> StateTransition { return fkStateTransitionStrMap.at(transition); } - - /// @brief Convert State to string - /// @param state to convert - /// @return string representation of State enum entry - static auto ToStr(State state) -> std::string { return fkStrStateMap.at(state); } - - /// @brief Convert StateTransition to string - /// @param transition to convert - /// @return string representation of StateTransition enum entry - static auto ToStr(StateTransition transition) -> std::string { return fkStrStateTransitionMap.at(transition); } - - friend auto operator<<(std::ostream& os, const State& state) -> std::ostream& { return os << ToStr(state); } - friend auto operator<<(std::ostream& os, const StateTransition& transition) -> std::ostream& { return os << ToStr(transition); } - StateMachine(); + virtual ~StateMachine(); - struct IllegalTransition : std::runtime_error { using std::runtime_error::runtime_error; }; + bool ChangeState(const Transition transition); + bool ChangeState(const std::string& transition) { return ChangeState(GetTransition(transition)); } - struct StateChange : Event {}; - struct StateQueued : Event {}; - auto SubscribeToStateChange(const std::string& subscriber, std::function callback) -> void { fCallbacks.Subscribe(subscriber, callback); } - auto UnsubscribeFromStateChange(const std::string& subscriber) -> void { fCallbacks.Unsubscribe(subscriber); } - auto SubscribeToStateQueued(const std::string& subscriber, std::function callback) -> void { fCallbacks.Subscribe(subscriber, callback); } - auto UnsubscribeFromStateQueued(const std::string& subscriber) -> void { fCallbacks.Unsubscribe(subscriber); } + void SubscribeToStateChange(const std::string& key, std::function callback); + void UnsubscribeFromStateChange(const std::string& key); - auto GetCurrentState() const -> State { std::lock_guard lock{fMutex}; return fState; } - auto GetCurrentErrorState() const -> State { std::lock_guard lock{fMutex}; return fErrorState; } - auto GetLastQueuedState() const -> State { std::lock_guard lock{fMutex}; return fNextStates.back(); } + void HandleStates(std::function callback); + void StopHandlingStates(); - auto ChangeState(StateTransition transition) -> void; + void SubscribeToNewTransition(const std::string& key, std::function callback); + void UnsubscribeFromNewTransition(const std::string& key); - auto Run() -> void; - auto Reset() -> void; + bool NewStatePending() const; + void WaitForPendingState() const; + bool WaitForPendingStateFor(const int durationInMs) const; - auto NextStatePending() -> bool; + State GetCurrentState() const; + std::string GetCurrentStateName() const; + + void Start(); + + void ProcessWork(); + + static std::string GetStateName(const State); + static std::string GetTransitionName(const Transition); + static State GetState(const std::string& state); + static Transition GetTransition(const std::string& transition); private: - State fState; - State fErrorState; - std::deque fNextStates; - EventManager fCallbacks; + std::shared_ptr fFsm; +}; - static const std::unordered_map fkStateStrMap; - static const std::unordered_map> fkStrStateMap; - static const std::unordered_map fkStateTransitionStrMap; - static const std::unordered_map> fkStrStateTransitionMap; +inline std::ostream& operator<<(std::ostream& os, const State& state) { return os << StateMachine::GetStateName(state); } +inline std::ostream& operator<<(std::ostream& os, const Transition& transition) { return os << StateMachine::GetTransitionName(transition); } - mutable std::mutex fMutex; - std::condition_variable fNewState; +} // namespace mq +} // namespace fair - static auto Transition(const State currentState, const StateTransition transition) -> State; -}; /* class StateMachine */ - -} /* namespace mq */ -} /* namespace fair */ - -#endif /* FAIR_MQ_STATEMACHINE_H */ +#endif /* FAIRMQSTATEMACHINE_H_ */ diff --git a/fairmq/plugins/Control.cxx b/fairmq/plugins/Control.cxx index 662bfc72..c36f862d 100644 --- a/fairmq/plugins/Control.cxx +++ b/fairmq/plugins/Control.cxx @@ -14,6 +14,8 @@ #include #include #include +#include +#include using namespace std; @@ -53,8 +55,8 @@ Control::Control(const string& name, const Plugin::Version version, const string , fDeviceHasShutdown(false) , fPluginShutdownRequested(false) { - SubscribeToDeviceStateChange([&](DeviceState newState) - { + SubscribeToDeviceStateChange([&](DeviceState newState) { + LOG(trace) << "control plugin notified on new state: " << newState; { lock_guard lock{fEventsMutex}; fEvents.push(newState); @@ -62,47 +64,52 @@ Control::Control(const string& name, const Plugin::Version version, const string fNewEvent.notify_one(); }); - try - { + try { TakeDeviceControl(); auto control = GetProperty("control"); - if (control == "static") - { + if (control == "static") { LOG(debug) << "Running builtin controller: static"; fControllerThread = thread(&Control::StaticMode, this); - } - else if (control == "interactive") - { + } else if (control == "interactive") { LOG(debug) << "Running builtin controller: interactive"; fControllerThread = thread(&Control::InteractiveMode, this); - } - else - { + } else { LOG(error) << "Unrecognized control mode '" << control << "' requested. " << "Ignoring and falling back to static control mode."; fControllerThread = thread(&Control::StaticMode, this); } - } - catch (PluginServices::DeviceControlError& e) - { + } catch (PluginServices::DeviceControlError& e) { // If we are here, it means another plugin has taken control. That's fine, just print the exception message and do nothing else. LOG(debug) << e.what(); } - LOG(debug) << "catch-signals: " << GetProperty("catch-signals"); - if (GetProperty("catch-signals") > 0) - { + if (GetProperty("catch-signals") > 0) { + LOG(debug) << "Plugin '" << name << "' is setting up signal handling for SIGINT and SIGTERM"; fSignalHandlerThread = thread(&Control::SignalHandler, this); signal(SIGINT, signal_handler); signal(SIGTERM, signal_handler); - } - else - { + } else { LOG(warn) << "Signal handling (e.g. Ctrl-C) has been deactivated."; } } +auto Control::RunStartupSequence() -> void +{ + ChangeDeviceState(DeviceStateTransition::InitDevice); + while (WaitForNextState() != DeviceState::InitializingDevice) {} + ChangeDeviceState(DeviceStateTransition::CompleteInit); + while (WaitForNextState() != DeviceState::Initialized) {} + ChangeDeviceState(DeviceStateTransition::Bind); + while (WaitForNextState() != DeviceState::Bound) {} + ChangeDeviceState(DeviceStateTransition::Connect); + while (WaitForNextState() != DeviceState::DeviceReady) {} + ChangeDeviceState(DeviceStateTransition::InitTask); + while (WaitForNextState() != DeviceState::Ready) {} + ChangeDeviceState(DeviceStateTransition::Run); + while (WaitForNextState() != DeviceState::Running) {} +} + auto ControlPluginProgramOptions() -> Plugin::ProgOptions { namespace po = boost::program_options; @@ -135,8 +142,7 @@ struct terminal_config }; auto Control::InteractiveMode() -> void -try -{ +try { RunStartupSequence(); char input; // hold the user console input @@ -147,129 +153,226 @@ try terminal_config tconfig; - PrintInteractiveHelp(); + bool color = GetProperty("color"); + + if (color) { + PrintInteractiveHelpColor(); + } else { + PrintInteractiveHelp(); + } bool keepRunning = true; - while (keepRunning) - { - if (poll(cinfd, 1, 500)) - { - if (fDeviceShutdownRequested) - { + while (keepRunning) { + if (poll(cinfd, 1, 500)) { + if (fDeviceShutdownRequested) { break; } cin >> input; - switch (input) - { + switch (input) { + case 'c': + cout << "\n --> [i] check current state\n\n" << flush; + LOG(state) << GetCurrentDeviceState(); + break; case 'i': - LOG(info) << "\n\n --> [i] init device\n"; + cout << "\n --> [i] init device\n\n" << flush; ChangeDeviceState(DeviceStateTransition::InitDevice); + while (WaitForNextState() != DeviceState::InitializingDevice) {} + ChangeDeviceState(DeviceStateTransition::CompleteInit); + break; + case 'b': + cout << "\n --> [b] bind\n\n" << flush; + ChangeDeviceState(DeviceStateTransition::Bind); + break; + case 'x': + cout << "\n --> [x] connect\n\n" << flush; + ChangeDeviceState(DeviceStateTransition::Connect); break; case 'j': - LOG(info) << "\n\n --> [j] init task\n"; + cout << "\n --> [j] init task\n\n" << flush; ChangeDeviceState(DeviceStateTransition::InitTask); break; - case 'p': - LOG(info) << "\n\n --> [p] pause\n"; - ChangeDeviceState(DeviceStateTransition::Pause); - break; case 'r': - LOG(info) << "\n\n --> [r] run\n"; + cout << "\n --> [r] run\n\n" << flush; ChangeDeviceState(DeviceStateTransition::Run); break; case 's': - LOG(info) << "\n\n --> [s] stop\n"; + cout << "\n --> [s] stop\n\n" << flush; ChangeDeviceState(DeviceStateTransition::Stop); break; case 't': - LOG(info) << "\n\n --> [t] reset task\n"; + cout << "\n --> [t] reset task\n\n" << flush; ChangeDeviceState(DeviceStateTransition::ResetTask); break; case 'd': - LOG(info) << "\n\n --> [d] reset device\n"; + cout << "\n --> [d] reset device\n\n" << flush; ChangeDeviceState(DeviceStateTransition::ResetDevice); break; case 'k': - LOG(info) << "\n\n --> [k] increase log severity\n"; + cout << "\n --> [k] increase log severity\n\n" << flush; CycleLogConsoleSeverityUp(); break; case 'l': - LOG(info) << "\n\n --> [l] decrease log severity\n"; + cout << "\n --> [l] decrease log severity\n\n" << flush; CycleLogConsoleSeverityDown(); break; case 'n': - LOG(info) << "\n\n --> [n] increase log verbosity\n"; + cout << "\n --> [n] increase log verbosity\n\n" << flush; CycleLogVerbosityUp(); break; case 'm': - LOG(info) << "\n\n --> [m] decrease log verbosity\n"; + cout << "\n --> [m] decrease log verbosity\n\n" << flush; CycleLogVerbosityDown(); break; case 'h': - LOG(info) << "\n\n --> [h] help\n"; - PrintInteractiveHelp(); + cout << "\n --> [h] help\n\n" << flush; + if (color) { + PrintInteractiveHelpColor(); + PrintStateMachineColor(); + } else { + PrintInteractiveHelp(); + PrintStateMachine(); + } break; - // case 'x': - // LOG(info) << "\n\n --> [x] ERROR\n"; - // ChangeDeviceState(DeviceStateTransition::ERROR_FOUND); - // break; case 'q': - LOG(info) << "\n\n --> [q] end\n"; + cout << "\n --> [q] end\n\n" << flush; keepRunning = false; break; default: - LOG(info) << "Invalid input: [" << input << "]"; + cout << "Invalid input: [" << input << "]. " << flush; PrintInteractiveHelp(); break; } } - if (GetCurrentDeviceState() == DeviceState::Error) - { + if (GetCurrentDeviceState() == DeviceState::Error) { throw DeviceErrorState("Controlled device transitioned to error state."); } - if (fDeviceShutdownRequested) - { + if (fDeviceShutdownRequested) { break; } } RunShutdownSequence(); -} -catch (PluginServices::DeviceControlError& e) -{ +} catch (PluginServices::DeviceControlError& e) { // If we are here, it means another plugin has taken control. That's fine, just print the exception message and do nothing else. LOG(debug) << e.what(); +} catch (DeviceErrorState&) { } -catch (DeviceErrorState&) + +auto Control::PrintInteractiveHelpColor() -> void { + stringstream ss; + ss << "Following control commands are available:\n\n" + << " [\033[01;32mh\033[0m] help, [\033[01;32mc\033[0m] check current device state,\n" + << " [\033[01;32mi\033[0m] init device, [\033[01;32mb\033[0m] bind, [\033[01;32mx\033[0m] connect, [\033[01;32mj\033[0m] init task," + << " [\033[01;32mr\033[0m] run, [\033[01;32ms\033[0m] stop,\n" + << " [\033[01;32mt\033[0m] reset task, [\033[01;32md\033[0m] reset device, [\033[01;32mq\033[0m] end,\n" + << " [\033[01;32mk\033[0m] increase log severity [\033[01;32ml\033[0m] decrease log severity [\033[01;32mn\033[0m] increase log verbosity [\033[01;32mm\033[0m] decrease log verbosity\n\n"; + cout << ss.str() << flush; } auto Control::PrintInteractiveHelp() -> void { stringstream ss; - ss << "\nFollowing control commands are available:\n\n" - << "[h] help, [p] pause, [r] run, [s] stop, [t] reset task, [d] reset device, [q] end, [j] init task, [i] init device\n" - << "[k] increase log severity [l] decrease log severity [n] increase log verbosity [m] decrease log verbosity\n\n"; + ss << "Following control commands are available:\n\n" + << " [h] help, [c] check current device state,\n" + << " [i] init device, [b] bind, [x] connect, [j] init task,\n" + << " [r] run, [s] stop,\n" + << " [t] reset task, [d] reset device, [q] end,\n" + << " [k] increase log severity, [l] decrease log severity, [n] increase log verbosity, [m] decrease log verbosity.\n\n"; + cout << ss.str() << flush; +} + +void Control::PrintStateMachineColor() +{ + stringstream ss; + ss << " @ \n" + << " ┌───────────╨─────────────┐ ┌───────────┐ \n" + << " │ \033[01;36mIDLE\033[0m [\033[01;32mq\033[0m]─▶ \033[01;33mEXITING\033[0m │ \n" + << " └[\033[01;32mi\033[0m]─────────────────── ▲ ┘ └───────────┘ \n" + << " ╔══ ▼ ════════════════╗ ╔═╩══════════════════╗ \n" + << " ║ \033[01;33mINITIALIZING DEVICE\033[0m ║ ║ \033[01;33mRESETTING DEVICE\033[0m ║ \n" + << " ╚══════════╦══════════╝ ╚════════ ▲ ═════════╝ \n" + << " ┌───────── ▼ ─────────┐ │ ┌────────────────────────────┐ \n" + << " │ \033[01;36mINITIALIZED\033[0m │ │ │ Legend: │ \n" + << " └─────────[\033[01;32mb\033[0m]─────────┘ │ │----------------------------│ \n" + << " ╔═════════ ▼ ═════════╗ │ │ [\033[01;32mk\033[0m] keyboard shortcut for │ \n" + << " ║ \033[01;33mBINDING\033[0m ║ │ │ interactive controller │ \n" + << " ╚══════════╦══════════╝ │ │ ┌────────────────────────┐ │ \n" + << " ┌───────── ▼ ─────────┐ │ │ │ \033[01;36mIDLING STATE\033[0m │ │ \n" + << " │ \033[01;36mBOUND\033[0m │ │ │ └────────────────────────┘ │ \n" + << " └─────────[\033[01;32mx\033[0m]─────────┘ │ │ ╔════════════════════════╗ │ \n" + << " ╔═════════ ▼ ═════════╗ │ │ ║ \033[01;33mWORKING STATE\033[0m ║ │ \n" + << " ║ \033[01;33mCONNECTING\033[0m ║ │ │ ╚════════════════════════╝ │ \n" + << " ╚══════════╦══════════╝ │ └────────────────────────────┘ \n" + << " ┌─────── ▼ ────────────────────[\033[01;32md\033[0m]───────┐ \n" + << " │ \033[01;36mDEVICE READY\033[0m │ \n" + << " └───────[\033[01;32mj\033[0m]──────────────────── ▲ ───────┘ \n" + << " ╔═════════ ▼ ═════════╗ ╔═════════╩══════════╗ \n" + << " ║ \033[01;33mINITIALIZING TASK\033[0m ║ ║ \033[01;33mRESETTING TASK\033[0m ║ \n" + << " ╚══════════╦══════════╝ ╚════════ ▲ ═════════╝ \n" + << " ┌─────── ▼ ────────────────────[\033[01;32mt\033[0m]───────┐ \n" + << " │ \033[01;36mREADY\033[0m │ \n" + << " └───────[\033[01;32mr\033[0m]──────────────────── ▲ ───────┘ \n" + << " ╔══════ ▼ ════════════════════[\033[01;32ms\033[0m]══════╗ \n" + << " ║ \033[01;33mRUNNING\033[0m ║ \n" + << " ╚══════════════════════════════════════╝ \n" + << " \n"; + cout << ss.str() << flush; +} + +void Control::PrintStateMachine() +{ + stringstream ss; + ss << " @ \n" + << " ┌───────────╨─────────────┐ ┌───────────┐ \n" + << " │ IDLE [q]─▶ EXITING │ \n" + << " └[i]─────────────────── ▲ ┘ └───────────┘ \n" + << " ╔══ ▼ ════════════════╗ ╔═╩══════════════════╗ \n" + << " ║ INITIALIZING DEVICE ║ ║ RESETTING DEVICE ║ \n" + << " ╚══════════╦══════════╝ ╚════════ ▲ ═════════╝ \n" + << " ┌───────── ▼ ─────────┐ │ ┌────────────────────────────┐ \n" + << " │ INITIALIZED │ │ │ Legend: │ \n" + << " └─────────[b]─────────┘ │ │----------------------------│ \n" + << " ╔═════════ ▼ ═════════╗ │ │ [k] keyboard shortcut for │ \n" + << " ║ BINDING ║ │ │ interactive controller │ \n" + << " ╚══════════╦══════════╝ │ │ ┌────────────────────────┐ │ \n" + << " ┌───────── ▼ ─────────┐ │ │ │ IDLING STATE │ │ \n" + << " │ BOUND │ │ │ └────────────────────────┘ │ \n" + << " └─────────[x]─────────┘ │ │ ╔════════════════════════╗ │ \n" + << " ╔═════════ ▼ ═════════╗ │ │ ║ WORKING STATE ║ │ \n" + << " ║ CONNECTING ║ │ │ ╚════════════════════════╝ │ \n" + << " ╚══════════╦══════════╝ │ └────────────────────────────┘ \n" + << " ┌─────── ▼ ────────────────────[d]───────┐ \n" + << " │ DEVICE READY │ \n" + << " └───────[j]──────────────────── ▲ ───────┘ \n" + << " ╔═════════ ▼ ═════════╗ ╔═════════╩══════════╗ \n" + << " ║ INITIALIZING TASK ║ ║ RESETTING TASK ║ \n" + << " ╚══════════╦══════════╝ ╚════════ ▲ ═════════╝ \n" + << " ┌─────── ▼ ────────────────────[t]───────┐ \n" + << " │ READY │ \n" + << " └───────[r]──────────────────── ▲ ───────┘ \n" + << " ╔══════ ▼ ════════════════════[s]══════╗ \n" + << " ║ RUNNING ║ \n" + << " ╚══════════════════════════════════════╝ \n" + << " \n"; cout << ss.str() << flush; } auto Control::WaitForNextState() -> DeviceState { unique_lock lock{fEventsMutex}; - while (fEvents.empty()) - { + while (fEvents.empty()) { fNewEvent.wait_for(lock, chrono::milliseconds(50)); } auto result = fEvents.front(); - if (result == DeviceState::Error) - { + if (result == DeviceState::Error) { throw DeviceErrorState("Controlled device transitioned to error state."); } @@ -351,14 +454,16 @@ auto Control::SignalHandler() -> void auto Control::RunShutdownSequence() -> void { auto nextState = GetCurrentDeviceState(); - EmptyEventQueue(); - while (nextState != DeviceState::Exiting && nextState != DeviceState::Error) - { - switch (nextState) - { + if (nextState != DeviceState::Error) { + EmptyEventQueue(); + } + while (nextState != DeviceState::Exiting && nextState != DeviceState::Error) { + switch (nextState) { case DeviceState::Idle: ChangeDeviceState(DeviceStateTransition::End); break; + case DeviceState::Initialized: + case DeviceState::Bound: case DeviceState::DeviceReady: ChangeDeviceState(DeviceStateTransition::ResetDevice); break; @@ -368,11 +473,8 @@ auto Control::RunShutdownSequence() -> void case DeviceState::Running: ChangeDeviceState(DeviceStateTransition::Stop); break; - case DeviceState::Paused: - ChangeDeviceState(DeviceStateTransition::Resume); - break; default: - LOG(debug) << "Controller ignoring event: " << nextState; + // LOG(debug) << "Controller ignoring event: " << nextState; break; } @@ -383,16 +485,6 @@ auto Control::RunShutdownSequence() -> void ReleaseDeviceControl(); } -auto Control::RunStartupSequence() -> void -{ - ChangeDeviceState(DeviceStateTransition::InitDevice); - while (WaitForNextState() != DeviceState::DeviceReady) {} - ChangeDeviceState(DeviceStateTransition::InitTask); - while (WaitForNextState() != DeviceState::Ready) {} - ChangeDeviceState(DeviceStateTransition::Run); - while (WaitForNextState() != DeviceState::Running) {} -} - auto Control::EmptyEventQueue() -> void { lock_guard lock{fEventsMutex}; diff --git a/fairmq/plugins/Control.h b/fairmq/plugins/Control.h index 0815d39b..88f56032 100644 --- a/fairmq/plugins/Control.h +++ b/fairmq/plugins/Control.h @@ -36,7 +36,10 @@ class Control : public Plugin private: auto InteractiveMode() -> void; + static auto PrintInteractiveHelpColor() -> void; static auto PrintInteractiveHelp() -> void; + static auto PrintStateMachineColor() -> void; + static auto PrintStateMachine() -> void; auto StaticMode() -> void; auto WaitForNextState() -> DeviceState; auto SignalHandler() -> void; @@ -62,9 +65,7 @@ auto ControlPluginProgramOptions() -> Plugin::ProgOptions; REGISTER_FAIRMQ_PLUGIN( Control, // Class name control, // Plugin name (string, lower case chars only) - (Plugin::Version{FAIRMQ_VERSION_MAJOR, - FAIRMQ_VERSION_MINOR, - FAIRMQ_VERSION_PATCH}), // Version + (Plugin::Version{FAIRMQ_VERSION_MAJOR, FAIRMQ_VERSION_MINOR, FAIRMQ_VERSION_PATCH}), // Version "FairRootGroup ", // Maintainer "https://github.com/FairRootGroup/FairRoot", // Homepage ControlPluginProgramOptions // Free function which declares custom program options for the diff --git a/fairmq/plugins/DDS/DDS.cxx b/fairmq/plugins/DDS/DDS.cxx index 533ea987..9b3fd5d9 100644 --- a/fairmq/plugins/DDS/DDS.cxx +++ b/fairmq/plugins/DDS/DDS.cxx @@ -40,7 +40,7 @@ DDS::DDS(const string& name, const Plugin::Version version, const string& mainta , fConnectingChans() , fStopMutex() , fStopCondition() - , fCommands({ "INIT DEVICE", "INIT TASK", "PAUSE", "RUN", "STOP", "RESET TASK", "RESET DEVICE" }) + , fCommands({ "BIND", "CONNECT", "INIT TASK", "RUN", "STOP", "RESET TASK", "RESET DEVICE" }) , fControllerThread() , fEvents() , fEventsMutex() @@ -96,6 +96,10 @@ auto DDS::HandleControl() -> void ChangeDeviceState(DeviceStateTransition::InitDevice); while (WaitForNextState() != DeviceState::InitializingDevice) {} + ChangeDeviceState(DeviceStateTransition::CompleteInit); + while (WaitForNextState() != DeviceState::Initialized) {} + ChangeDeviceState(DeviceStateTransition::Bind); + while (WaitForNextState() != DeviceState::Bound) {} // in the Initializing state subscribe to receive addresses of connecting channels from DDS // and propagate addresses of bound channels to DDS. @@ -114,6 +118,7 @@ auto DDS::HandleControl() -> void // publish bound addresses via DDS at keys corresponding to the channel prefixes, e.g. 'data' in data[i] PublishBoundChannels(); + ChangeDeviceState(DeviceStateTransition::Connect); while (WaitForNextState() != DeviceState::DeviceReady) {} ChangeDeviceState(DeviceStateTransition::InitTask); @@ -305,13 +310,26 @@ auto DDS::SubscribeForCustomCommands() -> void if (cmd == "check-state") { fDDSCustomCmd.send(id + ": " + ToStr(GetCurrentDeviceState()) + " (pid: " + pid + ")", to_string(senderId)); + } else if (cmd == "INIT DEVICE") { + if (ChangeDeviceState(ToDeviceStateTransition(cmd))) { + fDDSCustomCmd.send(id + ": queued " + cmd + " transition", to_string(senderId)); + while (WaitForNextState() != DeviceState::InitializingDevice) {} + ChangeDeviceState(DeviceStateTransition::CompleteInit); + } else { + fDDSCustomCmd.send(id + ": could not queue " + cmd + " transition", to_string(senderId)); + } } else if (fCommands.find(cmd) != fCommands.end()) { - fDDSCustomCmd.send(id + ": attempting to " + cmd, to_string(senderId)); - ChangeDeviceState(ToDeviceStateTransition(cmd)); + if (ChangeDeviceState(ToDeviceStateTransition(cmd))) { + fDDSCustomCmd.send(id + ": queued " + cmd + " transition", to_string(senderId)); + } else { + fDDSCustomCmd.send(id + ": could not queue " + cmd + " transition", to_string(senderId)); + } } else if (cmd == "END") { - fDDSCustomCmd.send(id + ": attempting to " + cmd, to_string(senderId)); - ChangeDeviceState(ToDeviceStateTransition(cmd)); - fDDSCustomCmd.send(id + ": " + ToStr(GetCurrentDeviceState()), to_string(senderId)); + if (ChangeDeviceState(ToDeviceStateTransition(cmd))) { + fDDSCustomCmd.send(id + ": queued " + cmd + " transition", to_string(senderId)); + } else { + fDDSCustomCmd.send(id + ": could not queue " + cmd + " transition", to_string(senderId)); + } if (ToStr(GetCurrentDeviceState()) == "EXITING") { unique_lock lock(fStopMutex); fStopCondition.notify_one(); @@ -363,7 +381,7 @@ auto DDS::WaitForNextState() -> DeviceState { unique_lock lock{fEventsMutex}; while (fEvents.empty()) { - fNewEvent.wait(lock); + fNewEvent.wait_for(lock, chrono::milliseconds(50)); } auto result = fEvents.front(); diff --git a/fairmq/plugins/DDS/runDDSCommandUI.cxx b/fairmq/plugins/DDS/runDDSCommandUI.cxx index 04d9baa1..940d05e2 100644 --- a/fairmq/plugins/DDS/runDDSCommandUI.cxx +++ b/fairmq/plugins/DDS/runDDSCommandUI.cxx @@ -25,7 +25,7 @@ namespace bpo = boost::program_options; void PrintControlsHelp() { cout << "Use keys to control the devices:" << endl; - cout << "[c] check states, [o] dump config, [h] help, [p] pause, [r] run, [s] stop, [t] reset task, [d] reset device, [q] end, [j] init task, [i] init device" << endl; + cout << "[c] check states, [o] dump config, [h] help, [r] run, [s] stop, [t] reset task, [d] reset device, [q] end, [j] init task, [i] init device, [b] bind, [x] connect" << endl; cout << "To quit press Ctrl+C" << endl; } @@ -48,7 +48,7 @@ int main(int argc, char* argv[]) if (vm.count("help")) { cout << "FairMQ DDS Command UI" << endl << options << endl; - cout << "Commands: [c] check state, [o] dump config, [h] help, [p] pause, [r] run, [s] stop, [t] reset task, [d] reset device, [q] end, [j] init task, [i] init device" << endl; + cout << "Commands: [c] check state, [o] dump config, [h] help, [r] run, [s] stop, [t] reset task, [d] reset device, [q] end, [j] init task, [i] init device" << endl; return EXIT_SUCCESS; } @@ -96,14 +96,18 @@ int main(int argc, char* argv[]) cout << " > init devices" << endl; ddsCustomCmd.send("INIT DEVICE", topologyPath); break; + case 'b': + cout << " > bind" << endl; + ddsCustomCmd.send("BIND", topologyPath); + break; + case 'x': + cout << " > connect" << endl; + ddsCustomCmd.send("CONNECT", topologyPath); + break; case 'j': cout << " > init tasks" << endl; ddsCustomCmd.send("INIT TASK", topologyPath); break; - case 'p': - cout << " > pause devices" << endl; - ddsCustomCmd.send("PAUSE", topologyPath); - break; case 'r': cout << " > run tasks" << endl; ddsCustomCmd.send("RUN", topologyPath); diff --git a/fairmq/plugins/PMIx/PMIxPlugin.cxx b/fairmq/plugins/PMIx/PMIxPlugin.cxx index 77796210..63f8b983 100644 --- a/fairmq/plugins/PMIx/PMIxPlugin.cxx +++ b/fairmq/plugins/PMIx/PMIxPlugin.cxx @@ -29,7 +29,7 @@ PMIxPlugin::PMIxPlugin(const std::string& name, { SubscribeToDeviceStateChange([&](DeviceState newState) { switch (newState) { - case DeviceState::InitializingDevice: + case DeviceState::Connecting: Init(); Publish(); Fence(); @@ -83,6 +83,7 @@ auto PMIxPlugin::Publish() -> void for (int i = 0; i < c.second; ++i) { std::string addressKey{"chans." + c.first + "." + std::to_string(i) + ".address"}; info.emplace_back(addressKey, GetProperty(addressKey)); + LOG(debug) << PMIxClient() << info.back(); } } } @@ -123,16 +124,21 @@ auto PMIxPlugin::Lookup() -> void LOG(debug) << PMIxClient() << " pmix::lookup() OK"; } + LOG(info) << pdata.size(); + for (const auto& p : pdata) { if (p.value.type == PMIX_UNDEF) { LOG(debug) << PMIxClient() << " pmix::lookup() not found: key=" << p.key; } else if (p.value.type == PMIX_STRING) { - SetProperty(p.key, p.value.data.string); LOG(debug) << PMIxClient() << " pmix::lookup() found: key=" << p.key << ",value=" << p.value.data.string; + SetProperty(p.key, p.value.data.string); + LOG(info) << GetProperty(p.key); } else { LOG(debug) << PMIxClient() << " pmix::lookup() wrong type returned: key=" << p.key << ",type=" << p.value.type; } } + + LOG(info) << pdata.size(); } } /* namespace plugins */ diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 084eac08..c9790d0f 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -187,16 +187,16 @@ add_testsuite(FairMQ.EventManager TIMEOUT 10 ) -add_testsuite(FairMQ.StateMachine - SOURCES - ${CMAKE_CURRENT_BINARY_DIR}/runner.cxx - state_machine/_state_machine.cxx +# add_testsuite(FairMQ.StateMachine +# SOURCES +# ${CMAKE_CURRENT_BINARY_DIR}/runner.cxx +# state_machine/_state_machine.cxx - LINKS FairMQ - INCLUDES ${CMAKE_CURRENT_SOURCE_DIR} - ${CMAKE_CURRENT_BINARY_DIR} - TIMEOUT 10 -) +# LINKS FairMQ +# INCLUDES ${CMAKE_CURRENT_SOURCE_DIR} +# ${CMAKE_CURRENT_BINARY_DIR} +# TIMEOUT 10 +# ) add_testsuite(FairMQ.Tools SOURCES diff --git a/test/device/_device_config.cxx b/test/device/_device_config.cxx index 9598f73c..8b078457 100644 --- a/test/device/_device_config.cxx +++ b/test/device/_device_config.cxx @@ -23,22 +23,72 @@ using namespace std; void control(FairMQDevice& device) { - device.ChangeState("INIT_DEVICE"); - device.WaitForEndOfState("INIT_DEVICE"); - device.ChangeState("INIT_TASK"); - device.WaitForEndOfState("INIT_TASK"); + device.ChangeState(fair::mq::Transition::InitDevice); + device.WaitForState(fair::mq::State::InitializingDevice); + device.ChangeState(fair::mq::Transition::CompleteInit); + device.WaitForState(fair::mq::State::Initialized); + device.ChangeState(fair::mq::Transition::Bind); + device.WaitForState(fair::mq::State::Bound); + device.ChangeState(fair::mq::Transition::Connect); + device.WaitForState(fair::mq::State::DeviceReady); + device.ChangeState(fair::mq::Transition::InitTask); + device.WaitForState(fair::mq::State::Ready); - device.ChangeState("RUN"); - device.WaitForEndOfState("RUN"); + device.ChangeState(fair::mq::Transition::Run); + device.WaitForState(fair::mq::State::Ready); - device.ChangeState("RESET_TASK"); - device.WaitForEndOfState("RESET_TASK"); - device.ChangeState("RESET_DEVICE"); - device.WaitForEndOfState("RESET_DEVICE"); + device.ChangeState(fair::mq::Transition::ResetTask); + device.WaitForState(fair::mq::State::DeviceReady); + device.ChangeState(fair::mq::Transition::ResetDevice); + device.WaitForState(fair::mq::State::Idle); - device.ChangeState("END"); + device.ChangeState(fair::mq::Transition::End); } +class TestDevice : public FairMQDevice +{ + public: + TestDevice(const string& transport) + { + fDeviceThread = thread(&FairMQDevice::RunStateMachine, this); + + SetTransport(transport); + + ChangeState(fair::mq::Transition::InitDevice); + WaitForState(fair::mq::State::InitializingDevice); + ChangeState(fair::mq::Transition::CompleteInit); + WaitForState(fair::mq::State::Initialized); + ChangeState(fair::mq::Transition::Bind); + WaitForState(fair::mq::State::Bound); + ChangeState(fair::mq::Transition::Connect); + WaitForState(fair::mq::State::DeviceReady); + ChangeState(fair::mq::Transition::InitTask); + WaitForState(fair::mq::State::Ready); + + ChangeState(fair::mq::Transition::Run); + } + + ~TestDevice() + { + WaitForState(fair::mq::State::Running); + ChangeState(fair::mq::Transition::Stop); + WaitForState(fair::mq::State::Ready); + ChangeState(fair::mq::Transition::ResetTask); + WaitForState(fair::mq::State::DeviceReady); + ChangeState(fair::mq::Transition::ResetDevice); + WaitForState(fair::mq::State::Idle); + + ChangeState(fair::mq::Transition::End); + + if (fDeviceThread.joinable()) { + fDeviceThread.join(); + } + } + + private: + thread fDeviceThread; +}; + class DeviceConfig : public ::testing::Test { public: @@ -51,8 +101,7 @@ class DeviceConfig : public ::testing::Test vector emptyArgs = {"dummy", "--id", "test", "--color", "false"}; - if (config.ParseAll(emptyArgs, true)) - { + if (config.ParseAll(emptyArgs, true)) { return 0; } @@ -71,14 +120,20 @@ class DeviceConfig : public ::testing::Test device.RunStateMachine(); - if (t.joinable()) - { + if (t.joinable()) { t.join(); } return device.GetTransportName(); } + string TestDeviceControlInConstructor(const string& transport) + { + TestDevice device(transport); + + return device.GetTransportName(); + } + string TestDeviceSetTransport(const string& transport) { FairMQDevice device; @@ -90,12 +145,11 @@ class DeviceConfig : public ::testing::Test channel.UpdateAddress("tcp://localhost:5558"); device.AddChannel("data", channel); - std::thread t(control, std::ref(device)); + thread t(&FairMQDevice::RunStateMachine, &device); - device.RunStateMachine(); + control(device); - if (t.joinable()) - { + if (t.joinable()) { t.join(); } @@ -119,4 +173,12 @@ TEST_F(DeviceConfig, SetTransport) EXPECT_EQ(transport, returnedTransport); } +TEST_F(DeviceConfig, ControlInConstructor) +{ + string transport = "zeromq"; + string returnedTransport = TestDeviceControlInConstructor(transport); + + EXPECT_EQ(transport, returnedTransport); +} + } // namespace diff --git a/test/device/_multiple_devices.cxx b/test/device/_multiple_devices.cxx index 490e58e3..2e842544 100644 --- a/test/device/_multiple_devices.cxx +++ b/test/device/_multiple_devices.cxx @@ -22,20 +22,26 @@ using namespace std; void control(FairMQDevice& device) { - device.ChangeState("INIT_DEVICE"); - device.WaitForEndOfState("INIT_DEVICE"); - device.ChangeState("INIT_TASK"); - device.WaitForEndOfState("INIT_TASK"); + device.ChangeState(fair::mq::Transition::InitDevice); + device.WaitForState(fair::mq::State::InitializingDevice); + device.ChangeState(fair::mq::Transition::CompleteInit); + device.WaitForState(fair::mq::State::Initialized); + device.ChangeState(fair::mq::Transition::Bind); + device.WaitForState(fair::mq::State::Bound); + device.ChangeState(fair::mq::Transition::Connect); + device.WaitForState(fair::mq::State::DeviceReady); + device.ChangeState(fair::mq::Transition::InitTask); + device.WaitForState(fair::mq::State::Ready); - device.ChangeState("RUN"); - device.WaitForEndOfState("RUN"); + device.ChangeState(fair::mq::Transition::Run); + device.WaitForState(fair::mq::State::Ready); - device.ChangeState("RESET_TASK"); - device.WaitForEndOfState("RESET_TASK"); - device.ChangeState("RESET_DEVICE"); - device.WaitForEndOfState("RESET_DEVICE"); + device.ChangeState(fair::mq::Transition::ResetTask); + device.WaitForState(fair::mq::State::DeviceReady); + device.ChangeState(fair::mq::Transition::ResetDevice); + device.WaitForState(fair::mq::State::Idle); - device.ChangeState("END"); + device.ChangeState(fair::mq::Transition::End); } class MultipleDevices : public ::testing::Test { @@ -57,8 +63,7 @@ class MultipleDevices : public ::testing::Test { sender.RunStateMachine(); - if (t.joinable()) - { + if (t.joinable()) { t.join(); } @@ -79,8 +84,7 @@ class MultipleDevices : public ::testing::Test { receiver.RunStateMachine(); - if (t.joinable()) - { + if (t.joinable()) { t.join(); } diff --git a/test/helper/devices/TestPairLeft.h b/test/helper/devices/TestPairLeft.h index 3d327b95..bb08ae10 100644 --- a/test/helper/devices/TestPairLeft.h +++ b/test/helper/devices/TestPairLeft.h @@ -24,7 +24,7 @@ namespace test class PairLeft : public FairMQDevice { protected: - auto Init() -> void override + auto InitTask() -> void override { std::this_thread::sleep_for(std::chrono::milliseconds(200)); } diff --git a/test/helper/devices/TestPairRight.h b/test/helper/devices/TestPairRight.h index 149a7491..f1dc9025 100644 --- a/test/helper/devices/TestPairRight.h +++ b/test/helper/devices/TestPairRight.h @@ -24,7 +24,7 @@ namespace test class PairRight : public FairMQDevice { protected: - auto Init() -> void override + auto InitTask() -> void override { std::this_thread::sleep_for(std::chrono::milliseconds(200)); } diff --git a/test/helper/devices/TestPollIn.h b/test/helper/devices/TestPollIn.h index 74399ce0..168281f0 100644 --- a/test/helper/devices/TestPollIn.h +++ b/test/helper/devices/TestPollIn.h @@ -31,14 +31,10 @@ class PollIn : public FairMQDevice {} protected: - auto Init() -> void override - { - std::this_thread::sleep_for(std::chrono::milliseconds(200)); - } - auto InitTask() -> void override { fPollType = fConfig->GetValue("poll-type"); + std::this_thread::sleep_for(std::chrono::milliseconds(200)); } auto Run() -> void override diff --git a/test/helper/devices/TestPollOut.h b/test/helper/devices/TestPollOut.h index 4fa57061..04163ce3 100644 --- a/test/helper/devices/TestPollOut.h +++ b/test/helper/devices/TestPollOut.h @@ -22,7 +22,7 @@ namespace test class PollOut : public FairMQDevice { protected: - auto Init() -> void override + auto InitTask() -> void override { std::this_thread::sleep_for(std::chrono::milliseconds(200)); } diff --git a/test/helper/devices/TestPub.h b/test/helper/devices/TestPub.h index 04c939e0..4bb77a6f 100644 --- a/test/helper/devices/TestPub.h +++ b/test/helper/devices/TestPub.h @@ -24,7 +24,7 @@ namespace test class Pub : public FairMQDevice { protected: - auto Init() -> void override + auto InitTask() -> void override { std::this_thread::sleep_for(std::chrono::milliseconds(200)); } diff --git a/test/helper/devices/TestPull.h b/test/helper/devices/TestPull.h index fef9a10c..645c43ad 100644 --- a/test/helper/devices/TestPull.h +++ b/test/helper/devices/TestPull.h @@ -25,7 +25,7 @@ using namespace std; class Pull : public FairMQDevice { protected: - auto Init() -> void override + auto InitTask() -> void override { std::this_thread::sleep_for(std::chrono::milliseconds(200)); } diff --git a/test/helper/devices/TestPush.h b/test/helper/devices/TestPush.h index bb693cbf..f86e46dd 100644 --- a/test/helper/devices/TestPush.h +++ b/test/helper/devices/TestPush.h @@ -22,7 +22,7 @@ namespace test class Push : public FairMQDevice { protected: - auto Init() -> void override + auto InitTask() -> void override { std::this_thread::sleep_for(std::chrono::milliseconds(200)); } diff --git a/test/helper/devices/TestRep.h b/test/helper/devices/TestRep.h index 7e4e73b5..9bbca15d 100644 --- a/test/helper/devices/TestRep.h +++ b/test/helper/devices/TestRep.h @@ -23,7 +23,7 @@ namespace test class Rep : public FairMQDevice { protected: - auto Init() -> void override + auto InitTask() -> void override { std::this_thread::sleep_for(std::chrono::milliseconds(200)); } diff --git a/test/helper/devices/TestReq.h b/test/helper/devices/TestReq.h index 1748793b..c8c835d1 100644 --- a/test/helper/devices/TestReq.h +++ b/test/helper/devices/TestReq.h @@ -23,7 +23,7 @@ namespace test class Req : public FairMQDevice { protected: - auto Init() -> void override + auto InitTask() -> void override { std::this_thread::sleep_for(std::chrono::milliseconds(200)); } diff --git a/test/helper/devices/TestSub.h b/test/helper/devices/TestSub.h index 1e8192eb..893b2f44 100644 --- a/test/helper/devices/TestSub.h +++ b/test/helper/devices/TestSub.h @@ -24,7 +24,7 @@ namespace test class Sub : public FairMQDevice { protected: - auto Init() -> void override + auto InitTask() -> void override { std::this_thread::sleep_for(std::chrono::milliseconds(200)); } diff --git a/test/plugin_services/Fixture.h b/test/plugin_services/Fixture.h index db2f797f..81506e60 100644 --- a/test/plugin_services/Fixture.h +++ b/test/plugin_services/Fixture.h @@ -25,14 +25,18 @@ namespace test inline auto control(FairMQDevice& device) -> void { - for (const auto event : { - FairMQDevice::INIT_DEVICE, - FairMQDevice::RESET_DEVICE, - FairMQDevice::END, - }) { - device.ChangeState(event); - if (event != FairMQDevice::END) device.WaitForEndOfState(event); - } + device.ChangeState(fair::mq::Transition::InitDevice); + device.WaitForState(fair::mq::State::InitializingDevice); + device.ChangeState(fair::mq::Transition::CompleteInit); + device.WaitForState(fair::mq::State::Initialized); + device.ChangeState(fair::mq::Transition::Bind); + device.WaitForState(fair::mq::State::Bound); + device.ChangeState(fair::mq::Transition::Connect); + device.WaitForState(fair::mq::State::DeviceReady); + device.ChangeState(fair::mq::Transition::ResetDevice); + device.WaitForState(fair::mq::State::Idle); + + device.ChangeState(fair::mq::Transition::End); } struct PluginServices : ::testing::Test { @@ -48,7 +52,7 @@ struct PluginServices : ::testing::Test { ~PluginServices() { - if (mDevice.GetCurrentState() == FairMQDevice::IDLE) control(mDevice); + if (mDevice.GetCurrentState() == fair::mq::State::Idle) control(mDevice); if (fRunStateMachineThread.joinable()) { fRunStateMachineThread.join(); } diff --git a/test/plugin_services/_control.cxx b/test/plugin_services/_control.cxx index caeed251..6846d89b 100644 --- a/test/plugin_services/_control.cxx +++ b/test/plugin_services/_control.cxx @@ -38,11 +38,19 @@ TEST_F(PluginServices, OnlySingleController) ASSERT_NO_THROW(mServices.ChangeDeviceState("foo", DeviceStateTransition::InitDevice)); EXPECT_EQ(mServices.GetDeviceController(), string{"foo"}); + mDevice.WaitForState(fair::mq::State::InitializingDevice); + mServices.ChangeDeviceState("foo", DeviceStateTransition::CompleteInit); + mDevice.WaitForState(fair::mq::State::Initialized); + mServices.ChangeDeviceState("foo", DeviceStateTransition::Bind); + mDevice.WaitForState(fair::mq::State::Bound); + mServices.ChangeDeviceState("foo", DeviceStateTransition::Connect); + // park device - mDevice.WaitForEndOfState(FairMQDevice::DEVICE_READY); + mDevice.WaitForState(fair::mq::State::DeviceReady); mServices.ChangeDeviceState("foo", DeviceStateTransition::ResetDevice); - mDevice.WaitForEndOfState(FairMQDevice::RESET_DEVICE); + mDevice.WaitForState(fair::mq::State::Idle); mServices.ChangeDeviceState("foo", DeviceStateTransition::End); + mDevice.WaitForState(fair::mq::State::Exiting); } TEST_F(PluginServices, Control) @@ -66,14 +74,23 @@ TEST_F(PluginServices, Control) } }); + mDevice.WaitForState(fair::mq::State::InitializingDevice); + mServices.ChangeDeviceState("foo", DeviceStateTransition::CompleteInit); + mDevice.WaitForState(fair::mq::State::Initialized); + mServices.ChangeDeviceState("foo", DeviceStateTransition::Bind); + mDevice.WaitForState(fair::mq::State::Bound); + mServices.ChangeDeviceState("foo", DeviceStateTransition::Connect); + mDevice.WaitForState(fair::mq::State::DeviceReady); + unique_lock lock{cv_m}; - cv.wait(lock); + cv.wait(lock, [&]{ return nextState == DeviceState::DeviceReady; }); ASSERT_EQ(mServices.GetCurrentDeviceState(), DeviceState::DeviceReady); mServices.ChangeDeviceState("foo", DeviceStateTransition::ResetDevice); - mDevice.WaitForEndOfState(FairMQDevice::RESET_DEVICE); + mDevice.WaitForState(fair::mq::State::Idle); mServices.ChangeDeviceState("foo", DeviceStateTransition::End); + mDevice.WaitForState(fair::mq::State::Exiting); } TEST_F(PluginServices, ControlStateConversions) @@ -82,11 +99,12 @@ TEST_F(PluginServices, ControlStateConversions) EXPECT_NO_THROW(mServices.ToDeviceState("ERROR")); EXPECT_NO_THROW(mServices.ToDeviceState("IDLE")); EXPECT_NO_THROW(mServices.ToDeviceState("INITIALIZING DEVICE")); + EXPECT_NO_THROW(mServices.ToDeviceState("BINDING")); + EXPECT_NO_THROW(mServices.ToDeviceState("CONNECTING")); EXPECT_NO_THROW(mServices.ToDeviceState("DEVICE READY")); EXPECT_NO_THROW(mServices.ToDeviceState("INITIALIZING TASK")); EXPECT_NO_THROW(mServices.ToDeviceState("READY")); EXPECT_NO_THROW(mServices.ToDeviceState("RUNNING")); - EXPECT_NO_THROW(mServices.ToDeviceState("PAUSED")); EXPECT_NO_THROW(mServices.ToDeviceState("RESETTING TASK")); EXPECT_NO_THROW(mServices.ToDeviceState("RESETTING DEVICE")); EXPECT_NO_THROW(mServices.ToDeviceState("EXITING")); @@ -94,11 +112,12 @@ TEST_F(PluginServices, ControlStateConversions) EXPECT_NO_THROW(mServices.ToStr(DeviceState::Error)); EXPECT_NO_THROW(mServices.ToStr(DeviceState::Idle)); EXPECT_NO_THROW(mServices.ToStr(DeviceState::InitializingDevice)); + EXPECT_NO_THROW(mServices.ToStr(DeviceState::Binding)); + EXPECT_NO_THROW(mServices.ToStr(DeviceState::Connecting)); EXPECT_NO_THROW(mServices.ToStr(DeviceState::DeviceReady)); EXPECT_NO_THROW(mServices.ToStr(DeviceState::InitializingTask)); EXPECT_NO_THROW(mServices.ToStr(DeviceState::Ready)); EXPECT_NO_THROW(mServices.ToStr(DeviceState::Running)); - EXPECT_NO_THROW(mServices.ToStr(DeviceState::Paused)); EXPECT_NO_THROW(mServices.ToStr(DeviceState::ResettingTask)); EXPECT_NO_THROW(mServices.ToStr(DeviceState::ResettingDevice)); EXPECT_NO_THROW(mServices.ToStr(DeviceState::Exiting)); @@ -107,18 +126,18 @@ TEST_F(PluginServices, ControlStateConversions) TEST_F(PluginServices, ControlStateTransitionConversions) { EXPECT_NO_THROW(mServices.ToDeviceStateTransition("INIT DEVICE")); + EXPECT_NO_THROW(mServices.ToDeviceStateTransition("COMPLETE INIT")); EXPECT_NO_THROW(mServices.ToDeviceStateTransition("INIT TASK")); EXPECT_NO_THROW(mServices.ToDeviceStateTransition("RUN")); - EXPECT_NO_THROW(mServices.ToDeviceStateTransition("PAUSE")); EXPECT_NO_THROW(mServices.ToDeviceStateTransition("STOP")); EXPECT_NO_THROW(mServices.ToDeviceStateTransition("RESET TASK")); EXPECT_NO_THROW(mServices.ToDeviceStateTransition("RESET DEVICE")); EXPECT_NO_THROW(mServices.ToDeviceStateTransition("END")); EXPECT_NO_THROW(mServices.ToDeviceStateTransition("ERROR FOUND")); EXPECT_NO_THROW(mServices.ToStr(DeviceStateTransition::InitDevice)); + EXPECT_NO_THROW(mServices.ToStr(DeviceStateTransition::CompleteInit)); EXPECT_NO_THROW(mServices.ToStr(DeviceStateTransition::InitTask)); EXPECT_NO_THROW(mServices.ToStr(DeviceStateTransition::Run)); - EXPECT_NO_THROW(mServices.ToStr(DeviceStateTransition::Pause)); EXPECT_NO_THROW(mServices.ToStr(DeviceStateTransition::Stop)); EXPECT_NO_THROW(mServices.ToStr(DeviceStateTransition::ResetTask)); EXPECT_NO_THROW(mServices.ToStr(DeviceStateTransition::ResetDevice)); diff --git a/test/plugins/_plugin.cxx b/test/plugins/_plugin.cxx index 5f2ce1e3..7f9a2751 100644 --- a/test/plugins/_plugin.cxx +++ b/test/plugins/_plugin.cxx @@ -27,14 +27,19 @@ using namespace fair::mq; auto control(FairMQDevice& device) -> void { device.SetTransport("zeromq"); - for (const auto event : { - FairMQDevice::INIT_DEVICE, - FairMQDevice::RESET_DEVICE, - FairMQDevice::END, - }) { - device.ChangeState(event); - if (event != FairMQDevice::END) device.WaitForEndOfState(event); - } + + device.ChangeState(fair::mq::Transition::InitDevice); + device.WaitForState(fair::mq::State::InitializingDevice); + device.ChangeState(fair::mq::Transition::CompleteInit); + device.WaitForState(fair::mq::State::Initialized); + device.ChangeState(fair::mq::Transition::Bind); + device.WaitForState(fair::mq::State::Bound); + device.ChangeState(fair::mq::Transition::Connect); + device.WaitForState(fair::mq::State::DeviceReady); + device.ChangeState(fair::mq::Transition::ResetDevice); + device.WaitForState(fair::mq::State::Idle); + + device.ChangeState(fair::mq::Transition::End); } TEST(Plugin, Operators) diff --git a/test/plugins/_plugin_manager.cxx b/test/plugins/_plugin_manager.cxx index 75e6f8bc..c1dc52a0 100644 --- a/test/plugins/_plugin_manager.cxx +++ b/test/plugins/_plugin_manager.cxx @@ -28,14 +28,19 @@ using namespace std; auto control(FairMQDevice& device) -> void { device.SetTransport("zeromq"); - for (const auto event : { - FairMQDevice::INIT_DEVICE, - FairMQDevice::RESET_DEVICE, - FairMQDevice::END, - }) { - device.ChangeState(event); - if (event != FairMQDevice::END) device.WaitForEndOfState(event); - } + + device.ChangeState(fair::mq::Transition::InitDevice); + device.WaitForState(fair::mq::State::InitializingDevice); + device.ChangeState(fair::mq::Transition::CompleteInit); + device.WaitForState(fair::mq::State::Initialized); + device.ChangeState(fair::mq::Transition::Bind); + device.WaitForState(fair::mq::State::Bound); + device.ChangeState(fair::mq::Transition::Connect); + device.WaitForState(fair::mq::State::DeviceReady); + device.ChangeState(fair::mq::Transition::ResetDevice); + device.WaitForState(fair::mq::State::Idle); + + device.ChangeState(fair::mq::Transition::End); } TEST(PluginManager, LoadPluginDynamic)