SDK: add WaitForPublisherCount() and related ctor arg

This commit is contained in:
Alexey Rybalchenko 2020-11-11 21:35:18 +01:00 committed by Dennis Klein
parent 73377c5100
commit d3697ec97b
3 changed files with 47 additions and 19 deletions

View File

@ -18,11 +18,13 @@ namespace sdk {
/// @param nativeSession Existing and initialized CSession (either via create() or attach()) /// @param nativeSession Existing and initialized CSession (either via create() or attach())
/// @param nativeTopo Existing CTopology that is activated on the given nativeSession /// @param nativeTopo Existing CTopology that is activated on the given nativeSession
/// @param env Optional DDSEnv (needed primarily for unit testing) /// @param env Optional DDSEnv (needed primarily for unit testing)
/// @param blockUntilConnected if true, ctor will wait for all tasks to confirm subscriptions
auto MakeTopology(dds::topology_api::CTopology nativeTopo, auto MakeTopology(dds::topology_api::CTopology nativeTopo,
std::shared_ptr<dds::tools_api::CSession> nativeSession, std::shared_ptr<dds::tools_api::CSession> nativeSession,
DDSEnv env) -> Topology DDSEnv env,
bool blockUntilConnected) -> Topology
{ {
return {DDSTopo(std::move(nativeTopo), env), DDSSession(std::move(nativeSession), env)}; return {DDSTopo(std::move(nativeTopo), env), DDSSession(std::move(nativeSession), env), blockUntilConnected};
} }
} // namespace sdk } // namespace sdk

View File

