From 2358d7b03aeaab30fe0c74017c83f1dd264e0561 Mon Sep 17 00:00:00 2001 From: Dennis Klein Date: Wed, 6 Feb 2019 05:23:27 +0100 Subject: [PATCH] Implement some PMIx C++ bindings pmix::init() pmix::finalize() pmix::publish() pmix::initialized() pmix::get_version() and supporting data structures. --- fairmq/plugins/PMIx/CMakeLists.txt | 6 +- fairmq/plugins/PMIx/PMIx.cxx | 60 -------- fairmq/plugins/PMIx/PMIx.hpp | 152 +++++++++++++++++++ fairmq/plugins/PMIx/PMIxPlugin.cxx | 124 +++++++++++++++ fairmq/plugins/PMIx/{PMIx.h => PMIxPlugin.h} | 44 +++++- 5 files changed, 318 insertions(+), 68 deletions(-) delete mode 100644 fairmq/plugins/PMIx/PMIx.cxx create mode 100644 fairmq/plugins/PMIx/PMIx.hpp create mode 100644 fairmq/plugins/PMIx/PMIxPlugin.cxx rename fairmq/plugins/PMIx/{PMIx.h => PMIxPlugin.h} (64%) diff --git a/fairmq/plugins/PMIx/CMakeLists.txt b/fairmq/plugins/PMIx/CMakeLists.txt index 28b834b2..edbddf6e 100644 --- a/fairmq/plugins/PMIx/CMakeLists.txt +++ b/fairmq/plugins/PMIx/CMakeLists.txt @@ -7,7 +7,11 @@ ################################################################################ set(plugin FairMQPlugin_pmix) -add_library(${plugin} SHARED ${CMAKE_CURRENT_SOURCE_DIR}/PMIx.cxx ${CMAKE_CURRENT_SOURCE_DIR}/PMIx.h) +add_library(${plugin} SHARED + ${CMAKE_CURRENT_SOURCE_DIR}/PMIxPlugin.cxx + ${CMAKE_CURRENT_SOURCE_DIR}/PMIxPlugin.h + ${CMAKE_CURRENT_SOURCE_DIR}/PMIx.hpp +) target_link_libraries(${plugin} FairMQ PMIx::libpmix) target_include_directories(${plugin} PRIVATE ${CMAKE_CURRENT_BINARY_DIR}) set_target_properties(${plugin} PROPERTIES diff --git a/fairmq/plugins/PMIx/PMIx.cxx b/fairmq/plugins/PMIx/PMIx.cxx deleted file mode 100644 index 2463772e..00000000 --- a/fairmq/plugins/PMIx/PMIx.cxx +++ /dev/null @@ -1,60 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ - -#include "PMIx.h" -#include -#include -#include - -namespace fair -{ -namespace mq -{ -namespace plugins -{ - -PMIx::PMIx(const std::string& name, - const Plugin::Version version, - const std::string& maintainer, - const std::string& homepage, - PluginServices* pluginServices) - : Plugin(name, version, maintainer, homepage, pluginServices) - , fPid(getpid()) -{ - auto rc = PMIx_Init(&fPMIxProc, NULL, 0); - if (rc != PMIX_SUCCESS) { - throw std::runtime_error(tools::ToString("Client ns ", fPMIxProc.nspace, - " rank ", fPMIxProc.rank, - " pid ", fPid, - ": PMIx_Init failed: ", rc)); - } - - LOG(info) << "Client ns " << fPMIxProc.nspace << " rank " << fPMIxProc.rank << " pid " << fPid - << ": Running"; -} - -PMIx::~PMIx() -{ - LOG(info) << "Client ns " << fPMIxProc.nspace << " rank " << fPMIxProc.rank << " pid " << fPid - << ": Finalizing"; - - auto rc = PMIx_Finalize(NULL, 0); - if (rc != PMIX_SUCCESS) { - throw std::runtime_error(tools::ToString("Client ns ", fPMIxProc.nspace, - " rank ", fPMIxProc.rank, - " pid ", fPid, - ": PMIx_Finalize failed: ", rc)); - } - - LOG(info) << "Client ns " << fPMIxProc.nspace << " rank " << fPMIxProc.rank << " pid " << fPid - << ": PMIx_Finalize successfully completed"; -} - -} /* namespace plugins */ -} /* namespace mq */ -} /* namespace fair */ diff --git a/fairmq/plugins/PMIx/PMIx.hpp b/fairmq/plugins/PMIx/PMIx.hpp new file mode 100644 index 00000000..795d0846 --- /dev/null +++ b/fairmq/plugins/PMIx/PMIx.hpp @@ -0,0 +1,152 @@ +/******************************************************************************** + * Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#ifndef PMIX_HPP +#define PMIX_HPP + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +// C++ PMIx v2.1 API +namespace pmix +{ + +struct runtime_error : std::runtime_error +{ + using std::runtime_error::runtime_error; +}; + +using status = pmix_status_t; + +using nspace = pmix_nspace_t; + +using key = pmix_key_t; + +using data_type = pmix_data_type_t; + +struct rank +{ + enum named : pmix_rank_t + { + undef = PMIX_RANK_UNDEF, + wildcard = PMIX_RANK_WILDCARD, + local_node = PMIX_RANK_LOCAL_NODE + }; + rank(pmix_rank_t r) + : m_value(r) + {} + operator pmix_rank_t() { return m_value; } + + private: + pmix_rank_t m_value; +}; + +struct proc : pmix_proc_t +{ + proc() { PMIX_PROC_CONSTRUCT(static_cast(this)); } + ~proc() { PMIX_PROC_DESTRUCT(static_cast(this)); } + proc(pmix::nspace ns, pmix::rank r) + { + PMIX_PROC_LOAD(static_cast(this), ns, static_cast(r)); + } + + friend std::ostream& operator<<(std::ostream& os, const proc& p) + { + return os << "nspace=" << p.nspace << ",rank=" << p.rank; + } +}; + +struct value : pmix_value_t +{ + value() { PMIX_VALUE_CONSTRUCT(static_cast(this)); } + ~value() { /*PMIX_VALUE_DESTRUCT(static_cast(this));*/ } + + // template + // value(const T* val, data_type dt) + // { + // PMIX_VALUE_LOAD(static_cast(this), const_cast(val), dt); + // } + template + value(T) + { + throw runtime_error("Given value type not supported or not yet implemented."); + } + value(const char* val) + { + PMIX_VALUE_LOAD(static_cast(this), const_cast(val), PMIX_STRING); + } + value(const std::string& val) + { + PMIX_VALUE_LOAD( + static_cast(this), const_cast(val.c_str()), PMIX_STRING); + } +}; + +struct info : pmix_info_t +{ + info() { PMIX_INFO_CONSTRUCT(static_cast(this)); } + ~info() { PMIX_INFO_DESTRUCT(static_cast(this)); } + + template + info(const std::string& k, Args&&... args) + { + (void)strncpy(key, k.c_str(), PMIX_MAX_KEYLEN); + flags = 0; + value = pmix::value(std::forward(args)...); + } +}; + +auto init(const std::vector& info = {}) -> proc +{ + proc res; + status rc; + + rc = PMIx_Init(&res, const_cast(info.data()), info.size()); + if (rc != PMIX_SUCCESS) { + throw runtime_error("pmix::init() failed: rc=" + rc); + } + + return res; +} + +auto initialized() -> bool { return !!PMIx_Initialized(); } + +auto get_version() -> const char* { return PMIx_Get_version(); } + +auto finalize(const std::vector& info = {}) -> void +{ + status rc; + + rc = PMIx_Finalize(info.data(), info.size()); + if (rc != PMIX_SUCCESS) { + throw runtime_error("pmix::finalize() failed: rc=" + rc); + } +} + +auto publish(const std::vector& info) -> void +{ + status rc; + + rc = PMIx_Publish(info.data(), info.size()); + if (rc != PMIX_SUCCESS) { + throw runtime_error("pmix::publish() failed: rc=" + rc); + } +} + +} /* namespace pmix */ + +#endif /* PMIX_HPP */ diff --git a/fairmq/plugins/PMIx/PMIxPlugin.cxx b/fairmq/plugins/PMIx/PMIxPlugin.cxx new file mode 100644 index 00000000..7393a534 --- /dev/null +++ b/fairmq/plugins/PMIx/PMIxPlugin.cxx @@ -0,0 +1,124 @@ +/******************************************************************************** + * Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#include "PMIxPlugin.h" + +#include +#include +#include + +namespace fair +{ +namespace mq +{ +namespace plugins +{ + +PMIxPlugin::PMIxPlugin(const std::string& name, + const Plugin::Version version, + const std::string& maintainer, + const std::string& homepage, + PluginServices* pluginServices) + : Plugin(name, version, maintainer, homepage, pluginServices) + , fPid(getpid()) +{ + SubscribeToDeviceStateChange([&](DeviceState newState) { + switch (newState) { + case DeviceState::InitializingDevice: + if (!pmix::initialized()) { + fProc = pmix::init(); + LOG(debug) << PMIxClient() << " pmix::init() OK: " << fProc + << ",version=" << pmix::get_version(); + } + + FillChannelContainers(); + + PublishBoundChannels(); + + // pmix_proc_t proc; + // rc = PMIx_Fence(&proc, 1, NULL, 0) + // fence + + // lookup + + // fence + break; + case DeviceState::Exiting: + UnsubscribeFromDeviceStateChange(); + break; + default: + break; + } + }); +} + +PMIxPlugin::~PMIxPlugin() +{ + LOG(debug) << PMIxClient() << " Finalizing PMIx session... (On success, logs seen by the RTE will stop here.)"; + + while (pmix::initialized()) { + try { + pmix::finalize(); + LOG(debug) << PMIxClient() << " pmix::finalize() OK"; + } catch (const pmix::runtime_error& e) { + LOG(debug) << PMIxClient() << " pmix::finalize() failed: " << e.what(); + } + } +} + +auto PMIxPlugin::FillChannelContainers() -> void +{ + try { + std::unordered_map channelInfo(GetChannelInfo()); + + // fill binding and connecting chans + for (const auto& c : channelInfo) { + std::string methodKey{"chans." + c.first + "." + std::to_string(c.second - 1) + + ".method"}; + if (GetProperty(methodKey) == "bind") { + fBindingChannels.insert(std::make_pair(c.first, std::vector())); + for (int i = 0; i < c.second; ++i) { + fBindingChannels.at(c.first).push_back(GetProperty( + std::string{"chans." + c.first + "." + std::to_string(i) + ".address"})); + } + } else if (GetProperty(methodKey) == "connect") { + fConnectingChannels.insert(std::make_pair(c.first, ConnectingChannel())); + LOG(debug) << "preparing to connect: " << c.first << " with " << c.second + << " sub-channels."; + for (int i = 0; i < c.second; ++i) { + fConnectingChannels.at(c.first).fSubChannelAddresses.push_back(std::string()); + } + } else { + LOG(error) << "Cannot update address configuration. Channel method (bind/connect) " + "not specified."; + return; + } + } + } catch (const std::exception& e) { + LOG(error) << "Error filling channel containers: " << e.what(); + } +} + +auto PMIxPlugin::PublishBoundChannels() -> void +{ + std::vector infos; + infos.reserve(fBindingChannels.size()); + + for (const auto& channel : fBindingChannels) { + std::string joined = boost::algorithm::join(channel.second, ","); + infos.emplace_back(channel.first, joined); + } + + pmix::publish(infos); + LOG(debug) << PMIxClient() << " pmix::publish() OK: published " + << fBindingChannels.size() << " binding channels."; +} + +} /* namespace plugins */ +} /* namespace mq */ +} /* namespace fair */ diff --git a/fairmq/plugins/PMIx/PMIx.h b/fairmq/plugins/PMIx/PMIxPlugin.h similarity index 64% rename from fairmq/plugins/PMIx/PMIx.h rename to fairmq/plugins/PMIx/PMIxPlugin.h index cb1ab208..3e731182 100644 --- a/fairmq/plugins/PMIx/PMIx.h +++ b/fairmq/plugins/PMIx/PMIxPlugin.h @@ -9,12 +9,20 @@ #ifndef FAIR_MQ_PLUGINS_PMIX #define FAIR_MQ_PLUGINS_PMIX +#include "PMIx.hpp" + #include #include +#include -#include +#include +#include +#include +#include #include #include +#include +#include namespace fair { @@ -23,19 +31,41 @@ namespace mq namespace plugins { -class PMIx : public Plugin +struct ConnectingChannel +{ + ConnectingChannel() + : fSubChannelAddresses() + , fValues() + {} + + std::vector fSubChannelAddresses; + std::unordered_map fValues; +}; + +class PMIxPlugin : public Plugin { public: - PMIx(const std::string& name, + PMIxPlugin(const std::string& name, const Plugin::Version version, const std::string& maintainer, const std::string& homepage, PluginServices* pluginServices); - ~PMIx(); + ~PMIxPlugin(); + auto PMIxClient() const -> std::string + { + std::stringstream ss; + ss << "PMIx client(pid=" << fPid << ")"; + return ss.str(); + } private: - pmix_proc_t fPMIxProc; + pmix::proc fProc; pid_t fPid; + std::unordered_map> fBindingChannels; + std::unordered_map fConnectingChannels; + + auto FillChannelContainers() -> void; + auto PublishBoundChannels() -> void; }; Plugin::ProgOptions PMIxProgramOptions() @@ -47,7 +77,7 @@ Plugin::ProgOptions PMIxProgramOptions() } REGISTER_FAIRMQ_PLUGIN( - PMIx, // Class name + PMIxPlugin, // Class name pmix, // Plugin name (string, lower case chars only) (Plugin::Version{FAIRMQ_VERSION_MAJOR, FAIRMQ_VERSION_MINOR, @@ -61,4 +91,4 @@ REGISTER_FAIRMQ_PLUGIN( } /* namespace mq */ } /* namespace fair */ -#endif /* FAIR_MQ_PLUGINS_DDS */ +#endif /* FAIR_MQ_PLUGINS_PMIX */