diff --git a/fairmq/plugins/DDS/DDS.cxx b/fairmq/plugins/DDS/DDS.cxx index 8648838a..c4f8cfa5 100644 --- a/fairmq/plugins/DDS/DDS.cxx +++ b/fairmq/plugins/DDS/DDS.cxx @@ -36,6 +36,7 @@ DDS::DDS(const string& name, const Plugin::Version version, const string& mainta , fService() , fDDSCustomCmd(fService) , fDDSKeyValue(fService) + , fDDSTaskId(dds::env_prop()) , fBindingChans() , fConnectingChans() , fStopMutex() @@ -68,9 +69,13 @@ auto DDS::HandleControl() -> void LOG(debug) << "$DDS_TASK_NAME: " << getenv("DDS_TASK_NAME"); LOG(debug) << "$DDS_TASK_INDEX: " << getenv("DDS_TASK_INDEX"); LOG(debug) << "$DDS_COLLECTION_INDEX: " << getenv("DDS_COLLECTION_INDEX"); + LOG(debug) << "$DDS_TASK_ID: " << getenv("DDS_TASK_ID"); + LOG(debug) << "$DDS_LOCATION: " << getenv("DDS_LOCATION"); string dds_session_id(getenv("DDS_SESSION_ID")); LOG(debug) << "$DDS_SESSION_ID: " << dds_session_id; + LOG(info) << "DDS Task Id (from API): " << fDDSTaskId; + // subscribe for state changes from DDS (subscriptions start firing after fService.start() is called) SubscribeForCustomCommands(); @@ -95,7 +100,7 @@ auto DDS::HandleControl() -> void fCurrentState = newState; for (auto subscriberId : fStateChangeSubscribers) { LOG(debug) << "Publishing state-change: " << fLastState << "->" << newState << " to " << subscriberId; - fDDSCustomCmd.send("state-change: " + id + "," + ToStr(fLastState) + "->" + ToStr(newState), to_string(subscriberId)); + fDDSCustomCmd.send("state-change: " + id + "," + ToString(fDDSTaskId) + "," + ToStr(fLastState) + "->" + ToStr(newState), to_string(subscriberId)); } } }); @@ -309,7 +314,7 @@ auto DDS::SubscribeForCustomCommands() -> void string id = GetProperty("id"); fDDSCustomCmd.subscribe([id, this](const string& cmd, const string& cond, uint64_t senderId) { - LOG(info) << "Received command: " << cmd; + LOG(info) << "Received command: '" << cmd << "' from " << senderId; if (cmd == "check-state") { fDDSCustomCmd.send(id + ": " + ToStr(GetCurrentDeviceState()), to_string(senderId)); @@ -366,7 +371,8 @@ auto DDS::SubscribeForCustomCommands() -> void { lock_guard lock{fStateChangeSubscriberMutex}; LOG(debug) << "Publishing state-change: " << fLastState << "->" << fCurrentState << " to " << senderId; - fDDSCustomCmd.send("state-change: " + id + "," + ToStr(fLastState) + "->" + ToStr(fCurrentState), to_string(senderId)); + // fDDSCustomCmd.send("state-change: " + id + "," + ToStr(fLastState) + "->" + ToStr(fCurrentState), to_string(senderId)); + fDDSCustomCmd.send("state-change: " + id + "," + ToString(fDDSTaskId) + "," + ToStr(fLastState) + "->" + ToStr(fCurrentState), to_string(senderId)); } } else if (cmd == "unsubscribe-from-state-changes") { { diff --git a/fairmq/plugins/DDS/DDS.h b/fairmq/plugins/DDS/DDS.h index 136a5e2c..4cf3851f 100644 --- a/fairmq/plugins/DDS/DDS.h +++ b/fairmq/plugins/DDS/DDS.h @@ -14,6 +14,7 @@ #include #include +#include #include #include @@ -79,6 +80,7 @@ class DDS : public Plugin dds::intercom_api::CIntercomService fService; dds::intercom_api::CCustomCmd fDDSCustomCmd; dds::intercom_api::CKeyValue fDDSKeyValue; + uint64_t fDDSTaskId; std::unordered_map> fBindingChans; std::unordered_map fConnectingChans; diff --git a/fairmq/sdk/DDSSession.cxx b/fairmq/sdk/DDSSession.cxx index 6840a745..affe05bc 100644 --- a/fairmq/sdk/DDSSession.cxx +++ b/fairmq/sdk/DDSSession.cxx @@ -229,9 +229,9 @@ void DDSSession::SubscribeToCommands(std::functionfSession.unsubscribe(); // TODO REMOVE THIS HACK!!!! fImpl->fDDSCustomCmd.subscribe(cb); - fImpl->fDDSCustomCmd.subscribeOnReply([](const std::string& reply) { - LOG(debug) << reply; - }); + // fImpl->fDDSCustomCmd.subscribeOnReply([](const std::string& reply) { + // LOG(debug) << reply; + // }); } void DDSSession::SendCommand(const std::string& cmd) { fImpl->fDDSCustomCmd.send(cmd, ""); } diff --git a/fairmq/sdk/DDSTopology.cxx b/fairmq/sdk/DDSTopology.cxx index 3ca2737d..a405d4e9 100644 --- a/fairmq/sdk/DDSTopology.cxx +++ b/fairmq/sdk/DDSTopology.cxx @@ -75,7 +75,7 @@ std::vector DDSTopology::GetDeviceList() }); for (auto& it = taskIt.first; it != taskIt.second; ++it) { - LOG(debug) << "Found task " << it->first << " : " << it->second.m_task->getPath(); + LOG(debug) << "Found task " << it->first << " : " << "Path: " << it->second.m_task->getPath() << "Name: " << it->second.m_task->getName(); taskIDs.push_back(it->first); } diff --git a/fairmq/sdk/Topology.cxx b/fairmq/sdk/Topology.cxx index 338167cf..0e5ed751 100644 --- a/fairmq/sdk/Topology.cxx +++ b/fairmq/sdk/Topology.cxx @@ -69,12 +69,18 @@ Topology::Topology(DDSTopology topo, DDSSession session) fTopologyState.emplace(d, DeviceStatus{ false, DeviceState::Ok }); } fDDSSession.SubscribeToCommands([this](const std::string& msg, const std::string& condition, uint64_t senderId) { - LOG(info) << "Received from " << senderId << ": " << msg; + LOG(debug) << "Received from " << senderId << ": " << msg; std::vector parts; boost::algorithm::split(parts, msg, boost::algorithm::is_any_of(":,")); + + for (unsigned int i = 0; i < parts.size(); ++i) { + boost::trim(parts.at(i)); + LOG(info) << "parts[" << i << "]: " << parts.at(i); + } + if (parts[0] == "state-change") { - boost::trim(parts[2]); - AddNewStateEntry(senderId, parts[2]); + boost::trim(parts[3]); + AddNewStateEntry(std::stoull(parts[2]), parts[3]); } else if (parts[0] == "state-changes-subscription") { if (parts[2] != "OK") { LOG(error) << "state-changes-subscription failed with return code: " << parts[2]; @@ -99,6 +105,7 @@ auto Topology::ChangeState(fair::mq::Transition transition, ChangeStateCallback LOG(error) << "State change already in progress, concurrent requested not yet supported"; return; } + LOG(info) << "Initiating ChangeState with " << transition << " to " << fkExpectedState.at(transition); fStateChangeOngoing = true; fChangeStateCallback = cb; fStateChangeTimeout = timeout; @@ -114,43 +121,61 @@ void Topology::WaitForState() { while (!fShutdown) { if (fStateChangeOngoing) { - auto condition = [&] { return fShutdown || std::all_of(fTopologyState.cbegin(), - fTopologyState.cend(), - [&](TopologyState::value_type i) { - return i.second.state == fTargetState; - }); + auto condition = [&] { + LOG(info) << "checking condition"; + LOG(info) << "fShutdown: " << fShutdown; + LOG(info) << "condition: " << std::all_of(fTopologyState.cbegin(), fTopologyState.cend(), [&](TopologyState::value_type i) { return i.second.state == fTargetState; }); + return fShutdown || std::all_of(fTopologyState.cbegin(), fTopologyState.cend(), [&](TopologyState::value_type i) { + return i.second.state == fTargetState; + }); }; std::unique_lock lock(fMtx); + // TODO Fix the timeout version if (fStateChangeTimeout > std::chrono::milliseconds(0)) { + LOG(debug) << "initiating wait with timeout"; if (!fCV.wait_for(lock, fStateChangeTimeout, condition)) { LOG(debug) << "timeout"; - // TODO: catch this from another thread... - throw std::runtime_error("timeout"); + fStateChangeOngoing = false; + break; } } else { + LOG(debug) << "initiating wait without timeout"; fCV.wait(lock, condition); } + fStateChangeOngoing = false; if (fShutdown) { break; } - fStateChangeOngoing = false; fChangeStateCallback(ChangeStateResult{AsyncOpResult::Ok, fTopologyState}); } else { std::unique_lock lock(fExecutionMtx); fExecutionCV.wait(lock); } } + LOG(debug) << "WaitForState shutting down"; }; void Topology::AddNewStateEntry(uint64_t senderId, const std::string& state) { + std::size_t pos = state.find("->"); + std::string endState = state.substr(pos + 2); + LOG(info) << "Adding new state entry: " << senderId << ", " << state << ", end state: " << endState; { - std::unique_lock lock(fMtx); - fTopologyState[senderId] = DeviceStatus{ true, fair::mq::GetState(state) }; + try { + std::unique_lock lock(fMtx); + fTopologyState[senderId] = DeviceStatus{ true, fair::mq::GetState(endState) }; + } catch(const std::exception& e) { + LOG(error) << "Exception in AddNewStateEntry: " << e.what(); + } + + LOG(info) << "fTopologyState after update: "; + for (auto& e : fTopologyState) { + LOG(info) << e.first << ": " << e.second.state; + } } fCV.notify_one(); }