From acbf57d6f33bb2e75f1a3695ec689b89ccfdfdbd Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Thu, 5 Sep 2019 14:39:37 +0200 Subject: [PATCH] Add sdk::GroupByCollectionId(TopologyState) --- fairmq/sdk/Topology.h | 63 ++++++++++++++++++++++++++++++++----------- 1 file changed, 47 insertions(+), 16 deletions(-) diff --git a/fairmq/sdk/Topology.h b/fairmq/sdk/Topology.h index 4edca885..8ffe198c 100644 --- a/fairmq/sdk/Topology.h +++ b/fairmq/sdk/Topology.h @@ -21,8 +21,10 @@ #include #include #include +#include #include #include +#include #include #include #include @@ -61,23 +63,26 @@ struct DeviceStatus { bool initialized; DeviceState state; + DDSTask::Id taskId; + DDSCollection::Id collectionId; }; -using TopologyState = std::unordered_map; +using TopologyState = std::vector; +using TopologyStateByTask = std::unordered_map; +using TopologyStateByCollection = std::unordered_map>; using TopologyTransition = fair::mq::Transition; inline DeviceState AggregateState(const TopologyState& topologyState) { - DeviceState first = topologyState.begin()->second.state; + DeviceState first = topologyState.begin()->state; if (std::all_of(topologyState.cbegin(), topologyState.cend(), [&](TopologyState::value_type i) { - return i.second.state == first; + return i.state == first; })) { return first; } throw MixedStateError("State is not uniform"); - } inline bool StateEqualsTo(const TopologyState& topologyState, DeviceState state) @@ -85,6 +90,18 @@ inline bool StateEqualsTo(const TopologyState& topologyState, DeviceState state) return AggregateState(topologyState) == state; } +inline TopologyStateByCollection GroupByCollectionId(const TopologyState& topologyState) +{ + TopologyStateByCollection state; + for (const auto& ds : topologyState) { + if (ds.collectionId != 0) { + state[ds.collectionId].push_back(ds); + } + } + + return state; +} + /** * @class BasicTopology Topology.h * @tparam Executor Associated I/O executor @@ -167,7 +184,7 @@ class BasicTopology : public AsioBase && fState.at(fDDSSession.GetTaskId(senderId)).state != fChangeStateTarget) { fChangeStateOpTimer.cancel(); fChangeStateOp.Complete(MakeErrorCode(ErrorCode::DeviceChangeStateFailed), - fState); + MakeTopologyStateFromMap()); } } }); @@ -189,7 +206,7 @@ class BasicTopology : public AsioBase std::lock_guard lk(fMtx); fDDSSession.UnsubscribeFromCommands(); try { - fChangeStateOp.Cancel(fState); + fChangeStateOp.Cancel(MakeTopologyStateFromMap()); } catch (...) {} } @@ -298,7 +315,7 @@ class BasicTopology : public AsioBase fChangeStateOpTimer.async_wait([&](std::error_code ec) { if (!ec) { std::lock_guard lk2(fMtx); - fChangeStateOp.Timeout(fState); + fChangeStateOp.Timeout(MakeTopologyStateFromMap()); } }); } @@ -359,7 +376,7 @@ class BasicTopology : public AsioBase auto GetCurrentState() const -> TopologyState { std::lock_guard lk(fMtx); - return fState; + return MakeTopologyStateFromMap(); } auto AggregateState() const -> DeviceState { return sdk::AggregateState(GetCurrentState()); } @@ -369,7 +386,7 @@ class BasicTopology : public AsioBase private: DDSSession fDDSSession; DDSTopology fDDSTopo; - TopologyState fState; + TopologyStateByTask fState; mutable std::mutex fMtx; using ChangeStateOp = AsioAsyncOp; @@ -377,11 +394,11 @@ class BasicTopology : public AsioBase asio::steady_timer fChangeStateOpTimer; DeviceState fChangeStateTarget; - static auto makeTopologyState(const DDSTopo& topo) -> TopologyState + static auto makeTopologyState(const DDSTopo& topo) -> TopologyStateByTask { - TopologyState state; + TopologyStateByTask state; for (const auto& task : topo.GetTasks()) { - state.emplace(task.GetId(), DeviceStatus{false, DeviceState::Ok}); + state.emplace(task.GetId(), DeviceStatus{false, DeviceState::Ok, task.GetId(), task.GetCollectionId()}); } return state; } @@ -392,7 +409,9 @@ class BasicTopology : public AsioBase std::string endState = state.substr(pos + 2); try { std::lock_guard lk(fMtx); - fState[taskId] = DeviceStatus{true, fair::mq::GetState(endState)}; + DeviceStatus& task = fState.at(taskId); + task.initialized = true; + task.state = fair::mq::GetState(endState); LOG(debug) << "Updated state entry: taskId=" << taskId << ",state=" << state; TryChangeStateCompletion(); } catch (const std::exception& e) { @@ -404,20 +423,32 @@ class BasicTopology : public AsioBase auto TryChangeStateCompletion() -> void { bool targetStateReached( - std::all_of(fState.cbegin(), fState.cend(), [&](TopologyState::value_type i) { + std::all_of(fState.cbegin(), fState.cend(), [&](TopologyStateByTask::value_type i) { return (i.second.state == fChangeStateTarget) && i.second.initialized; })); if (!fChangeStateOp.IsCompleted() && targetStateReached) { fChangeStateOpTimer.cancel(); - fChangeStateOp.Complete(fState); + fChangeStateOp.Complete(MakeTopologyStateFromMap()); } } /// call only under locked fMtx! auto GetCurrentStateUnsafe() const -> TopologyState { - return fState; + return MakeTopologyStateFromMap(); + } + + auto MakeTopologyStateFromMap() const -> TopologyState + { + TopologyState state; + state.reserve(fState.size()); + + for (const auto& e : fState) { + state.push_back(e.second); + } + + return state; } };