diff --git a/fairmq/sdk/DDSSession.cxx b/fairmq/sdk/DDSSession.cxx index 5ffd9893..ee2c7eee 100644 --- a/fairmq/sdk/DDSSession.cxx +++ b/fairmq/sdk/DDSSession.cxx @@ -20,7 +20,9 @@ #include #include +#include #include +#include #include namespace fair { @@ -134,6 +136,8 @@ struct DDSSession::Impl dds::intercom_api::CCustomCmd fDDSCustomCmd; Id fId; bool fStopOnDestruction; + mutable std::mutex fMtx; + std::unordered_map fTaskIdByChannelIdMap; }; DDSSession::DDSSession(DDSEnvironment env) @@ -316,6 +320,18 @@ void DDSSession::UnsubscribeFromCommands() void DDSSession::SendCommand(const std::string& cmd) { fImpl->fDDSCustomCmd.send(cmd, ""); } +auto DDSSession::UpdateChannelToTaskAssociation(DDSChannel::Id channelId, DDSTask::Id taskId) -> void +{ + std::lock_guard lk(fImpl->fMtx); + fImpl->fTaskIdByChannelIdMap[channelId] = taskId; +} + +auto DDSSession::GetTaskId(DDSChannel::Id channelId) const -> DDSTask::Id +{ + std::lock_guard lk(fImpl->fMtx); + return fImpl->fTaskIdByChannelIdMap.at(channelId); +} + auto operator<<(std::ostream& os, const DDSSession& session) -> std::ostream& { return os << "$DDS_SESSION_ID: " << session.GetId(); diff --git a/fairmq/sdk/DDSSession.h b/fairmq/sdk/DDSSession.h index 0a1c4885..e14c7e95 100644 --- a/fairmq/sdk/DDSSession.h +++ b/fairmq/sdk/DDSSession.h @@ -43,6 +43,18 @@ auto operator>>(std::istream& is, DDSRMSPlugin& plugin) -> std::istream&; class DDSTopology; class DDSAgent; +class DDSTask +{ + public: + using Id = std::uint64_t; +}; + +class DDSChannel +{ + public: + using Id = std::uint64_t; +}; + /** * @class DDSSession DDSSession.h * @brief Represents a DDS session @@ -95,6 +107,8 @@ class DDSSession void SubscribeToCommands(std::function); void UnsubscribeFromCommands(); void SendCommand(const std::string&); + auto UpdateChannelToTaskAssociation(DDSChannel::Id, DDSTask::Id) -> void; + auto GetTaskId(DDSChannel::Id) const -> DDSTask::Id; friend auto operator<<(std::ostream& os, const DDSSession& session) -> std::ostream&; diff --git a/fairmq/sdk/Topology.cxx b/fairmq/sdk/Topology.cxx index d9f489cd..2d557eef 100644 --- a/fairmq/sdk/Topology.cxx +++ b/fairmq/sdk/Topology.cxx @@ -75,7 +75,7 @@ Topology::Topology(DDSTopology topo, DDSSession session) // LOG(debug) << "Adding device " << d; fState.emplace(d, DeviceStatus{ false, DeviceState::Ok }); } - fDDSSession.SubscribeToCommands([this](const std::string& msg, const std::string& /* condition */, uint64_t senderId) { + fDDSSession.SubscribeToCommands([this](const std::string& msg, const std::string& /* condition */, DDSChannel::Id senderId) { // LOG(debug) << "Received from " << senderId << ": " << msg; std::vector parts; boost::algorithm::split(parts, msg, boost::algorithm::is_any_of(":,")); @@ -85,7 +85,9 @@ Topology::Topology(DDSTopology topo, DDSSession session) } if (parts[0] == "state-change") { - AddNewStateEntry(std::stoull(parts[2]), parts[3]); + DDSTask::Id taskId(std::stoull(parts[2])); + fDDSSession.UpdateChannelToTaskAssociation(senderId, taskId); + AddNewStateEntry(taskId, parts[3]); } else if (parts[0] == "state-changes-subscription") { LOG(debug) << "Received from " << senderId << ": " << msg; if (parts[2] != "OK") { diff --git a/fairmq/sdk/Topology.h b/fairmq/sdk/Topology.h index ff8e64cc..953ea695 100644 --- a/fairmq/sdk/Topology.h +++ b/fairmq/sdk/Topology.h @@ -56,7 +56,7 @@ struct DeviceStatus DeviceState state; }; -using TopologyState = std::unordered_map; +using TopologyState = std::unordered_map; using TopologyTransition = fair::mq::Transition; struct MixedState : std::runtime_error { using std::runtime_error::runtime_error; };