From 98aeb16dc7e277144e4f01ee8d33f4b1276ddbf6 Mon Sep 17 00:00:00 2001 From: Dennis Klein Date: Fri, 26 Jul 2019 12:49:03 +0200 Subject: [PATCH] SDK: Adapt to new DDS plugin external mode --- fairmq/plugins/DDS/DDS.cxx | 1 + fairmq/plugins/DDS/DDS.h | 5 ++- fairmq/sdk/Topology.cxx | 31 ++++++++++++------ fairmq/sdk/Topology.h | 1 + test/sdk/_topology.cxx | 65 ++++++++++++++++++++++++++------------ 5 files changed, 73 insertions(+), 30 deletions(-) diff --git a/fairmq/plugins/DDS/DDS.cxx b/fairmq/plugins/DDS/DDS.cxx index cc631fd6..f471ee3a 100644 --- a/fairmq/plugins/DDS/DDS.cxx +++ b/fairmq/plugins/DDS/DDS.cxx @@ -80,6 +80,7 @@ auto DDS::HandleControl() -> void SubscribeForCustomCommands(); SubscribeForConnectingChannels(); + fDDS.Start(); // subscribe to device state changes, pushing new state changes into the event queue SubscribeToDeviceStateChange([&](DeviceState newState) { diff --git a/fairmq/plugins/DDS/DDS.h b/fairmq/plugins/DDS/DDS.h index e9ca4dfe..84225511 100644 --- a/fairmq/plugins/DDS/DDS.h +++ b/fairmq/plugins/DDS/DDS.h @@ -64,7 +64,10 @@ struct DDSSubscription }); assert(!dds_session_id.empty()); - fService.start(dds_session_id); + } + + auto Start() -> void { + fService.start(dds::env_prop()); } ~DDSSubscription() { diff --git a/fairmq/sdk/Topology.cxx b/fairmq/sdk/Topology.cxx index b05a135a..d9f489cd 100644 --- a/fairmq/sdk/Topology.cxx +++ b/fairmq/sdk/Topology.cxx @@ -54,7 +54,7 @@ const std::unordered_map parts; boost::algorithm::split(parts, msg, boost::algorithm::is_any_of(":,")); @@ -87,6 +87,7 @@ Topology::Topology(DDSTopology topo, DDSSession session) if (parts[0] == "state-change") { AddNewStateEntry(std::stoull(parts[2]), parts[3]); } else if (parts[0] == "state-changes-subscription") { + LOG(debug) << "Received from " << senderId << ": " << msg; if (parts[2] != "OK") { LOG(error) << "state-changes-subscription failed with return code: " << parts[2]; } @@ -99,6 +100,7 @@ Topology::Topology(DDSTopology topo, DDSSession session) } }); fDDSSession.StartDDSService(); + LOG(debug) << "subscribe-to-state-changes"; fDDSSession.SendCommand("subscribe-to-state-changes"); fExecutionThread = std::thread(&Topology::WaitForState, this); @@ -156,10 +158,16 @@ void Topology::WaitForState() auto condition = [&] { // LOG(info) << "checking condition"; // LOG(info) << "fShutdown: " << fShutdown; - // LOG(info) << "condition: " << std::all_of(fState.cbegin(), fState.cend(), [&](TopologyState::value_type i) { return i.second.state == fTargetState; }); - return fShutdown || std::all_of(fState.cbegin(), fState.cend(), [&](TopologyState::value_type i) { - return i.second.state == fTargetState; - }); + // LOG(info) << "condition: " << std::all_of(fState.cbegin(), fState.cend(), + // [&](TopologyState::value_type i) { return i.second.state == fTargetState; }); + return fShutdown + || std::all_of( + fState.cbegin(), fState.cend(), [&](TopologyState::value_type i) { + // TODO Check, if we can make sure that EXITING state change event are not missed + return (fTargetState == DeviceState::Exiting) + || ((i.second.state == fTargetState) + && i.second.initialized); + }); }; std::unique_lock lock(fMtx); @@ -170,7 +178,8 @@ void Topology::WaitForState() fStateChangeOngoing = false; TopologyState state = fState; lock.unlock(); - fChangeStateCallback({{AsyncOpResultCode::Timeout, "timeout"}, std::move(state)}); + fChangeStateCallback( + {{AsyncOpResultCode::Timeout, "timeout"}, std::move(state)}); break; } } else { @@ -182,13 +191,17 @@ void Topology::WaitForState() LOG(debug) << "Aborting because a shutdown was requested"; TopologyState state = fState; lock.unlock(); - fChangeStateCallback({{AsyncOpResultCode::Aborted, "Aborted because a shutdown was requested"}, std::move(state)}); + fChangeStateCallback( + {{AsyncOpResultCode::Aborted, "Aborted because a shutdown was requested"}, + std::move(state)}); break; } } catch (std::exception& e) { fStateChangeOngoing = false; LOG(error) << "Error while processing state request: " << e.what(); - fChangeStateCallback({{AsyncOpResultCode::Error, tools::ToString("Exception thrown: ", e.what())}, fState}); + fChangeStateCallback( + {{AsyncOpResultCode::Error, tools::ToString("Exception thrown: ", e.what())}, + fState}); } fChangeStateCallback({{AsyncOpResultCode::Ok, "success"}, fState}); diff --git a/fairmq/sdk/Topology.h b/fairmq/sdk/Topology.h index e875a698..ff8e64cc 100644 --- a/fairmq/sdk/Topology.h +++ b/fairmq/sdk/Topology.h @@ -139,6 +139,7 @@ class Topology DDSSession fDDSSession; DDSTopology fDDSTopo; TopologyState fState; + std::unordered_map fStateChangesSubscriptions; bool fStateChangeOngoing; DeviceState fTargetState; mutable std::mutex fMtx; diff --git a/test/sdk/_topology.cxx b/test/sdk/_topology.cxx index 85ff9e42..204ae507 100644 --- a/test/sdk/_topology.cxx +++ b/test/sdk/_topology.cxx @@ -25,7 +25,8 @@ TEST(Topology2, ConstructionWithNativeDdsApiObjects) ///////////////////////////////////////// // Example usage: - dds::topology_api::CTopology nativeTopo(fair::mq::tools::ToString(SDK_TESTSUITE_SOURCE_DIR, "/test_topo.xml")); + dds::topology_api::CTopology nativeTopo( + fair::mq::tools::ToString(SDK_TESTSUITE_SOURCE_DIR, "/test_topo.xml")); auto nativeSession(std::make_shared()); nativeSession->create(); EXPECT_THROW(fair::mq::sdk::Topology topo(nativeTopo, nativeSession, env), std::runtime_error); @@ -43,13 +44,16 @@ TEST_F(Topology, ChangeStateAsync) Topology topo(mDDSTopo, mDDSSession); fair::mq::tools::Semaphore blocker; - topo.ChangeState(TopologyTransition::InitDevice, [&blocker, &topo](Topology::ChangeStateResult result) { - LOG(info) << result; - EXPECT_EQ(result.rc, fair::mq::AsyncOpResultCode::Ok); - EXPECT_NO_THROW(fair::mq::sdk::AggregateState(result.state)); - EXPECT_EQ(fair::mq::sdk::StateEqualsTo(result.state, fair::mq::sdk::DeviceState::Running), true); - blocker.Signal(); - }); + topo.ChangeState( + TopologyTransition::InitDevice, [&blocker, &topo](Topology::ChangeStateResult result) { + LOG(info) << result; + EXPECT_EQ(result.rc, fair::mq::AsyncOpResultCode::Ok); + EXPECT_NO_THROW(fair::mq::sdk::AggregateState(result.state)); + EXPECT_EQ( + fair::mq::sdk::StateEqualsTo(result.state, fair::mq::sdk::DeviceState::InitializingDevice), + true); + blocker.Signal(); + }); blocker.Wait(); } @@ -59,12 +63,13 @@ TEST_F(Topology, ChangeStateSync) using fair::mq::sdk::TopologyTransition; Topology topo(mDDSTopo, mDDSSession); - EXPECT_EQ(topo.ChangeState(TopologyTransition::Run).rc, fair::mq::AsyncOpResultCode::Ok); - auto result(topo.ChangeState(TopologyTransition::Stop)); + auto result(topo.ChangeState(TopologyTransition::InitDevice)); EXPECT_EQ(result.rc, fair::mq::AsyncOpResultCode::Ok); EXPECT_NO_THROW(fair::mq::sdk::AggregateState(result.state)); - EXPECT_EQ(fair::mq::sdk::StateEqualsTo(result.state, fair::mq::sdk::DeviceState::Ready), true); + EXPECT_EQ( + fair::mq::sdk::StateEqualsTo(result.state, fair::mq::sdk::DeviceState::InitializingDevice), + true); } TEST_F(Topology, ChangeStateConcurrent) @@ -74,14 +79,14 @@ TEST_F(Topology, ChangeStateConcurrent) Topology topo(mDDSTopo, mDDSSession); fair::mq::tools::Semaphore blocker; - topo.ChangeState(TopologyTransition::Run, [&blocker](Topology::ChangeStateResult result) { - LOG(info) << "result for valid ChangeState: " << result; - blocker.Signal(); - }); - topo.ChangeState(TopologyTransition::Stop, [&blocker](Topology::ChangeStateResult result) { - LOG(info) << "result for invalid ChangeState: " << result; - EXPECT_EQ(result.rc, fair::mq::AsyncOpResultCode::Error); - }); + topo.ChangeState(TopologyTransition::InitDevice, + [&blocker](Topology::ChangeStateResult result) { + LOG(info) << "result for valid ChangeState: " << result; + blocker.Signal(); + }); + EXPECT_THROW(topo.ChangeState(TopologyTransition::Stop, + [&blocker](Topology::ChangeStateResult) {}), + std::runtime_error); blocker.Wait(); } @@ -92,7 +97,7 @@ TEST_F(Topology, ChangeStateTimeout) Topology topo(mDDSTopo, mDDSSession); fair::mq::tools::Semaphore blocker; - topo.ChangeState(TopologyTransition::End, [&](Topology::ChangeStateResult result) { + topo.ChangeState(TopologyTransition::InitDevice, [&](Topology::ChangeStateResult result) { LOG(info) << result; EXPECT_EQ(result.rc, fair::mq::AsyncOpResultCode::Timeout); blocker.Signal(); @@ -100,4 +105,24 @@ TEST_F(Topology, ChangeStateTimeout) blocker.Wait(); } +TEST_F(Topology, ChangeStateFullDeviceLifetime) +{ + using fair::mq::sdk::Topology; + using fair::mq::sdk::TopologyTransition; + + Topology topo(mDDSTopo, mDDSSession); + for (auto transition : {TopologyTransition::InitDevice, + TopologyTransition::CompleteInit, + TopologyTransition::Bind, + TopologyTransition::Connect, + TopologyTransition::InitTask, + TopologyTransition::Run, + TopologyTransition::Stop, + TopologyTransition::ResetTask, + TopologyTransition::ResetDevice, + TopologyTransition::End}) { + ASSERT_EQ(topo.ChangeState(transition).rc, fair::mq::AsyncOpResultCode::Ok); + } +} + } // namespace