@ -216,18 +216,21 @@ class BasicTopology : public AsioBase<Executor, Allocator>
/// @brief (Re)Construct a FairMQ topology from an existing DDS topology /// @brief (Re)Construct a FairMQ topology from an existing DDS topology
/// @param topo DDSTopology /// @param topo DDSTopology
/// @param session DDSSession /// @param session DDSSession
BasicTopology(DDSTopology topo, DDSSession session) /// @param blockUntilConnected if true, ctor will wait for all tasks to confirm subscriptions
: BasicTopology<Executor, Allocator>(asio::system_executor(), std::move(topo), std::move(session)) BasicTopology(DDSTopology topo, DDSSession session, bool blockUntilConnected = false)
: BasicTopology<Executor, Allocator>(asio::system_executor(), std::move(topo), std::move(session), blockUntilConnected)
{} {}
/// @brief (Re)Construct a FairMQ topology from an existing DDS topology /// @brief (Re)Construct a FairMQ topology from an existing DDS topology
/// @param ex I/O executor to be associated /// @param ex I/O executor to be associated
/// @param topo DDSTopology /// @param topo DDSTopology
/// @param session DDSSession /// @param session DDSSession
/// @param blockUntilConnected if true, ctor will wait for all tasks to confirm subscriptions
/// @throws RuntimeError /// @throws RuntimeError
BasicTopology(const Executor& ex, BasicTopology(const Executor& ex,
DDSTopology topo, DDSTopology topo,
DDSSession session, DDSSession session,
bool blockUntilConnected = false,
Allocator alloc = DefaultAllocator()) Allocator alloc = DefaultAllocator())
: AsioBase<Executor, Allocator>(ex, std::move(alloc)) : AsioBase<Executor, Allocator>(ex, std::move(alloc))
, fDDSSession(std::move(session)) , fDDSSession(std::move(session))
@ -235,7 +238,8 @@ class BasicTopology : public AsioBase<Executor, Allocator>
, fStateData() , fStateData()
, fStateIndex() , fStateIndex()
, fMtx(std::make_unique<std::mutex>()) , fMtx(std::make_unique<std::mutex>())
, fStateChangeUnsubscriptionCV(std::make_unique<std::condition_variable>()) , fStateChangeSubscriptionsCV(std::make_unique<std::condition_variable>())
, fNumStateChangePublishers(0)
, fHeartbeatsTimer(asio::system_executor()) , fHeartbeatsTimer(asio::system_executor())
, fHeartbeatInterval(600000) , fHeartbeatInterval(600000)
{ {
@ -251,6 +255,9 @@ class BasicTopology : public AsioBase<Executor, Allocator>
fDDSSession.StartDDSService(); fDDSSession.StartDDSService();
SubscribeToStateChanges(); SubscribeToStateChanges();
if (blockUntilConnected) {
WaitForPublisherCount(fStateIndex.size());
}
} }
/// not copyable /// not copyable
@ -284,6 +291,14 @@ class BasicTopology : public AsioBase<Executor, Allocator>
fHeartbeatsTimer.async_wait(std::bind(&BasicTopology::SendSubscriptionHeartbeats, this, std::placeholders::_1)); fHeartbeatsTimer.async_wait(std::bind(&BasicTopology::SendSubscriptionHeartbeats, this, std::placeholders::_1));
} }
void WaitForPublisherCount(unsigned int number)
{
std::unique_lock<std::mutex> lk(*fMtx);
fStateChangeSubscriptionsCV->wait(lk, [&](){
return fNumStateChangePublishers == number;
});
}
void SendSubscriptionHeartbeats(const std::error_code& ec) void SendSubscriptionHeartbeats(const std::error_code& ec)
{ {
if (!ec) { if (!ec) {
@ -308,13 +323,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
fDDSSession.SendCommand(cmd::Cmds(cmd::make<cmd::UnsubscribeFromStateChange>()).Serialize()); fDDSSession.SendCommand(cmd::Cmds(cmd::make<cmd::UnsubscribeFromStateChange>()).Serialize());
// wait for all tasks to confirm unsubscription // wait for all tasks to confirm unsubscription
std::unique_lock<std::mutex> lk(*fMtx); WaitForPublisherCount(0);
fStateChangeUnsubscriptionCV->wait(lk, [&](){
unsigned int count = std::count_if(fStateIndex.cbegin(), fStateIndex.cend(), [=](const auto& s) {
return fStateData.at(s.second).subscribed_to_state_changes == false;
});
return count == fStateIndex.size();
});
} }
void SubscribeToCommands() void SubscribeToCommands()
@ -360,11 +369,19 @@ class BasicTopology : public AsioBase<Executor, Allocator>
DDSTask::Id taskId(cmd.GetTaskId()); DDSTask::Id taskId(cmd.GetTaskId());
try { try {
std::lock_guard<std::mutex> lk(*fMtx); std::unique_lock<std::mutex> lk(*fMtx);
DeviceStatus& task = fStateData.at(fStateIndex.at(taskId)); DeviceStatus& task = fStateData.at(fStateIndex.at(taskId));
task.subscribed_to_state_changes = true; if (!task.subscribed_to_state_changes) {
task.subscribed_to_state_changes = true;
++fNumStateChangePublishers;
} else {
FAIR_LOG(warn) << "Task '" << task.taskId << "' sent subscription confirmation more than once";
}
lk.unlock();
fStateChangeSubscriptionsCV->notify_one();
} catch (const std::exception& e) { } catch (const std::exception& e) {
FAIR_LOG(error) << "Exception in HandleCmd(cmd::StateChangeSubscription const&): " << e.what(); FAIR_LOG(error) << "Exception in HandleCmd(cmd::StateChangeSubscription const&): " << e.what();
FAIR_LOG(error) << "Possibly no task with id '" << taskId << "'?";
} }
} else { } else {
FAIR_LOG(error) << "State change subscription failed for device: " << cmd.GetDeviceId() << ", task id: " << cmd.GetTaskId(); FAIR_LOG(error) << "State change subscription failed for device: " << cmd.GetDeviceId() << ", task id: " << cmd.GetTaskId();
@ -379,9 +396,14 @@ class BasicTopology : public AsioBase<Executor, Allocator>
try { try {
std::unique_lock<std::mutex> lk(*fMtx); std::unique_lock<std::mutex> lk(*fMtx);
DeviceStatus& task = fStateData.at(fStateIndex.at(taskId)); DeviceStatus& task = fStateData.at(fStateIndex.at(taskId));
task.subscribed_to_state_changes = false; if (task.subscribed_to_state_changes) {
task.subscribed_to_state_changes = false;
--fNumStateChangePublishers;
} else {
FAIR_LOG(warn) << "Task '" << task.taskId << "' sent unsubscription confirmation more than once";
}
lk.unlock(); lk.unlock();
fStateChangeUnsubscriptionCV->notify_one(); fStateChangeSubscriptionsCV->notify_one();
} catch (const std::exception& e) { } catch (const std::exception& e) {
FAIR_LOG(error) << "Exception in HandleCmd(cmd::StateChangeUnsubscription const&): " << e.what(); FAIR_LOG(error) << "Exception in HandleCmd(cmd::StateChangeUnsubscription const&): " << e.what();
} }
@ -406,6 +428,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
// if the task is exiting, it will not respond to unsubscription request anymore, set it to false now. // if the task is exiting, it will not respond to unsubscription request anymore, set it to false now.
if (task.state == DeviceState::Exiting) { if (task.state == DeviceState::Exiting) {
task.subscribed_to_state_changes = false; task.subscribed_to_state_changes = false;
--fNumStateChangePublishers;
} }
// FAIR_LOG(debug) << "Updated state entry: taskId=" << taskId << ", state=" << state; // FAIR_LOG(debug) << "Updated state entry: taskId=" << taskId << ", state=" << state;
@ -1300,7 +1323,8 @@ class BasicTopology : public AsioBase<Executor, Allocator>
mutable std::unique_ptr<std::mutex> fMtx; mutable std::unique_ptr<std::mutex> fMtx;
std::unique_ptr<std::condition_variable> fStateChangeUnsubscriptionCV; std::unique_ptr<std::condition_variable> fStateChangeSubscriptionsCV;
unsigned int fNumStateChangePublishers;
asio::steady_timer fHeartbeatsTimer; asio::steady_timer fHeartbeatsTimer;
Duration fHeartbeatInterval; Duration fHeartbeatInterval;
@ -1336,9 +1360,11 @@ using Topo = Topology;
/// @param nativeSession Existing and initialized CSession (either via create() or attach()) /// @param nativeSession Existing and initialized CSession (either via create() or attach())
/// @param nativeTopo Existing CTopology that is activated on the given nativeSession /// @param nativeTopo Existing CTopology that is activated on the given nativeSession
/// @param env Optional DDSEnv (needed primarily for unit testing) /// @param env Optional DDSEnv (needed primarily for unit testing)
/// @param blockUntilConnected if true, ctor will wait for all tasks to confirm subscriptions
auto MakeTopology(dds::topology_api::CTopology nativeTopo, auto MakeTopology(dds::topology_api::CTopology nativeTopo,
std::shared_ptr<dds::tools_api::CSession> nativeSession, std::shared_ptr<dds::tools_api::CSession> nativeSession,
DDSEnv env = {}) -> Topology; DDSEnv env = {},
bool blockUntilConnected = false) -> Topology;
} // namespace sdk } // namespace sdk
} // namespace mq } // namespace mq

View File

@ -209,7 +209,7 @@ try {
DDSSession session(sessionID, env); DDSSession session(sessionID, env);
DDSTopology ddsTopo(DDSTopology::Path(topoFile), env); DDSTopology ddsTopo(DDSTopology::Path(topoFile), env);
Topology topo(ddsTopo, session); Topology topo(ddsTopo, session, true);
if (targetState != "") { if (targetState != "") {
if (command != "") { if (command != "") {