diff --git a/fairmq/sdk/DDSSession.cxx b/fairmq/sdk/DDSSession.cxx index c2773ff5..a0553894 100644 --- a/fairmq/sdk/DDSSession.cxx +++ b/fairmq/sdk/DDSSession.cxx @@ -187,25 +187,36 @@ auto DDSSession::RequestAgentInfo() -> void blocker.Wait(); } -auto DDSSession::RequestCommanderInfo() -> void +auto DDSSession::RequestCommanderInfo() -> CommanderInfo { dds::tools_api::SCommanderInfoRequestData commanderInfoInfo; tools::Semaphore blocker; auto commanderInfoRequest = dds::tools_api::SCommanderInfoRequest::makeRequest(commanderInfoInfo); + CommanderInfo info; commanderInfoRequest->setResponseCallback( - [&](const dds::tools_api::SCommanderInfoResponseData& _response) { - LOG(debug) << "pid: " << _response.m_pid; - LOG(debug) << "idle agents: " << _response.m_idleAgentsCount; - // LOG(debug) << "active topology: " - // << ((_response.m_hasActiveTopology) ? _response.m_activeTopologyName - // : ""); + [&info](const dds::tools_api::SCommanderInfoResponseData& _response) { + info.pid = _response.m_pid; + info.idleAgentsCount = _response.m_idleAgentsCount; }); commanderInfoRequest->setMessageCallback( [](const dds::tools_api::SMessageResponseData& _message) { LOG(debug) << _message; }); commanderInfoRequest->setDoneCallback([&]() { blocker.Signal(); }); fImpl->fSession.sendRequest(commanderInfoRequest); blocker.Wait(); + + return info; +} + +auto DDSSession::WaitForIdleAgents(Quantity minCount) -> void +{ + auto info(RequestCommanderInfo()); + int interval(8); + while (info.idleAgentsCount < minCount) { + std::this_thread::sleep_for(std::chrono::milliseconds(8)); + interval = std::min(256, interval * 2); + info = RequestCommanderInfo(); + } } auto DDSSession::ActivateTopology(const Path& topologyFile) -> void diff --git a/fairmq/sdk/DDSSession.h b/fairmq/sdk/DDSSession.h index 6618d396..ffac3efb 100644 --- a/fairmq/sdk/DDSSession.h +++ b/fairmq/sdk/DDSSession.h @@ -65,7 +65,12 @@ class DDSSession auto IsRunning() const -> bool; auto SubmitAgents(Quantity agents) -> void; auto RequestAgentInfo() -> void; - auto RequestCommanderInfo() -> void; + struct CommanderInfo { + int pid; + Quantity idleAgentsCount; + }; + auto RequestCommanderInfo() -> CommanderInfo; + auto WaitForIdleAgents(Quantity) -> void; auto ActivateTopology(const Path& topologyFile) -> void; auto Stop() -> void; diff --git a/test/sdk/TopologyFixture.h b/test/sdk/TopologyFixture.h index 1c0724c6..86df73a4 100644 --- a/test/sdk/TopologyFixture.h +++ b/test/sdk/TopologyFixture.h @@ -58,12 +58,13 @@ struct TopologyFixture : ::testing::Test auto SetUp() -> void override { LOG(info) << mDDSEnv; LOG(info) << mDDSSession; - mDDSSession.RequestCommanderInfo(); - mDDSSession.SubmitAgents(2); - mDDSSession.RequestCommanderInfo(); - std::this_thread::sleep_for(std::chrono::seconds(1)); // TODO implement WaitForIdleAgents + LOG(info) << mDDSTopo; + auto n(mDDSTopo.GetNumRequiredAgents()); + mDDSSession.SubmitAgents(n); + mDDSSession.WaitForIdleAgents(n); mDDSSession.ActivateTopology(mDDSTopoFile); - mDDSSession.RequestCommanderInfo(); + std::this_thread::sleep_for(std::chrono::seconds(1)); // TODO implement WaitForActiveAgents + mDDSSession.RequestAgentInfo(); } auto TearDown() -> void override {