SDK: Implement WaitForExecutingAgents

* Require DDS 2.5.22
* Apply in meaningful places
* Adapt test fixture
This commit is contained in:
Dennis Klein 2019-07-24 14:25:50 +02:00
parent 388b1be056
commit dc55272317
No known key found for this signature in database
GPG Key ID: 08E62D23FA0ECBBC
5 changed files with 97 additions and 23 deletions

View File

@ -75,7 +75,7 @@ if(BUILD_NANOMSG_TRANSPORT)
endif()
if(BUILD_SDK)
set(required_dds_version 2.5.20)
set(required_dds_version 2.5.22)
else()
set(required_dds_version 2.4)
endif()

View File

@ -9,6 +9,7 @@
#include "DDSSession.h"
#include <fairmq/sdk/DDSEnvironment.h>
#include <fairmq/sdk/DDSTopology.h>
#include <fairmq/Tools.h>
#include <fairlogger/Logger.h>
@ -168,23 +169,34 @@ auto DDSSession::SubmitAgents(Quantity agents) -> void
fImpl->fSession.sendRequest<dds::tools_api::SSubmitRequest>(submitRequest);
blocker.Wait();
// Not perfect, but best we can do
WaitForIdleAgents(agents);
}
auto DDSSession::RequestAgentInfo() -> void
auto DDSSession::RequestAgentInfo() -> AgentInfo
{
dds::tools_api::SAgentInfoRequestData agentInfoInfo;
tools::Semaphore blocker;
AgentInfo info;
auto agentInfoRequest = dds::tools_api::SAgentInfoRequest::makeRequest(agentInfoInfo);
agentInfoRequest->setResponseCallback(
[&](const dds::tools_api::SAgentInfoResponseData& _response) {
LOG(debug) << "agent: " << _response.m_index << "/" << _response.m_activeAgentsCount;
LOG(debug) << "info: " << _response.m_agentInfo;
[this, &info](const dds::tools_api::SAgentInfoResponseData& _response) {
if (_response.m_index == 0) {
info.activeAgentsCount = _response.m_activeAgentsCount;
info.idleAgentsCount = _response.m_idleAgentsCount;
info.executingAgentsCount = _response.m_executingAgentsCount;
info.agents.reserve(_response.m_activeAgentsCount);
}
info.agents.emplace_back(*this, std::move(_response.m_agentInfo));
});
agentInfoRequest->setMessageCallback(
[](const dds::tools_api::SMessageResponseData& _message) { LOG(debug) << _message; });
agentInfoRequest->setDoneCallback([&]() { blocker.Signal(); });
fImpl->fSession.sendRequest<dds::tools_api::SAgentInfoRequest>(agentInfoRequest);
blocker.Wait();
return info;
}
auto DDSSession::RequestCommanderInfo() -> CommanderInfo
@ -197,7 +209,7 @@ auto DDSSession::RequestCommanderInfo() -> CommanderInfo
commanderInfoRequest->setResponseCallback(
[&info](const dds::tools_api::SCommanderInfoResponseData& _response) {
info.pid = _response.m_pid;
info.idleAgentsCount = _response.m_idleAgentsCount;
info.activeTopologyName = std::move(_response.m_activeTopologyName);
});
commanderInfoRequest->setMessageCallback(
[](const dds::tools_api::SMessageResponseData& _message) { LOG(debug) << _message; });
@ -208,22 +220,33 @@ auto DDSSession::RequestCommanderInfo() -> CommanderInfo
return info;
}
auto DDSSession::WaitForIdleAgents(Quantity minCount) -> void
auto DDSSession::WaitForExecutingAgents(Quantity minCount) -> void
{
auto info(RequestCommanderInfo());
auto info(RequestAgentInfo());
int interval(8);
while (info.idleAgentsCount < minCount) {
std::this_thread::sleep_for(std::chrono::milliseconds(8));
while (info.executingAgentsCount < minCount) {
std::this_thread::sleep_for(std::chrono::milliseconds(interval));
interval = std::min(256, interval * 2);
info = RequestCommanderInfo();
info = RequestAgentInfo();
}
}
auto DDSSession::ActivateTopology(const Path& topologyFile) -> void
auto DDSSession::WaitForIdleAgents(Quantity minCount) -> void
{
auto info(RequestAgentInfo());
int interval(8);
while (info.idleAgentsCount < minCount) {
std::this_thread::sleep_for(std::chrono::milliseconds(interval));
interval = std::min(256, interval * 2);
info = RequestAgentInfo();
}
}
auto DDSSession::ActivateTopology(DDSTopology topo) -> void
{
dds::tools_api::STopologyRequestData topologyInfo;
topologyInfo.m_updateType = dds::tools_api::STopologyRequestData::EUpdateType::ACTIVATE;
topologyInfo.m_topologyFile = topologyFile.string();
topologyInfo.m_topologyFile = topo.GetTopoFile().string();
tools::Semaphore blocker;
auto topologyRequest = dds::tools_api::STopologyRequest::makeRequest(topologyInfo);
@ -232,6 +255,13 @@ auto DDSSession::ActivateTopology(const Path& topologyFile) -> void
topologyRequest->setDoneCallback([&]() { blocker.Signal(); });
fImpl->fSession.sendRequest<dds::tools_api::STopologyRequest>(topologyRequest);
blocker.Wait();
WaitForExecutingAgents(topo.GetNumRequiredAgents());
}
auto DDSSession::ActivateTopology(const Path& topoFile) -> void
{
ActivateTopology(DDSTopology(topoFile, GetEnv()));
}
void DDSSession::StartDDSService() { fImpl->fDDSService.start(fImpl->fId); }
@ -256,6 +286,21 @@ auto operator<<(std::ostream& os, const DDSSession& session) -> std::ostream&
return os << "$DDS_SESSION_ID: " << session.GetId();
}
auto DDSAgent::GetSession() const -> DDSSession
{
return fSession;
}
auto DDSAgent::GetInfoStr() const -> std::string
{
return fInfoStr;
}
auto operator<<(std::ostream& os, const DDSAgent& agent) -> std::ostream&
{
return os << agent.GetInfoStr();
}
} // namespace sdk
} // namespace mq
} // namespace fair

