diff --git a/fairmq/sdk/Topology.h b/fairmq/sdk/Topology.h index e3f112e6..a92d3606 100644 --- a/fairmq/sdk/Topology.h +++ b/fairmq/sdk/Topology.h @@ -69,6 +69,7 @@ struct DeviceStatus }; using TopologyState = std::vector; +using TopologyStateIndex = std::unordered_map; // task id -> index in the data vector using TopologyStateByTask = std::unordered_map; using TopologyStateByCollection = std::unordered_map>; using TopologyTransition = fair::mq::Transition; @@ -131,9 +132,7 @@ class BasicTopology : public AsioBase /// @param topo DDSTopology /// @param session DDSSession BasicTopology(DDSTopology topo, DDSSession session) - : BasicTopology(asio::system_executor(), - std::move(topo), - std::move(session)) + : BasicTopology(asio::system_executor(), std::move(topo), std::move(session)) {} /// @brief (Re)Construct a FairMQ topology from an existing DDS topology @@ -148,21 +147,21 @@ class BasicTopology : public AsioBase : AsioBase(ex, std::move(alloc)) , fDDSSession(std::move(session)) , fDDSTopo(std::move(topo)) - , fState(makeTopologyState(fDDSTopo)) + , fStateData() + , fStateIndex() , fChangeStateOp() , fChangeStateOpTimer(ex) , fChangeStateTarget(DeviceState::Idle) { + makeTopologyState(); + std::string activeTopo(fDDSSession.RequestCommanderInfo().activeTopologyName); std::string givenTopo(fDDSTopo.GetName()); if (activeTopo != givenTopo) { - throw RuntimeError("Given topology ", givenTopo, - " is not activated (active: ", activeTopo, ")"); + throw RuntimeError("Given topology ", givenTopo, " is not activated (active: ", activeTopo, ")"); } - fDDSSession.SubscribeToCommands([&](const std::string& msg, - const std::string& /* condition */, - DDSChannel::Id senderId) { + fDDSSession.SubscribeToCommands([&](const std::string& msg, const std::string& /* condition */, DDSChannel::Id senderId) { // LOG(debug) << "Received from " << senderId << ": " << msg; std::vector parts; boost::algorithm::split(parts, msg, boost::algorithm::is_any_of(":,")); @@ -174,31 +173,29 @@ class BasicTopology : public AsioBase if (parts[0] == "state-change") { DDSTask::Id taskId(std::stoull(parts[2])); fDDSSession.UpdateChannelToTaskAssociation(senderId, taskId); - if(parts[3] == "IDLE->EXITING") { + if (parts[3] == "IDLE->EXITING") { fDDSSession.SendCommand("state-change-exiting-received", senderId); } UpdateStateEntry(taskId, parts[3]); } else if (parts[0] == "state-changes-subscription") { LOG(debug) << "Received from " << senderId << ": " << msg; if (parts[2] != "OK") { - LOG(error) << "state-changes-subscription failed with return code: " - << parts[2]; + LOG(error) << "state-changes-subscription failed with return code: " << parts[2]; } } else if (parts[0] == "state-changes-unsubscription") { + LOG(debug) << "Received from " << senderId << ": " << msg; if (parts[2] != "OK") { - LOG(error) << "state-changes-unsubscription failed with return code: " - << parts[2]; + LOG(error) << "state-changes-unsubscription failed with return code: " << parts[2]; } } else if (parts[1] == "could not queue") { std::lock_guard lk(fMtx); - if (!fChangeStateOp.IsCompleted() - && fState.at(fDDSSession.GetTaskId(senderId)).state != fChangeStateTarget) { + if (!fChangeStateOp.IsCompleted() && fStateData.at(fStateIndex.at(fDDSSession.GetTaskId(senderId))).state != fChangeStateTarget) { fChangeStateOpTimer.cancel(); - fChangeStateOp.Complete(MakeErrorCode(ErrorCode::DeviceChangeStateFailed), - MakeTopologyStateFromMap()); + fChangeStateOp.Complete(MakeErrorCode(ErrorCode::DeviceChangeStateFailed), fStateData); } } }); + fDDSSession.StartDDSService(); LOG(debug) << "subscribe-to-state-changes"; fDDSSession.SendCommand("subscribe-to-state-changes"); @@ -217,7 +214,7 @@ class BasicTopology : public AsioBase std::lock_guard lk(fMtx); fDDSSession.UnsubscribeFromCommands(); try { - fChangeStateOp.Cancel(MakeTopologyStateFromMap()); + fChangeStateOp.Cancel(fStateData); } catch (...) {} } @@ -327,7 +324,7 @@ class BasicTopology : public AsioBase fChangeStateOpTimer.async_wait([&](std::error_code ec) { if (!ec) { std::lock_guard lk2(fMtx); - fChangeStateOp.Timeout(MakeTopologyStateFromMap()); + fChangeStateOp.Timeout(fStateData); } }); } @@ -388,7 +385,7 @@ class BasicTopology : public AsioBase auto GetCurrentState() const -> TopologyState { std::lock_guard lk(fMtx); - return MakeTopologyStateFromMap(); + return fStateData; } auto AggregateState() const -> DeviceState { return sdk::AggregateState(GetCurrentState()); } @@ -397,12 +394,11 @@ class BasicTopology : public AsioBase private: using TransitionedCount = unsigned int; - // using TransitionCounts = std::map; - DDSSession fDDSSession; DDSTopology fDDSTopo; - TopologyStateByTask fState; + TopologyState fStateData; + TopologyStateIndex fStateIndex; mutable std::mutex fMtx; using ChangeStateOp = AsioAsyncOp; @@ -411,13 +407,17 @@ class BasicTopology : public AsioBase DeviceState fChangeStateTarget; TransitionedCount fTransitionedCount; - static auto makeTopologyState(const DDSTopo& topo) -> TopologyStateByTask + auto makeTopologyState() -> void { - TopologyStateByTask state; - for (const auto& task : topo.GetTasks()) { - state.emplace(task.GetId(), DeviceStatus{false, DeviceState::Ok, task.GetId(), task.GetCollectionId()}); + fStateData.reserve(fDDSTopo.GetTasks().size()); + + size_t index = 0; + + for (const auto& task : fDDSTopo.GetTasks()) { + fStateData.push_back(DeviceStatus{false, DeviceState::Ok, task.GetId(), task.GetCollectionId()}); + fStateIndex.emplace(task.GetId(), index); + index++; } - return state; } auto UpdateStateEntry(DDSTask::Id taskId, const std::string& state) -> void @@ -426,54 +426,41 @@ class BasicTopology : public AsioBase std::string endState = state.substr(pos + 2); try { std::lock_guard lk(fMtx); - DeviceStatus& task = fState.at(taskId); + DeviceStatus& task = fStateData.at(fStateIndex.at(taskId)); task.initialized = true; task.state = fair::mq::GetState(endState); if (task.state == fChangeStateTarget) { ++fTransitionedCount; } - LOG(debug) << "Updated state entry: taskId=" << taskId << ",state=" << state; + // LOG(debug) << "Updated state entry: taskId=" << taskId << ", state=" << endState; TryChangeStateCompletion(); } catch (const std::exception& e) { LOG(error) << "Exception in UpdateStateEntry: " << e.what(); } } - /// call only under locked fMtx! + /// precodition: fMtx is locked. auto TryChangeStateCompletion() -> void { - if (!fChangeStateOp.IsCompleted() && fTransitionedCount == fState.size()) { + if (!fChangeStateOp.IsCompleted() && fTransitionedCount == fStateData.size()) { fChangeStateOpTimer.cancel(); - fChangeStateOp.Complete(MakeTopologyStateFromMap()); + fChangeStateOp.Complete(fStateData); } } - /// call only under locked fMtx! + /// precodition: fMtx is locked. auto ResetTransitionedCount(DeviceState targetState) -> void { - fTransitionedCount = std::count_if(fState.cbegin(), fState.cend(), [=](const auto& s) { - return s.second.state == targetState; + fTransitionedCount = std::count_if(fStateIndex.cbegin(), fStateIndex.cend(), [=](const auto& s) { + return fStateData.at(s.second).state == targetState; }); } - /// call only under locked fMtx! + /// precodition: fMtx is locked. auto GetCurrentStateUnsafe() const -> TopologyState { - return MakeTopologyStateFromMap(); + return fStateData; } - - auto MakeTopologyStateFromMap() const -> TopologyState - { - TopologyState state; - state.reserve(fState.size()); - - for (const auto& e : fState) { - state.push_back(e.second); - } - - return state; - } - }; using Topology = BasicTopology;