diff --git a/fairmq/sdk/DDSTopology.cxx b/fairmq/sdk/DDSTopology.cxx index 941e5b76..b632e287 100644 --- a/fairmq/sdk/DDSTopology.cxx +++ b/fairmq/sdk/DDSTopology.cxx @@ -77,10 +77,10 @@ auto DDSTopology::GetTasks(const std::string& path /* = "" */) const -> std::vec auto tasks = boost::make_iterator_range(itPair.first, itPair.second); for (const auto& task : tasks) { - LOG(debug) << "Found task with id: " << task.first << ", " - << "Path: " << task.second.m_taskPath << ", " - << "Collection id: " << task.second.m_taskCollectionId << ", " - << "Name: " << task.second.m_task->getName() << "_" << task.second.m_taskIndex; + // LOG(debug) << "Found task with id: " << task.first << ", " + // << "Path: " << task.second.m_taskPath << ", " + // << "Collection id: " << task.second.m_taskCollectionId << ", " + // << "Name: " << task.second.m_task->getName() << "_" << task.second.m_taskIndex; list.emplace_back(task.first, task.second.m_taskCollectionId); } diff --git a/fairmq/sdk/Topology.h b/fairmq/sdk/Topology.h index 1ee1e654..49af7fce 100644 --- a/fairmq/sdk/Topology.h +++ b/fairmq/sdk/Topology.h @@ -468,7 +468,6 @@ class BasicTopology : public AsioBase , fTargetLastState(targetLastState) , fTargetCurrentState(targetCurrentState) , fMtx(mutex) - , fCompleted(false) { if (timeout > std::chrono::milliseconds(0)) { fTimer.expires_after(timeout); @@ -490,7 +489,6 @@ class BasicTopology : public AsioBase /// precondition: fMtx is locked. auto ResetCount(const TopologyStateIndex& stateIndex, const TopologyState& stateData) -> void { - LOG(info) << "Resetting count and expecting fTargetLastState=" << fTargetLastState << ",fTargetCurrentState=" << fTargetCurrentState; fCount = std::count_if(stateIndex.cbegin(), stateIndex.cend(), [=](const auto& s) { if (ContainsTask(stateData.at(s.second).taskId)) { if (stateData.at(s.second).state == fTargetCurrentState && @@ -509,8 +507,7 @@ class BasicTopology : public AsioBase /// precondition: fMtx is locked. auto Update(const DDSTask::Id taskId, const DeviceState lastState, const DeviceState currentState) -> void { - if (!fCompleted && ContainsTask(taskId)) { - LOG(info) << "Update: lastState=" << lastState << ",currentState=" << currentState; + if (!fOp.IsCompleted() && ContainsTask(taskId)) { if (currentState == fTargetCurrentState && (lastState == fTargetLastState || fTargetLastState == DeviceState::Ok)) { @@ -523,14 +520,14 @@ class BasicTopology : public AsioBase /// precondition: fMtx is locked. auto TryCompletion() -> void { - LOG(info) << "fCount: " << fCount; if (!fOp.IsCompleted() && fCount == fTasks.size()) { - fCompleted = true; fTimer.cancel(); fOp.Complete(); } } + bool IsCompleted() { return fOp.IsCompleted(); } + private: Id const fId; AsioAsyncOp fOp; @@ -540,7 +537,6 @@ class BasicTopology : public AsioBase DeviceState fTargetLastState; DeviceState fTargetCurrentState; std::mutex& fMtx; - bool fCompleted; /// precondition: fMtx is locked. auto ContainsTask(DDSTask::Id id) -> bool @@ -574,6 +570,15 @@ class BasicTopology : public AsioBase // TODO Implement garbage collection of completed ops std::lock_guard lk(fMtx); + + for (auto it = begin(fWaitForStateOps); it != end(fWaitForStateOps);) { + if (it->second.IsCompleted()) { + it = fWaitForStateOps.erase(it); + } else { + ++it; + } + } + auto p = fWaitForStateOps.emplace( std::piecewise_construct, std::forward_as_tuple(id), @@ -689,6 +694,8 @@ class BasicTopology : public AsioBase TryCompletion(); } + bool IsCompleted() { return fOp.IsCompleted(); } + private: Id const fId; AsioAsyncOp fOp; @@ -744,8 +751,16 @@ class BasicTopology : public AsioBase [&](auto handler) { typename GetPropertiesOp::Id const id(tools::UuidHash()); - // TODO Implement garbage collection of completed ops std::lock_guard lk(fMtx); + + for (auto it = begin(fGetPropertiesOps); it != end(fGetPropertiesOps);) { + if (it->second.IsCompleted()) { + it = fGetPropertiesOps.erase(it); + } else { + ++it; + } + } + fGetPropertiesOps.emplace( std::piecewise_construct, std::forward_as_tuple(id), @@ -846,6 +861,8 @@ class BasicTopology : public AsioBase TryCompletion(); } + bool IsCompleted() { return fOp.IsCompleted(); } + private: Id const fId; AsioAsyncOp fOp; @@ -901,8 +918,16 @@ class BasicTopology : public AsioBase [&](auto handler) { typename SetPropertiesOp::Id const id(tools::UuidHash()); - // TODO Implement garbage collection of completed ops std::lock_guard lk(fMtx); + + for (auto it = begin(fGetPropertiesOps); it != end(fGetPropertiesOps);) { + if (it->second.IsCompleted()) { + it = fGetPropertiesOps.erase(it); + } else { + ++it; + } + } + fSetPropertiesOps.emplace( std::piecewise_construct, std::forward_as_tuple(id), diff --git a/test/sdk/_topology.cxx b/test/sdk/_topology.cxx index e5ea101b..c1146374 100644 --- a/test/sdk/_topology.cxx +++ b/test/sdk/_topology.cxx @@ -225,6 +225,9 @@ TEST_F(Topology, WaitForStateFullDeviceLifecycle) using fair::mq::sdk::TopologyTransition; sdk::Topology topo(mDDSTopo, mDDSSession); + topo.AsyncWaitForState(sdk::DeviceState::ResettingDevice, [](std::error_code ec){ + ASSERT_EQ(ec, std::error_code()); + }); for (auto transition : {TopologyTransition::InitDevice, TopologyTransition::CompleteInit, TopologyTransition::Bind, @@ -235,7 +238,7 @@ TEST_F(Topology, WaitForStateFullDeviceLifecycle) TopologyTransition::ResetTask, TopologyTransition::ResetDevice, TopologyTransition::End}) { - LOG(info) << topo.ChangeState(transition).first; + topo.ChangeState(transition); ASSERT_EQ(topo.WaitForState(sdk::expectedState.at(transition)), std::error_code()); } }