diff --git a/examples/dds/Sampler.cxx b/examples/dds/Sampler.cxx index 7e0a8a02..bc4e39b2 100644 --- a/examples/dds/Sampler.cxx +++ b/examples/dds/Sampler.cxx @@ -46,6 +46,8 @@ bool Sampler::ConditionalRun() return false; } + this_thread::sleep_for(chrono::seconds(5)); + if (fIterations > 0) { ++fCounter; if (fCounter >= fIterations) { diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index 3d2f20bb..e775bfeb 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -413,6 +413,7 @@ if(BUILD_FAIRMQ) target_link_libraries(fairmq-splitter FairMQ) add_executable(fairmq-shmmonitor shmem/Monitor.cxx shmem/Monitor.h shmem/runMonitor.cxx) + target_compile_definitions(fairmq-shmmonitor PUBLIC BOOST_ERROR_CODE_HEADER_ONLY) target_link_libraries(fairmq-shmmonitor PUBLIC Threads::Threads $<$:rt> diff --git a/fairmq/sdk/CMakeLists.txt b/fairmq/sdk/CMakeLists.txt index 0cc31cd3..01037cb8 100644 --- a/fairmq/sdk/CMakeLists.txt +++ b/fairmq/sdk/CMakeLists.txt @@ -17,6 +17,7 @@ set(SDK_PUBLIC_HEADER_FILES ../SDK.h DDSEnvironment.h DDSSession.h + DDSTopology.h Topology.h ) @@ -27,6 +28,7 @@ set(SDK_PRIVATE_HEADER_FILES set(SDK_SOURCE_FILES DDSEnvironment.cxx DDSSession.cxx + DDSTopology.cxx Topology.cxx ) diff --git a/fairmq/sdk/DDSEnvironment.cxx b/fairmq/sdk/DDSEnvironment.cxx index a1ac2131..2d383ebd 100644 --- a/fairmq/sdk/DDSEnvironment.cxx +++ b/fairmq/sdk/DDSEnvironment.cxx @@ -8,73 +8,119 @@ #include "DDSEnvironment.h" -#include -#include #include +#include + +#include + +#include +#include + +#include #include -#include #include namespace fair { namespace mq { namespace sdk { -// TODO https://github.com/FairRootGroup/DDS/issues/224 -auto LoadDDSEnv(const boost::filesystem::path& config_home) - -> void -{ - setenv("DDS_LOCATION", DDSInstallPrefix.c_str(), 1); - if (!config_home.empty()) { - setenv("HOME", config_home.c_str(), 1); - } - std::string path(std::getenv("PATH")); - path = DDSExecutableDir + std::string(":") + path; - setenv("PATH", path.c_str(), 1); - -#ifndef __APPLE__ - std::string ldVar("LD_LIBRARY_PATH"); - std::string ld(std::getenv(ldVar.c_str())); - ld = DDSLibraryDir + std::string(":") + ld; - setenv(ldVar.c_str(), ld.c_str(), 1); -#endif - - std::istringstream cmd; - cmd.str("DDS_CFG=`dds-user-defaults --ignore-default-sid -p`\n" - "if [ -z \"$DDS_CFG\" ]; then\n" - " mkdir -p \"$HOME/.DDS\"\n" - " dds-user-defaults --ignore-default-sid -d -c \"$HOME/.DDS/DDS.cfg\"\n" - "fi"); - std::system(cmd.str().c_str()); -} - struct DDSEnvironment::Impl { - explicit Impl(Path config_home) - : fCount() - , fConfigHome(std::move(config_home)) + explicit Impl(Path configHome) + : fLocation(DDSInstallPrefix) + , fConfigHome(std::move(configHome)) { - LoadDDSEnv(fConfigHome); - if (fConfigHome.empty()) { - fConfigHome = std::getenv("HOME"); + SetupLocation(); + SetupDynamicLoader(); + SetupPath(); + SetupConfigHome(); + } + + auto SetupLocation() -> void + { + std::string location(GetEnv("DDS_LOCATION")); + if (location != DDSInstallPrefix) { + if (location.empty()) { + setenv("DDS_LOCATION", DDSInstallPrefix.c_str(), 1); + } else { + LOG(debug) << "$DDS_LOCATION appears to point to a different installation than this" + << "program was linked against. Things might still work out, so not" + << "touching it."; + fLocation = location; + } } } + auto SetupConfigHome() -> void + { + if (fConfigHome.empty()) { + fConfigHome = GetEnv("HOME"); + } else { + setenv("HOME", fConfigHome.c_str(), 1); + } + + std::istringstream cmd; + cmd.str("DDS_CFG=`dds-user-defaults --ignore-default-sid -p`\n" + "if [ -z \"$DDS_CFG\" ]; then\n" + " mkdir -p \"$HOME/.DDS\"\n" + " dds-user-defaults --ignore-default-sid -d -c \"$HOME/.DDS/DDS.cfg\"\n" + "fi"); + std::system(cmd.str().c_str()); + } + + auto SetupPath() -> void + { + std::string path(GetEnv("PATH")); + Path ddsExecDir = (fLocation == DDSInstallPrefix) ? DDSExecutableDir : fLocation / Path("bin"); + path = ddsExecDir.string() + std::string(":") + path; + setenv("PATH", path.c_str(), 1); + } + + auto SetupDynamicLoader() -> void + { +#ifdef __APPLE__ + std::string ldVar("DYLD_LIBRARY_PATH"); +#else + std::string ldVar("LD_LIBRARY_PATH"); +#endif + std::string ld(GetEnv(ldVar)); + Path ddsLibDir = (fLocation == DDSInstallPrefix) ? DDSLibraryDir : fLocation / Path("lib"); + ld = ddsLibDir.string() + std::string(":") + ld; + setenv(ldVar.c_str(), ld.c_str(), 1); + } + + auto GetEnv(const std::string& key) -> std::string + { + auto value = std::getenv(key.c_str()); + if (value) { + return {value}; + } + return {}; + } + struct Tag {}; friend auto operator<<(std::ostream& os, Tag) -> std::ostream& { return os << "DDSEnvironment"; } tools::InstanceLimiter fCount; + Path fLocation; Path fConfigHome; }; -DDSEnvironment::DDSEnvironment(Path config_home) - : fImpl(std::make_shared(std::move(config_home))) +DDSEnvironment::DDSEnvironment() + : DDSEnvironment(Path()) {} +DDSEnvironment::DDSEnvironment(Path configHome) + : fImpl(std::make_shared(std::move(configHome))) +{} + +auto DDSEnvironment::GetLocation() const -> Path { return fImpl->fLocation; } + auto DDSEnvironment::GetConfigHome() const -> Path { return fImpl->fConfigHome; } auto operator<<(std::ostream& os, DDSEnvironment env) -> std::ostream& { - return os << "$DDS_LOCATION: " << DDSInstallPrefix << ", " + return os << "$DDS_LOCATION: " << env.GetLocation() << ", " << "$DDS_CONFIG_HOME: " << env.GetConfigHome() / DDSEnvironment::Path(".DDS"); } diff --git a/fairmq/sdk/DDSEnvironment.h b/fairmq/sdk/DDSEnvironment.h index 96faa7aa..2ba72eca 100644 --- a/fairmq/sdk/DDSEnvironment.h +++ b/fairmq/sdk/DDSEnvironment.h @@ -10,7 +10,6 @@ #define FAIR_MQ_SDK_DDSENVIRONMENT_H #include -#include #include #include @@ -27,9 +26,10 @@ class DDSEnvironment public: using Path = boost::filesystem::path; - /// @brief See fair::mq::sdk::LoadDDSEnv - explicit DDSEnvironment(Path config_home = ""); + DDSEnvironment(); + explicit DDSEnvironment(Path); + auto GetLocation() const -> Path; auto GetConfigHome() const -> Path; friend auto operator<<(std::ostream& os, DDSEnvironment env) -> std::ostream&; @@ -38,6 +38,8 @@ class DDSEnvironment std::shared_ptr fImpl; }; +using DDSEnv = DDSEnvironment; + } // namespace sdk } // namespace mq } // namespace fair diff --git a/fairmq/sdk/DDSInfo.h.in b/fairmq/sdk/DDSInfo.h.in index 357d9f7d..156d219c 100644 --- a/fairmq/sdk/DDSInfo.h.in +++ b/fairmq/sdk/DDSInfo.h.in @@ -11,6 +11,20 @@ #include +namespace dds { +namespace tools_api { + +class CSession; + +} // namespace tools_api + +namespace topology_api { + +class CTopology; + +} // namespace topology_api +} // namespace dds + namespace fair { namespace mq { namespace sdk { diff --git a/fairmq/sdk/DDSSession.cxx b/fairmq/sdk/DDSSession.cxx index e533d6b5..2e2f4154 100644 --- a/fairmq/sdk/DDSSession.cxx +++ b/fairmq/sdk/DDSSession.cxx @@ -7,16 +7,19 @@ ********************************************************************************/ #include "DDSSession.h" -#include "DDSEnvironment.h" + +#include +#include + +#include #include + #include + #include #include -#include -#include #include -#include #include namespace fair { @@ -52,48 +55,76 @@ auto operator>>(std::istream& is, DDSRMSPlugin& plugin) -> std::istream& struct DDSSession::Impl { - Impl(DDSEnvironment env, DDSRMSPlugin plugin) - : fCount() - , fEnv(std::move(env)) - , fDefaultPlugin(std::move(plugin)) - , fSession() + explicit Impl(DDSEnvironment env) + : fEnv(std::move(env)) + , fRMSPlugin(DDSRMSPlugin::localhost) + , fDDSService() + , fDDSCustomCmd(fDDSService) , fId(to_string(fSession.create())) + , fStopOnDestruction(false) { setenv("DDS_SESSION_ID", fId.c_str(), 1); + + fDDSService.subscribeOnError([](const dds::intercom_api::EErrorCode errorCode, const std::string& msg) { + std::cerr << "DDS error, error code: " << errorCode << ", error message: " << msg << std::endl; + }); } - Impl(DDSEnvironment env, DDSRMSPlugin plugin, Id existing_id) - : fCount() - , fEnv(std::move(env)) - , fDefaultPlugin(std::move(plugin)) - , fSession() - , fId(std::move(existing_id)) + explicit Impl(Id existing, DDSEnvironment env) + : fEnv(std::move(env)) + , fRMSPlugin(DDSRMSPlugin::localhost) + , fDDSService() + , fDDSCustomCmd(fDDSService) + , fId(std::move(existing)) + , fStopOnDestruction(false) { fSession.attach(fId); std::string envId(std::getenv("DDS_SESSION_ID")); if (envId != fId) { setenv("DDS_SESSION_ID", fId.c_str(), 1); } + + fDDSService.subscribeOnError([](const dds::intercom_api::EErrorCode errorCode, const std::string& msg) { + std::cerr << "DDS error, error code: " << errorCode << ", error message: " << msg << std::endl; + }); } + ~Impl() + { + if (fStopOnDestruction) { + fSession.shutdown(); + } + } + + Impl() = delete; + Impl(const Impl&) = delete; + Impl& operator=(const Impl&) = delete; + Impl(Impl&&) = delete; + Impl& operator=(Impl&&) = delete; + struct Tag {}; friend auto operator<<(std::ostream& os, Tag) -> std::ostream& { return os << "DDSSession"; } tools::InstanceLimiter fCount; - const DDSEnvironment fEnv; - const DDSRMSPlugin fDefaultPlugin; + DDSEnvironment fEnv; + DDSRMSPlugin fRMSPlugin; + Path fRMSConfig; dds::tools_api::CSession fSession; - const Id fId; + dds::intercom_api::CIntercomService fDDSService; + dds::intercom_api::CCustomCmd fDDSCustomCmd; + Id fId; + bool fStopOnDestruction; }; -DDSSession::DDSSession(DDSEnvironment env, DDSRMSPlugin default_plugin) -: fImpl(std::make_shared(std::move(env), std::move(default_plugin))) {} +DDSSession::DDSSession(DDSEnvironment env) + : fImpl(std::make_shared(env)) +{} -DDSSession::DDSSession(DDSEnvironment env, Id existing_id) -: fImpl(std::make_shared(std::move(env), DDSRMSPlugin::localhost, std::move(existing_id))) {} +DDSSession::DDSSession(Id existing, DDSEnvironment env) + : fImpl(std::make_shared(std::move(existing), env)) +{} -DDSSession::DDSSession(DDSEnvironment env, DDSRMSPlugin default_plugin, Id existing_id) -: fImpl(std::make_shared(std::move(env), std::move(default_plugin), std::move(existing_id))) {} +auto DDSSession::GetEnv() const -> DDSEnvironment { return fImpl->fEnv; } auto DDSSession::IsRunning() const -> bool { return fImpl->fSession.IsRunning(); } @@ -101,36 +132,30 @@ auto DDSSession::GetId() const -> Id { return fImpl->fId; } auto DDSSession::Stop() -> void { return fImpl->fSession.shutdown(); } -auto DDSSession::GetDefaultPlugin() const -> DDSRMSPlugin { return fImpl->fDefaultPlugin; } +auto DDSSession::GetRMSPlugin() const -> DDSRMSPlugin { return fImpl->fRMSPlugin; } + +auto DDSSession::SetRMSPlugin(DDSRMSPlugin plugin) -> void { fImpl->fRMSPlugin = plugin; } + +auto DDSSession::GetRMSConfig() const -> Path { return fImpl->fRMSConfig; } + +auto DDSSession::SetRMSConfig(Path configFile) const -> void +{ + fImpl->fRMSConfig = std::move(configFile); +} + +auto DDSSession::IsStoppedOnDestruction() const -> bool { return fImpl->fStopOnDestruction; } + +auto DDSSession::StopOnDestruction(bool stop) -> void { fImpl->fStopOnDestruction = stop; } auto DDSSession::SubmitAgents(Quantity agents) -> void -{ - SubmitAgents(agents, GetDefaultPlugin(), Path()); -} - -auto DDSSession::SubmitAgents(Quantity agents, DDSRMSPlugin plugin) -> void -{ - SubmitAgents(agents, plugin, Path()); -} - -auto DDSSession::SubmitAgents(Quantity agents, const Path& config) -> void -{ - SubmitAgents(agents, GetDefaultPlugin(), std::move(config)); -} - -auto DDSSession::SubmitAgents(Quantity agents, DDSRMSPlugin plugin, const Path& config) -> void { // Requesting to submit 0 agents is not meaningful assert(agents > 0); - // The config argument is required with all plugins except localhost - if (plugin != DDSRMSPlugin::localhost) { - assert(exists(config)); - } dds::tools_api::SSubmitRequestData submitInfo; - submitInfo.m_rms = tools::ToString(plugin); + submitInfo.m_rms = tools::ToString(GetRMSPlugin()); submitInfo.m_instances = agents; - submitInfo.m_config = config.string(); + submitInfo.m_config = GetRMSConfig().string(); tools::Semaphore blocker; auto submitRequest = dds::tools_api::SSubmitRequest::makeRequest(submitInfo); @@ -162,7 +187,28 @@ auto DDSSession::RequestAgentInfo() -> void blocker.Wait(); } -auto DDSSession::ActivateTopology(Path topologyFile) -> void +auto DDSSession::RequestCommanderInfo() -> void +{ + dds::tools_api::SCommanderInfoRequestData commanderInfoInfo; + tools::Semaphore blocker; + auto commanderInfoRequest = + dds::tools_api::SCommanderInfoRequest::makeRequest(commanderInfoInfo); + commanderInfoRequest->setResponseCallback( + [&](const dds::tools_api::SCommanderInfoResponseData& _response) { + LOG(debug) << "pid: " << _response.m_pid; + LOG(debug) << "idle agents: " << _response.m_idleAgentsCount; + // LOG(debug) << "active topology: " + // << ((_response.m_hasActiveTopology) ? _response.m_activeTopologyName + // : ""); + }); + commanderInfoRequest->setMessageCallback( + [](const dds::tools_api::SMessageResponseData& _message) { LOG(debug) << _message; }); + commanderInfoRequest->setDoneCallback([&]() { blocker.Signal(); }); + fImpl->fSession.sendRequest(commanderInfoRequest); + blocker.Wait(); +} + +auto DDSSession::ActivateTopology(const Path& topologyFile) -> void { dds::tools_api::STopologyRequestData topologyInfo; topologyInfo.m_updateType = dds::tools_api::STopologyRequestData::EUpdateType::ACTIVATE; @@ -177,7 +223,16 @@ auto DDSSession::ActivateTopology(Path topologyFile) -> void blocker.Wait(); } -auto operator<<(std::ostream& os, DDSSession session) -> std::ostream& +void DDSSession::StartDDSService() { fImpl->fDDSService.start(fImpl->fId); } + +void DDSSession::SubscribeToCommands(std::function cb) +{ + fImpl->fDDSCustomCmd.subscribe(cb); +} + +void DDSSession::SendCommand(const std::string& cmd) { fImpl->fDDSCustomCmd.send(cmd, ""); } + +auto operator<<(std::ostream& os, const DDSSession& session) -> std::ostream& { return os << "$DDS_SESSION_ID: " << session.GetId(); } diff --git a/fairmq/sdk/DDSSession.h b/fairmq/sdk/DDSSession.h index 25fa6657..f3e8202e 100644 --- a/fairmq/sdk/DDSSession.h +++ b/fairmq/sdk/DDSSession.h @@ -9,14 +9,18 @@ #ifndef FAIR_MQ_SDK_DDSSESSION_H #define FAIR_MQ_SDK_DDSSESSION_H -#include -#include +#include #include + +#include + +#include #include #include #include #include #include +#include namespace fair { namespace mq { @@ -47,23 +51,29 @@ class DDSSession using Quantity = std::uint32_t; using Path = boost::filesystem::path; - DDSSession() = delete; - explicit DDSSession(DDSEnvironment env, DDSRMSPlugin default_plugin = DDSRMSPlugin::localhost); - explicit DDSSession(DDSEnvironment env, Id existing_id); - explicit DDSSession(DDSEnvironment env, DDSRMSPlugin default_plugin, Id existing_id); + explicit DDSSession(DDSEnvironment env = DDSEnvironment()); + explicit DDSSession(Id existing, DDSEnvironment env = DDSEnvironment()); + auto GetEnv() const -> DDSEnvironment; auto GetId() const -> Id; - auto GetDefaultPlugin() const -> DDSRMSPlugin; + auto GetRMSPlugin() const -> DDSRMSPlugin; + auto SetRMSPlugin(DDSRMSPlugin) -> void; + auto GetRMSConfig() const -> Path; + auto SetRMSConfig(Path) const -> void; + auto IsStoppedOnDestruction() const -> bool; + auto StopOnDestruction(bool stop = true) -> void; auto IsRunning() const -> bool; auto SubmitAgents(Quantity agents) -> void; - auto SubmitAgents(Quantity agents, DDSRMSPlugin plugin) -> void; - auto SubmitAgents(Quantity agents, DDSRMSPlugin plugin, const Path& config) -> void; - auto SubmitAgents(Quantity agents, const Path& config) -> void; auto RequestAgentInfo() -> void; - auto ActivateTopology(Path topologyFile) -> void; + auto RequestCommanderInfo() -> void; + auto ActivateTopology(const Path& topologyFile) -> void; auto Stop() -> void; - friend auto operator<<(std::ostream& os, DDSSession session) -> std::ostream&; + void StartDDSService(); + void SubscribeToCommands(std::function); + void SendCommand(const std::string&); + + friend auto operator<<(std::ostream& os, const DDSSession& session) -> std::ostream&; private: struct Impl; std::shared_ptr fImpl; diff --git a/fairmq/sdk/DDSTopology.cxx b/fairmq/sdk/DDSTopology.cxx new file mode 100644 index 00000000..3ca2737d --- /dev/null +++ b/fairmq/sdk/DDSTopology.cxx @@ -0,0 +1,96 @@ +/******************************************************************************** + * Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#include "DDSTopology.h" + +#include +#include + +#include + +#include + +#include +#include +#include +#include + +namespace fair { +namespace mq { +namespace sdk { + +struct DDSTopology::Impl +{ + explicit Impl(Path topoFile, DDSEnvironment env) + : fEnv(std::move(env)) + , fTopoFile(std::move(topoFile)) + , fTopo(nullptr) + { + LOG(warn) << topoFile; + } + + DDSEnvironment fEnv; + Path fTopoFile; + std::unique_ptr fTopo; +}; + +DDSTopology::DDSTopology(Path topoFile, DDSEnvironment env) + : fImpl(std::make_shared(std::move(topoFile), std::move(env))) +{} + +auto DDSTopology::GetEnv() const -> DDSEnvironment { return fImpl->fEnv; } + +auto DDSTopology::GetTopoFile() const -> Path +{ + auto file(fImpl->fTopoFile); + if (file.string().empty()) { + throw std::runtime_error("DDS topology xml spec file unknown"); + } + return file; +} + +void DDSTopology::CreateTopology(Path topoFile) +{ + fImpl->fTopo = tools::make_unique(fImpl->fTopoFile.c_str()); +} + +int DDSTopology::GetNumRequiredAgents() +{ + return fImpl->fTopo->getRequiredNofAgents(); +} + +std::vector DDSTopology::GetDeviceList() +{ + std::vector taskIDs; + taskIDs.reserve(fImpl->fTopo->getRequiredNofAgents()); + + // TODO make sure returned tasks are actually devices + dds::topology_api::STopoRuntimeTask::FilterIteratorPair_t taskIt = fImpl->fTopo->getRuntimeTaskIterator([](const dds::topology_api::STopoRuntimeTask::FilterIterator_t::value_type& value) -> bool { + return true; + }); + + for (auto& it = taskIt.first; it != taskIt.second; ++it) { + LOG(debug) << "Found task " << it->first << " : " << it->second.m_task->getPath(); + taskIDs.push_back(it->first); + } + + return taskIDs; +} + +// auto DDSTopology::GetName() const -> std::string { return fImpl->fTopo.getName(); } + +auto operator<<(std::ostream& os, const DDSTopology& t) -> std::ostream& +try { + return os << "DDS topology: " /*<< t.GetName() <<*/ " (loaded from " << t.GetTopoFile() << ")"; +} catch (std::runtime_error&) { + return os << "DDS topology: " /*<< t.GetName()*/; +} + +} // namespace sdk +} // namespace mq +} // namespace fair diff --git a/fairmq/sdk/DDSTopology.h b/fairmq/sdk/DDSTopology.h index 814fddc9..9f7cec60 100644 --- a/fairmq/sdk/DDSTopology.h +++ b/fairmq/sdk/DDSTopology.h @@ -9,18 +9,11 @@ #ifndef FAIR_MQ_SDK_DDSTOPOLOGY_H #define FAIR_MQ_SDK_DDSTOPOLOGY_H -#include +#include +#include #include #include -namespace dds { -namespace topology_api { - -class CTopology; - -} // namespace topology_api -} // namespace dds - namespace fair { namespace mq { namespace sdk { @@ -29,23 +22,44 @@ namespace sdk { * @class DDSTopology DDSTopology.h * @brief Represents a DDS topology */ -class DDSSession +class DDSTopology { public: - using CSessionPtr = std::shared_ptr; + using Path = boost::filesystem::path; - explicit DDSSession(); - explicit DDSSession(std::string existing_session_id); + DDSTopology() = delete; + + /// @brief Construct from file + /// @param topoFile DDS topology xml file + /// @param env DDS environment + explicit DDSTopology(Path topoFile, DDSEnvironment env = DDSEnvironment()); + + /// @brief Get associated DDS environment + auto GetEnv() const -> DDSEnvironment; + + /// @brief Get path to DDS topology xml, if it is known + /// @throw std::runtime_error + auto GetTopoFile() const -> Path; + + void CreateTopology(Path); + + /// @brief Get number of required agents for this topology + int GetNumRequiredAgents(); + + /// @brief Get list of devices + std::vector GetDeviceList(); + + /// @brief Get the name of the topology + // auto GetName() const -> std::string; + + friend auto operator<<(std::ostream&, const DDSTopology&) -> std::ostream&; - auto GetId() const -> const std::string&; - auto IsRunning() const -> bool; private: - CSessionPtr fSession; - const std::string fId; + struct Impl; + std::shared_ptr fImpl; }; -auto LoadDDSEnv(const std::string& config_home = "", const std::string& prefix = DDSInstallPrefix) - -> void; +using DDSTopo = DDSTopology; } // namespace sdk } // namespace mq diff --git a/fairmq/sdk/Topology.cxx b/fairmq/sdk/Topology.cxx index b4a2a56b..a1129232 100644 --- a/fairmq/sdk/Topology.cxx +++ b/fairmq/sdk/Topology.cxx @@ -8,25 +8,166 @@ #include "Topology.h" -#include +#include + +#include +#include + #include +#include +#include +#include +#include namespace fair { namespace mq { + +auto operator<<(std::ostream& os, AsyncOpResult v) -> std::ostream& +{ + switch (v) { + case AsyncOpResult::Aborted: + return os << "Aborted"; + case AsyncOpResult::Timeout: + return os << "Timeout"; + case AsyncOpResult::Error: + return os << "Error"; + case AsyncOpResult::Ok: + default: + return os << "Ok"; + } +} + namespace sdk { -struct Topology::Impl +const std::unordered_map> Topology::fkExpectedState = { - Impl(dds::topology_api::CTopology topo) - : fDDSTopology(std::move(topo)) - {} - - dds::topology_api::CTopology fDDSTopology; + { Transition::InitDevice, DeviceState::InitializingDevice }, + { Transition::CompleteInit, DeviceState::Initialized }, + { Transition::Bind, DeviceState::Bound }, + { Transition::Connect, DeviceState::DeviceReady }, + { Transition::InitTask, DeviceState::InitializingTask }, + { Transition::Run, DeviceState::Running }, + { Transition::Stop, DeviceState::Ready }, + { Transition::ResetTask, DeviceState::DeviceReady }, + { Transition::ResetDevice, DeviceState::Idle }, + { Transition::End, DeviceState::Exiting } }; -Topology::Topology(dds::topology_api::CTopology topo) - : fImpl(std::make_shared(std::move(topo))) -{} +Topology::Topology(DDSTopology topo, DDSSession session) + : fDDSSession(std::move(session)) + , fDDSTopo(std::move(topo)) + , fTopologyState() + , fStateChangeOngoing(false) + , fExecutionThread() + , fShutdown(false) +{ + fDDSTopo.CreateTopology(fDDSTopo.GetTopoFile()); + + std::vector deviceList = fDDSTopo.GetDeviceList(); + for (const auto& d : deviceList) { + fTopologyState.emplace(d, DeviceStatus{ false, DeviceState::Ok }); + } + fDDSSession.SubscribeToCommands([this](const std::string& msg, const std::string& condition, uint64_t senderId) { + LOG(info) << "Received from " << senderId << ": " << msg; + std::vector parts; + boost::algorithm::split(parts, msg, boost::algorithm::is_any_of(":,")); + if (parts[0] == "state-change") { + boost::trim(parts[2]); + AddNewStateEntry(senderId, parts[2]); + } else if (parts[0] == "state-changes-subscription") { + if (parts[2] != "OK") { + LOG(error) << "state-changes-subscription failed with return code: " << parts[2]; + } + } else if (parts[0] == "state-changes-unsubscription") { + if (parts[2] != "OK") { + LOG(error) << "state-changes-unsubscription failed with return code: " << parts[2]; + } + } + }); + fDDSSession.StartDDSService(); + fDDSSession.SendCommand("subscribe-to-state-changes"); + + fExecutionThread = std::thread(&Topology::WaitForState, this); +} + +auto Topology::ChangeState(fair::mq::Transition transition, ChangeStateCallback cb, const std::chrono::milliseconds& timeout) -> void +{ + { + std::lock_guard guard(fMtx); + if (fStateChangeOngoing) { + LOG(error) << "State change already in progress, concurrent requested not yet supported"; + return; + } + fStateChangeOngoing = true; + fChangeStateCallback = cb; + fStateChangeTimeout = timeout; + + fDDSSession.SendCommand(GetTransitionName(transition)); + + fTargetState = fkExpectedState.at(transition); + } + fExecutionCV.notify_one(); +} + +void Topology::WaitForState() +{ + while (!fShutdown) { + if (fStateChangeOngoing) { + auto condition = [&] { return fShutdown || std::all_of(fTopologyState.cbegin(), + fTopologyState.cend(), + [&](TopologyState::value_type i) { + return i.second.state == fTargetState; + }); + }; + + std::unique_lock lock(fMtx); + + if (fStateChangeTimeout > std::chrono::milliseconds(0)) { + if (!fCV.wait_for(lock, fStateChangeTimeout, condition)) { + LOG(debug) << "timeout"; + // TODO: catch this from another thread... + throw std::runtime_error("timeout"); + } + } else { + fCV.wait(lock, condition); + } + + if (fShutdown) { + break; + } + + fStateChangeOngoing = false; + fChangeStateCallback(ChangeStateResult{AsyncOpResult::Ok, fTopologyState}); + } else { + std::unique_lock lock(fExecutionMtx); + fExecutionCV.wait(lock); + } + } +}; + +void Topology::AddNewStateEntry(uint64_t senderId, const std::string& state) +{ + { + std::unique_lock lock(fMtx); + fTopologyState[senderId] = DeviceStatus{ true, fair::mq::GetState(state) }; + } + fCV.notify_one(); +} + +Topology::~Topology() +{ + { + std::lock_guard guard(fMtx); + fShutdown = true; + } + fExecutionCV.notify_one(); + fExecutionThread.join(); +} + +auto operator<<(std::ostream& os, Topology::ChangeStateResult v) -> std::ostream& +{ + return os << v.rc; +} } // namespace sdk } // namespace mq diff --git a/fairmq/sdk/Topology.h b/fairmq/sdk/Topology.h index 5cfdd344..675f8f3f 100644 --- a/fairmq/sdk/Topology.h +++ b/fairmq/sdk/Topology.h @@ -9,25 +9,45 @@ #ifndef FAIR_MQ_SDK_TOPOLOGY_H #define FAIR_MQ_SDK_TOPOLOGY_H +#include +#include +#include #include +#include + #include +#include #include +#include #include #include #include -namespace dds { -namespace topology_api { - -class CTopology; - -} // namespace topology_api -} // namespace dds - namespace fair { namespace mq { + +enum class AsyncOpResult { + Ok, + Timeout, + Error, + Aborted +}; +auto operator<<(std::ostream& os, AsyncOpResult v) -> std::ostream&; + namespace sdk { +using DeviceState = fair::mq::State; +using DeviceTransition = fair::mq::Transition; + +struct DeviceStatus +{ + bool initialized; + DeviceState state; +}; + +using TopologyState = std::unordered_map; +using TopologyTransition = fair::mq::Transition; + /** * @class Topology Topology.h * @brief Represents a FairMQ topology @@ -35,16 +55,46 @@ namespace sdk { class Topology { public: + /// @brief (Re)Construct a FairMQ topology from an existing DDS topology + /// @param topo Initialized DDS CTopology + explicit Topology(DDSTopology topo, DDSSession session = DDSSession()); + ~Topology(); - /// @brief Construct a FairMQ topology from an existing DDS session via the dds::topology_api - /// @param topo An initialized CTopology object - explicit Topology(dds::topology_api::CTopology topo); + struct ChangeStateResult { + AsyncOpResult rc; + TopologyState state; + friend auto operator<<(std::ostream& os, ChangeStateResult v) -> std::ostream&; + }; + using ChangeStateCallback = std::function; + + /// @brief Initiate state transition on all FairMQ devices in this topology + /// @param t FairMQ device state machine transition + /// @param cb Completion callback + auto ChangeState(TopologyTransition t, ChangeStateCallback cb, const std::chrono::milliseconds& timeout = std::chrono::milliseconds(0)) -> void; + + static const std::unordered_map> fkExpectedState; private: - struct Impl; - std::shared_ptr fImpl; + DDSSession fDDSSession; + DDSTopology fDDSTopo; + TopologyState fTopologyState; + bool fStateChangeOngoing; + DeviceState fTargetState; + std::mutex fMtx; + std::mutex fExecutionMtx; + std::condition_variable fCV; + std::condition_variable fExecutionCV; + std::thread fExecutionThread; + ChangeStateCallback fChangeStateCallback; + std::chrono::milliseconds fStateChangeTimeout; + bool fShutdown; + + void WaitForState(); + void AddNewStateEntry(uint64_t senderId, const std::string& state); }; +using Topo = Topology; + } // namespace sdk } // namespace mq } // namespace fair diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index db65b394..ab18edc4 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -287,7 +287,7 @@ if(BUILD_SDK) add_testsuite(SDK SOURCES ${CMAKE_CURRENT_BINARY_DIR}/runner.cxx - # sdk/_dds.cxx + sdk/_dds.cxx sdk/_topology.cxx sdk/TopologyFixture.h diff --git a/test/sdk/TopologyFixture.h b/test/sdk/TopologyFixture.h index d1f7ebf5..1c0724c6 100644 --- a/test/sdk/TopologyFixture.h +++ b/test/sdk/TopologyFixture.h @@ -13,12 +13,10 @@ #include #include -#include #include #include #include #include -#include #include namespace fair { @@ -45,14 +43,14 @@ struct LoggerConfig struct TopologyFixture : ::testing::Test { TopologyFixture() - : mLoggerConfig() - , mDDSTopologyFile(std::string(SDK_TESTSUITE_SOURCE_DIR) + "/test_topo.xml") + : mDDSTopoFile(tools::ToString(SDK_TESTSUITE_SOURCE_DIR, "/test_topo.xml")) , mDDSEnv(CMAKE_CURRENT_BINARY_DIR) , mDDSSession(mDDSEnv) - , mDDSTopology(mDDSTopologyFile) - {} - // -// + , mDDSTopo(mDDSTopoFile, mDDSEnv) + { + mDDSSession.StopOnDestruction(); + } + // auto ActivateDDSTopology(const std::string& topology_file) -> void { // LOG(debug) << "ActivateDDSTopology(\"" << topology_file << "\")"; // } @@ -60,20 +58,22 @@ struct TopologyFixture : ::testing::Test auto SetUp() -> void override { LOG(info) << mDDSEnv; LOG(info) << mDDSSession; + mDDSSession.RequestCommanderInfo(); mDDSSession.SubmitAgents(2); + mDDSSession.RequestCommanderInfo(); std::this_thread::sleep_for(std::chrono::seconds(1)); // TODO implement WaitForIdleAgents - mDDSSession.ActivateTopology(mDDSTopologyFile); + mDDSSession.ActivateTopology(mDDSTopoFile); + mDDSSession.RequestCommanderInfo(); } auto TearDown() -> void override { - mDDSSession.Stop(); } LoggerConfig mLoggerConfig; - std::string mDDSTopologyFile; + std::string mDDSTopoFile; sdk::DDSEnvironment mDDSEnv; sdk::DDSSession mDDSSession; - dds::topology_api::CTopology mDDSTopology; + sdk::DDSTopology mDDSTopo; }; } /* namespace test */ diff --git a/test/sdk/_dds.cxx b/test/sdk/_dds.cxx index e15e70c8..b22ac0b2 100644 --- a/test/sdk/_dds.cxx +++ b/test/sdk/_dds.cxx @@ -15,7 +15,7 @@ namespace { -auto session_test() -> void +auto setup() -> void { fair::Logger::SetConsoleSeverity("debug"); fair::Logger::DefineVerbosity("user1", @@ -23,35 +23,24 @@ auto session_test() -> void fair::VerbositySpec::Info::severity)); fair::Logger::SetVerbosity("user1"); fair::Logger::SetConsoleColor(); +} + +TEST(DDS, Environment) +{ + setup(); fair::mq::sdk::DDSEnvironment env(CMAKE_CURRENT_BINARY_DIR); LOG(debug) << env; - { - fair::mq::sdk::DDSSession session(env); - LOG(debug) << session; - session.SubmitAgents(5); - session.SubmitAgents(5); - } - { - fair::mq::sdk::DDSSession session(env); - LOG(debug) << session; - session.SubmitAgents(5); - } - { - fair::mq::sdk::DDSSession session(env); - LOG(debug) << session; - session.SubmitAgents(5); - } } TEST(DDS, Session) { - session_test(); -} + setup(); -TEST(DDS, Session2) -{ - session_test(); + fair::mq::sdk::DDSEnvironment env(CMAKE_CURRENT_BINARY_DIR); + fair::mq::sdk::DDSSession session(env); + session.StopOnDestruction(); + LOG(debug) << session; } } // namespace diff --git a/test/sdk/_topology.cxx b/test/sdk/_topology.cxx index 7449688b..0ff1a3b1 100644 --- a/test/sdk/_topology.cxx +++ b/test/sdk/_topology.cxx @@ -8,7 +8,9 @@ #include "TopologyFixture.h" +#include #include +#include namespace { @@ -16,7 +18,21 @@ using Topology = fair::mq::test::TopologyFixture; TEST_F(Topology, Construction) { - fair::mq::sdk::Topology topo(mDDSTopology); + fair::mq::sdk::Topology topo(mDDSTopo, mDDSSession); +} + +TEST_F(Topology, ChangeState) +{ + using fair::mq::sdk::Topology; + using fair::mq::sdk::TopologyTransition; + + Topology topo(mDDSTopo, mDDSSession); + fair::mq::tools::Semaphore blocker; + topo.ChangeState(TopologyTransition::Stop, [&](Topology::ChangeStateResult result) { + LOG(info) << result; + blocker.Signal(); + }); + blocker.Wait(); } } // namespace