From d70a203449bca952130883843375a094510cd3f2 Mon Sep 17 00:00:00 2001 From: Dennis Klein Date: Wed, 24 Jul 2019 10:41:08 +0200 Subject: [PATCH] SDK: Add sync ChangeState and add msg to its result --- fairmq/sdk/Topology.cxx | 73 ++++++++++++++++++++++++++-------------- fairmq/sdk/Topology.h | 29 +++++++++++++--- fairmq/sdk/runFairMQ.cxx | 3 +- test/sdk/_topology.cxx | 26 ++++++++++---- test/sdk/test_topo.xml | 6 ++-- 5 files changed, 96 insertions(+), 41 deletions(-) diff --git a/fairmq/sdk/Topology.cxx b/fairmq/sdk/Topology.cxx index 98de02bb..9d1fc766 100644 --- a/fairmq/sdk/Topology.cxx +++ b/fairmq/sdk/Topology.cxx @@ -22,43 +22,50 @@ namespace fair { namespace mq { -auto operator<<(std::ostream& os, AsyncOpResult v) -> std::ostream& +auto operator<<(std::ostream& os, AsyncOpResultCode v) -> std::ostream& { switch (v) { - case AsyncOpResult::Aborted: + case AsyncOpResultCode::Aborted: return os << "Aborted"; - case AsyncOpResult::Timeout: + case AsyncOpResultCode::Timeout: return os << "Timeout"; - case AsyncOpResult::Error: + case AsyncOpResultCode::Error: return os << "Error"; - case AsyncOpResult::Ok: + case AsyncOpResultCode::Ok: default: return os << "Ok"; } } +auto operator<<(std::ostream& os, AsyncOpResult v) -> std::ostream& +{ + (void)(os << "[" << v.code << "]"); + if (v.msg.empty()) { + (void)(os << " " << v.msg); + } + return os; +} + namespace sdk { -const std::unordered_map> Topology::fkExpectedState = -{ - { Transition::InitDevice, DeviceState::InitializingDevice }, - { Transition::CompleteInit, DeviceState::Initialized }, - { Transition::Bind, DeviceState::Bound }, - { Transition::Connect, DeviceState::DeviceReady }, - { Transition::InitTask, DeviceState::InitializingTask }, - { Transition::Run, DeviceState::Running }, - { Transition::Stop, DeviceState::Ready }, - { Transition::ResetTask, DeviceState::DeviceReady }, - { Transition::ResetDevice, DeviceState::Idle }, - { Transition::End, DeviceState::Exiting } -}; +const std::unordered_map> + expectedState = {{Transition::InitDevice, DeviceState::InitializingDevice}, + {Transition::CompleteInit, DeviceState::Initialized}, + {Transition::Bind, DeviceState::Bound}, + {Transition::Connect, DeviceState::DeviceReady}, + {Transition::InitTask, DeviceState::InitializingTask}, + {Transition::Run, DeviceState::Running}, + {Transition::Stop, DeviceState::Ready}, + {Transition::ResetTask, DeviceState::DeviceReady}, + {Transition::ResetDevice, DeviceState::Idle}, + {Transition::End, DeviceState::Exiting}}; Topology::Topology(DDSTopology topo, DDSSession session) : fDDSSession(std::move(session)) , fDDSTopo(std::move(topo)) - , fTopologyState() , fStateChangeOngoing(false) - , fExecutionThread() + , fTargetState(DeviceState::Idle) + , fStateChangeTimeout(0) , fShutdown(false) { std::vector deviceList = fDDSTopo.GetDeviceList(); @@ -73,7 +80,6 @@ Topology::Topology(DDSTopology topo, DDSSession session) for (unsigned int i = 0; i < parts.size(); ++i) { boost::trim(parts.at(i)); - LOG(info) << "parts[" << i << "]: " << parts.at(i); } if (parts[0] == "state-change") { @@ -95,7 +101,7 @@ Topology::Topology(DDSTopology topo, DDSSession session) fExecutionThread = std::thread(&Topology::WaitForState, this); } -auto Topology::ChangeState(fair::mq::Transition transition, ChangeStateCallback cb, std::chrono::milliseconds timeout) -> void +auto Topology::ChangeState(TopologyTransition transition, ChangeStateCallback cb, Duration timeout) -> void { { std::lock_guard guard(fMtx); @@ -103,17 +109,32 @@ auto Topology::ChangeState(fair::mq::Transition transition, ChangeStateCallback LOG(error) << "State change already in progress, concurrent requested not yet supported"; return; // TODO call the callback with error msg } - LOG(info) << "Initiating ChangeState with " << transition << " to " << fkExpectedState.at(transition); + LOG(info) << "Initiating ChangeState with " << transition << " to " << expectedState.at(transition); fStateChangeOngoing = true; fChangeStateCallback = cb; fStateChangeTimeout = timeout; - fTargetState = fkExpectedState.at(transition); + fTargetState = expectedState.at(transition); fDDSSession.SendCommand(GetTransitionName(transition)); } fExecutionCV.notify_one(); } +auto Topology::ChangeState(TopologyTransition t, Duration timeout) -> ChangeStateResult +{ + fair::mq::tools::Semaphore blocker; + ChangeStateResult res; + ChangeState( + t, + [&blocker, &res](Topology::ChangeStateResult _res) { + res = _res; + blocker.Signal(); + }, + timeout); + blocker.Wait(); + return res; +} + void Topology::WaitForState() { while (!fShutdown) { @@ -150,10 +171,10 @@ void Topology::WaitForState() } } catch(std::exception& e) { LOG(error) << "Error while processing state request: " << e.what(); - fChangeStateCallback(ChangeStateResult{AsyncOpResult::Error, fTopologyState}); + fChangeStateCallback({{AsyncOpResultCode::Error, ""}, fTopologyState}); } - fChangeStateCallback(ChangeStateResult{AsyncOpResult::Ok, fTopologyState}); + fChangeStateCallback({{AsyncOpResultCode::Ok, ""}, fTopologyState}); } else { std::unique_lock lock(fExecutionMtx); fExecutionCV.wait(lock); diff --git a/fairmq/sdk/Topology.h b/fairmq/sdk/Topology.h index 47efb9fb..e02d5a65 100644 --- a/fairmq/sdk/Topology.h +++ b/fairmq/sdk/Topology.h @@ -26,13 +26,21 @@ namespace fair { namespace mq { -// TODO make this a struct with a readable string error msg -enum class AsyncOpResult { +enum class AsyncOpResultCode { Ok, Timeout, Error, Aborted }; +auto operator<<(std::ostream& os, AsyncOpResultCode v) -> std::ostream&; + +using AsyncOpResultMessage = std::string; + +struct AsyncOpResult { + AsyncOpResultCode code; + AsyncOpResultMessage msg; + operator AsyncOpResultCode() const { return code; } +}; auto operator<<(std::ostream& os, AsyncOpResult v) -> std::ostream&; namespace sdk { @@ -59,6 +67,12 @@ class Topology /// @brief (Re)Construct a FairMQ topology from an existing DDS topology /// @param topo Initialized DDS CTopology explicit Topology(DDSTopology topo, DDSSession session = DDSSession()); + + explicit Topology(const Topology&) = delete; + Topology& operator=(const Topology&) = delete; + explicit Topology(Topology&&) = delete; + Topology& operator=(Topology&&) = delete; + ~Topology(); struct ChangeStateResult { @@ -67,14 +81,21 @@ class Topology friend auto operator<<(std::ostream& os, ChangeStateResult v) -> std::ostream&; }; using ChangeStateCallback = std::function; + using Duration = std::chrono::milliseconds; /// @brief Initiate state transition on all FairMQ devices in this topology /// @param t FairMQ device state machine transition /// @param cb Completion callback - auto ChangeState(TopologyTransition t, ChangeStateCallback cb, std::chrono::milliseconds timeout = std::chrono::milliseconds(0)) -> void; + /// @param timeout Timeout in milliseconds, 0 means no timeout + auto ChangeState(TopologyTransition t, ChangeStateCallback cb, Duration timeout = std::chrono::milliseconds(0)) -> void; - static const std::unordered_map> fkExpectedState; + /// @brief Perform a state transition on all FairMQ devices in this topology + /// @param t FairMQ device state machine transition + /// @param timeout Timeout in milliseconds, 0 means no timeout + /// @return The result of the state transition + auto ChangeState(TopologyTransition t, Duration timeout = std::chrono::milliseconds(0)) -> ChangeStateResult; + private: DDSSession fDDSSession; DDSTopology fDDSTopo; diff --git a/fairmq/sdk/runFairMQ.cxx b/fairmq/sdk/runFairMQ.cxx index a874a88b..b88c8d0e 100644 --- a/fairmq/sdk/runFairMQ.cxx +++ b/fairmq/sdk/runFairMQ.cxx @@ -6,8 +6,7 @@ * copied verbatim in the file "LICENSE" * ********************************************************************************/ -#include -#include +#include #include int main(int /*argc*/, char ** /*argv*/) diff --git a/test/sdk/_topology.cxx b/test/sdk/_topology.cxx index 8b67b6ba..109cf79d 100644 --- a/test/sdk/_topology.cxx +++ b/test/sdk/_topology.cxx @@ -21,27 +21,39 @@ TEST_F(Topology, Construction) fair::mq::sdk::Topology topo(mDDSTopo, mDDSSession); } -TEST_F(Topology, ChangeState) +TEST_F(Topology, ChangeState_async1) { using fair::mq::sdk::Topology; using fair::mq::sdk::TopologyTransition; Topology topo(mDDSTopo, mDDSSession); - Topology::ChangeStateResult r; fair::mq::tools::Semaphore blocker; - topo.ChangeState(TopologyTransition::Stop, [&](Topology::ChangeStateResult result) { + topo.ChangeState(TopologyTransition::Stop, [&blocker](Topology::ChangeStateResult result) { LOG(info) << result; - r = result; + EXPECT_EQ(result.rc, fair::mq::AsyncOpResultCode::Ok); + // TODO add the helper to check state consistency + for (const auto& e : result.state) { + EXPECT_EQ(e.second.state, fair::mq::sdk::DeviceState::Ready); + } blocker.Signal(); }); blocker.Wait(); - EXPECT_EQ(r.rc, fair::mq::AsyncOpResult::Ok); +} + +TEST_F(Topology, ChangeState_sync) +{ + using fair::mq::sdk::Topology; + using fair::mq::sdk::TopologyTransition; + + Topology topo(mDDSTopo, mDDSSession); + auto result(topo.ChangeState(TopologyTransition::Stop)); + + EXPECT_EQ(result.rc, fair::mq::AsyncOpResultCode::Ok); // TODO add the helper to check state consistency - for (const auto& e : r.state) { + for (const auto& e : result.state) { EXPECT_EQ(e.second.state, fair::mq::sdk::DeviceState::Ready); } } - // TEST_F(Topology, Timeout) // { // using fair::mq::sdk::Topology; diff --git a/test/sdk/test_topo.xml b/test/sdk/test_topo.xml index 0a83d889..c8ae0bcc 100644 --- a/test/sdk/test_topo.xml +++ b/test/sdk/test_topo.xml @@ -16,7 +16,7 @@ - fairmq-sink --id sink --color false --channel-config name=data,type=pull,method=connect -P dds + fairmq-sink --id sink_%taskIndex% --color false --channel-config name=data,type=pull,method=connect -P dds SinkWorker @@ -27,7 +27,9 @@
Sampler - Sink + + Sink +