From 7a0d348bd49fce80b422f51a73ff652e3cc87a30 Mon Sep 17 00:00:00 2001 From: Dennis Klein Date: Tue, 20 Aug 2019 18:57:55 +0200 Subject: [PATCH] SDK: Implement Topology with asio-compliant async interface --- fairmq/sdk/CMakeLists.txt | 1 - fairmq/sdk/Topology.cxx | 261 +----------------------------- fairmq/sdk/Topology.h | 328 ++++++++++++++++++++++++++++++-------- test/sdk/_async_op.cxx | 18 +-- test/sdk/_topology.cxx | 145 +++++++++-------- 5 files changed, 362 insertions(+), 391 deletions(-) diff --git a/fairmq/sdk/CMakeLists.txt b/fairmq/sdk/CMakeLists.txt index 6204d4ec..694db1ac 100644 --- a/fairmq/sdk/CMakeLists.txt +++ b/fairmq/sdk/CMakeLists.txt @@ -45,7 +45,6 @@ add_library(${target} ${SDK_PRIVATE_HEADER_FILES} # for IDE integration ) set_target_properties(${target} PROPERTIES LABELS coverage) -target_compile_definitions(${target} PUBLIC BOOST_ERROR_CODE_HEADER_ONLY) target_include_directories(${target} PUBLIC $ diff --git a/fairmq/sdk/Topology.cxx b/fairmq/sdk/Topology.cxx index 1a625b18..0bc1d422 100644 --- a/fairmq/sdk/Topology.cxx +++ b/fairmq/sdk/Topology.cxx @@ -10,265 +10,20 @@ #include #include -#include -#include -#include -#include -#include -#include -#include -#include namespace fair { namespace mq { - -auto operator<<(std::ostream& os, AsyncOpResultCode v) -> std::ostream& -{ - switch (v) { - case AsyncOpResultCode::Aborted: - return os << "Aborted"; - case AsyncOpResultCode::Timeout: - return os << "Timeout"; - case AsyncOpResultCode::Error: - return os << "Error"; - case AsyncOpResultCode::Ok: - default: - return os << "Ok"; - } -} - -auto operator<<(std::ostream& os, AsyncOpResult v) -> std::ostream& -{ - os << "[" << v.code << "]"; - if (!v.msg.empty()) { - os << " " << v.msg; - } - return os; -} - namespace sdk { -const std::unordered_map> expectedState = +/// @brief Helper to (Re)Construct a FairMQ topology based on already existing native DDS API objects +/// @param nativeSession Existing and initialized CSession (either via create() or attach()) +/// @param nativeTopo Existing CTopology that is activated on the given nativeSession +/// @param env Optional DDSEnv (needed primarily for unit testing) +auto MakeTopology(dds::topology_api::CTopology nativeTopo, + std::shared_ptr nativeSession, + DDSEnv env) -> Topology { - { Transition::InitDevice, DeviceState::InitializingDevice }, - { Transition::CompleteInit, DeviceState::Initialized }, - { Transition::Bind, DeviceState::Bound }, - { Transition::Connect, DeviceState::DeviceReady }, - { Transition::InitTask, DeviceState::Ready }, - { Transition::Run, DeviceState::Running }, - { Transition::Stop, DeviceState::Ready }, - { Transition::ResetTask, DeviceState::DeviceReady }, - { Transition::ResetDevice, DeviceState::Idle }, - { Transition::End, DeviceState::Exiting } -}; - -Topology::Topology(DDSTopology topo, DDSSession session) - : fDDSSession(std::move(session)) - , fDDSTopo(std::move(topo)) - , fStateChangeOngoing(false) - , fTargetState(DeviceState::Idle) - , fStateChangeTimeout(0) - , fShutdown(false) -{ - std::vector deviceList = fDDSTopo.GetDeviceList(); - for (const auto& d : deviceList) { - // LOG(debug) << "Adding device " << d; - fState.emplace(d, DeviceStatus{ false, DeviceState::Ok }); - } - fDDSSession.SubscribeToCommands([this](const std::string& msg, const std::string& /* condition */, DDSChannel::Id senderId) { - // LOG(debug) << "Received from " << senderId << ": " << msg; - std::vector parts; - boost::algorithm::split(parts, msg, boost::algorithm::is_any_of(":,")); - - for (unsigned int i = 0; i < parts.size(); ++i) { - boost::trim(parts.at(i)); - } - - if (parts[0] == "state-change") { - DDSTask::Id taskId(std::stoull(parts[2])); - fDDSSession.UpdateChannelToTaskAssociation(senderId, taskId); - AddNewStateEntry(taskId, parts[3]); - } else if (parts[0] == "state-changes-subscription") { - LOG(debug) << "Received from " << senderId << ": " << msg; - if (parts[2] != "OK") { - LOG(error) << "state-changes-subscription failed with return code: " << parts[2]; - } - } else if (parts[0] == "state-changes-unsubscription") { - if (parts[2] != "OK") { - LOG(error) << "state-changes-unsubscription failed with return code: " << parts[2]; - } - } else if (parts[1] == "could not queue") { - std::unique_lock lock(fMtx); - - if (fStateChangeOngoing) { - if (fState.at(fDDSSession.GetTaskId(senderId)).state != fTargetState) { - fStateChangeError = - tools::ToString("Could not queue ", parts[2], " transition on ", senderId); - lock.unlock(); - fCV.notify_one(); - } - } - } - }); - fDDSSession.StartDDSService(); - LOG(debug) << "subscribe-to-state-changes"; - fDDSSession.SendCommand("subscribe-to-state-changes"); - - fExecutionThread = std::thread(&Topology::WaitForState, this); -} - -Topology::Topology(dds::topology_api::CTopology nativeTopo, - std::shared_ptr nativeSession, - DDSEnv env) - : Topology(DDSTopo(std::move(nativeTopo), env), DDSSession(std::move(nativeSession), env)) -{ - if (fDDSSession.RequestCommanderInfo().activeTopologyName != fDDSTopo.GetName()) { - throw std::runtime_error("Given topology must be activated"); - } -} - -auto Topology::ChangeState(TopologyTransition transition, ChangeStateCallback cb, Duration timeout) -> void -{ - { - std::unique_lock lock(fMtx); - if (fStateChangeOngoing) { - throw std::runtime_error("A state change request is already in progress, concurrent requests are currently not supported"); - } - LOG(debug) << "Initiating ChangeState with " << transition << " to " << expectedState.at(transition); - fStateChangeOngoing = true; - fChangeStateCallback = cb; - fStateChangeTimeout = timeout; - fTargetState = expectedState.at(transition); - fStateChangeError.clear(); - - fDDSSession.SendCommand(GetTransitionName(transition)); - } - fExecutionCV.notify_one(); -} - -auto Topology::ChangeState(TopologyTransition t, Duration timeout) -> ChangeStateResult -{ - fair::mq::tools::Semaphore blocker; - ChangeStateResult res; - ChangeState( - t, - [&blocker, &res](Topology::ChangeStateResult _res) { - res = _res; - blocker.Signal(); - }, - timeout); - blocker.Wait(); - return res; -} - -void Topology::WaitForState() -{ - while (!fShutdown) { - if (fStateChangeOngoing) { - try { - std::unique_lock lock(fMtx); - - auto condition = [&] { - // LOG(info) << "checking condition"; - // LOG(info) << "fShutdown: " << fShutdown; - // LOG(info) << "condition: " << std::all_of(fState.cbegin(), fState.cend(), - // [&](TopologyState::value_type i) { return i.second.state == fTargetState; }); - return fShutdown - || !fStateChangeError.empty() - || std::all_of( - fState.cbegin(), fState.cend(), [&](TopologyState::value_type i) { - // TODO Check, if we can make sure that EXITING state change event are not missed - return (fTargetState == DeviceState::Exiting) - || ((i.second.state == fTargetState) - && i.second.initialized); - }); - }; - - if (fStateChangeTimeout > std::chrono::milliseconds(0)) { - if (!fCV.wait_for(lock, fStateChangeTimeout, condition)) { - // LOG(debug) << "timeout"; - fStateChangeOngoing = false; - TopologyState state = fState; - lock.unlock(); - fChangeStateCallback( - {{AsyncOpResultCode::Timeout, "timeout"}, std::move(state)}); - break; - } - } else { - fCV.wait(lock, condition); - } - - fStateChangeOngoing = false; - - if (!fStateChangeError.empty()) { - TopologyState state = fState; - lock.unlock(); - fChangeStateCallback( - {{AsyncOpResultCode::Error, fStateChangeError}, std::move(state)}); - break; - } - - if (fShutdown) { - LOG(debug) << "Aborting because a shutdown was requested"; - TopologyState state = fState; - lock.unlock(); - fChangeStateCallback( - {{AsyncOpResultCode::Aborted, "Aborted because a shutdown was requested"}, - std::move(state)}); - break; - } - } catch (std::exception& e) { - fStateChangeOngoing = false; - LOG(error) << "Error while processing state request: " << e.what(); - fChangeStateCallback( - {{AsyncOpResultCode::Error, tools::ToString("Exception thrown: ", e.what())}, - fState}); - } - - fChangeStateCallback({{AsyncOpResultCode::Ok, "success"}, fState}); - } else { - std::unique_lock lock(fExecutionMtx); - fExecutionCV.wait(lock); - } - } - LOG(debug) << "Topology::WaitForState shutting down"; -}; - -void Topology::AddNewStateEntry(DDSTask::Id taskId, const std::string& state) -{ - std::size_t pos = state.find("->"); - std::string endState = state.substr(pos + 2); - // LOG(debug) << "Adding new state entry: " << taskId << ", " << state << ", end state: " << endState; - { - try { - std::unique_lock lock(fMtx); - fState[taskId] = DeviceStatus{ true, fair::mq::GetState(endState) }; - } catch (const std::exception& e) { - LOG(error) << "Exception in AddNewStateEntry: " << e.what(); - } - - // LOG(info) << "fState after update: "; - // for (auto& e : fState) { - // LOG(info) << e.first << ": " << e.second.state; - // } - } - fCV.notify_one(); -} - -Topology::~Topology() -{ - fDDSSession.UnsubscribeFromCommands(); - { - std::lock_guard guard(fExecutionMtx); - fShutdown = true; - } - fExecutionCV.notify_one(); - fExecutionThread.join(); -} - -auto operator<<(std::ostream& os, Topology::ChangeStateResult v) -> std::ostream& -{ - return os << v.rc; + return {DDSTopo(std::move(nativeTopo), env), DDSSession(std::move(nativeSession), env)}; } } // namespace sdk diff --git a/fairmq/sdk/Topology.h b/fairmq/sdk/Topology.h index c213e2ab..01c0f4a1 100644 --- a/fairmq/sdk/Topology.h +++ b/fairmq/sdk/Topology.h @@ -9,47 +9,54 @@ #ifndef FAIR_MQ_SDK_TOPOLOGY_H #define FAIR_MQ_SDK_TOPOLOGY_H +#include +#include +#include +#include +#include +#include #include +#include #include #include +#include +#include #include #include #include #include #include +#include #include +#include #include #include #include #include +#include #include namespace fair { namespace mq { - -enum class AsyncOpResultCode -{ - Ok, - Timeout, - Error, - Aborted -}; -auto operator<<(std::ostream& os, AsyncOpResultCode v) -> std::ostream&; - -using AsyncOpResultMessage = std::string; - -struct AsyncOpResult { - AsyncOpResultCode code; - AsyncOpResultMessage msg; - operator AsyncOpResultCode() const { return code; } -}; -auto operator<<(std::ostream& os, AsyncOpResult v) -> std::ostream&; - namespace sdk { using DeviceState = fair::mq::State; using DeviceTransition = fair::mq::Transition; +const std::map expectedState = +{ + { DeviceTransition::InitDevice, DeviceState::InitializingDevice }, + { DeviceTransition::CompleteInit, DeviceState::Initialized }, + { DeviceTransition::Bind, DeviceState::Bound }, + { DeviceTransition::Connect, DeviceState::DeviceReady }, + { DeviceTransition::InitTask, DeviceState::Ready }, + { DeviceTransition::Run, DeviceState::Running }, + { DeviceTransition::Stop, DeviceState::Ready }, + { DeviceTransition::ResetTask, DeviceState::DeviceReady }, + { DeviceTransition::ResetDevice, DeviceState::Idle }, + { DeviceTransition::End, DeviceState::Exiting } +}; + struct DeviceStatus { bool initialized; @@ -59,8 +66,6 @@ struct DeviceStatus using TopologyState = std::unordered_map; using TopologyTransition = fair::mq::Transition; -struct MixedState : std::runtime_error { using std::runtime_error::runtime_error; }; - inline DeviceState AggregateState(const TopologyState& topologyState) { DeviceState first = topologyState.begin()->second.state; @@ -71,7 +76,7 @@ inline DeviceState AggregateState(const TopologyState& topologyState) return first; } - throw MixedState("State is not uniform"); + throw MixedStateError("State is not uniform"); } @@ -81,82 +86,275 @@ inline bool StateEqualsTo(const TopologyState& topologyState, DeviceState state) } /** - * @class Topology Topology.h + * @class BasicTopology Topology.h + * @tparam Executor Associated I/O executor + * @tparam Allocator Associated default allocator * @brief Represents a FairMQ topology + * + * @par Thread Safety + * @e Distinct @e objects: Safe.@n + * @e Shared @e objects: Safe. */ -class Topology +template +class BasicTopology : public AsioBase { public: /// @brief (Re)Construct a FairMQ topology from an existing DDS topology /// @param topo DDSTopology /// @param session DDSSession - explicit Topology(DDSTopology topo, DDSSession session = DDSSession()); + BasicTopology(DDSTopology topo, DDSSession session) + : BasicTopology(asio::system_executor(), + std::move(topo), + std::move(session)) + {} - /// @brief (Re)Construct a FairMQ topology based on already existing native DDS API objects - /// @param nativeSession Existing and initialized CSession (either via create() or attach()) - /// @param nativeTopo Existing CTopology that is activated on the given nativeSession - /// @param env Optional DDSEnv (needed primarily for unit testing) - explicit Topology(dds::topology_api::CTopology nativeTopo, - std::shared_ptr nativeSession, - DDSEnv env = {}); + /// @brief (Re)Construct a FairMQ topology from an existing DDS topology + /// @param ex I/O executor to be associated + /// @param topo DDSTopology + /// @param session DDSSession + /// @throws RuntimeError + BasicTopology(const Executor& ex, + DDSTopology topo, + DDSSession session, + Allocator alloc = DefaultAllocator()) + : AsioBase(ex, std::move(alloc)) + , fDDSSession(std::move(session)) + , fDDSTopo(std::move(topo)) + , fState(makeTopologyState(fDDSTopo)) + , fChangeStateOp() + , fChangeStateOpTimer(ex) + , fChangeStateTarget(DeviceState::Idle) + { + std::string activeTopo(fDDSSession.RequestCommanderInfo().activeTopologyName); + std::string givenTopo(fDDSTopo.GetName()); + if (activeTopo != givenTopo) { + throw RuntimeError("Given topology ", givenTopo, + " is not activated (active: ", activeTopo, ")"); + } - explicit Topology(const Topology&) = delete; - Topology& operator=(const Topology&) = delete; - explicit Topology(Topology&&) = delete; - Topology& operator=(Topology&&) = delete; + fDDSSession.SubscribeToCommands([&](const std::string& msg, + const std::string& /* condition */, + DDSChannel::Id senderId) { + // LOG(debug) << "Received from " << senderId << ": " << msg; + std::vector parts; + boost::algorithm::split(parts, msg, boost::algorithm::is_any_of(":,")); - ~Topology(); + for (unsigned int i = 0; i < parts.size(); ++i) { + boost::trim(parts.at(i)); + } + + if (parts[0] == "state-change") { + DDSTask::Id taskId(std::stoull(parts[2])); + fDDSSession.UpdateChannelToTaskAssociation(senderId, taskId); + UpdateStateEntry(taskId, parts[3]); + } else if (parts[0] == "state-changes-subscription") { + LOG(debug) << "Received from " << senderId << ": " << msg; + if (parts[2] != "OK") { + LOG(error) << "state-changes-subscription failed with return code: " + << parts[2]; + } + } else if (parts[0] == "state-changes-unsubscription") { + if (parts[2] != "OK") { + LOG(error) << "state-changes-unsubscription failed with return code: " + << parts[2]; + } + } else if (parts[1] == "could not queue") { + std::lock_guard lk(fMtx); + if (!fChangeStateOp.IsCompleted() + && fState.at(fDDSSession.GetTaskId(senderId)).state != fChangeStateTarget) { + fChangeStateOpTimer.cancel(); + fChangeStateOp.Complete(MakeErrorCode(ErrorCode::DeviceChangeStateFailed), + fState); + } + } + }); + fDDSSession.StartDDSService(); + LOG(debug) << "subscribe-to-state-changes"; + fDDSSession.SendCommand("subscribe-to-state-changes"); + } + + /// not copyable + BasicTopology(const BasicTopology&) = delete; + BasicTopology& operator=(const BasicTopology&) = delete; + + /// movable + BasicTopology(BasicTopology&&) noexcept = default; + BasicTopology& operator=(BasicTopology&&) noexcept = default; + + ~BasicTopology() + { + fDDSSession.UnsubscribeFromCommands(); + try { + fChangeStateOp.Cancel(fState); + } catch (...) {} + } - struct ChangeStateResult { - AsyncOpResult rc; - TopologyState state; - friend auto operator<<(std::ostream& os, ChangeStateResult v) -> std::ostream&; - }; - using ChangeStateCallback = std::function; using Duration = std::chrono::milliseconds; + using ChangeStateCompletionSignature = void(std::error_code, TopologyState); /// @brief Initiate state transition on all FairMQ devices in this topology - /// @param t FairMQ device state machine transition - /// @param cb Completion callback + /// @param transition FairMQ device state machine transition /// @param timeout Timeout in milliseconds, 0 means no timeout - auto ChangeState(TopologyTransition t, ChangeStateCallback cb, Duration timeout = std::chrono::milliseconds(0)) -> void; + /// @param token Asio completion token + /// @tparam CompletionToken Asio completion token type + /// @throws std::system_error + /// TODO usage examples + template + auto AsyncChangeState(TopologyTransition transition, + Duration timeout, + CompletionToken&& token) + { + return asio::async_initiate( + [&](auto handler) { + std::lock_guard lk(fMtx); - /// @brief Perform a state transition on all FairMQ devices in this topology - /// @param t FairMQ device state machine transition + if (fChangeStateOp.IsCompleted()) { + fChangeStateOp = ChangeStateOp(AsioBase::GetExecutor(), + AsioBase::GetAllocator(), + std::move(handler)); + fChangeStateTarget = expectedState.at(transition); + fDDSSession.SendCommand(GetTransitionName(transition)); + if (timeout > std::chrono::milliseconds(0)) { + fChangeStateOpTimer.expires_after(timeout); + fChangeStateOpTimer.async_wait([&](std::error_code ec) { + if (!ec) { + std::lock_guard lk2(fMtx); + fChangeStateOp.Timeout(fState); + } + }); + } + } else { + // TODO refactor to hide boiler plate + auto ex2(asio::get_associated_executor( + handler, AsioBase::GetExecutor())); + auto alloc2(asio::get_associated_allocator( + handler, AsioBase::GetAllocator())); + auto state(GetCurrentStateUnsafe()); + + ex2.post( + [h = std::move(handler), s = std::move(state)]() mutable { + try { + h(MakeErrorCode(ErrorCode::OperationInProgress), s); + } catch (const std::exception& e) { + LOG(error) + << "Uncaught exception in completion handler: " << e.what(); + } catch (...) { + LOG(error) << "Unknown uncaught exception in completion handler."; + } + }, + alloc2); + } + }, + token); + } + + template + auto AsyncChangeState(TopologyTransition transition, CompletionToken&& token) + { + return AsyncChangeState(transition, Duration(0), std::move(token)); + } + + /// @brief Initiate state transition on all FairMQ devices in this topology + /// @param transition FairMQ device state machine transition /// @param timeout Timeout in milliseconds, 0 means no timeout - /// @return The result of the state transition - auto ChangeState(TopologyTransition t, Duration timeout = std::chrono::milliseconds(0)) -> ChangeStateResult; + /// @tparam CompletionToken Asio completion token type + /// @throws std::system_error + auto ChangeState(TopologyTransition transition, Duration timeout = Duration(0)) + -> std::pair + { + tools::Semaphore blocker; + std::error_code ec; + TopologyState state; + AsyncChangeState(transition, timeout, [&](std::error_code _ec, TopologyState _state) mutable { + ec = _ec; + state = _state; + blocker.Signal(); + }); + blocker.Wait(); + return {ec, state}; + } /// @brief Returns the current state of the topology /// @return map of id : DeviceStatus (initialized, state) - TopologyState GetCurrentState() const { std::lock_guard guard(fMtx); return fState; } + auto GetCurrentState() const -> TopologyState + { + std::lock_guard lk(fMtx); + return fState; + } - DeviceState AggregateState() { return sdk::AggregateState(fState); } + auto AggregateState() const -> DeviceState { return sdk::AggregateState(GetCurrentState()); } - bool StateEqualsTo(DeviceState state) { return sdk::StateEqualsTo(fState, state); } + auto StateEqualsTo(DeviceState state) const -> bool { return sdk::StateEqualsTo(GetCurrentState(), state); } private: DDSSession fDDSSession; DDSTopology fDDSTopo; TopologyState fState; - bool fStateChangeOngoing; - DeviceState fTargetState; mutable std::mutex fMtx; - mutable std::mutex fExecutionMtx; - std::condition_variable fCV; - std::condition_variable fExecutionCV; - std::thread fExecutionThread; - ChangeStateCallback fChangeStateCallback; - std::chrono::milliseconds fStateChangeTimeout; - bool fShutdown; - std::string fStateChangeError; - void WaitForState(); - void AddNewStateEntry(DDSTask::Id taskId, const std::string& state); + using ChangeStateOp = AsioAsyncOp; + ChangeStateOp fChangeStateOp; + asio::steady_timer fChangeStateOpTimer; + DeviceState fChangeStateTarget; + + static auto makeTopologyState(const DDSTopo& topo) -> TopologyState + { + TopologyState state; + for (const auto& task : topo.GetTasks()) { + state.emplace(task.GetId(), DeviceStatus{false, DeviceState::Ok}); + } + return state; + } + + auto UpdateStateEntry(DDSTask::Id taskId, const std::string& state) -> void + { + std::size_t pos = state.find("->"); + std::string endState = state.substr(pos + 2); + try { + std::lock_guard lk(fMtx); + fState[taskId] = DeviceStatus{true, fair::mq::GetState(endState)}; + LOG(debug) << "Updated state entry: taskId=" << taskId << ",state=" << state; + TryChangeStateCompletion(); + } catch (const std::exception& e) { + LOG(error) << "Exception in UpdateStateEntry: " << e.what(); + } + } + + /// call only under locked fMtx! + auto TryChangeStateCompletion() -> void + { + bool targetStateReached( + std::all_of(fState.cbegin(), fState.cend(), [&](TopologyState::value_type i) { + // TODO Check, if we can make sure that EXITING state change event are not missed + return fChangeStateTarget == DeviceState::Exiting + || ((i.second.state == fChangeStateTarget) && i.second.initialized); + })); + + if (!fChangeStateOp.IsCompleted() && targetStateReached) { + fChangeStateOpTimer.cancel(); + fChangeStateOp.Complete(fState); + } + } + + /// call only under locked fMtx! + auto GetCurrentStateUnsafe() const -> TopologyState + { + return fState; + } + }; +using Topology = BasicTopology; using Topo = Topology; +/// @brief Helper to (Re)Construct a FairMQ topology based on already existing native DDS API objects +/// @param nativeSession Existing and initialized CSession (either via create() or attach()) +/// @param nativeTopo Existing CTopology that is activated on the given nativeSession +/// @param env Optional DDSEnv (needed primarily for unit testing) +auto MakeTopology(dds::topology_api::CTopology nativeTopo, + std::shared_ptr nativeSession, + DDSEnv env = {}) -> Topology; + } // namespace sdk } // namespace mq } // namespace fair diff --git a/test/sdk/_async_op.cxx b/test/sdk/_async_op.cxx index da790636..787495cb 100644 --- a/test/sdk/_async_op.cxx +++ b/test/sdk/_async_op.cxx @@ -57,12 +57,12 @@ TEST_F(AsyncOp, Complete) TEST_F(AsyncOp, Cancel) { - using namespace fair::mq::sdk; + using namespace fair::mq; - AsioAsyncOp op( + sdk::AsioAsyncOp op( [](std::error_code ec) { EXPECT_TRUE(ec); // error - EXPECT_EQ(ec, std::make_error_code(std::errc::operation_canceled)); + EXPECT_EQ(ec, MakeErrorCode(ErrorCode::OperationCanceled)); }); op.Cancel(); @@ -70,26 +70,26 @@ TEST_F(AsyncOp, Cancel) TEST_F(AsyncOp, Timeout) { - using namespace fair::mq::sdk; + using namespace fair::mq; asio::steady_timer timer(mIoContext.get_executor(), std::chrono::milliseconds(50)); - AsioAsyncOp op( + sdk::AsioAsyncOp op( mIoContext.get_executor(), [&timer](std::error_code ec) { timer.cancel(); std::cout << "Completion with: " << ec.message() << std::endl; EXPECT_TRUE(ec); // error - EXPECT_EQ(ec, std::make_error_code(std::errc::operation_canceled)); + EXPECT_EQ(ec, MakeErrorCode(ErrorCode::OperationTimeout)); }); timer.async_wait([&op](asio::error_code ec) { std::cout << "Timer event" << std::endl; if (ec != asio::error::operation_aborted) { - op.Cancel(); + op.Timeout(); } }); mIoContext.run(); - EXPECT_THROW(op.Complete(), RuntimeError); + EXPECT_THROW(op.Complete(), sdk::RuntimeError); } TEST_F(AsyncOp, Timeout2) @@ -108,7 +108,7 @@ TEST_F(AsyncOp, Timeout2) timer.async_wait([&op](asio::error_code ec) { std::cout << "Timer event" << std::endl; if (ec != asio::error::operation_aborted) { - op.Cancel(); + op.Timeout(); } }); diff --git a/test/sdk/_topology.cxx b/test/sdk/_topology.cxx index 7c90e262..9a3abbe4 100644 --- a/test/sdk/_topology.cxx +++ b/test/sdk/_topology.cxx @@ -17,100 +17,118 @@ namespace { using Topology = fair::mq::test::TopologyFixture; -TEST(Topology2, ConstructionWithNativeDdsApiObjects) +TEST(TopologyHelper, MakeTopology) { - // This is only needed for this unit test - fair::mq::test::LoggerConfig cfg; - fair::mq::sdk::DDSEnv env(CMAKE_CURRENT_BINARY_DIR); - ///////////////////////////////////////// + using namespace fair::mq; + + // This is only needed for this unit test + test::LoggerConfig cfg; + sdk::DDSEnv env(CMAKE_CURRENT_BINARY_DIR); + ///////////////////////////////////// - // Example usage: dds::topology_api::CTopology nativeTopo( - fair::mq::tools::ToString(SDK_TESTSUITE_SOURCE_DIR, "/test_topo.xml")); + tools::ToString(SDK_TESTSUITE_SOURCE_DIR, "/test_topo.xml")); auto nativeSession(std::make_shared()); nativeSession->create(); - EXPECT_THROW(fair::mq::sdk::Topology topo(nativeTopo, nativeSession, env), std::runtime_error); + EXPECT_THROW(sdk::MakeTopology(nativeTopo, nativeSession, env), sdk::RuntimeError); + nativeSession->stop(); + nativeSession->shutdown(); } TEST_F(Topology, Construction) { - fair::mq::sdk::Topology topo(mDDSTopo, mDDSSession); + fair::mq::sdk::Topology topo(mDDSTopo, mDDSSession); } -TEST_F(Topology, ChangeStateAsync) +TEST_F(Topology, Construction2) { - using fair::mq::sdk::Topology; - using fair::mq::sdk::TopologyTransition; + fair::mq::sdk::Topology topo(mIoContext.get_executor(), mDDSTopo, mDDSSession); +} - Topology topo(mDDSTopo, mDDSSession); - fair::mq::tools::Semaphore blocker; - topo.ChangeState( - TopologyTransition::InitDevice, [&blocker, &topo](Topology::ChangeStateResult result) { - LOG(info) << result; - EXPECT_EQ(result.rc, fair::mq::AsyncOpResultCode::Ok); - EXPECT_NO_THROW(fair::mq::sdk::AggregateState(result.state)); - EXPECT_EQ( - fair::mq::sdk::StateEqualsTo(result.state, fair::mq::sdk::DeviceState::InitializingDevice), - true); +TEST_F(Topology, AsyncChangeState) +{ + using namespace fair::mq; + + tools::Semaphore blocker; + sdk::Topology topo(mDDSTopo, mDDSSession); + topo.AsyncChangeState( + sdk::TopologyTransition::InitDevice, + [&](std::error_code ec, sdk::TopologyState) { + LOG(info) << ec; + EXPECT_EQ(ec, std::error_code()); blocker.Signal(); }); blocker.Wait(); } -TEST_F(Topology, ChangeStateSync) +TEST_F(Topology, AsyncChangeStateWithCustomExecutor) { - using fair::mq::sdk::Topology; - using fair::mq::sdk::TopologyTransition; + using namespace fair::mq; - Topology topo(mDDSTopo, mDDSSession); - auto result(topo.ChangeState(TopologyTransition::InitDevice)); + sdk::Topology topo(mIoContext.get_executor(), mDDSTopo, mDDSSession); + topo.AsyncChangeState( + sdk::TopologyTransition::InitDevice, + [](std::error_code ec, sdk::TopologyState) { + LOG(info) << ec; + EXPECT_EQ(ec, std::error_code()); + }); - EXPECT_EQ(result.rc, fair::mq::AsyncOpResultCode::Ok); - EXPECT_NO_THROW(fair::mq::sdk::AggregateState(result.state)); - EXPECT_EQ( - fair::mq::sdk::StateEqualsTo(result.state, fair::mq::sdk::DeviceState::InitializingDevice), - true); + mIoContext.run(); } -TEST_F(Topology, ChangeStateConcurrent) +TEST_F(Topology, ChangeState) { - using fair::mq::sdk::Topology; - using fair::mq::sdk::TopologyTransition; + using namespace fair::mq; - Topology topo(mDDSTopo, mDDSSession); - fair::mq::tools::Semaphore blocker; - topo.ChangeState(TopologyTransition::InitDevice, - [&blocker](Topology::ChangeStateResult result) { - LOG(info) << "result for valid ChangeState: " << result; - blocker.Signal(); - }); - EXPECT_THROW(topo.ChangeState(TopologyTransition::Stop, - [&blocker](Topology::ChangeStateResult) {}), - std::runtime_error); + sdk::Topology topo(mDDSTopo, mDDSSession); + auto result(topo.ChangeState(sdk::TopologyTransition::InitDevice)); + LOG(info) << result.first; + + EXPECT_EQ(result.first, std::error_code()); + EXPECT_NO_THROW(sdk::AggregateState(result.second)); + EXPECT_EQ(sdk::StateEqualsTo(result.second, sdk::DeviceState::InitializingDevice), true); +} + +TEST_F(Topology, AsyncChangeStateConcurrent) +{ + using namespace fair::mq; + + sdk::Topology topo(mDDSTopo, mDDSSession); + tools::Semaphore blocker; + topo.AsyncChangeState(sdk::TopologyTransition::InitDevice, + [&blocker](std::error_code ec, sdk::TopologyState) { + LOG(info) << "result for valid ChangeState: " << ec; + blocker.Signal(); + }); + topo.AsyncChangeState(sdk::TopologyTransition::Stop, + [](std::error_code ec, sdk::TopologyState) { + LOG(ERROR) << "Expected error: " << ec; + EXPECT_EQ(ec, MakeErrorCode(ErrorCode::OperationInProgress)); + }); blocker.Wait(); } -TEST_F(Topology, ChangeStateTimeout) +TEST_F(Topology, AsyncChangeStateTimeout) { - using fair::mq::sdk::Topology; - using fair::mq::sdk::TopologyTransition; + using namespace fair::mq; - Topology topo(mDDSTopo, mDDSSession); - fair::mq::tools::Semaphore blocker; - topo.ChangeState(TopologyTransition::InitDevice, [&](Topology::ChangeStateResult result) { - LOG(info) << result; - EXPECT_EQ(result.rc, fair::mq::AsyncOpResultCode::Timeout); - blocker.Signal(); - }, std::chrono::milliseconds(1)); - blocker.Wait(); + sdk::Topology topo(mIoContext.get_executor(), mDDSTopo, mDDSSession); + topo.AsyncChangeState(sdk::TopologyTransition::InitDevice, + std::chrono::milliseconds(1), + [](std::error_code ec, sdk::TopologyState) { + LOG(info) << ec; + EXPECT_EQ(ec, MakeErrorCode(ErrorCode::OperationTimeout)); + }); + + mIoContext.run(); } TEST_F(Topology, ChangeStateFullDeviceLifecycle) { - using fair::mq::sdk::Topology; + using namespace fair::mq; using fair::mq::sdk::TopologyTransition; - Topology topo(mDDSTopo, mDDSSession); + sdk::Topology topo(mDDSTopo, mDDSSession); for (auto transition : {TopologyTransition::InitDevice, TopologyTransition::CompleteInit, TopologyTransition::Bind, @@ -121,16 +139,16 @@ TEST_F(Topology, ChangeStateFullDeviceLifecycle) TopologyTransition::ResetTask, TopologyTransition::ResetDevice, TopologyTransition::End}) { - ASSERT_EQ(topo.ChangeState(transition).rc, fair::mq::AsyncOpResultCode::Ok); + ASSERT_EQ(topo.ChangeState(transition).first, std::error_code()); } } TEST_F(Topology, ChangeStateFullDeviceLifecycle2) { - using fair::mq::sdk::Topology; + using namespace fair::mq; using fair::mq::sdk::TopologyTransition; - Topology topo(mDDSTopo, mDDSSession); + sdk::Topology topo(mDDSTopo, mDDSSession); for (int i(0); i < 10; ++i) { for (auto transition : {TopologyTransition::InitDevice, TopologyTransition::CompleteInit, @@ -138,14 +156,15 @@ TEST_F(Topology, ChangeStateFullDeviceLifecycle2) TopologyTransition::Connect, TopologyTransition::InitTask, TopologyTransition::Run}) { - ASSERT_EQ(topo.ChangeState(transition).rc, fair::mq::AsyncOpResultCode::Ok); + ASSERT_EQ(topo.ChangeState(transition).first, std::error_code()); } std::this_thread::sleep_for(std::chrono::milliseconds(100)); for (auto transition : {TopologyTransition::Stop, TopologyTransition::ResetTask, TopologyTransition::ResetDevice}) { - ASSERT_EQ(topo.ChangeState(transition).rc, fair::mq::AsyncOpResultCode::Ok); + ASSERT_EQ(topo.ChangeState(transition).first, std::error_code()); } } } + } // namespace