From 5256e7c58088bad5902df677068c96985f3a10b6 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Mon, 15 Jul 2019 15:48:36 +0200 Subject: [PATCH] Add support and test for concurrent TransitionTo --- fairmq/FairMQDevice.cxx | 19 ++++++++++++++++ fairmq/FairMQDevice.h | 11 +++++++++ test/device/_transitions.cxx | 44 +++++++++++++++++++++++++++++++++++- 3 files changed, 73 insertions(+), 1 deletion(-) diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index e3a47980..72671230 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -106,6 +106,11 @@ FairMQDevice::FairMQDevice(ProgOptions* config, const tools::Version version) , fMaxRunRuntimeInS(DefaultMaxRunTime) , fInitializationTimeoutInS(DefaultInitTimeout) , fRawCmdLineArgs() + , fStates() + , fStatesMtx() + , fStatesCV() + , fTransitionMtx() + , fTransitioning(false) { SubscribeToNewTransition("device", [&](Transition transition) { LOG(trace) << "device notified on new transition: " << transition; @@ -187,6 +192,15 @@ void FairMQDevice::WaitForState(fair::mq::State state) void FairMQDevice::TransitionTo(const fair::mq::State s) { + { + lock_guard lock(fTransitionMtx); + if (fTransitioning) { + LOG(debug) << "Attempting a transition with TransitionTo() while another one is already in progress"; + throw OngoingTransition("Attempting a transition with TransitionTo() while another one is already in progress"); + } + fTransitioning = true; + } + using fair::mq::State; State currentState = GetCurrentState(); @@ -225,6 +239,11 @@ void FairMQDevice::TransitionTo(const fair::mq::State s) currentState = WaitForNextState(); } + + { + lock_guard lock(fTransitionMtx); + fTransitioning = false; + } } bool FairMQDevice::ChangeState(const int transition) diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index cc7c9ad3..87060d7d 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -44,6 +44,14 @@ using FairMQChannelMap = std::unordered_map; using InputMultipartCallback = std::function; +namespace fair +{ +namespace mq +{ +struct OngoingTransition : std::runtime_error { using std::runtime_error::runtime_error; }; +} +} + class FairMQDevice { friend class FairMQChannel; @@ -584,6 +592,9 @@ class FairMQDevice std::queue fStates; std::mutex fStatesMtx; std::condition_variable fStatesCV; + + std::mutex fTransitionMtx; + bool fTransitioning; }; #endif /* FAIRMQDEVICE_H_ */ diff --git a/test/device/_transitions.cxx b/test/device/_transitions.cxx index a8d718c2..e07ce97d 100644 --- a/test/device/_transitions.cxx +++ b/test/device/_transitions.cxx @@ -7,6 +7,7 @@ ********************************************************************************/ #include +#include #include @@ -19,7 +20,19 @@ namespace using namespace std; using namespace fair::mq; -void transitionTo(const std::vector& states, int numExpectedStates) +class SlowDevice : public FairMQDevice +{ + public: + SlowDevice() {} + + protected: + void Init() + { + this_thread::sleep_for(chrono::milliseconds(100)); + } +}; + +void transitionTo(const vector& states, int numExpectedStates) { FairMQDevice device; @@ -55,4 +68,33 @@ TEST(Transitions, TransitionTo) transitionTo({State::Running, State::Exiting}, 16); } +TEST(Transitions, ConcurrentTransitionTos) +{ + fair::Logger::SetConsoleSeverity("debug"); + SlowDevice slowDevice; + + vector states({State::Ready, State::Exiting}); + + thread t1([&] { + for (const auto& s : states) { + slowDevice.TransitionTo(s); + } + }); + + thread t2([&] { + this_thread::sleep_for(chrono::milliseconds(50)); + ASSERT_THROW(slowDevice.TransitionTo(State::Exiting), OngoingTransition); + }); + + slowDevice.RunStateMachine(); + + if (t1.joinable()) { + t1.join(); + } + + if (t2.joinable()) { + t2.join(); + } +} + } // namespace