diff --git a/fairmq/sdk/Topology.h b/fairmq/sdk/Topology.h index 557d471f..58b48a1e 100644 --- a/fairmq/sdk/Topology.h +++ b/fairmq/sdk/Topology.h @@ -234,6 +234,8 @@ class BasicTopology : public AsioBase , fDDSTopo(std::move(topo)) , fStateData() , fStateIndex() + , fMtx(std::make_unique()) + , fStateChangeUnsubscriptionCV(std::make_unique()) , fHeartbeatsTimer(asio::system_executor()) , fHeartbeatInterval(600000) { @@ -263,7 +265,7 @@ class BasicTopology : public AsioBase { UnsubscribeFromStateChanges(); - std::lock_guard lk(fMtx); + std::lock_guard lk(*fMtx); fDDSSession.UnsubscribeFromCommands(); try { for (auto& op : fChangeStateOps) { @@ -306,8 +308,8 @@ class BasicTopology : public AsioBase fDDSSession.SendCommand(cmd::Cmds(cmd::make()).Serialize()); // wait for all tasks to confirm unsubscription - std::unique_lock lk(fMtx); - fStateChangeUnsubscriptionCV.wait(lk, [&](){ + std::unique_lock lk(*fMtx); + fStateChangeUnsubscriptionCV->wait(lk, [&](){ unsigned int count = std::count_if(fStateIndex.cbegin(), fStateIndex.cend(), [=](const auto& s) { return fStateData.at(s.second).subscribed_to_state_changes == false; }); @@ -358,7 +360,7 @@ class BasicTopology : public AsioBase DDSTask::Id taskId(cmd.GetTaskId()); try { - std::lock_guard lk(fMtx); + std::lock_guard lk(*fMtx); DeviceStatus& task = fStateData.at(fStateIndex.at(taskId)); task.subscribed_to_state_changes = true; } catch (const std::exception& e) { @@ -375,11 +377,11 @@ class BasicTopology : public AsioBase DDSTask::Id taskId(cmd.GetTaskId()); try { - std::unique_lock lk(fMtx); + std::unique_lock lk(*fMtx); DeviceStatus& task = fStateData.at(fStateIndex.at(taskId)); task.subscribed_to_state_changes = false; lk.unlock(); - fStateChangeUnsubscriptionCV.notify_one(); + fStateChangeUnsubscriptionCV->notify_one(); } catch (const std::exception& e) { FAIR_LOG(error) << "Exception in HandleCmd(cmd::StateChangeUnsubscription const&): " << e.what(); } @@ -397,7 +399,7 @@ class BasicTopology : public AsioBase DDSTask::Id taskId(cmd.GetTaskId()); try { - std::lock_guard lk(fMtx); + std::lock_guard lk(*fMtx); DeviceStatus& task = fStateData.at(fStateIndex.at(taskId)); task.lastState = cmd.GetLastState(); task.state = cmd.GetCurrentState(); @@ -422,7 +424,7 @@ class BasicTopology : public AsioBase { if (cmd.GetResult() != cmd::Result::Ok) { DDSTask::Id taskId(cmd.GetTaskId()); - std::lock_guard lk(fMtx); + std::lock_guard lk(*fMtx); for (auto& op : fChangeStateOps) { if (!op.second.IsCompleted() && op.second.ContainsTask(taskId)) { if (fStateData.at(fStateIndex.at(taskId)).state != op.second.GetTargetState()) { @@ -438,7 +440,7 @@ class BasicTopology : public AsioBase auto HandleCmd(cmd::Properties const& cmd) -> void { - std::unique_lock lk(fMtx); + std::unique_lock lk(*fMtx); try { auto& op(fGetPropertiesOps.at(cmd.GetRequestId())); lk.unlock(); @@ -452,7 +454,7 @@ class BasicTopology : public AsioBase auto HandleCmd(cmd::PropertiesSet const& cmd) -> void { - std::unique_lock lk(fMtx); + std::unique_lock lk(*fMtx); try { auto& op(fSetPropertiesOps.at(cmd.GetRequestId())); lk.unlock(); @@ -659,7 +661,7 @@ class BasicTopology : public AsioBase return asio::async_initiate([&](auto handler) { typename ChangeStateOp::Id const id(tools::UuidHash()); - std::lock_guard lk(fMtx); + std::lock_guard lk(*fMtx); for (auto it = begin(fChangeStateOps); it != end(fChangeStateOps);) { if (it->second.IsCompleted()) { @@ -677,7 +679,7 @@ class BasicTopology : public AsioBase fDDSTopo.GetTasks(path), fStateData, timeout, - fMtx, + *fMtx, AsioBase::GetExecutor(), AsioBase::GetAllocator(), std::move(handler))); @@ -762,7 +764,7 @@ class BasicTopology : public AsioBase /// @return map of id : DeviceStatus auto GetCurrentState() const -> TopologyState { - std::lock_guard lk(fMtx); + std::lock_guard lk(*fMtx); return fStateData; } @@ -890,7 +892,7 @@ class BasicTopology : public AsioBase return asio::async_initiate([&](auto handler) { typename GetPropertiesOp::Id const id(tools::UuidHash()); - std::lock_guard lk(fMtx); + std::lock_guard lk(*fMtx); for (auto it = begin(fWaitForStateOps); it != end(fWaitForStateOps);) { if (it->second.IsCompleted()) { @@ -908,7 +910,7 @@ class BasicTopology : public AsioBase targetCurrentState, fDDSTopo.GetTasks(path), timeout, - fMtx, + *fMtx, AsioBase::GetExecutor(), AsioBase::GetAllocator(), std::move(handler))); @@ -1071,7 +1073,7 @@ class BasicTopology : public AsioBase [&](auto handler) { typename GetPropertiesOp::Id const id(tools::UuidHash()); - std::lock_guard lk(fMtx); + std::lock_guard lk(*fMtx); for (auto it = begin(fGetPropertiesOps); it != end(fGetPropertiesOps);) { if (it->second.IsCompleted()) { @@ -1087,7 +1089,7 @@ class BasicTopology : public AsioBase std::forward_as_tuple(id, fDDSTopo.GetTasks(path).size(), timeout, - fMtx, + *fMtx, AsioBase::GetExecutor(), AsioBase::GetAllocator(), std::move(handler))); @@ -1227,7 +1229,7 @@ class BasicTopology : public AsioBase [&](auto handler) { typename SetPropertiesOp::Id const id(tools::UuidHash()); - std::lock_guard lk(fMtx); + std::lock_guard lk(*fMtx); for (auto it = begin(fGetPropertiesOps); it != end(fGetPropertiesOps);) { if (it->second.IsCompleted()) { @@ -1243,7 +1245,7 @@ class BasicTopology : public AsioBase std::forward_as_tuple(id, fDDSTopo.GetTasks(path).size(), timeout, - fMtx, + *fMtx, AsioBase::GetExecutor(), AsioBase::GetAllocator(), std::move(handler))); @@ -1296,9 +1298,9 @@ class BasicTopology : public AsioBase TopologyState fStateData; TopologyStateIndex fStateIndex; - mutable std::mutex fMtx; + mutable std::unique_ptr fMtx; - std::condition_variable fStateChangeUnsubscriptionCV; + std::unique_ptr fStateChangeUnsubscriptionCV; asio::steady_timer fHeartbeatsTimer; Duration fHeartbeatInterval;