From 9093ed82dcbfdcb8110e314eb3920877752dc126 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Wed, 22 Feb 2023 14:01:23 +0100 Subject: [PATCH] Resume/Interrupt transports consistently - Resume transports before state callbacks & handlers - Interrupt transports on new transitions --- fairmq/Device.cxx | 16 +++++----------- fairmq/StateMachine.cxx | 22 ++++++++++++++++++++++ fairmq/StateMachine.h | 1 + 3 files changed, 28 insertions(+), 11 deletions(-) diff --git a/fairmq/Device.cxx b/fairmq/Device.cxx index f763d36f..8008f4be 100644 --- a/fairmq/Device.cxx +++ b/fairmq/Device.cxx @@ -92,14 +92,12 @@ Device::Device(ProgOptions* config, tools::Version version) { SubscribeToNewTransition("device", [&](Transition transition) { LOG(trace) << "device notified on new transition: " << transition; + InterruptTransports(); + }); - switch (transition) { - case Transition::Stop: - InterruptTransports(); - break; - default: - break; - } + fStateMachine.PrepareState([&](State state) { + LOG(trace) << "Resuming transports for " << state << " state"; + ResumeTransports(); }); fStateMachine.HandleStates([&](State state) { @@ -462,9 +460,6 @@ void Device::RunWrapper() if (rateLogging && rateLogger->joinable()) { rateLogger->join(); } }); - // notify transports to resume transfers - ResumeTransports(); - // change to Error state in case of an exception, to release LogSocketRates tools::CallOnDestruction cod([&](){ ChangeState(Transition::ErrorFound); @@ -494,7 +489,6 @@ void Device::RunWrapper() // if Run() exited and the state is still RUNNING, transition to READY. if (!NewStatePending()) { - InterruptTransports(); ChangeState(Transition::Stop); } diff --git a/fairmq/StateMachine.cxx b/fairmq/StateMachine.cxx index 140c29f8..56668dea 100644 --- a/fairmq/StateMachine.cxx +++ b/fairmq/StateMachine.cxx @@ -157,6 +157,13 @@ struct Machine_ : public state_machine_def } } + void CallStatePrep(const State state) const + { + if (!fStatePrepSignal.empty()) { + fStatePrepSignal(state); + } + } + void CallNewTransitionCallbacks(const Transition transition) const { if (!fNewTransitionSignal.empty()) { @@ -175,6 +182,7 @@ struct Machine_ : public state_machine_def boost::signals2::signal fStateChangeSignal; boost::signals2::signal fStateHandleSignal; + boost::signals2::signal fStatePrepSignal; boost::signals2::signal fNewTransitionSignal; unordered_map fStateChangeSignalsMap; unordered_map fNewTransitionSignalsMap; @@ -198,6 +206,7 @@ struct Machine_ : public state_machine_def } } + CallStatePrep(fState); CallStateChangeCallbacks(fState); CallStateHandler(fState); } @@ -313,6 +322,16 @@ void StateMachine::UnsubscribeFromStateChange(const string& key) } } +void StateMachine::PrepareState(std::function callback) +{ + auto fsm = static_pointer_cast(fFsm); + if (fsm->fStatePrepSignal.empty()) { + fsm->fStatePrepSignal.connect(callback); + } else { + LOG(error) << "state preparation handler is already set"; + } +} + void StateMachine::HandleStates(function callback) { auto fsm = static_pointer_cast(fFsm); @@ -326,6 +345,9 @@ void StateMachine::HandleStates(function callback) void StateMachine::StopHandlingStates() { auto fsm = static_pointer_cast(fFsm); + if (!fsm->fStatePrepSignal.empty()) { + fsm->fStatePrepSignal.disconnect_all_slots(); + } if (!fsm->fStateHandleSignal.empty()) { fsm->fStateHandleSignal.disconnect_all_slots(); } diff --git a/fairmq/StateMachine.h b/fairmq/StateMachine.h index d95e2636..03b5b028 100644 --- a/fairmq/StateMachine.h +++ b/fairmq/StateMachine.h @@ -35,6 +35,7 @@ class StateMachine void SubscribeToStateChange(const std::string& key, std::function callback); void UnsubscribeFromStateChange(const std::string& key); + void PrepareState(std::function callback); void HandleStates(std::function callback); void StopHandlingStates();