From dc552723178504f9504bf74a9da57f118b6418b0 Mon Sep 17 00:00:00 2001 From: Dennis Klein Date: Wed, 24 Jul 2019 14:25:50 +0200 Subject: [PATCH] SDK: Implement WaitForExecutingAgents * Require DDS 2.5.22 * Apply in meaningful places * Adapt test fixture --- CMakeLists.txt | 2 +- fairmq/sdk/DDSSession.cxx | 69 +++++++++++++++++++++++++++++++------- fairmq/sdk/DDSSession.h | 38 +++++++++++++++++++-- fairmq/sdk/Topology.cxx | 8 ++--- test/sdk/TopologyFixture.h | 3 -- 5 files changed, 97 insertions(+), 23 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 24e81ee1..0d1cb95d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -75,7 +75,7 @@ if(BUILD_NANOMSG_TRANSPORT) endif() if(BUILD_SDK) - set(required_dds_version 2.5.20) + set(required_dds_version 2.5.22) else() set(required_dds_version 2.4) endif() diff --git a/fairmq/sdk/DDSSession.cxx b/fairmq/sdk/DDSSession.cxx index a0553894..5b0df0f7 100644 --- a/fairmq/sdk/DDSSession.cxx +++ b/fairmq/sdk/DDSSession.cxx @@ -9,6 +9,7 @@ #include "DDSSession.h" #include +#include #include #include @@ -168,23 +169,34 @@ auto DDSSession::SubmitAgents(Quantity agents) -> void fImpl->fSession.sendRequest(submitRequest); blocker.Wait(); + + // Not perfect, but best we can do + WaitForIdleAgents(agents); } -auto DDSSession::RequestAgentInfo() -> void +auto DDSSession::RequestAgentInfo() -> AgentInfo { dds::tools_api::SAgentInfoRequestData agentInfoInfo; tools::Semaphore blocker; + AgentInfo info; auto agentInfoRequest = dds::tools_api::SAgentInfoRequest::makeRequest(agentInfoInfo); agentInfoRequest->setResponseCallback( - [&](const dds::tools_api::SAgentInfoResponseData& _response) { - LOG(debug) << "agent: " << _response.m_index << "/" << _response.m_activeAgentsCount; - LOG(debug) << "info: " << _response.m_agentInfo; + [this, &info](const dds::tools_api::SAgentInfoResponseData& _response) { + if (_response.m_index == 0) { + info.activeAgentsCount = _response.m_activeAgentsCount; + info.idleAgentsCount = _response.m_idleAgentsCount; + info.executingAgentsCount = _response.m_executingAgentsCount; + info.agents.reserve(_response.m_activeAgentsCount); + } + info.agents.emplace_back(*this, std::move(_response.m_agentInfo)); }); agentInfoRequest->setMessageCallback( [](const dds::tools_api::SMessageResponseData& _message) { LOG(debug) << _message; }); agentInfoRequest->setDoneCallback([&]() { blocker.Signal(); }); fImpl->fSession.sendRequest(agentInfoRequest); blocker.Wait(); + + return info; } auto DDSSession::RequestCommanderInfo() -> CommanderInfo @@ -197,7 +209,7 @@ auto DDSSession::RequestCommanderInfo() -> CommanderInfo commanderInfoRequest->setResponseCallback( [&info](const dds::tools_api::SCommanderInfoResponseData& _response) { info.pid = _response.m_pid; - info.idleAgentsCount = _response.m_idleAgentsCount; + info.activeTopologyName = std::move(_response.m_activeTopologyName); }); commanderInfoRequest->setMessageCallback( [](const dds::tools_api::SMessageResponseData& _message) { LOG(debug) << _message; }); @@ -208,22 +220,33 @@ auto DDSSession::RequestCommanderInfo() -> CommanderInfo return info; } -auto DDSSession::WaitForIdleAgents(Quantity minCount) -> void +auto DDSSession::WaitForExecutingAgents(Quantity minCount) -> void { - auto info(RequestCommanderInfo()); + auto info(RequestAgentInfo()); int interval(8); - while (info.idleAgentsCount < minCount) { - std::this_thread::sleep_for(std::chrono::milliseconds(8)); + while (info.executingAgentsCount < minCount) { + std::this_thread::sleep_for(std::chrono::milliseconds(interval)); interval = std::min(256, interval * 2); - info = RequestCommanderInfo(); + info = RequestAgentInfo(); } } -auto DDSSession::ActivateTopology(const Path& topologyFile) -> void +auto DDSSession::WaitForIdleAgents(Quantity minCount) -> void +{ + auto info(RequestAgentInfo()); + int interval(8); + while (info.idleAgentsCount < minCount) { + std::this_thread::sleep_for(std::chrono::milliseconds(interval)); + interval = std::min(256, interval * 2); + info = RequestAgentInfo(); + } +} + +auto DDSSession::ActivateTopology(DDSTopology topo) -> void { dds::tools_api::STopologyRequestData topologyInfo; topologyInfo.m_updateType = dds::tools_api::STopologyRequestData::EUpdateType::ACTIVATE; - topologyInfo.m_topologyFile = topologyFile.string(); + topologyInfo.m_topologyFile = topo.GetTopoFile().string(); tools::Semaphore blocker; auto topologyRequest = dds::tools_api::STopologyRequest::makeRequest(topologyInfo); @@ -232,6 +255,13 @@ auto DDSSession::ActivateTopology(const Path& topologyFile) -> void topologyRequest->setDoneCallback([&]() { blocker.Signal(); }); fImpl->fSession.sendRequest(topologyRequest); blocker.Wait(); + + WaitForExecutingAgents(topo.GetNumRequiredAgents()); +} + +auto DDSSession::ActivateTopology(const Path& topoFile) -> void +{ + ActivateTopology(DDSTopology(topoFile, GetEnv())); } void DDSSession::StartDDSService() { fImpl->fDDSService.start(fImpl->fId); } @@ -256,6 +286,21 @@ auto operator<<(std::ostream& os, const DDSSession& session) -> std::ostream& return os << "$DDS_SESSION_ID: " << session.GetId(); } +auto DDSAgent::GetSession() const -> DDSSession +{ + return fSession; +} + +auto DDSAgent::GetInfoStr() const -> std::string +{ + return fInfoStr; +} + +auto operator<<(std::ostream& os, const DDSAgent& agent) -> std::ostream& +{ + return os << agent.GetInfoStr(); +} + } // namespace sdk } // namespace mq } // namespace fair diff --git a/fairmq/sdk/DDSSession.h b/fairmq/sdk/DDSSession.h index ffac3efb..a77ed195 100644 --- a/fairmq/sdk/DDSSession.h +++ b/fairmq/sdk/DDSSession.h @@ -40,6 +40,9 @@ enum class DDSRMSPlugin auto operator<<(std::ostream& os, DDSRMSPlugin plugin) -> std::ostream&; auto operator>>(std::istream& is, DDSRMSPlugin& plugin) -> std::istream&; +class DDSTopology; +class DDSAgent; + /** * @class DDSSession DDSSession.h * @brief Represents a DDS session @@ -64,14 +67,22 @@ class DDSSession auto StopOnDestruction(bool stop = true) -> void; auto IsRunning() const -> bool; auto SubmitAgents(Quantity agents) -> void; - auto RequestAgentInfo() -> void; + struct AgentInfo { + Quantity idleAgentsCount; + Quantity activeAgentsCount; + Quantity executingAgentsCount; + std::vector agents; + }; + auto RequestAgentInfo() -> AgentInfo; struct CommanderInfo { int pid; - Quantity idleAgentsCount; + std::string activeTopologyName; }; auto RequestCommanderInfo() -> CommanderInfo; auto WaitForIdleAgents(Quantity) -> void; - auto ActivateTopology(const Path& topologyFile) -> void; + auto WaitForExecutingAgents(Quantity) -> void; + auto ActivateTopology(const Path& topoFile) -> void; + auto ActivateTopology(DDSTopology) -> void; auto Stop() -> void; void StartDDSService(); @@ -85,6 +96,27 @@ class DDSSession std::shared_ptr fImpl; }; +/** + * @class DDSAgent DDSSession.h + * @brief Represents a DDS agent + */ +class DDSAgent +{ + public: + explicit DDSAgent(DDSSession session, std::string infostr) + : fInfoStr(std::move(infostr)) + , fSession(std::move(session)) + {} + + auto GetSession() const -> DDSSession; + auto GetInfoStr() const -> std::string; + + friend auto operator<<(std::ostream& os, const DDSAgent& plugin) -> std::ostream&; + + private: + std::string fInfoStr; + DDSSession fSession; +}; } // namespace sdk } // namespace mq } // namespace fair diff --git a/fairmq/sdk/Topology.cxx b/fairmq/sdk/Topology.cxx index 9d1fc766..f1d5de4e 100644 --- a/fairmq/sdk/Topology.cxx +++ b/fairmq/sdk/Topology.cxx @@ -196,10 +196,10 @@ void Topology::AddNewStateEntry(uint64_t senderId, const std::string& state) LOG(error) << "Exception in AddNewStateEntry: " << e.what(); } - LOG(info) << "fTopologyState after update: "; - for (auto& e : fTopologyState) { - LOG(info) << e.first << ": " << e.second.state; - } + // LOG(info) << "fTopologyState after update: "; + // for (auto& e : fTopologyState) { + // LOG(info) << e.first << ": " << e.second.state; + // } } fCV.notify_one(); } diff --git a/test/sdk/TopologyFixture.h b/test/sdk/TopologyFixture.h index 86df73a4..7a00e1d9 100644 --- a/test/sdk/TopologyFixture.h +++ b/test/sdk/TopologyFixture.h @@ -61,10 +61,7 @@ struct TopologyFixture : ::testing::Test LOG(info) << mDDSTopo; auto n(mDDSTopo.GetNumRequiredAgents()); mDDSSession.SubmitAgents(n); - mDDSSession.WaitForIdleAgents(n); mDDSSession.ActivateTopology(mDDSTopoFile); - std::this_thread::sleep_for(std::chrono::seconds(1)); // TODO implement WaitForActiveAgents - mDDSSession.RequestAgentInfo(); } auto TearDown() -> void override {