mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 08:41:16 +00:00
SDK: Implement WaitForIdleAgents and CommanderInfoRequest
This commit is contained in:
parent
bc98ab1eed
commit
a93840b240
|
@ -187,25 +187,36 @@ auto DDSSession::RequestAgentInfo() -> void
|
||||||
blocker.Wait();
|
blocker.Wait();
|
||||||
}
|
}
|
||||||
|
|
||||||
auto DDSSession::RequestCommanderInfo() -> void
|
auto DDSSession::RequestCommanderInfo() -> CommanderInfo
|
||||||
{
|
{
|
||||||
dds::tools_api::SCommanderInfoRequestData commanderInfoInfo;
|
dds::tools_api::SCommanderInfoRequestData commanderInfoInfo;
|
||||||
tools::Semaphore blocker;
|
tools::Semaphore blocker;
|
||||||
auto commanderInfoRequest =
|
auto commanderInfoRequest =
|
||||||
dds::tools_api::SCommanderInfoRequest::makeRequest(commanderInfoInfo);
|
dds::tools_api::SCommanderInfoRequest::makeRequest(commanderInfoInfo);
|
||||||
|
CommanderInfo info;
|
||||||
commanderInfoRequest->setResponseCallback(
|
commanderInfoRequest->setResponseCallback(
|
||||||
[&](const dds::tools_api::SCommanderInfoResponseData& _response) {
|
[&info](const dds::tools_api::SCommanderInfoResponseData& _response) {
|
||||||
LOG(debug) << "pid: " << _response.m_pid;
|
info.pid = _response.m_pid;
|
||||||
LOG(debug) << "idle agents: " << _response.m_idleAgentsCount;
|
info.idleAgentsCount = _response.m_idleAgentsCount;
|
||||||
// LOG(debug) << "active topology: "
|
|
||||||
// << ((_response.m_hasActiveTopology) ? _response.m_activeTopologyName
|
|
||||||
// : "<none>");
|
|
||||||
});
|
});
|
||||||
commanderInfoRequest->setMessageCallback(
|
commanderInfoRequest->setMessageCallback(
|
||||||
[](const dds::tools_api::SMessageResponseData& _message) { LOG(debug) << _message; });
|
[](const dds::tools_api::SMessageResponseData& _message) { LOG(debug) << _message; });
|
||||||
commanderInfoRequest->setDoneCallback([&]() { blocker.Signal(); });
|
commanderInfoRequest->setDoneCallback([&]() { blocker.Signal(); });
|
||||||
fImpl->fSession.sendRequest<dds::tools_api::SCommanderInfoRequest>(commanderInfoRequest);
|
fImpl->fSession.sendRequest<dds::tools_api::SCommanderInfoRequest>(commanderInfoRequest);
|
||||||
blocker.Wait();
|
blocker.Wait();
|
||||||
|
|
||||||
|
return info;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto DDSSession::WaitForIdleAgents(Quantity minCount) -> void
|
||||||
|
{
|
||||||
|
auto info(RequestCommanderInfo());
|
||||||
|
int interval(8);
|
||||||
|
while (info.idleAgentsCount < minCount) {
|
||||||
|
std::this_thread::sleep_for(std::chrono::milliseconds(8));
|
||||||
|
interval = std::min(256, interval * 2);
|
||||||
|
info = RequestCommanderInfo();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
auto DDSSession::ActivateTopology(const Path& topologyFile) -> void
|
auto DDSSession::ActivateTopology(const Path& topologyFile) -> void
|
||||||
|
|
|
@ -65,7 +65,12 @@ class DDSSession
|
||||||
auto IsRunning() const -> bool;
|
auto IsRunning() const -> bool;
|
||||||
auto SubmitAgents(Quantity agents) -> void;
|
auto SubmitAgents(Quantity agents) -> void;
|
||||||
auto RequestAgentInfo() -> void;
|
auto RequestAgentInfo() -> void;
|
||||||
auto RequestCommanderInfo() -> void;
|
struct CommanderInfo {
|
||||||
|
int pid;
|
||||||
|
Quantity idleAgentsCount;
|
||||||
|
};
|
||||||
|
auto RequestCommanderInfo() -> CommanderInfo;
|
||||||
|
auto WaitForIdleAgents(Quantity) -> void;
|
||||||
auto ActivateTopology(const Path& topologyFile) -> void;
|
auto ActivateTopology(const Path& topologyFile) -> void;
|
||||||
auto Stop() -> void;
|
auto Stop() -> void;
|
||||||
|
|
||||||
|
|
|
@ -58,12 +58,13 @@ struct TopologyFixture : ::testing::Test
|
||||||
auto SetUp() -> void override {
|
auto SetUp() -> void override {
|
||||||
LOG(info) << mDDSEnv;
|
LOG(info) << mDDSEnv;
|
||||||
LOG(info) << mDDSSession;
|
LOG(info) << mDDSSession;
|
||||||
mDDSSession.RequestCommanderInfo();
|
LOG(info) << mDDSTopo;
|
||||||
mDDSSession.SubmitAgents(2);
|
auto n(mDDSTopo.GetNumRequiredAgents());
|
||||||
mDDSSession.RequestCommanderInfo();
|
mDDSSession.SubmitAgents(n);
|
||||||
std::this_thread::sleep_for(std::chrono::seconds(1)); // TODO implement WaitForIdleAgents
|
mDDSSession.WaitForIdleAgents(n);
|
||||||
mDDSSession.ActivateTopology(mDDSTopoFile);
|
mDDSSession.ActivateTopology(mDDSTopoFile);
|
||||||
mDDSSession.RequestCommanderInfo();
|
std::this_thread::sleep_for(std::chrono::seconds(1)); // TODO implement WaitForActiveAgents
|
||||||
|
mDDSSession.RequestAgentInfo();
|
||||||
}
|
}
|
||||||
|
|
||||||
auto TearDown() -> void override {
|
auto TearDown() -> void override {
|
||||||
|
|
Loading…
Reference in New Issue
Block a user