From 5d535163f192113a4a9da8e036bdc35058d8228b Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Wed, 24 Jul 2019 14:16:35 +0200 Subject: [PATCH] SDK: Add test for timeout, concurrent call. Implement TODOs --- fairmq/sdk/Topology.cxx | 79 ++++++++++++++++++++++------------------- fairmq/sdk/Topology.h | 38 +++++++++++++++++--- test/sdk/_topology.cxx | 66 ++++++++++++++++++++-------------- 3 files changed, 116 insertions(+), 67 deletions(-) diff --git a/fairmq/sdk/Topology.cxx b/fairmq/sdk/Topology.cxx index f1d5de4e..58a70400 100644 --- a/fairmq/sdk/Topology.cxx +++ b/fairmq/sdk/Topology.cxx @@ -39,26 +39,28 @@ auto operator<<(std::ostream& os, AsyncOpResultCode v) -> std::ostream& auto operator<<(std::ostream& os, AsyncOpResult v) -> std::ostream& { - (void)(os << "[" << v.code << "]"); - if (v.msg.empty()) { - (void)(os << " " << v.msg); + os << "[" << v.code << "]"; + if (!v.msg.empty()) { + os << " " << v.msg; } return os; } namespace sdk { -const std::unordered_map> - expectedState = {{Transition::InitDevice, DeviceState::InitializingDevice}, - {Transition::CompleteInit, DeviceState::Initialized}, - {Transition::Bind, DeviceState::Bound}, - {Transition::Connect, DeviceState::DeviceReady}, - {Transition::InitTask, DeviceState::InitializingTask}, - {Transition::Run, DeviceState::Running}, - {Transition::Stop, DeviceState::Ready}, - {Transition::ResetTask, DeviceState::DeviceReady}, - {Transition::ResetDevice, DeviceState::Idle}, - {Transition::End, DeviceState::Exiting}}; +const std::unordered_map> expectedState = +{ + { Transition::InitDevice, DeviceState::InitializingDevice }, + { Transition::CompleteInit, DeviceState::Initialized }, + { Transition::Bind, DeviceState::Bound }, + { Transition::Connect, DeviceState::DeviceReady }, + { Transition::InitTask, DeviceState::InitializingTask }, + { Transition::Run, DeviceState::Running }, + { Transition::Stop, DeviceState::Ready }, + { Transition::ResetTask, DeviceState::DeviceReady }, + { Transition::ResetDevice, DeviceState::Idle }, + { Transition::End, DeviceState::Exiting } +}; Topology::Topology(DDSTopology topo, DDSSession session) : fDDSSession(std::move(session)) @@ -70,8 +72,8 @@ Topology::Topology(DDSTopology topo, DDSSession session) { std::vector deviceList = fDDSTopo.GetDeviceList(); for (const auto& d : deviceList) { - LOG(info) << "fair::mq::Topology Adding device " << d; - fTopologyState.emplace(d, DeviceStatus{ false, DeviceState::Ok }); + LOG(info) << "Adding device " << d; + fState.emplace(d, DeviceStatus{ false, DeviceState::Ok }); } fDDSSession.SubscribeToCommands([this](const std::string& msg, const std::string& /* condition */, uint64_t senderId) { LOG(debug) << "Received from " << senderId << ": " << msg; @@ -83,7 +85,6 @@ Topology::Topology(DDSTopology topo, DDSSession session) } if (parts[0] == "state-change") { - boost::trim(parts[3]); AddNewStateEntry(std::stoull(parts[2]), parts[3]); } else if (parts[0] == "state-changes-subscription") { if (parts[2] != "OK") { @@ -104,10 +105,12 @@ Topology::Topology(DDSTopology topo, DDSSession session) auto Topology::ChangeState(TopologyTransition transition, ChangeStateCallback cb, Duration timeout) -> void { { - std::lock_guard guard(fMtx); + std::unique_lock lock(fMtx); if (fStateChangeOngoing) { - LOG(error) << "State change already in progress, concurrent requested not yet supported"; - return; // TODO call the callback with error msg + LOG(error) << "A state change request is already in progress, concurrent requests are currently not supported"; + lock.unlock(); + cb({{AsyncOpResultCode::Error, "A state change request is already in progress, concurrent requests are currently not supported"}, fState}); + return; } LOG(info) << "Initiating ChangeState with " << transition << " to " << expectedState.at(transition); fStateChangeOngoing = true; @@ -143,38 +146,42 @@ void Topology::WaitForState() auto condition = [&] { // LOG(info) << "checking condition"; // LOG(info) << "fShutdown: " << fShutdown; - // LOG(info) << "condition: " << std::all_of(fTopologyState.cbegin(), fTopologyState.cend(), [&](TopologyState::value_type i) { return i.second.state == fTargetState; }); - return fShutdown || std::all_of(fTopologyState.cbegin(), fTopologyState.cend(), [&](TopologyState::value_type i) { + // LOG(info) << "condition: " << std::all_of(fState.cbegin(), fState.cend(), [&](TopologyState::value_type i) { return i.second.state == fTargetState; }); + return fShutdown || std::all_of(fState.cbegin(), fState.cend(), [&](TopologyState::value_type i) { return i.second.state == fTargetState; }); }; std::unique_lock lock(fMtx); - // TODO Fix the timeout version if (fStateChangeTimeout > std::chrono::milliseconds(0)) { - LOG(debug) << "initiating wait with timeout"; if (!fCV.wait_for(lock, fStateChangeTimeout, condition)) { - LOG(debug) << "timeout"; + // LOG(debug) << "timeout"; fStateChangeOngoing = false; + TopologyState state = fState; + lock.unlock(); + fChangeStateCallback({{AsyncOpResultCode::Timeout, "timeout"}, std::move(state)}); break; } } else { - LOG(debug) << "initiating wait without timeout"; fCV.wait(lock, condition); } fStateChangeOngoing = false; if (fShutdown) { - // TODO call the callback here with Aborted result + LOG(debug) << "Aborting because a shutdown was requested"; + TopologyState state = fState; + lock.unlock(); + fChangeStateCallback({{AsyncOpResultCode::Aborted, "Aborted because a shutdown was requested"}, std::move(state)}); break; } - } catch(std::exception& e) { + } catch (std::exception& e) { + fStateChangeOngoing = false; LOG(error) << "Error while processing state request: " << e.what(); - fChangeStateCallback({{AsyncOpResultCode::Error, ""}, fTopologyState}); + fChangeStateCallback({{AsyncOpResultCode::Error, tools::ToString("Exception thrown: ", e.what())}, fState}); } - fChangeStateCallback({{AsyncOpResultCode::Ok, ""}, fTopologyState}); + fChangeStateCallback({{AsyncOpResultCode::Ok, "success"}, fState}); } else { std::unique_lock lock(fExecutionMtx); fExecutionCV.wait(lock); @@ -191,14 +198,14 @@ void Topology::AddNewStateEntry(uint64_t senderId, const std::string& state) { try { std::unique_lock lock(fMtx); - fTopologyState[senderId] = DeviceStatus{ true, fair::mq::GetState(endState) }; - } catch(const std::exception& e) { + fState[senderId] = DeviceStatus{ true, fair::mq::GetState(endState) }; + } catch (const std::exception& e) { LOG(error) << "Exception in AddNewStateEntry: " << e.what(); } - // LOG(info) << "fTopologyState after update: "; - // for (auto& e : fTopologyState) { - // LOG(info) << e.first << ": " << e.second.state; + // LOG(info) << "fState after update: "; + // for (auto& e : fState) { + // LOG(info) << e.first << ": " << e.second.state; // } } fCV.notify_one(); @@ -208,7 +215,7 @@ Topology::~Topology() { fDDSSession.UnsubscribeFromCommands(); { - std::lock_guard guard(fMtx); + std::lock_guard guard(fExecutionMtx); fShutdown = true; } fExecutionCV.notify_one(); diff --git a/fairmq/sdk/Topology.h b/fairmq/sdk/Topology.h index 56b1e32c..3ae5fa22 100644 --- a/fairmq/sdk/Topology.h +++ b/fairmq/sdk/Topology.h @@ -22,11 +22,13 @@ #include #include #include +#include namespace fair { namespace mq { -enum class AsyncOpResultCode { +enum class AsyncOpResultCode +{ Ok, Timeout, Error, @@ -57,6 +59,26 @@ struct DeviceStatus using TopologyState = std::unordered_map; using TopologyTransition = fair::mq::Transition; +struct MixedState : std::runtime_error { using std::runtime_error::runtime_error; }; + +DeviceState AggregateState(const TopologyState& topologyState) +{ + DeviceState first = topologyState.begin()->second.state; + + if (std::all_of(topologyState.cbegin(), topologyState.cend(), [&](TopologyState::value_type i) { + return i.second.state == first; + })) { + return first; + } else { + throw MixedState("State is not uniform"); + } +} + +bool StateEqualsTo(const TopologyState& topologyState, DeviceState state) +{ + return AggregateState(topologyState) == state; +} + /** * @class Topology Topology.h * @brief Represents a FairMQ topology @@ -95,14 +117,22 @@ class Topology /// @return The result of the state transition auto ChangeState(TopologyTransition t, Duration timeout = std::chrono::milliseconds(0)) -> ChangeStateResult; + /// @brief Returns the current state of the topology + /// @return map of id : DeviceStatus (initialized, state) + TopologyState GetCurrentState() const { std::lock_guard guard(fMtx); return fState; } + + DeviceState AggregateState() { return sdk::AggregateState(fState); } + + bool StateEqualsTo(DeviceState state) { return sdk::StateEqualsTo(fState, state); } + private: DDSSession fDDSSession; DDSTopology fDDSTopo; - TopologyState fTopologyState; + TopologyState fState; bool fStateChangeOngoing; DeviceState fTargetState; - std::mutex fMtx; - std::mutex fExecutionMtx; + mutable std::mutex fMtx; + mutable std::mutex fExecutionMtx; std::condition_variable fCV; std::condition_variable fExecutionCV; std::thread fExecutionThread; diff --git a/test/sdk/_topology.cxx b/test/sdk/_topology.cxx index 109cf79d..a9542d08 100644 --- a/test/sdk/_topology.cxx +++ b/test/sdk/_topology.cxx @@ -21,26 +21,24 @@ TEST_F(Topology, Construction) fair::mq::sdk::Topology topo(mDDSTopo, mDDSSession); } -TEST_F(Topology, ChangeState_async1) +TEST_F(Topology, ChangeStateAsync) { using fair::mq::sdk::Topology; using fair::mq::sdk::TopologyTransition; Topology topo(mDDSTopo, mDDSSession); fair::mq::tools::Semaphore blocker; - topo.ChangeState(TopologyTransition::Stop, [&blocker](Topology::ChangeStateResult result) { + topo.ChangeState(TopologyTransition::Stop, [&blocker, &topo](Topology::ChangeStateResult result) { LOG(info) << result; EXPECT_EQ(result.rc, fair::mq::AsyncOpResultCode::Ok); - // TODO add the helper to check state consistency - for (const auto& e : result.state) { - EXPECT_EQ(e.second.state, fair::mq::sdk::DeviceState::Ready); - } + EXPECT_NO_THROW(fair::mq::sdk::AggregateState(result.state)); + EXPECT_EQ(fair::mq::sdk::StateEqualsTo(result.state, fair::mq::sdk::DeviceState::Ready), true); blocker.Signal(); }); blocker.Wait(); } -TEST_F(Topology, ChangeState_sync) +TEST_F(Topology, ChangeStateSync) { using fair::mq::sdk::Topology; using fair::mq::sdk::TopologyTransition; @@ -49,27 +47,41 @@ TEST_F(Topology, ChangeState_sync) auto result(topo.ChangeState(TopologyTransition::Stop)); EXPECT_EQ(result.rc, fair::mq::AsyncOpResultCode::Ok); - // TODO add the helper to check state consistency - for (const auto& e : result.state) { - EXPECT_EQ(e.second.state, fair::mq::sdk::DeviceState::Ready); - } + EXPECT_NO_THROW(fair::mq::sdk::AggregateState(result.state)); + EXPECT_EQ(fair::mq::sdk::StateEqualsTo(result.state, fair::mq::sdk::DeviceState::Ready), true); } -// TEST_F(Topology, Timeout) -// { -// using fair::mq::sdk::Topology; -// using fair::mq::sdk::TopologyTransition; -// Topology topo(mDDSTopo, mDDSSession); -// Topology::ChangeStateResult r; -// fair::mq::tools::Semaphore blocker; -// topo.ChangeState(TopologyTransition::End, [&](Topology::ChangeStateResult result) { -// LOG(info) << result; -// blocker.Signal(); -// }, std::chrono::milliseconds(100)); -// blocker.Wait(); -// for (const auto& e : r.rc) { -// EXPECT_EQ(e.second.state, fair::mq::sdk::DeviceState::Ok); -// } -// } +TEST_F(Topology, ChangeStateConcurrent) +{ + using fair::mq::sdk::Topology; + using fair::mq::sdk::TopologyTransition; + + Topology topo(mDDSTopo, mDDSSession); + fair::mq::tools::Semaphore blocker; + topo.ChangeState(TopologyTransition::Stop, [&blocker](Topology::ChangeStateResult result) { + LOG(info) << "result for valid ChangeState: " << result; + blocker.Signal(); + }); + topo.ChangeState(TopologyTransition::Stop, [&blocker](Topology::ChangeStateResult result) { + LOG(info) << "result for invalid ChangeState: " << result; + EXPECT_EQ(result.rc, fair::mq::AsyncOpResultCode::Error); + }); + blocker.Wait(); +} + +TEST_F(Topology, ChangeStateTimeout) +{ + using fair::mq::sdk::Topology; + using fair::mq::sdk::TopologyTransition; + + Topology topo(mDDSTopo, mDDSSession); + fair::mq::tools::Semaphore blocker; + topo.ChangeState(TopologyTransition::End, [&](Topology::ChangeStateResult result) { + LOG(info) << result; + EXPECT_EQ(result.rc, fair::mq::AsyncOpResultCode::Timeout); + blocker.Signal(); + }, std::chrono::milliseconds(1)); + blocker.Wait(); +} } // namespace