From d3697ec97bcfad446d7bec5bc138e89ec8c10364 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Wed, 11 Nov 2020 21:35:18 +0100 Subject: [PATCH] SDK: add WaitForPublisherCount() and related ctor arg --- fairmq/sdk/Topology.cxx | 6 ++-- fairmq/sdk/Topology.h | 58 ++++++++++++++++++++++++---------- fairmq/sdk/runDDSCommandUI.cxx | 2 +- 3 files changed, 47 insertions(+), 19 deletions(-) diff --git a/fairmq/sdk/Topology.cxx b/fairmq/sdk/Topology.cxx index af35ad48..597644c9 100644 --- a/fairmq/sdk/Topology.cxx +++ b/fairmq/sdk/Topology.cxx @@ -18,11 +18,13 @@ namespace sdk { /// @param nativeSession Existing and initialized CSession (either via create() or attach()) /// @param nativeTopo Existing CTopology that is activated on the given nativeSession /// @param env Optional DDSEnv (needed primarily for unit testing) +/// @param blockUntilConnected if true, ctor will wait for all tasks to confirm subscriptions auto MakeTopology(dds::topology_api::CTopology nativeTopo, std::shared_ptr nativeSession, - DDSEnv env) -> Topology + DDSEnv env, + bool blockUntilConnected) -> Topology { - return {DDSTopo(std::move(nativeTopo), env), DDSSession(std::move(nativeSession), env)}; + return {DDSTopo(std::move(nativeTopo), env), DDSSession(std::move(nativeSession), env), blockUntilConnected}; } } // namespace sdk diff --git a/fairmq/sdk/Topology.h b/fairmq/sdk/Topology.h index 58b48a1e..0fc18425 100644 --- a/fairmq/sdk/Topology.h +++ b/fairmq/sdk/Topology.h @@ -216,18 +216,21 @@ class BasicTopology : public AsioBase /// @brief (Re)Construct a FairMQ topology from an existing DDS topology /// @param topo DDSTopology /// @param session DDSSession - BasicTopology(DDSTopology topo, DDSSession session) - : BasicTopology(asio::system_executor(), std::move(topo), std::move(session)) + /// @param blockUntilConnected if true, ctor will wait for all tasks to confirm subscriptions + BasicTopology(DDSTopology topo, DDSSession session, bool blockUntilConnected = false) + : BasicTopology(asio::system_executor(), std::move(topo), std::move(session), blockUntilConnected) {} /// @brief (Re)Construct a FairMQ topology from an existing DDS topology /// @param ex I/O executor to be associated /// @param topo DDSTopology /// @param session DDSSession + /// @param blockUntilConnected if true, ctor will wait for all tasks to confirm subscriptions /// @throws RuntimeError BasicTopology(const Executor& ex, DDSTopology topo, DDSSession session, + bool blockUntilConnected = false, Allocator alloc = DefaultAllocator()) : AsioBase(ex, std::move(alloc)) , fDDSSession(std::move(session)) @@ -235,7 +238,8 @@ class BasicTopology : public AsioBase , fStateData() , fStateIndex() , fMtx(std::make_unique()) - , fStateChangeUnsubscriptionCV(std::make_unique()) + , fStateChangeSubscriptionsCV(std::make_unique()) + , fNumStateChangePublishers(0) , fHeartbeatsTimer(asio::system_executor()) , fHeartbeatInterval(600000) { @@ -251,6 +255,9 @@ class BasicTopology : public AsioBase fDDSSession.StartDDSService(); SubscribeToStateChanges(); + if (blockUntilConnected) { + WaitForPublisherCount(fStateIndex.size()); + } } /// not copyable @@ -284,6 +291,14 @@ class BasicTopology : public AsioBase fHeartbeatsTimer.async_wait(std::bind(&BasicTopology::SendSubscriptionHeartbeats, this, std::placeholders::_1)); } + void WaitForPublisherCount(unsigned int number) + { + std::unique_lock lk(*fMtx); + fStateChangeSubscriptionsCV->wait(lk, [&](){ + return fNumStateChangePublishers == number; + }); + } + void SendSubscriptionHeartbeats(const std::error_code& ec) { if (!ec) { @@ -308,13 +323,7 @@ class BasicTopology : public AsioBase fDDSSession.SendCommand(cmd::Cmds(cmd::make()).Serialize()); // wait for all tasks to confirm unsubscription - std::unique_lock lk(*fMtx); - fStateChangeUnsubscriptionCV->wait(lk, [&](){ - unsigned int count = std::count_if(fStateIndex.cbegin(), fStateIndex.cend(), [=](const auto& s) { - return fStateData.at(s.second).subscribed_to_state_changes == false; - }); - return count == fStateIndex.size(); - }); + WaitForPublisherCount(0); } void SubscribeToCommands() @@ -360,11 +369,19 @@ class BasicTopology : public AsioBase DDSTask::Id taskId(cmd.GetTaskId()); try { - std::lock_guard lk(*fMtx); + std::unique_lock lk(*fMtx); DeviceStatus& task = fStateData.at(fStateIndex.at(taskId)); - task.subscribed_to_state_changes = true; + if (!task.subscribed_to_state_changes) { + task.subscribed_to_state_changes = true; + ++fNumStateChangePublishers; + } else { + FAIR_LOG(warn) << "Task '" << task.taskId << "' sent subscription confirmation more than once"; + } + lk.unlock(); + fStateChangeSubscriptionsCV->notify_one(); } catch (const std::exception& e) { FAIR_LOG(error) << "Exception in HandleCmd(cmd::StateChangeSubscription const&): " << e.what(); + FAIR_LOG(error) << "Possibly no task with id '" << taskId << "'?"; } } else { FAIR_LOG(error) << "State change subscription failed for device: " << cmd.GetDeviceId() << ", task id: " << cmd.GetTaskId(); @@ -379,9 +396,14 @@ class BasicTopology : public AsioBase try { std::unique_lock lk(*fMtx); DeviceStatus& task = fStateData.at(fStateIndex.at(taskId)); - task.subscribed_to_state_changes = false; + if (task.subscribed_to_state_changes) { + task.subscribed_to_state_changes = false; + --fNumStateChangePublishers; + } else { + FAIR_LOG(warn) << "Task '" << task.taskId << "' sent unsubscription confirmation more than once"; + } lk.unlock(); - fStateChangeUnsubscriptionCV->notify_one(); + fStateChangeSubscriptionsCV->notify_one(); } catch (const std::exception& e) { FAIR_LOG(error) << "Exception in HandleCmd(cmd::StateChangeUnsubscription const&): " << e.what(); } @@ -406,6 +428,7 @@ class BasicTopology : public AsioBase // if the task is exiting, it will not respond to unsubscription request anymore, set it to false now. if (task.state == DeviceState::Exiting) { task.subscribed_to_state_changes = false; + --fNumStateChangePublishers; } // FAIR_LOG(debug) << "Updated state entry: taskId=" << taskId << ", state=" << state; @@ -1300,7 +1323,8 @@ class BasicTopology : public AsioBase mutable std::unique_ptr fMtx; - std::unique_ptr fStateChangeUnsubscriptionCV; + std::unique_ptr fStateChangeSubscriptionsCV; + unsigned int fNumStateChangePublishers; asio::steady_timer fHeartbeatsTimer; Duration fHeartbeatInterval; @@ -1336,9 +1360,11 @@ using Topo = Topology; /// @param nativeSession Existing and initialized CSession (either via create() or attach()) /// @param nativeTopo Existing CTopology that is activated on the given nativeSession /// @param env Optional DDSEnv (needed primarily for unit testing) +/// @param blockUntilConnected if true, ctor will wait for all tasks to confirm subscriptions auto MakeTopology(dds::topology_api::CTopology nativeTopo, std::shared_ptr nativeSession, - DDSEnv env = {}) -> Topology; + DDSEnv env = {}, + bool blockUntilConnected = false) -> Topology; } // namespace sdk } // namespace mq diff --git a/fairmq/sdk/runDDSCommandUI.cxx b/fairmq/sdk/runDDSCommandUI.cxx index 6d43232d..f7eed12a 100644 --- a/fairmq/sdk/runDDSCommandUI.cxx +++ b/fairmq/sdk/runDDSCommandUI.cxx @@ -209,7 +209,7 @@ try { DDSSession session(sessionID, env); DDSTopology ddsTopo(DDSTopology::Path(topoFile), env); - Topology topo(ddsTopo, session); + Topology topo(ddsTopo, session, true); if (targetState != "") { if (command != "") {