diff --git a/fairmq/sdk/Topology.h b/fairmq/sdk/Topology.h index a9f70aaf..827959d5 100644 --- a/fairmq/sdk/Topology.h +++ b/fairmq/sdk/Topology.h @@ -197,6 +197,19 @@ class BasicTopology : public AsioBase BasicTopology(BasicTopology&&) = default; BasicTopology& operator=(BasicTopology&&) = default; + ~BasicTopology() + { + UnsubscribeFromStateChanges(); + + std::lock_guard lk(fMtx); + fDDSSession.UnsubscribeFromCommands(); + try { + for (auto& op : fChangeStateOps) { + op.second.Complete(MakeErrorCode(ErrorCode::OperationCanceled)); + } + } catch (...) {} + } + void SubscribeToStateChanges() { using namespace fair::mq::sdk::cmd; @@ -215,54 +228,31 @@ class BasicTopology : public AsioBase void SubscribeToCommands() { - using namespace fair::mq::sdk::cmd; fDDSSession.SubscribeToCommands([&](const std::string& msg, const std::string& /* condition */, DDSChannel::Id senderId) { - Cmds inCmds; + cmd::Cmds inCmds; inCmds.Deserialize(msg); // FAIR_LOG(debug) << "Received " << inCmds.Size() << " command(s) with total size of " << msg.length() << " bytes: "; for (const auto& cmd : inCmds) { // FAIR_LOG(debug) << " > " << cmd->GetType(); switch (cmd->GetType()) { - case Type::state_change: { - auto _cmd = static_cast(*cmd); - if (_cmd.GetCurrentState() == DeviceState::Exiting) { - fDDSSession.SendCommand(Cmds(make()).Serialize(), senderId); - } - HandleCmd(_cmd); - } break; - case Type::state_change_subscription: - if (static_cast(*cmd).GetResult() != Result::Ok) { - FAIR_LOG(error) << "State change subscription failed for " << static_cast(*cmd).GetDeviceId(); - } + case cmd::Type::state_change_subscription: + HandleCmd(static_cast(*cmd)); break; - case Type::state_change_unsubscription: - if (static_cast(*cmd).GetResult() != Result::Ok) { - FAIR_LOG(error) << "State change unsubscription failed for " << static_cast(*cmd).GetDeviceId(); - } + case cmd::Type::state_change_unsubscription: + HandleCmd(static_cast(*cmd)); break; - case Type::transition_status: { - auto _cmd = static_cast(*cmd); - if (_cmd.GetResult() != Result::Ok) { - FAIR_LOG(error) << _cmd.GetTransition() << " transition failed for " << _cmd.GetDeviceId(); - DDSTask::Id id(_cmd.GetTaskId()); - std::lock_guard lk(fMtx); - for (auto& op : fChangeStateOps) { - if (!op.second.IsCompleted() && op.second.ContainsTask(id) && - fStateData.at(fStateIndex.at(id)).state != op.second.GetTargetState()) { - op.second.Complete(MakeErrorCode(ErrorCode::DeviceChangeStateFailed)); - } - } - } - } + case cmd::Type::state_change: + HandleCmd(static_cast(*cmd), senderId); break; - case Type::properties: { + case cmd::Type::transition_status: + HandleCmd(static_cast(*cmd)); + break; + case cmd::Type::properties: HandleCmd(static_cast(*cmd)); - } break; - case Type::properties_set: { + case cmd::Type::properties_set: HandleCmd(static_cast(*cmd)); - } break; default: FAIR_LOG(warn) << "Unexpected/unknown command received: " << cmd->GetType(); @@ -273,21 +263,28 @@ class BasicTopology : public AsioBase }); } - ~BasicTopology() + auto HandleCmd(cmd::StateChangeSubscription const& cmd) -> void { - UnsubscribeFromStateChanges(); - - std::lock_guard lk(fMtx); - fDDSSession.UnsubscribeFromCommands(); - try { - for (auto& op : fChangeStateOps) { - op.second.Complete(MakeErrorCode(ErrorCode::OperationCanceled)); - } - } catch (...) {} + if (cmd.GetResult() != cmd::Result::Ok) { + FAIR_LOG(error) << "State change subscription failed for " << cmd.GetDeviceId(); + } } - auto HandleCmd(cmd::StateChange const& cmd) -> void + auto HandleCmd(cmd::StateChangeUnsubscription const& cmd) -> void { + if (cmd.GetResult() != cmd::Result::Ok) { + FAIR_LOG(error) << "State change unsubscription failed for " << cmd.GetDeviceId(); + } + } + + auto HandleCmd(cmd::StateChange const& cmd, DDSChannel::Id const& senderId) -> void + { + using namespace fair::mq::sdk::cmd; + + if (cmd.GetCurrentState() == DeviceState::Exiting) { + fDDSSession.SendCommand(Cmds(make()).Serialize(), senderId); + } + DDSTask::Id taskId(cmd.GetTaskId()); std::lock_guard lk(fMtx); @@ -309,6 +306,21 @@ class BasicTopology : public AsioBase } } + auto HandleCmd(cmd::TransitionStatus const& cmd) -> void + { + if (cmd.GetResult() != cmd::Result::Ok) { + FAIR_LOG(error) << cmd.GetTransition() << " transition failed for " << cmd.GetDeviceId(); + DDSTask::Id id(cmd.GetTaskId()); + std::lock_guard lk(fMtx); + for (auto& op : fChangeStateOps) { + if (!op.second.IsCompleted() && op.second.ContainsTask(id) && + fStateData.at(fStateIndex.at(id)).state != op.second.GetTargetState()) { + op.second.Complete(MakeErrorCode(ErrorCode::DeviceChangeStateFailed)); + } + } + } + } + auto HandleCmd(cmd::Properties const& cmd) -> void { std::unique_lock lk(fMtx); @@ -629,7 +641,7 @@ class BasicTopology : public AsioBase } /// @brief Returns the current state of the topology - /// @return map of id : DeviceStatus (initialized, state) + /// @return map of id : DeviceStatus auto GetCurrentState() const -> TopologyState { std::lock_guard lk(fMtx);