View File

@ -40,6 +40,9 @@ enum class DDSRMSPlugin
auto operator<<(std::ostream& os, DDSRMSPlugin plugin) -> std::ostream&;
auto operator>>(std::istream& is, DDSRMSPlugin& plugin) -> std::istream&;
class DDSTopology;
class DDSAgent;
/**
* @class DDSSession DDSSession.h <fairmq/sdk/DDSSession.h>
* @brief Represents a DDS session
@ -64,14 +67,22 @@ class DDSSession
auto StopOnDestruction(bool stop = true) -> void;
auto IsRunning() const -> bool;
auto SubmitAgents(Quantity agents) -> void;
auto RequestAgentInfo() -> void;
struct AgentInfo {
Quantity idleAgentsCount;
Quantity activeAgentsCount;
Quantity executingAgentsCount;
std::vector<DDSAgent> agents;
};
auto RequestAgentInfo() -> AgentInfo;
struct CommanderInfo {
int pid;
Quantity idleAgentsCount;
std::string activeTopologyName;
};
auto RequestCommanderInfo() -> CommanderInfo;
auto WaitForIdleAgents(Quantity) -> void;
auto ActivateTopology(const Path& topologyFile) -> void;
auto WaitForExecutingAgents(Quantity) -> void;
auto ActivateTopology(const Path& topoFile) -> void;
auto ActivateTopology(DDSTopology) -> void;
auto Stop() -> void;
void StartDDSService();
@ -85,6 +96,27 @@ class DDSSession
std::shared_ptr<Impl> fImpl;
};
/**
* @class DDSAgent DDSSession.h <fairmq/sdk/DDSSession.h>
* @brief Represents a DDS agent
*/
class DDSAgent
{
public:
explicit DDSAgent(DDSSession session, std::string infostr)
: fInfoStr(std::move(infostr))
, fSession(std::move(session))
{}
auto GetSession() const -> DDSSession;
auto GetInfoStr() const -> std::string;
friend auto operator<<(std::ostream& os, const DDSAgent& plugin) -> std::ostream&;
private:
std::string fInfoStr;
DDSSession fSession;
};
} // namespace sdk
} // namespace mq
} // namespace fair

View File

@ -196,10 +196,10 @@ void Topology::AddNewStateEntry(uint64_t senderId, const std::string& state)
LOG(error) << "Exception in AddNewStateEntry: " << e.what();
}
LOG(info) << "fTopologyState after update: ";
for (auto& e : fTopologyState) {
LOG(info) << e.first << ": " << e.second.state;
}
// LOG(info) << "fTopologyState after update: ";
// for (auto& e : fTopologyState) {
// LOG(info) << e.first << ": " << e.second.state;
// }
}
fCV.notify_one();
}

View File

@ -61,10 +61,7 @@ struct TopologyFixture : ::testing::Test
LOG(info) << mDDSTopo;
auto n(mDDSTopo.GetNumRequiredAgents());
mDDSSession.SubmitAgents(n);
mDDSSession.WaitForIdleAgents(n);
mDDSSession.ActivateTopology(mDDSTopoFile);
std::this_thread::sleep_for(std::chrono::seconds(1)); // TODO implement WaitForActiveAgents
mDDSSession.RequestAgentInfo();
}
auto TearDown() -> void override {