diff --git a/CMakeLists.txt b/CMakeLists.txt index 6e063e7e..341f1271 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -75,7 +75,7 @@ if(BUILD_NANOMSG_TRANSPORT) endif() if(BUILD_SDK) - set(required_dds_version 2.5.22) + set(required_dds_version 2.5.36) else() set(required_dds_version 2.4) endif() diff --git a/fairmq/SDK.h b/fairmq/SDK.h index c2e626d2..64ee7d00 100644 --- a/fairmq/SDK.h +++ b/fairmq/SDK.h @@ -10,6 +10,7 @@ #define FAIR_MQ_SDK_H // IWYU pragma: begin_exports +#include #include #include #include diff --git a/fairmq/sdk/CMakeLists.txt b/fairmq/sdk/CMakeLists.txt index 01037cb8..cbb06b65 100644 --- a/fairmq/sdk/CMakeLists.txt +++ b/fairmq/sdk/CMakeLists.txt @@ -15,6 +15,7 @@ set(target SDK) set(SDK_PUBLIC_HEADER_FILES ../SDK.h + DDSAgent.h DDSEnvironment.h DDSSession.h DDSTopology.h diff --git a/fairmq/sdk/DDSAgent.h b/fairmq/sdk/DDSAgent.h new file mode 100644 index 00000000..07fbb703 --- /dev/null +++ b/fairmq/sdk/DDSAgent.h @@ -0,0 +1,94 @@ +/******************************************************************************** + * Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#ifndef FAIR_MQ_SDK_DDSSAGENT_H +#define FAIR_MQ_SDK_DDSSAGENT_H + +#include + +#include +#include +#include +#include + +namespace fair { +namespace mq { +namespace sdk { + +/** + * @class DDSAgent + * @brief Represents a DDS agent + */ +class DDSAgent +{ + public: + using Id = uint64_t; + using Pid = uint32_t; + + explicit DDSAgent(DDSSession session, + Id id, + Pid pid, + std::string state, + std::string path, + std::string host, + bool lobbyLeader, + std::chrono::milliseconds startupTime, + Id taskId, + std::string username) + : fSession(std::move(session)) + , fId(id) + , fPid(pid) + , fState(std::move(state)) + , fDDSPath(std::move(path)) + , fHost(std::move(host)) + , fLobbyLeader(lobbyLeader) + , fStartupTime(startupTime) + , fTaskId(taskId) + , fUsername(std::move(username)) + {} + + DDSSession GetSession() const { return fSession; } + Id GetId() const { return fId; } + Pid GetPid() const { return fPid; } + std::string GetState() const { return fState; } + std::string GetHost() const { return fHost; } + bool IsLobbyLeader() const { return fLobbyLeader; } + std::chrono::milliseconds GetStartupTime() const { return fStartupTime; } + std::string GetUsername() const { return fUsername; } + + friend auto operator<<(std::ostream& os, const DDSAgent& agent) -> std::ostream& + { + return os << "DDSAgent id: " << agent.fId + << ", pid: " << agent.fPid + << ", state: " << agent.fState + << ", path: " << agent.fDDSPath + << ", host: " << agent.fHost + << ", lobbyLeader: " << agent.fLobbyLeader + << ", startupTime: " << agent.fStartupTime.count() + << ", taskId: " << agent.fTaskId + << ", username: " << agent.fUsername; + } + + private: + DDSSession fSession; + Id fId; + Pid fPid; + std::string fState; + std::string fDDSPath; + std::string fHost; + bool fLobbyLeader; + std::chrono::milliseconds fStartupTime; + Id fTaskId; + std::string fUsername; +}; + +} // namespace sdk +} // namespace mq +} // namespace fair + +#endif /* FAIR_MQ_SDK_DDSSAGENT_H */ diff --git a/fairmq/sdk/DDSSession.cxx b/fairmq/sdk/DDSSession.cxx index ee2c7eee..e6ed18bc 100644 --- a/fairmq/sdk/DDSSession.cxx +++ b/fairmq/sdk/DDSSession.cxx @@ -8,6 +8,7 @@ #include "DDSSession.h" +#include #include #include #include @@ -180,76 +181,110 @@ auto DDSSession::SubmitAgents(Quantity agents) -> void // Requesting to submit 0 agents is not meaningful assert(agents > 0); - dds::tools_api::SSubmitRequestData submitInfo; + using namespace dds::tools_api; + + SSubmitRequestData submitInfo; submitInfo.m_rms = tools::ToString(GetRMSPlugin()); submitInfo.m_instances = agents; submitInfo.m_config = GetRMSConfig().string(); tools::Semaphore blocker; - auto submitRequest = dds::tools_api::SSubmitRequest::makeRequest(submitInfo); - submitRequest->setMessageCallback( - [](const dds::tools_api::SMessageResponseData& message) { LOG(debug) << message; }); + auto submitRequest = SSubmitRequest::makeRequest(submitInfo); + submitRequest->setMessageCallback([](const SMessageResponseData& message){ + LOG(debug) << message.m_msg; + }); submitRequest->setDoneCallback([&]() { LOG(debug) << agents << " Agents submitted"; blocker.Signal(); }); - fImpl->fSession->sendRequest(submitRequest); + fImpl->fSession->sendRequest(submitRequest); blocker.Wait(); - // perfect WaitForIdleAgents(agents); } -auto DDSSession::RequestAgentInfo() -> AgentInfo +auto DDSSession::RequestAgentCount() -> AgentCount { - dds::tools_api::SAgentInfoRequestData agentInfoInfo; - tools::Semaphore blocker; - AgentInfo info; - auto agentInfoRequest = dds::tools_api::SAgentInfoRequest::makeRequest(agentInfoInfo); - agentInfoRequest->setResponseCallback( - [this, &info](const dds::tools_api::SAgentInfoResponseData& _response) { - if (_response.m_index == 0) { - info.activeAgentsCount = _response.m_activeAgentsCount; - info.idleAgentsCount = _response.m_idleAgentsCount; - info.executingAgentsCount = _response.m_executingAgentsCount; - info.agents.reserve(_response.m_activeAgentsCount); - } - info.agents.emplace_back(*this, _response.m_agentInfo); - }); - agentInfoRequest->setMessageCallback( - [](const dds::tools_api::SMessageResponseData& _message) { LOG(debug) << _message; }); - agentInfoRequest->setDoneCallback([&]() { blocker.Signal(); }); - fImpl->fSession->sendRequest(agentInfoRequest); - blocker.Wait(); + using namespace dds::tools_api; - return info; + SAgentCountRequest::response_t res; + fImpl->fSession->syncSendRequest(SAgentCountRequest::request_t(), res); + + AgentCount count; + count.active = res.m_activeAgentsCount; + count.idle = res.m_idleAgentsCount; + count.executing = res.m_executingAgentsCount; + + return count; +} + +auto DDSSession::RequestAgentInfo() -> std::vector +{ + using namespace dds::tools_api; + + SAgentInfoRequest::responseVector_t res; + fImpl->fSession->syncSendRequest(SAgentInfoRequest::request_t(), res); + + std::vector agentInfo; + agentInfo.reserve(res.size()); + for (const auto& a : res) { + agentInfo.emplace_back( + *this, + a.m_agentID, + a.m_agentPid, + a.m_agentState, + a.m_DDSPath, + a.m_host, + a.m_lobbyLeader, + a.m_startUpTime, + a.m_taskID, + a.m_username + ); + } + + return agentInfo; +} + +auto DDSSession::RequestTaskInfo() -> std::vector +{ + using namespace dds::tools_api; + + SAgentInfoRequest::responseVector_t res; + fImpl->fSession->syncSendRequest(SAgentInfoRequest::request_t(), res); + + std::vector taskInfo; + taskInfo.reserve(res.size()); + for (auto& a : res) { + taskInfo.emplace_back(a.m_taskID); + } + + return taskInfo; } auto DDSSession::RequestCommanderInfo() -> CommanderInfo { - dds::tools_api::SCommanderInfoRequestData commanderInfoInfo; + using namespace dds::tools_api; + + SCommanderInfoRequestData commanderInfo; tools::Semaphore blocker; std::string error; - auto commanderInfoRequest = - dds::tools_api::SCommanderInfoRequest::makeRequest(commanderInfoInfo); + auto commanderInfoRequest = SCommanderInfoRequest::makeRequest(commanderInfo); CommanderInfo info; - commanderInfoRequest->setResponseCallback( - [&info](const dds::tools_api::SCommanderInfoResponseData& _response) { - info.pid = _response.m_pid; - info.activeTopologyName = _response.m_activeTopologyName; - }); - commanderInfoRequest->setMessageCallback( - [&](const dds::tools_api::SMessageResponseData& _message) { - if (_message.m_severity == dds::intercom_api::EMsgSeverity::error) { - error = _message.m_msg; - blocker.Signal(); - } else { - LOG(debug) << _message; - } - }); + commanderInfoRequest->setResponseCallback([&info](const SCommanderInfoResponseData& _response) { + info.pid = _response.m_pid; + info.activeTopologyName = _response.m_activeTopologyName; + }); + commanderInfoRequest->setMessageCallback([&](const SMessageResponseData& _message) { + if (_message.m_severity == dds::intercom_api::EMsgSeverity::error) { + error = _message.m_msg; + blocker.Signal(); + } else { + LOG(debug) << _message.m_msg; + } + }); commanderInfoRequest->setDoneCallback([&]() { blocker.Signal(); }); - fImpl->fSession->sendRequest(commanderInfoRequest); + fImpl->fSession->sendRequest(commanderInfoRequest); blocker.Wait(); if (!error.empty()) { @@ -261,38 +296,41 @@ auto DDSSession::RequestCommanderInfo() -> CommanderInfo auto DDSSession::WaitForExecutingAgents(Quantity minCount) -> void { - auto info(RequestAgentInfo()); + auto count(RequestAgentCount()); int interval(8); - while (info.executingAgentsCount < minCount) { + while (count.executing < minCount) { std::this_thread::sleep_for(std::chrono::milliseconds(interval)); interval = std::min(256, interval * 2); - info = RequestAgentInfo(); + count = RequestAgentCount(); } } auto DDSSession::WaitForIdleAgents(Quantity minCount) -> void { - auto info(RequestAgentInfo()); + auto count(RequestAgentCount()); int interval(8); - while (info.idleAgentsCount < minCount) { + while (count.idle < minCount) { std::this_thread::sleep_for(std::chrono::milliseconds(interval)); interval = std::min(256, interval * 2); - info = RequestAgentInfo(); + count = RequestAgentCount(); } } auto DDSSession::ActivateTopology(DDSTopology topo) -> void { - dds::tools_api::STopologyRequestData topologyInfo; - topologyInfo.m_updateType = dds::tools_api::STopologyRequestData::EUpdateType::ACTIVATE; + using namespace dds::tools_api; + + STopologyRequestData topologyInfo; + topologyInfo.m_updateType = STopologyRequestData::EUpdateType::ACTIVATE; topologyInfo.m_topologyFile = topo.GetTopoFile().string(); tools::Semaphore blocker; - auto topologyRequest = dds::tools_api::STopologyRequest::makeRequest(topologyInfo); - topologyRequest->setMessageCallback( - [](const dds::tools_api::SMessageResponseData& _message) { LOG(debug) << _message; }); + auto topologyRequest = STopologyRequest::makeRequest(topologyInfo); + topologyRequest->setMessageCallback([](const SMessageResponseData& _message) { + LOG(debug) << _message.m_msg; + }); topologyRequest->setDoneCallback([&]() { blocker.Signal(); }); - fImpl->fSession->sendRequest(topologyRequest); + fImpl->fSession->sendRequest(topologyRequest); blocker.Wait(); WaitForExecutingAgents(topo.GetNumRequiredAgents()); @@ -337,21 +375,6 @@ auto operator<<(std::ostream& os, const DDSSession& session) -> std::ostream& return os << "$DDS_SESSION_ID: " << session.GetId(); } -auto DDSAgent::GetSession() const -> DDSSession -{ - return fSession; -} - -auto DDSAgent::GetInfoStr() const -> std::string -{ - return fInfoStr; -} - -auto operator<<(std::ostream& os, const DDSAgent& agent) -> std::ostream& -{ - return os << agent.GetInfoStr(); -} - } // namespace sdk } // namespace mq } // namespace fair diff --git a/fairmq/sdk/DDSSession.h b/fairmq/sdk/DDSSession.h index e14c7e95..9c4eeb84 100644 --- a/fairmq/sdk/DDSSession.h +++ b/fairmq/sdk/DDSSession.h @@ -21,6 +21,7 @@ #include #include #include +#include namespace fair { namespace mq { @@ -47,6 +48,20 @@ class DDSTask { public: using Id = std::uint64_t; + + explicit DDSTask(Id id) + : fId(id) + {} + + Id GetId() const { return fId; } + + friend auto operator<<(std::ostream& os, const DDSTask& task) -> std::ostream& + { + return os << "DDSTask id: " << task.fId; + } + + private: + Id fId; }; class DDSChannel @@ -84,13 +99,14 @@ class DDSSession auto StopOnDestruction(bool stop = true) -> void; auto IsRunning() const -> bool; auto SubmitAgents(Quantity agents) -> void; - struct AgentInfo { - Quantity idleAgentsCount = 0; - Quantity activeAgentsCount = 0; - Quantity executingAgentsCount = 0; - std::vector agents; + struct AgentCount { + Quantity idle = 0; + Quantity active = 0; + Quantity executing = 0; }; - auto RequestAgentInfo() -> AgentInfo; + auto RequestAgentCount() -> AgentCount; + auto RequestAgentInfo() -> std::vector; + auto RequestTaskInfo() -> std::vector; struct CommanderInfo { int pid = -1; std::string activeTopologyName; @@ -117,27 +133,6 @@ class DDSSession std::shared_ptr fImpl; }; -/** - * @class DDSAgent DDSSession.h - * @brief Represents a DDS agent - */ -class DDSAgent -{ - public: - explicit DDSAgent(DDSSession session, std::string infostr) - : fInfoStr(std::move(infostr)) - , fSession(std::move(session)) - {} - - auto GetSession() const -> DDSSession; - auto GetInfoStr() const -> std::string; - - friend auto operator<<(std::ostream& os, const DDSAgent& plugin) -> std::ostream&; - - private: - std::string fInfoStr; - DDSSession fSession; -}; } // namespace sdk } // namespace mq } // namespace fair diff --git a/fairmq/sdk/Topology.cxx b/fairmq/sdk/Topology.cxx index 86f85466..1a625b18 100644 --- a/fairmq/sdk/Topology.cxx +++ b/fairmq/sdk/Topology.cxx @@ -231,18 +231,18 @@ void Topology::WaitForState() fExecutionCV.wait(lock); } } - LOG(debug) << "WaitForState shutting down"; + LOG(debug) << "Topology::WaitForState shutting down"; }; -void Topology::AddNewStateEntry(DDSTask::Id senderId, const std::string& state) +void Topology::AddNewStateEntry(DDSTask::Id taskId, const std::string& state) { std::size_t pos = state.find("->"); std::string endState = state.substr(pos + 2); - // LOG(debug) << "Adding new state entry: " << senderId << ", " << state << ", end state: " << endState; + // LOG(debug) << "Adding new state entry: " << taskId << ", " << state << ", end state: " << endState; { try { std::unique_lock lock(fMtx); - fState[senderId] = DeviceStatus{ true, fair::mq::GetState(endState) }; + fState[taskId] = DeviceStatus{ true, fair::mq::GetState(endState) }; } catch (const std::exception& e) { LOG(error) << "Exception in AddNewStateEntry: " << e.what(); } diff --git a/fairmq/sdk/Topology.h b/fairmq/sdk/Topology.h index f455a6a7..2faafec2 100644 --- a/fairmq/sdk/Topology.h +++ b/fairmq/sdk/Topology.h @@ -152,7 +152,7 @@ class Topology std::string fStateChangeError; void WaitForState(); - void AddNewStateEntry(DDSTask::Id senderId, const std::string& state); + void AddNewStateEntry(DDSTask::Id taskId, const std::string& state); }; using Topo = Topology; diff --git a/test/sdk/Fixtures.h b/test/sdk/Fixtures.h index 5b0e4388..70b1a2d4 100644 --- a/test/sdk/Fixtures.h +++ b/test/sdk/Fixtures.h @@ -46,7 +46,7 @@ struct TopologyFixture : ::testing::Test : mDDSTopoFile(tools::ToString(SDK_TESTSUITE_SOURCE_DIR, "/test_topo.xml")) , mDDSEnv(CMAKE_CURRENT_BINARY_DIR) , mDDSSession(mDDSEnv) - , mDDSTopo(mDDSTopoFile, mDDSEnv) + , mDDSTopo(sdk::DDSTopology::Path(mDDSTopoFile), mDDSEnv) { mDDSSession.StopOnDestruction(); } @@ -58,6 +58,14 @@ struct TopologyFixture : ::testing::Test auto n(mDDSTopo.GetNumRequiredAgents()); mDDSSession.SubmitAgents(n); mDDSSession.ActivateTopology(mDDSTopo); + std::vector agents = mDDSSession.RequestAgentInfo(); + for (const auto& a : agents) { + LOG(debug) << a; + } + std::vector tasks = mDDSSession.RequestTaskInfo(); + for (const auto& t : tasks) { + LOG(debug) << t; + } } auto TearDown() -> void override { diff --git a/test/sdk/_dds.cxx b/test/sdk/_dds.cxx index 583c3a9e..47b56659 100644 --- a/test/sdk/_dds.cxx +++ b/test/sdk/_dds.cxx @@ -16,8 +16,8 @@ namespace { TEST(DDSEnvironment, Construction) { fair::mq::test::LoggerConfig cfg; - fair::mq::sdk::DDSEnvironment env(CMAKE_CURRENT_BINARY_DIR); + LOG(debug) << env; }