mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-12 16:21:13 +00:00
feat(plugins)!: Remove PMIx
plugin
This commit is contained in:
parent
2e98a4e2cb
commit
0aecfff133
|
@ -27,8 +27,6 @@ fairmq_build_option(BUILD_FAIRMQ "Build FairMQ library and devices."
|
|||
DEFAULT ON)
|
||||
fairmq_build_option(BUILD_TESTING "Build tests."
|
||||
DEFAULT OFF REQUIRES "BUILD_FAIRMQ")
|
||||
fairmq_build_option(BUILD_PMIX_PLUGIN "Build PMIx plugin."
|
||||
DEFAULT OFF REQUIRES "BUILD_FAIRMQ")
|
||||
fairmq_build_option(BUILD_EXAMPLES "Build FairMQ examples."
|
||||
DEFAULT ON REQUIRES "BUILD_FAIRMQ")
|
||||
fairmq_build_option(BUILD_TIDY_TOOL "Build the fairmq-tidy tool."
|
||||
|
@ -84,9 +82,6 @@ endif()
|
|||
if(BUILD_TESTING)
|
||||
list(APPEND PROJECT_PACKAGE_COMPONENTS tests)
|
||||
endif()
|
||||
if(BUILD_PMIX_PLUGIN)
|
||||
list(APPEND PROJECT_PACKAGE_COMPONENTS pmix_plugin)
|
||||
endif()
|
||||
if(BUILD_EXAMPLES)
|
||||
list(APPEND PROJECT_PACKAGE_COMPONENTS examples)
|
||||
endif()
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
################################################################################
|
||||
# Copyright (C) 2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
|
||||
# Copyright (C) 2021-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
|
||||
# #
|
||||
# This software is distributed under the terms of the #
|
||||
# GNU Lesser General Public Licence (LGPL) version 3, #
|
||||
|
@ -42,9 +42,6 @@ endif()
|
|||
ctest_start(Continuous)
|
||||
|
||||
list(APPEND options "-DDISABLE_COLOR=ON" "-DBUILD_EXAMPLES=ON" "-DBUILD_TESTING=ON")
|
||||
if(HAS_PMIX)
|
||||
list(APPEND options "-DBUILD_PMIX_PLUGIN=ON")
|
||||
endif()
|
||||
if(RUN_STATIC_ANALYSIS)
|
||||
list(APPEND options "-DRUN_STATIC_ANALYSIS=ON")
|
||||
endif()
|
||||
|
|
14
Jenkinsfile
vendored
14
Jenkinsfile
vendored
|
@ -85,14 +85,14 @@ pipeline{
|
|||
steps{
|
||||
script {
|
||||
def builds = jobMatrix('build', [
|
||||
[os: 'ubuntu', ver: '20.04', arch: 'x86_64', compiler: 'gcc-9', extra: '-DHAS_PMIX=ON'],
|
||||
[os: 'ubuntu', ver: '20.04', arch: 'x86_64', compiler: 'gcc-9'],
|
||||
[os: 'ubuntu', ver: '22.04', arch: 'x86_64', compiler: 'gcc-11'],
|
||||
[os: 'fedora', ver: '33', arch: 'x86_64', compiler: 'gcc-10', extra: '-DHAS_PMIX=ON'],
|
||||
[os: 'fedora', ver: '34', arch: 'x86_64', compiler: 'gcc-11', extra: '-DHAS_PMIX=ON'],
|
||||
[os: 'fedora', ver: '35', arch: 'x86_64', compiler: 'gcc-11', extra: '-DHAS_PMIX=ON'],
|
||||
[os: 'fedora', ver: '36', arch: 'x86_64', compiler: 'gcc-12', extra: '-DHAS_PMIX=ON'],
|
||||
[os: 'fedora', ver: '37', arch: 'x86_64', compiler: 'gcc-12', extra: '-DHAS_PMIX=ON'],
|
||||
[os: 'fedora', ver: '38', arch: 'x86_64', compiler: 'gcc-13', extra: '-DHAS_PMIX=ON'],
|
||||
[os: 'fedora', ver: '33', arch: 'x86_64', compiler: 'gcc-10'],
|
||||
[os: 'fedora', ver: '34', arch: 'x86_64', compiler: 'gcc-11'],
|
||||
[os: 'fedora', ver: '35', arch: 'x86_64', compiler: 'gcc-11'],
|
||||
[os: 'fedora', ver: '36', arch: 'x86_64', compiler: 'gcc-12'],
|
||||
[os: 'fedora', ver: '37', arch: 'x86_64', compiler: 'gcc-12'],
|
||||
[os: 'fedora', ver: '38', arch: 'x86_64', compiler: 'gcc-13'],
|
||||
[os: 'macos', ver: '12', arch: 'x86_64', compiler: 'apple-clang-13', extra: '-DHAS_ASIO=ON'],
|
||||
[os: 'macos', ver: '12', arch: 'arm64', compiler: 'apple-clang-13', extra: '-DHAS_ASIO=ON'],
|
||||
])
|
||||
|
|
|
@ -82,7 +82,6 @@ list(PREPEND CMAKE_PREFIX_PATH /path/to/fairmq_install)
|
|||
* [FairCMakeModules](https://github.com/FairRootGroup/FairCMakeModules) (optionally bundled)
|
||||
* [FairLogger](https://github.com/FairRootGroup/FairLogger)
|
||||
* [GTest](https://github.com/google/googletest) (optionally bundled)
|
||||
* [PMIx](https://pmix.org/)
|
||||
* [ZeroMQ](http://zeromq.org/)
|
||||
|
||||
Which dependencies are required depends on which components are built.
|
||||
|
@ -96,7 +95,6 @@ On command line:
|
|||
* `-DDISABLE_COLOR=ON` disables coloured console output.
|
||||
* `-DBUILD_TESTING=OFF` disables building of tests.
|
||||
* `-DBUILD_EXAMPLES=OFF` disables building of examples.
|
||||
* `-DBUILD_PMIX_PLUGIN=ON` enables building of the PMIx plugin.
|
||||
* `-DBUILD_DOCS=ON` enables building of API docs.
|
||||
* You can hint non-system installations for dependent packages, see the #installation-from-source section above
|
||||
|
||||
|
@ -154,4 +152,4 @@ After the `find_package(FairMQ)` call the following CMake variables are defined:
|
|||
1. [Usage](docs/Plugins.md#71-usage)
|
||||
2. [Development](docs/Plugins.md#72-development)
|
||||
3. [Provided Plugins](docs/Plugins.md#73-provided-plugins)
|
||||
2. [PMIx](docs/Plugins.md#731-pmix)
|
||||
1. [PMIx](docs/Plugins.md#731-pmix)
|
||||
|
|
|
@ -18,10 +18,6 @@ if(BUILD_FAIRMQ)
|
|||
set(Threads_PREFIX "<system>")
|
||||
endif()
|
||||
|
||||
if(BUILD_PMIX_PLUGIN)
|
||||
find_package2(PRIVATE PMIx REQUIRED VERSION 2.1.4)
|
||||
endif()
|
||||
|
||||
if(BUILD_FAIRMQ OR BUILD_TIDY_TOOL)
|
||||
find_package2(PUBLIC FairLogger REQUIRED VERSION 1.6.0)
|
||||
find_package2(PUBLIC Boost REQUIRED VERSION 1.66
|
||||
|
|
|
@ -27,12 +27,6 @@ macro(fairmq_summary_components)
|
|||
set(tests_summary "${BRed} NO${CR} (default, enable with ${BMagenta}-DBUILD_TESTING=ON${CR})")
|
||||
endif()
|
||||
message(STATUS " ${BWhite}tests${CR} ${tests_summary}")
|
||||
if(BUILD_PMIX_PLUGIN)
|
||||
set(pmix_summary "${BGreen}YES${CR} EXPERIMENTAL (disable with ${BMagenta}-DBUILD_PMIX_PLUGIN=OFF${CR})")
|
||||
else()
|
||||
set(pmix_summary "${BRed} NO${CR} EXPERIMENTAL (default, enable with ${BMagenta}-DBUILD_PMIX_PLUGIN=ON${CR})")
|
||||
endif()
|
||||
message(STATUS " ${BWhite}pmix_plugin${CR} ${pmix_summary}")
|
||||
if(BUILD_EXAMPLES)
|
||||
set(examples_summary "${BGreen}YES${CR} (default, disable with ${BMagenta}-DBUILD_EXAMPLES=OFF${CR})")
|
||||
else()
|
||||
|
|
|
@ -1,74 +0,0 @@
|
|||
################################################################################
|
||||
# Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
|
||||
# #
|
||||
# This software is distributed under the terms of the #
|
||||
# GNU Lesser General Public Licence (LGPL) version 3, #
|
||||
# copied verbatim in the file "LICENSE" #
|
||||
################################################################################
|
||||
|
||||
|
||||
# The "lib/${CMAKE_LIBRARY_ARCHITECTURE}/pmix" part in all
|
||||
# the PATH_SUFFIXES is here to be able to find Debian's
|
||||
# libpmix-dev package. It installs everything below
|
||||
# /usr/lib/${CMAKE_LIBRARY_ARCHITECTURE}/pmix
|
||||
|
||||
|
||||
find_path(PMIx_INCLUDE_DIR
|
||||
NAMES pmix.h
|
||||
HINTS ${PMIX_ROOT} $ENV{PMIX_ROOT}
|
||||
PATH_SUFFIXES include lib/${CMAKE_LIBRARY_ARCHITECTURE}/pmix/include
|
||||
)
|
||||
|
||||
find_path(PMIx_LIBRARY_DIR
|
||||
NAMES libpmix.dylib libpmix.so
|
||||
HINTS ${PMIX_ROOT} $ENV{PMIX_ROOT}
|
||||
PATH_SUFFIXES lib lib64 lib/${CMAKE_LIBRARY_ARCHITECTURE}/pmix/lib
|
||||
)
|
||||
|
||||
find_library(PMIx_LIBRARY_SHARED
|
||||
NAMES libpmix.dylib libpmix.so
|
||||
HINTS ${PMIX_ROOT} $ENV{PMIX_ROOT}
|
||||
PATH_SUFFIXES lib lib64 lib/${CMAKE_LIBRARY_ARCHITECTURE}/pmix/lib
|
||||
)
|
||||
|
||||
find_file(PMIx_VERSION_FILE
|
||||
NAMES pmix_version.h
|
||||
HINTS ${PMIX_ROOT} $ENV{PMIX_ROOT}
|
||||
PATH_SUFFIXES include lib/${CMAKE_LIBRARY_ARCHITECTURE}/pmix/include
|
||||
)
|
||||
|
||||
file(READ "${PMIx_VERSION_FILE}" __version_raw)
|
||||
string(REGEX MATCH "#define PMIX_VERSION_MAJOR ([0-9]?)L?"
|
||||
__version_major_raw "${__version_raw}"
|
||||
)
|
||||
set(PMIx_VERSION_MAJOR "${CMAKE_MATCH_1}")
|
||||
|
||||
string(REGEX MATCH "#define PMIX_VERSION_MINOR ([0-9]?)L?"
|
||||
__version_minor_raw "${__version_raw}"
|
||||
)
|
||||
set(PMIx_VERSION_MINOR "${CMAKE_MATCH_1}")
|
||||
|
||||
string(REGEX MATCH "#define PMIX_VERSION_RELEASE ([0-9]?)L?"
|
||||
__version_patch_raw "${__version_raw}"
|
||||
)
|
||||
set(PMIx_VERSION_PATCH "${CMAKE_MATCH_1}")
|
||||
|
||||
set(PMIx_VERSION "${PMIx_VERSION_MAJOR}.${PMIx_VERSION_MINOR}.${PMIx_VERSION_PATCH}")
|
||||
|
||||
include(FindPackageHandleStandardArgs)
|
||||
find_package_handle_standard_args(PMIx
|
||||
REQUIRED_VARS
|
||||
PMIx_INCLUDE_DIR
|
||||
PMIx_LIBRARY_DIR
|
||||
PMIx_LIBRARY_SHARED
|
||||
|
||||
VERSION_VAR PMIx_VERSION
|
||||
)
|
||||
|
||||
if(NOT TARGET PMIx::libpmix AND PMIx_FOUND)
|
||||
add_library(PMIx::libpmix SHARED IMPORTED)
|
||||
set_target_properties(PMIx::libpmix PROPERTIES
|
||||
IMPORTED_LOCATION ${PMIx_LIBRARY_SHARED}
|
||||
INTERFACE_INCLUDE_DIRECTORIES ${PMIx_INCLUDE_DIR}
|
||||
)
|
||||
endif()
|
|
@ -56,8 +56,6 @@ A more complete example which may serve as a start including example CMake code
|
|||
|
||||
### 7.3.1 PMIx
|
||||
|
||||
The [PMIx](https://pmix.org/) plugin enables launching a FairMQ topology with any PMIx capable launcher, e.g. the [Open Run-Time Environment (ORTE) of OpenMPI](https://www.open-mpi.org/doc/v4.0/man1/mpirun.1.php) or the [Slurm workload manager](https://slurm.schedmd.com/srun.html). This plugin is not (yet) very mature and serves as a proof of concept at the moment.
|
||||
|
||||
TODO example usage
|
||||
The [PMIx](https://pmix.org/) plugin enables launching a FairMQ topology with any PMIx capable launcher, e.g. the [Open Run-Time Environment (ORTE) of OpenMPI](https://www.open-mpi.org/doc/v4.0/man1/mpirun.1.php) or the [Slurm workload manager](https://slurm.schedmd.com/srun.html). This experimental plugin has been last released in v1.4.56 and is removed in v1.5+. For now there are no plans to pick up development of it again.
|
||||
|
||||
← [Back](../README.md)
|
||||
|
|
|
@ -318,10 +318,3 @@ if(BUILD_FAIRMQ)
|
|||
)
|
||||
endforeach()
|
||||
endif()
|
||||
|
||||
####################
|
||||
# external plugins #
|
||||
####################
|
||||
if(BUILD_PMIX_PLUGIN)
|
||||
add_subdirectory(plugins/PMIx)
|
||||
endif()
|
||||
|
|
|
@ -1,33 +0,0 @@
|
|||
################################################################################
|
||||
# Copyright (C) 2019-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
|
||||
# #
|
||||
# This software is distributed under the terms of the #
|
||||
# GNU Lesser General Public Licence (LGPL) version 3, #
|
||||
# copied verbatim in the file "LICENSE" #
|
||||
################################################################################
|
||||
|
||||
set(plugin FairMQPlugin_pmix)
|
||||
add_library(${plugin} SHARED
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/PMIxPlugin.cxx
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/PMIxPlugin.h
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/PMIxCommands.h
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/PMIx.hpp
|
||||
)
|
||||
target_compile_features(${plugin} PUBLIC cxx_std_17)
|
||||
target_link_libraries(${plugin} PUBLIC FairMQ PMIx::libpmix)
|
||||
target_include_directories(${plugin} PRIVATE ${CMAKE_CURRENT_BINARY_DIR})
|
||||
set_target_properties(${plugin} PROPERTIES
|
||||
CXX_VISIBILITY_PRESET hidden
|
||||
OUTPUT_NAME "${PROJECT_NAME_LOWER}-plugin-pmix"
|
||||
)
|
||||
|
||||
set(exe fairmq-pmix-command-ui)
|
||||
add_executable(${exe} ${CMAKE_CURRENT_SOURCE_DIR}/runPMIxCommandUI.cxx)
|
||||
target_link_libraries(${exe} FairMQ PMIx::libpmix)
|
||||
target_include_directories(${exe} PRIVATE ${CMAKE_CURRENT_BINARY_DIR})
|
||||
|
||||
install(TARGETS ${plugin} ${exe}
|
||||
EXPORT ${PROJECT_EXPORT_SET}
|
||||
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
|
||||
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
|
||||
)
|
|
@ -1,317 +0,0 @@
|
|||
/********************************************************************************
|
||||
* Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
|
||||
#ifndef PMIX_HPP
|
||||
#define PMIX_HPP
|
||||
|
||||
#include <array>
|
||||
#include <cstring>
|
||||
#include <functional>
|
||||
#include <limits>
|
||||
#include <memory>
|
||||
#include <ostream>
|
||||
#include <pmix.h>
|
||||
#include <sstream>
|
||||
#include <stdexcept>
|
||||
#include <string>
|
||||
#include <type_traits>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
// C++ PMIx v2.2 API
|
||||
namespace pmix
|
||||
{
|
||||
|
||||
struct runtime_error : std::runtime_error
|
||||
{
|
||||
using std::runtime_error::runtime_error;
|
||||
};
|
||||
|
||||
using status = pmix_status_t;
|
||||
|
||||
using nspace = pmix_nspace_t;
|
||||
|
||||
using key = pmix_key_t;
|
||||
|
||||
using data_type = pmix_data_type_t;
|
||||
|
||||
struct rank
|
||||
{
|
||||
enum named : pmix_rank_t
|
||||
{
|
||||
undef = PMIX_RANK_UNDEF,
|
||||
wildcard = PMIX_RANK_WILDCARD,
|
||||
local_node = PMIX_RANK_LOCAL_NODE
|
||||
};
|
||||
|
||||
explicit rank(pmix_rank_t r)
|
||||
: m_value(r)
|
||||
{}
|
||||
|
||||
operator pmix_rank_t() { return m_value; }
|
||||
|
||||
private:
|
||||
pmix_rank_t m_value;
|
||||
};
|
||||
|
||||
struct proc : pmix_proc_t
|
||||
{
|
||||
proc() { PMIX_PROC_CONSTRUCT(static_cast<pmix_proc_t*>(this)); }
|
||||
~proc() { PMIX_PROC_DESTRUCT(static_cast<pmix_proc_t*>(this)); }
|
||||
|
||||
proc(pmix::nspace ns, pmix::rank r)
|
||||
{
|
||||
PMIX_PROC_LOAD(static_cast<pmix_proc_t*>(this), ns, static_cast<pmix_rank_t>(r));
|
||||
}
|
||||
|
||||
friend std::ostream& operator<<(std::ostream& os, const proc& p)
|
||||
{
|
||||
return os << p.nspace << "_" << p.rank;
|
||||
}
|
||||
};
|
||||
|
||||
struct value : pmix_value_t
|
||||
{
|
||||
value() { PMIX_VALUE_CONSTRUCT(static_cast<pmix_value_t*>(this)); }
|
||||
~value() { PMIX_VALUE_DESTRUCT(static_cast<pmix_value_t*>(this)); }
|
||||
|
||||
value(const value& rhs)
|
||||
{
|
||||
status rc;
|
||||
auto lhs(static_cast<pmix_value_t*>(this));
|
||||
PMIX_VALUE_XFER(rc, lhs, static_cast<pmix_value_t*>(const_cast<value*>(&rhs)));
|
||||
|
||||
if (rc != PMIX_SUCCESS) {
|
||||
throw runtime_error("pmix::value copy ctor failed: rc=" + std::to_string(rc));
|
||||
}
|
||||
}
|
||||
|
||||
template<typename T>
|
||||
explicit value(T)
|
||||
{
|
||||
throw runtime_error("Given value type not supported or not yet implemented.");
|
||||
}
|
||||
|
||||
explicit value(const char* val)
|
||||
{
|
||||
PMIX_VALUE_LOAD(static_cast<pmix_value_t*>(this), const_cast<char*>(val), PMIX_STRING);
|
||||
}
|
||||
|
||||
explicit value(const std::string& val)
|
||||
{
|
||||
PMIX_VALUE_LOAD(
|
||||
static_cast<pmix_value_t*>(this), const_cast<char*>(val.c_str()), PMIX_STRING);
|
||||
}
|
||||
|
||||
explicit value(int val)
|
||||
{
|
||||
PMIX_VALUE_LOAD(static_cast<pmix_value_t*>(this), &val, PMIX_INT);
|
||||
}
|
||||
|
||||
explicit value(pmix_data_array_t* val)
|
||||
{
|
||||
PMIX_VALUE_LOAD(static_cast<pmix_value_t*>(this), val, PMIX_DATA_ARRAY);
|
||||
}
|
||||
};
|
||||
|
||||
struct info : pmix_info_t
|
||||
{
|
||||
info() { PMIX_INFO_CONSTRUCT(static_cast<pmix_info_t*>(this)); }
|
||||
~info() { PMIX_INFO_DESTRUCT(static_cast<pmix_info_t*>(this)); }
|
||||
|
||||
template<typename... Args>
|
||||
info(const std::string& k, Args&&... args)
|
||||
{
|
||||
(void)strncpy(key, k.c_str(), PMIX_MAX_KEYLEN);
|
||||
flags = 0;
|
||||
|
||||
pmix::value rhs(std::forward<Args>(args)...);
|
||||
auto lhs(&value);
|
||||
status rc;
|
||||
PMIX_VALUE_XFER(rc, lhs, static_cast<pmix_value_t*>(&rhs));
|
||||
|
||||
if (rc != PMIX_SUCCESS) {
|
||||
throw runtime_error("pmix::info ctor failed: rc=" + std::to_string(rc));
|
||||
}
|
||||
}
|
||||
|
||||
friend std::ostream& operator<<(std::ostream& os, const info& i)
|
||||
{
|
||||
return os << "key=" << i.key << ",value='" << i.value.data.string << "'";
|
||||
}
|
||||
|
||||
info(const info& rhs)
|
||||
{
|
||||
PMIX_INFO_XFER(static_cast<pmix_info_t*>(this),
|
||||
static_cast<pmix_info_t*>(const_cast<info*>(&rhs)));
|
||||
}
|
||||
};
|
||||
|
||||
struct pdata : pmix_pdata_t
|
||||
{
|
||||
pdata() { PMIX_PDATA_CONSTRUCT(static_cast<pmix_pdata_t*>(this)); }
|
||||
~pdata() { PMIX_PDATA_DESTRUCT(static_cast<pmix_pdata_t*>(this)); }
|
||||
|
||||
pdata(const pdata& rhs)
|
||||
{
|
||||
PMIX_PDATA_XFER(static_cast<pmix_pdata_t*>(this),
|
||||
static_cast<pmix_pdata_t*>(const_cast<pdata*>(&rhs)));
|
||||
}
|
||||
|
||||
auto set_key(const std::string& new_key) -> void
|
||||
{
|
||||
(void)strncpy(key, new_key.c_str(), PMIX_MAX_KEYLEN);
|
||||
}
|
||||
};
|
||||
|
||||
auto init(const std::vector<info>& info = {}) -> proc
|
||||
{
|
||||
proc res;
|
||||
status rc;
|
||||
|
||||
rc = PMIx_Init(&res, const_cast<pmix::info*>(info.data()), info.size());
|
||||
if (rc != PMIX_SUCCESS) {
|
||||
throw runtime_error("pmix::init() failed: rc=" + std::to_string(rc));
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
auto initialized() -> bool { return !!PMIx_Initialized(); }
|
||||
|
||||
auto get_version() -> std::string { return {PMIx_Get_version()}; }
|
||||
|
||||
auto finalize(const std::vector<info>& info = {}) -> void
|
||||
{
|
||||
status rc;
|
||||
|
||||
rc = PMIx_Finalize(info.data(), info.size());
|
||||
if (rc != PMIX_SUCCESS) {
|
||||
throw runtime_error("pmix::finalize() failed: rc=" + std::to_string(rc));
|
||||
}
|
||||
}
|
||||
|
||||
auto publish(const std::vector<info>& info) -> void
|
||||
{
|
||||
status rc;
|
||||
|
||||
rc = PMIx_Publish(info.data(), info.size());
|
||||
if (rc != PMIX_SUCCESS) {
|
||||
throw runtime_error("pmix::publish() failed: rc=" + std::to_string(rc));
|
||||
}
|
||||
}
|
||||
|
||||
auto fence(const std::vector<proc>& procs = {}, const std::vector<info>& info = {}) -> void
|
||||
{
|
||||
status rc;
|
||||
|
||||
rc = PMIx_Fence(procs.data(), procs.size(), info.data(), info.size());
|
||||
if (rc != PMIX_SUCCESS) {
|
||||
throw runtime_error("pmix::fence() failed: rc=" + std::to_string(rc));
|
||||
}
|
||||
}
|
||||
|
||||
auto lookup(std::vector<pdata>& pdata, const std::vector<info>& info = {}) -> void
|
||||
{
|
||||
status rc;
|
||||
|
||||
rc = PMIx_Lookup(pdata.data(), pdata.size(), info.data(), info.size());
|
||||
if (rc != PMIX_SUCCESS) {
|
||||
throw runtime_error("pmix::lookup() failed: rc=" + std::to_string(rc));
|
||||
}
|
||||
}
|
||||
|
||||
std::string get_info(const std::string& name, pmix::proc& process)
|
||||
{
|
||||
pmix_value_t* v;
|
||||
|
||||
pmix::status rc = PMIx_Get(&process, name.c_str(), nullptr, 0, &v);
|
||||
if (rc == PMIX_SUCCESS) {
|
||||
std::stringstream ss;
|
||||
|
||||
switch (v->type) {
|
||||
case PMIX_SIZE: ss << static_cast<size_t>(v->data.size) << " (size_t)"; break;
|
||||
case PMIX_INT: ss << static_cast<int>(v->data.integer) << " (int)"; break;
|
||||
case PMIX_INT8: ss << static_cast<int8_t>(v->data.int8) << " (int8_t)"; break;
|
||||
case PMIX_INT16: ss << static_cast<int16_t>(v->data.int16) << " (int16_t)"; break;
|
||||
case PMIX_INT32: ss << static_cast<int32_t>(v->data.int32) << " (int32_t)"; break;
|
||||
case PMIX_INT64: ss << static_cast<int64_t>(v->data.int64) << " (int64_t)"; break;
|
||||
case PMIX_UINT: ss << static_cast<unsigned int>(v->data.uint) << " (unsigned int)"; break;
|
||||
case PMIX_UINT8: ss << static_cast<uint8_t>(v->data.uint8) << " (uint8_t)"; break;
|
||||
case PMIX_UINT16: ss << static_cast<uint16_t>(v->data.uint16) << " (uint16_t)"; break;
|
||||
case PMIX_UINT32: ss << static_cast<uint32_t>(v->data.uint32) << " (uint32_t)"; break;
|
||||
case PMIX_UINT64: ss << static_cast<uint64_t>(v->data.uint64) << " (uint64_t)"; break;
|
||||
case PMIX_FLOAT: ss << static_cast<float>(v->data.fval) << " (float)"; break;
|
||||
case PMIX_DOUBLE: ss << static_cast<double>(v->data.dval) << " (double)"; break;
|
||||
case PMIX_PID: ss << static_cast<pid_t>(v->data.pid) << " (pid_t)"; break;
|
||||
case PMIX_STRING: ss << static_cast<char*>(v->data.string) << " (string)"; break;
|
||||
case PMIX_PROC_RANK: ss << static_cast<uint32_t>(v->data.rank) << " (pmix_rank_t)"; break;
|
||||
case PMIX_PROC: ss << "proc.nspace: " << static_cast<pmix_proc_t*>(v->data.proc)->nspace
|
||||
<< ", proc.rank: " << static_cast<pmix_proc_t*>(v->data.proc)->rank << " (pmix_proc_t*)"; break;
|
||||
default:
|
||||
ss << "unknown type: " << v->type;
|
||||
break;
|
||||
}
|
||||
|
||||
return ss.str();
|
||||
} else if (rc == PMIX_ERR_NOT_FOUND) {
|
||||
// LOG(error) << "PMIx_Get failed: PMIX_ERR_NOT_FOUND";
|
||||
return "";
|
||||
} else {
|
||||
// LOG(error) << "PMIx_Get failed: " << rc;
|
||||
return "<undefined>";
|
||||
}
|
||||
}
|
||||
|
||||
std::string get_value_str(const pmix_value_t& v)
|
||||
{
|
||||
switch (v.type) {
|
||||
case PMIX_BOOL: return std::to_string(static_cast<bool>(v.data.flag));
|
||||
case PMIX_SIZE: return std::to_string(static_cast<size_t>(v.data.size));
|
||||
case PMIX_INT: return std::to_string(static_cast<int>(v.data.integer));
|
||||
case PMIX_INT8: return std::to_string(static_cast<int8_t>(v.data.int8));
|
||||
case PMIX_INT16: return std::to_string(static_cast<int16_t>(v.data.int16));
|
||||
case PMIX_INT32: return std::to_string(static_cast<int32_t>(v.data.int32));
|
||||
case PMIX_INT64: return std::to_string(static_cast<int64_t>(v.data.int64));
|
||||
case PMIX_UINT: return std::to_string(static_cast<unsigned int>(v.data.uint));
|
||||
case PMIX_UINT8: return std::to_string(static_cast<uint8_t>(v.data.uint8));
|
||||
case PMIX_UINT16: return std::to_string(static_cast<uint16_t>(v.data.uint16));
|
||||
case PMIX_UINT32: return std::to_string(static_cast<uint32_t>(v.data.uint32));
|
||||
case PMIX_UINT64: return std::to_string(static_cast<uint64_t>(v.data.uint64));
|
||||
case PMIX_FLOAT: return std::to_string(static_cast<float>(v.data.fval));
|
||||
case PMIX_DOUBLE: return std::to_string(static_cast<double>(v.data.dval));
|
||||
case PMIX_PID: return std::to_string(static_cast<pid_t>(v.data.pid));
|
||||
case PMIX_STRING: return static_cast<char*>(v.data.string);
|
||||
case PMIX_PROC_RANK: return std::to_string(static_cast<uint32_t>(v.data.rank));
|
||||
case PMIX_POINTER: { std::stringstream ss; ss << static_cast<void*>(v.data.ptr); return ss.str(); }
|
||||
case PMIX_DATA_ARRAY: {
|
||||
if (v.data.darray->type == PMIX_PROC) {
|
||||
std::stringstream ss;
|
||||
ss << "[";
|
||||
for (size_t i = 0; i < v.data.darray->size; ++i) {
|
||||
ss << static_cast<pmix_proc_t*>(static_cast<pmix_data_array_t*>(v.data.darray)->array)[0].nspace;
|
||||
ss << "_";
|
||||
ss << static_cast<pmix_proc_t*>(static_cast<pmix_data_array_t*>(v.data.darray)->array)[0].rank;
|
||||
|
||||
if (i < v.data.darray->size - 1) {
|
||||
ss << ",";
|
||||
}
|
||||
}
|
||||
ss << "]";
|
||||
return ss.str();
|
||||
} else {
|
||||
return "UNKNOWN TYPE IN DATA ARRAY";
|
||||
}
|
||||
}
|
||||
default: return "UNKNOWN TYPE";
|
||||
}
|
||||
}
|
||||
|
||||
} /* namespace pmix */
|
||||
|
||||
#endif /* PMIX_HPP */
|
|
@ -1,291 +0,0 @@
|
|||
/********************************************************************************
|
||||
* Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
|
||||
#ifndef PMIXCOMMANDS_H
|
||||
#define PMIXCOMMANDS_H
|
||||
|
||||
#include "PMIx.hpp"
|
||||
|
||||
#include <fairlogger/Logger.h>
|
||||
#include <fairmq/tools/Semaphore.h>
|
||||
#include <memory> // make_unique
|
||||
#include <string>
|
||||
|
||||
namespace pmix
|
||||
{
|
||||
|
||||
std::array<std::string, 47> typeNames =
|
||||
{
|
||||
{
|
||||
"PMIX_UNDEF",
|
||||
"PMIX_BOOL",
|
||||
"PMIX_BYTE",
|
||||
"PMIX_STRING",
|
||||
"PMIX_SIZE",
|
||||
"PMIX_PID",
|
||||
"PMIX_INT",
|
||||
"PMIX_INT8",
|
||||
"PMIX_INT16",
|
||||
"PMIX_INT32",
|
||||
"PMIX_INT64",
|
||||
"PMIX_UINT",
|
||||
"PMIX_UINT8",
|
||||
"PMIX_UINT16",
|
||||
"PMIX_UINT32",
|
||||
"PMIX_UINT64",
|
||||
"PMIX_FLOAT",
|
||||
"PMIX_DOUBLE",
|
||||
"PMIX_TIMEVAL",
|
||||
"PMIX_TIME",
|
||||
"PMIX_STATUS",
|
||||
"PMIX_VALUE",
|
||||
"PMIX_PROC",
|
||||
"PMIX_APP",
|
||||
"PMIX_INFO",
|
||||
"PMIX_PDATA",
|
||||
"PMIX_BUFFER",
|
||||
"PMIX_BYTE_OBJECT",
|
||||
"PMIX_KVAL",
|
||||
"PMIX_MODEX",
|
||||
"PMIX_PERSIST",
|
||||
"PMIX_POINTER",
|
||||
"PMIX_SCOPE",
|
||||
"PMIX_DATA_RANGE",
|
||||
"PMIX_COMMAND",
|
||||
"PMIX_INFO_DIRECTIVES",
|
||||
"PMIX_DATA_TYPE",
|
||||
"PMIX_PROC_STATE",
|
||||
"PMIX_PROC_INFO",
|
||||
"PMIX_DATA_ARRAY",
|
||||
"PMIX_PROC_RANK",
|
||||
"PMIX_QUERY",
|
||||
"PMIX_COMPRESSED_STRING",
|
||||
"PMIX_ALLOC_DIRECTIVE",
|
||||
"PMIX_INFO_ARRAY",
|
||||
"PMIX_IOF_CHANNEL",
|
||||
"PMIX_ENVAR"
|
||||
}
|
||||
};
|
||||
|
||||
enum class Command : int
|
||||
{
|
||||
general = PMIX_EXTERNAL_ERR_BASE,
|
||||
error = PMIX_EXTERNAL_ERR_BASE - 1
|
||||
};
|
||||
|
||||
|
||||
class Commands
|
||||
{
|
||||
public:
|
||||
Commands(const proc& process)
|
||||
: fProcess(process)
|
||||
, fSubscribed(false)
|
||||
{
|
||||
}
|
||||
|
||||
~Commands()
|
||||
{
|
||||
Unsubscribe();
|
||||
}
|
||||
|
||||
void Subscribe(std::function<void(const std::string& msg, const proc& sender)> callback)
|
||||
{
|
||||
using namespace std::placeholders;
|
||||
|
||||
LOG(debug) << "PMIxCommands: Subscribing...";
|
||||
|
||||
fCallback = callback;
|
||||
std::array<pmix::status, 1> codes;
|
||||
codes[0] = static_cast<int>(pmix::Command::general);
|
||||
|
||||
PMIX_INFO_LOAD(&(fInfos[0]), PMIX_EVENT_RETURN_OBJECT, this, PMIX_POINTER);
|
||||
|
||||
PMIx_Register_event_handler(codes.data(), codes.size(),
|
||||
fInfos.data(), fInfos.size(),
|
||||
&Commands::Handler,
|
||||
&Commands::EventHandlerRegistration,
|
||||
this);
|
||||
fBlocker.Wait();
|
||||
LOG(debug) << "PMIxCommands: Subscribing complete!";
|
||||
}
|
||||
|
||||
void Unsubscribe()
|
||||
{
|
||||
if (fSubscribed) {
|
||||
LOG(debug) << "PMIxCommands: Unsubscribing...";
|
||||
PMIx_Deregister_event_handler(fHandlerRef, &Commands::EventHandlerDeregistration, this);
|
||||
fBlocker.Wait();
|
||||
LOG(debug) << "PMIxCommands: Unsubscribing complete!";
|
||||
} else {
|
||||
LOG(debug) << "Unsubscribe() is called while no subscription is active";
|
||||
}
|
||||
}
|
||||
|
||||
struct Holder
|
||||
{
|
||||
Holder() : fData(nullptr) {}
|
||||
~Holder() { PMIX_DATA_ARRAY_FREE(fData); }
|
||||
|
||||
std::vector<pmix::info> fInfos;
|
||||
pmix_data_array_t* fData;
|
||||
};
|
||||
|
||||
void Send(const std::string& msg)
|
||||
{
|
||||
std::vector<pmix::info>* infos = new std::vector<pmix::info>();
|
||||
infos->emplace_back("fairmq.cmd", msg);
|
||||
PMIx_Notify_event(static_cast<int>(pmix::Command::general),
|
||||
&fProcess,
|
||||
PMIX_RANGE_NAMESPACE,
|
||||
infos->data(), infos->size(),
|
||||
&Commands::OpCompleteCallback<std::vector<pmix::info>>,
|
||||
infos);
|
||||
}
|
||||
|
||||
void Send(const std::string& msg, rank rank)
|
||||
{
|
||||
pmix::proc destination(fProcess);
|
||||
destination.rank = rank;
|
||||
Send(msg, {destination});
|
||||
}
|
||||
|
||||
void Send(const std::string& msg, const std::vector<proc>& destination)
|
||||
{
|
||||
std::unique_ptr<Holder> holder = std::make_unique<Holder>();
|
||||
|
||||
PMIX_DATA_ARRAY_CREATE(holder->fData, destination.size(), PMIX_PROC);
|
||||
memcpy(holder->fData->array, destination.data(), destination.size() * sizeof(pmix_proc_t));
|
||||
// LOG(warn) << "OLOG: " << msg << " > " << static_cast<pmix_proc_t*>(holder->fData->array)[0].nspace << ": " << static_cast<pmix_proc_t*>(holder->fData->array)[0].rank;
|
||||
holder->fInfos.emplace_back(PMIX_EVENT_CUSTOM_RANGE, holder->fData);
|
||||
// LOG(warn) << msg << " // packed range: " << static_cast<pmix_proc_t*>(static_cast<pmix_data_array_t*>(holder->fInfos.at(0).value.data.darray)->array)[0].nspace << "_" << static_cast<pmix_proc_t*>(static_cast<pmix_data_array_t*>(holder->fInfos.at(0).value.data.darray)->array)[0].rank;
|
||||
// LOG(warn) << msg << " // packed range.type: " << pmix::typeNames.at(holder->fInfos.at(0).value.type);
|
||||
// LOG(warn) << msg << " // packed range.array.type: " << pmix::typeNames.at(static_cast<pmix_data_array_t*>(holder->fInfos.at(0).value.data.darray)->type);
|
||||
// LOG(warn) << msg << " // packed range.array.size: " << static_cast<pmix_data_array_t*>(holder->fInfos.at(0).value.data.darray)->size;
|
||||
// LOG(warn) << holder->fInfos.size();
|
||||
holder->fInfos.emplace_back("fairmq.cmd", msg);
|
||||
// LOG(warn) << msg << " // packed msg: " << holder->fInfos.at(1).value.data.string;
|
||||
// LOG(warn) << msg << " // packed msg.type: " << pmix::typeNames.at(holder->fInfos.at(1).value.type);
|
||||
// LOG(warn) << holder->fInfos.size();
|
||||
|
||||
PMIx_Notify_event(static_cast<int>(pmix::Command::general),
|
||||
&fProcess,
|
||||
PMIX_RANGE_CUSTOM,
|
||||
holder->fInfos.data(), holder->fInfos.size(),
|
||||
&Commands::OpCompleteCallback<Holder>,
|
||||
holder.get());
|
||||
holder.release();
|
||||
}
|
||||
|
||||
private:
|
||||
static void EventHandlerRegistration(pmix_status_t s, size_t handlerRef, void* obj)
|
||||
{
|
||||
if (s == PMIX_SUCCESS) {
|
||||
LOG(debug) << "Successfully registered event handler, reference = " << static_cast<unsigned long>(handlerRef);
|
||||
static_cast<Commands*>(obj)->fHandlerRef = handlerRef;
|
||||
static_cast<Commands*>(obj)->fSubscribed = true;
|
||||
} else {
|
||||
LOG(error) << "Could not register PMIx event handler, status = " << s;
|
||||
}
|
||||
static_cast<Commands*>(obj)->fBlocker.Signal();
|
||||
}
|
||||
|
||||
static void EventHandlerDeregistration(pmix_status_t s, void* obj)
|
||||
{
|
||||
if (s == PMIX_SUCCESS) {
|
||||
LOG(debug) << "Successfully deregistered event handler, reference = " << static_cast<Commands*>(obj)->fHandlerRef;
|
||||
static_cast<Commands*>(obj)->fSubscribed = false;
|
||||
} else {
|
||||
LOG(error) << "Could not deregister PMIx event handler, reference = " << static_cast<Commands*>(obj)->fHandlerRef << ", status = " << s;
|
||||
}
|
||||
static_cast<Commands*>(obj)->fBlocker.Signal();
|
||||
}
|
||||
|
||||
template<typename T>
|
||||
static void OpCompleteCallback(pmix_status_t s, void* data)
|
||||
{
|
||||
if (s == PMIX_SUCCESS) {
|
||||
// LOG(info) << "Operation completed successfully";
|
||||
} else {
|
||||
LOG(error) << "Could not complete operation, status = " << s;
|
||||
}
|
||||
if (data) {
|
||||
// LOG(warn) << "Destroying event data...";
|
||||
delete static_cast<T*>(data);
|
||||
}
|
||||
}
|
||||
|
||||
static void Handler(size_t handlerId,
|
||||
pmix_status_t s,
|
||||
const pmix_proc_t* src,
|
||||
pmix_info_t info[], size_t ninfo,
|
||||
pmix_info_t[] /* results */, size_t nresults,
|
||||
pmix_event_notification_cbfunc_fn_t cbfunc,
|
||||
void* cbdata)
|
||||
{
|
||||
std::stringstream ss;
|
||||
ss << "Event handler called with "
|
||||
<< "status: " << s << ", "
|
||||
<< "source: " << src->nspace << "_" << src->rank << ", "
|
||||
<< "ninfo: " << ninfo << ", "
|
||||
<< "nresults: " << nresults << ", "
|
||||
<< "handlerId: " << handlerId;
|
||||
|
||||
std::string msg;
|
||||
|
||||
Commands* obj = nullptr;
|
||||
|
||||
if (ninfo > 0) {
|
||||
ss << ":\n";
|
||||
for (size_t i = 0; i < ninfo; ++i) {
|
||||
ss << " [" << i << "]: key: '" << info[i].key
|
||||
<< "', value: '" << pmix::get_value_str(info[i].value)
|
||||
<< "', value.type: '" << pmix::typeNames.at(info[i].value.type)
|
||||
<< "', flags: " << info[i].flags;
|
||||
|
||||
if (std::strcmp(info[i].key, "fairmq.cmd") == 0) {
|
||||
msg = pmix::get_value_str(info[i].value);
|
||||
}
|
||||
|
||||
if (std::strcmp(info[i].key, PMIX_EVENT_RETURN_OBJECT) == 0) {
|
||||
obj = static_cast<Commands*>(info[i].value.data.ptr);
|
||||
}
|
||||
|
||||
if (i < ninfo - 1) {
|
||||
ss << "\n";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
if (obj != nullptr) {
|
||||
if (static_cast<Commands*>(obj)->fProcess.rank != src->rank) {
|
||||
// LOG(warn) << ss.str();
|
||||
static_cast<Commands*>(obj)->fCallback(msg, proc(const_cast<char*>(src->nspace), rank(src->rank)));
|
||||
} else {
|
||||
// LOG(trace) << "suppressing message from itself";
|
||||
}
|
||||
} else {
|
||||
LOG(error) << "ERROR";
|
||||
}
|
||||
|
||||
if (cbfunc != nullptr) {
|
||||
cbfunc(PMIX_SUCCESS, nullptr, 0, nullptr, nullptr, cbdata);
|
||||
}
|
||||
}
|
||||
|
||||
const proc& fProcess;
|
||||
size_t fHandlerRef;
|
||||
std::function<void(const std::string& msg, const proc& sender)> fCallback;
|
||||
std::array<pmix_info_t, 1> fInfos;
|
||||
bool fSubscribed;
|
||||
fair::mq::tools::SharedSemaphore fBlocker;
|
||||
};
|
||||
|
||||
} /* namespace pmix */
|
||||
|
||||
#endif /* PMIXCOMMANDS_H */
|
|
@ -1,205 +0,0 @@
|
|||
/********************************************************************************
|
||||
* Copyright (C) 2019-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
|
||||
#include "PMIxPlugin.h"
|
||||
|
||||
#include <fairmq/tools/Strings.h>
|
||||
|
||||
#include <sstream>
|
||||
#include <stdexcept>
|
||||
#include <cstdint> // UINT32_MAX
|
||||
|
||||
using namespace std;
|
||||
|
||||
namespace fair::mq::plugins
|
||||
{
|
||||
|
||||
PMIxPlugin::PMIxPlugin(const string& name,
|
||||
const Plugin::Version version,
|
||||
const string& maintainer,
|
||||
const string& homepage,
|
||||
PluginServices* pluginServices)
|
||||
: Plugin(name, version, maintainer, homepage, pluginServices)
|
||||
, fProcess(Init())
|
||||
, fPid(getpid())
|
||||
, fPMIxClient(tools::ToString("PMIx client(pid=", fPid, ") "))
|
||||
, fDeviceId(string(fProcess.nspace) + "_" + to_string(fProcess.rank))
|
||||
// , fLastExternalController(UINT32_MAX)
|
||||
, fExitingAckedByLastExternalController(false)
|
||||
, fCurrentState(DeviceState::Idle)
|
||||
, fLastState(DeviceState::Idle)
|
||||
{
|
||||
TakeDeviceControl();
|
||||
LOG(debug) << PMIxClient() << "pmix::init() OK: " << fProcess << ", version=" << pmix::get_version();
|
||||
SetProperty<string>("id", fDeviceId);
|
||||
|
||||
Fence("pmix::init");
|
||||
|
||||
// LOG(info) << "PMIX_EXTERNAL_ERR_BASE: " << PMIX_EXTERNAL_ERR_BASE;
|
||||
|
||||
// job level infos
|
||||
// LOG(info) << "PMIX_SESSION_ID: " << pmix::getInfo(PMIX_SESSION_ID, fProcess);
|
||||
// LOG(info) << "PMIX_UNIV_SIZE: " << pmix::getInfo(PMIX_UNIV_SIZE, fProcess);
|
||||
// LOG(info) << "PMIX_JOB_SIZE: " << pmix::getInfo(PMIX_JOB_SIZE, fProcess);
|
||||
// LOG(info) << "PMIX_JOB_NUM_APPS: " << pmix::getInfo(PMIX_JOB_NUM_APPS, fProcess);
|
||||
// LOG(info) << "PMIX_APP_SIZE: " << pmix::getInfo(PMIX_APP_SIZE, fProcess);
|
||||
// LOG(info) << "PMIX_MAX_PROCS: " << pmix::getInfo(PMIX_MAX_PROCS, fProcess);
|
||||
// LOG(info) << "PMIX_NUM_NODES: " << pmix::getInfo(PMIX_NUM_NODES, fProcess);
|
||||
// LOG(info) << "PMIX_CLUSTER_ID: " << pmix::getInfo(PMIX_CLUSTER_ID, fProcess);
|
||||
// LOG(info) << "PMIX_NSPACE: " << pmix::getInfo(PMIX_NSPACE, fProcess);
|
||||
// LOG(info) << "PMIX_JOBID: " << pmix::getInfo(PMIX_JOBID, fProcess);
|
||||
// LOG(info) << "PMIX_NODE_LIST: " << pmix::getInfo(PMIX_NODE_LIST, fProcess);
|
||||
// LOG(info) << "PMIX_ALLOCATED_NODELIST: " << pmix::getInfo(PMIX_ALLOCATED_NODELIST, fProcess);
|
||||
// LOG(info) << "PMIX_NPROC_OFFSET: " << pmix::getInfo(PMIX_NPROC_OFFSET, fProcess);
|
||||
// LOG(info) << "PMIX_LOCALLDR: " << pmix::getInfo(PMIX_LOCALLDR, fProcess);
|
||||
// LOG(info) << "PMIX_APPLDR: " << pmix::getInfo(PMIX_APPLDR, fProcess);
|
||||
|
||||
// // per-node information
|
||||
// LOG(info) << "PMIX_NODE_SIZE: " << pmix::getInfo(PMIX_NODE_SIZE, fProcess);
|
||||
// LOG(info) << "PMIX_LOCAL_SIZE: " << pmix::getInfo(PMIX_LOCAL_SIZE, fProcess);
|
||||
// LOG(info) << "PMIX_AVAIL_PHYS_MEMORY: " << pmix::getInfo(PMIX_AVAIL_PHYS_MEMORY, fProcess);
|
||||
|
||||
// // per-process information
|
||||
// LOG(info) << "PMIX_PROCID: " << pmix::getInfo(PMIX_PROCID, fProcess);
|
||||
// LOG(info) << "PMIX_APPNUM: " << pmix::getInfo(PMIX_APPNUM, fProcess);
|
||||
// LOG(info) << "PMIX_LOCAL_RANK: " << pmix::getInfo(PMIX_LOCAL_RANK, fProcess);
|
||||
// LOG(info) << "PMIX_NODE_RANK: " << pmix::getInfo(PMIX_NODE_RANK, fProcess);
|
||||
// LOG(info) << "PMIX_RANK: " << pmix::getInfo(PMIX_RANK, fProcess);
|
||||
// LOG(info) << "PMIX_GLOBAL_RANK: " << pmix::getInfo(PMIX_GLOBAL_RANK, fProcess);
|
||||
// LOG(info) << "PMIX_APP_RANK: " << pmix::getInfo(PMIX_APP_RANK, fProcess);
|
||||
|
||||
SubscribeToDeviceStateChange([this](DeviceState newState) {
|
||||
switch (newState) {
|
||||
case DeviceState::Bound:
|
||||
Publish();
|
||||
break;
|
||||
case DeviceState::Connecting:
|
||||
Lookup();
|
||||
break;
|
||||
case DeviceState::Exiting:
|
||||
ReleaseDeviceControl();
|
||||
UnsubscribeFromDeviceStateChange();
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
|
||||
fLastState = fCurrentState;
|
||||
fCurrentState = newState;
|
||||
// for (auto subscriberId : fStateChangeSubscribers) {
|
||||
// LOG(debug) << "Publishing state-change: " << fLastState << "->" << newState << " to " << subscriberId;
|
||||
// }
|
||||
});
|
||||
}
|
||||
|
||||
PMIxPlugin::~PMIxPlugin()
|
||||
{
|
||||
LOG(debug) << "Destroying PMIxPlugin";
|
||||
ReleaseDeviceControl();
|
||||
while (pmix::initialized()) {
|
||||
try {
|
||||
pmix::finalize();
|
||||
LOG(debug) << PMIxClient() << "pmix::finalize() OK";
|
||||
} catch (const pmix::runtime_error& e) {
|
||||
LOG(debug) << PMIxClient() << "pmix::finalize() failed: " << e.what();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
auto PMIxPlugin::Init() -> pmix::proc
|
||||
{
|
||||
if (!pmix::initialized()) {
|
||||
return pmix::init();
|
||||
} else {
|
||||
throw runtime_error("trying to initialize PMIx while it is already initialized");
|
||||
}
|
||||
}
|
||||
|
||||
auto PMIxPlugin::Publish() -> void
|
||||
{
|
||||
auto channels(GetChannelInfo());
|
||||
vector<pmix::info> info;
|
||||
|
||||
for (const auto& c : channels) {
|
||||
string methodKey("chans." + c.first + "." + to_string(c.second - 1) + ".method");
|
||||
if (GetProperty<string>(methodKey) == "bind") {
|
||||
for (int i = 0; i < c.second; ++i) {
|
||||
string addressKey("chans." + c.first + "." + to_string(i) + ".address");
|
||||
info.emplace_back(addressKey, GetProperty<string>(addressKey));
|
||||
LOG(debug) << PMIxClient() << info.back();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (info.size() > 0) {
|
||||
pmix::publish(info);
|
||||
LOG(debug) << PMIxClient() << "pmix::publish() OK: published " << info.size()
|
||||
<< " binding channels.";
|
||||
}
|
||||
}
|
||||
|
||||
auto PMIxPlugin::Fence() -> void
|
||||
{
|
||||
pmix::proc all(fProcess);
|
||||
all.rank = pmix::rank::wildcard;
|
||||
|
||||
pmix::fence({all});
|
||||
}
|
||||
|
||||
auto PMIxPlugin::Fence(const std::string& label) -> void
|
||||
{
|
||||
Fence();
|
||||
LOG(debug) << PMIxClient() << "pmix::fence() [" << label << "] OK";
|
||||
}
|
||||
|
||||
auto PMIxPlugin::Lookup() -> void
|
||||
{
|
||||
auto channels(GetChannelInfo());
|
||||
for (const auto& c : channels) {
|
||||
string methodKey("chans." + c.first + "." + to_string(c.second - 1) + ".method");
|
||||
if (GetProperty<string>(methodKey) == "connect") {
|
||||
for (int i = 0; i < c.second; ++i) {
|
||||
vector<pmix::pdata> pdata;
|
||||
string addressKey("chans." + c.first + "." + to_string(i) + ".address");
|
||||
pdata.emplace_back();
|
||||
pdata.back().set_key(addressKey);
|
||||
vector<pmix::info> info;
|
||||
info.emplace_back(PMIX_WAIT, static_cast<int>(pdata.size()));
|
||||
|
||||
if (pdata.size() > 0) {
|
||||
pmix::lookup(pdata, info);
|
||||
LOG(debug) << PMIxClient() << "pmix::lookup() OK";
|
||||
}
|
||||
|
||||
for (const auto& p : pdata) {
|
||||
if (p.value.type == PMIX_UNDEF) {
|
||||
LOG(debug) << PMIxClient() << "pmix::lookup() not found: key=" << p.key;
|
||||
} else if (p.value.type == PMIX_STRING) {
|
||||
LOG(debug) << PMIxClient() << "pmix::lookup() found:"
|
||||
<< " key=" << p.key << ",value=" << p.value.data.string;
|
||||
SetProperty<string>(p.key, p.value.data.string);
|
||||
} else {
|
||||
LOG(debug) << PMIxClient() << "pmix::lookup() wrong type returned: "
|
||||
<< "key=" << p.key << ",type=" << p.value.type;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
auto PMIxPlugin::WaitForExitingAck() -> void
|
||||
{
|
||||
unique_lock<mutex> lock(fStateChangeSubscriberMutex);
|
||||
fExitingAcked.wait_for(lock, chrono::milliseconds(1000), [this]() {
|
||||
return fExitingAckedByLastExternalController;
|
||||
});
|
||||
}
|
||||
|
||||
} // namespace fair::mq::plugins
|
|
@ -1,87 +0,0 @@
|
|||
/********************************************************************************
|
||||
* Copyright (C) 2019-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
|
||||
#ifndef FAIR_MQ_PLUGINS_PMIX
|
||||
#define FAIR_MQ_PLUGINS_PMIX
|
||||
|
||||
#include "PMIx.hpp"
|
||||
#include "PMIxCommands.h"
|
||||
|
||||
#include <fairmq/Plugin.h>
|
||||
#include <fairmq/Version.h>
|
||||
#include <fairlogger/Logger.h>
|
||||
|
||||
#include <string>
|
||||
#include <sstream>
|
||||
#include <stdexcept>
|
||||
#include <string>
|
||||
#include <sys/types.h>
|
||||
#include <unistd.h>
|
||||
#include <vector>
|
||||
|
||||
namespace fair::mq::plugins
|
||||
{
|
||||
|
||||
class PMIxPlugin : public Plugin
|
||||
{
|
||||
public:
|
||||
PMIxPlugin(const std::string& name,
|
||||
const Plugin::Version version,
|
||||
const std::string& maintainer,
|
||||
const std::string& homepage,
|
||||
PluginServices* pluginServices);
|
||||
~PMIxPlugin();
|
||||
|
||||
auto PMIxClient() const -> std::string { return fPMIxClient; };
|
||||
|
||||
private:
|
||||
pmix::proc fProcess;
|
||||
pid_t fPid;
|
||||
std::string fPMIxClient;
|
||||
std::string fDeviceId;
|
||||
|
||||
std::set<uint32_t> fStateChangeSubscribers;
|
||||
// uint32_t fLastExternalController;
|
||||
bool fExitingAckedByLastExternalController;
|
||||
std::condition_variable fExitingAcked;
|
||||
std::mutex fStateChangeSubscriberMutex;
|
||||
|
||||
DeviceState fCurrentState;
|
||||
DeviceState fLastState;
|
||||
|
||||
auto Init() -> pmix::proc;
|
||||
auto Publish() -> void;
|
||||
auto Fence() -> void;
|
||||
auto Fence(const std::string& label) -> void;
|
||||
auto Lookup() -> void;
|
||||
|
||||
auto WaitForExitingAck() -> void;
|
||||
};
|
||||
|
||||
Plugin::ProgOptions PMIxProgramOptions()
|
||||
{
|
||||
boost::program_options::options_description options("PMIx Plugin");
|
||||
options.add_options()
|
||||
("pmix-dummy", boost::program_options::value<int>()->default_value(0), "Dummy.");
|
||||
return options;
|
||||
}
|
||||
|
||||
REGISTER_FAIRMQ_PLUGIN(
|
||||
PMIxPlugin, // Class name
|
||||
pmix, // Plugin name (string, lower case chars only)
|
||||
(Plugin::Version{FAIRMQ_VERSION_MAJOR,
|
||||
FAIRMQ_VERSION_MINOR,
|
||||
FAIRMQ_VERSION_PATCH}), // Version
|
||||
"FairRootGroup <fairroot@gsi.de>", // Maintainer
|
||||
"https://github.com/FairRootGroup/FairMQ", // Homepage
|
||||
PMIxProgramOptions // custom program options for the plugin
|
||||
)
|
||||
|
||||
} // namespace fair::mq::plugins
|
||||
|
||||
#endif /* FAIR_MQ_PLUGINS_PMIX */
|
|
@ -1,131 +0,0 @@
|
|||
/********************************************************************************
|
||||
* Copyright (C) 2014-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
|
||||
#include <fairmq/States.h>
|
||||
|
||||
#include <fairlogger/Logger.h>
|
||||
|
||||
#include "PMIx.hpp"
|
||||
#include "PMIxCommands.h"
|
||||
|
||||
#include <boost/program_options.hpp>
|
||||
|
||||
#include <condition_variable>
|
||||
#include <algorithm>
|
||||
#include <cstdlib>
|
||||
#include <iostream>
|
||||
#include <mutex>
|
||||
#include <string>
|
||||
#include <termios.h> // raw mode console input
|
||||
#include <thread>
|
||||
#include <utility>
|
||||
#include <unistd.h>
|
||||
#include <vector>
|
||||
|
||||
using namespace std;
|
||||
namespace bpo = boost::program_options;
|
||||
|
||||
const std::map<fair::mq::Transition, fair::mq::State> expected =
|
||||
{
|
||||
{ fair::mq::Transition::InitDevice, fair::mq::State::InitializingDevice },
|
||||
{ fair::mq::Transition::CompleteInit, fair::mq::State::Initialized },
|
||||
{ fair::mq::Transition::Bind, fair::mq::State::Bound },
|
||||
{ fair::mq::Transition::Connect, fair::mq::State::DeviceReady },
|
||||
{ fair::mq::Transition::InitTask, fair::mq::State::Ready },
|
||||
{ fair::mq::Transition::Run, fair::mq::State::Running },
|
||||
{ fair::mq::Transition::Stop, fair::mq::State::Ready },
|
||||
{ fair::mq::Transition::ResetTask, fair::mq::State::DeviceReady },
|
||||
{ fair::mq::Transition::ResetDevice, fair::mq::State::Idle },
|
||||
{ fair::mq::Transition::End, fair::mq::State::Exiting }
|
||||
};
|
||||
|
||||
struct MiniTopo
|
||||
{
|
||||
explicit MiniTopo(unsigned int n)
|
||||
: fState(n, fair::mq::State::Ok)
|
||||
{}
|
||||
|
||||
void WaitFor(const fair::mq::State state)
|
||||
{
|
||||
std::unique_lock<std::mutex> lk(fMtx);
|
||||
|
||||
fCV.wait(lk, [&](){
|
||||
unsigned int count = std::count_if(fState.cbegin(), fState.cend(), [=](const auto& s) {
|
||||
return s == state;
|
||||
});
|
||||
|
||||
bool result = count == fState.size();
|
||||
cout << "expecting " << state << " for " << fState.size() << " devices, found " << count << ", condition: " << result << endl;
|
||||
return result;
|
||||
});
|
||||
}
|
||||
|
||||
void Update(uint32_t rank, const fair::mq::State state)
|
||||
{
|
||||
try {
|
||||
{
|
||||
std::lock_guard<std::mutex> lk(fMtx);
|
||||
fState.at(rank - 1) = state;
|
||||
}
|
||||
fCV.notify_one();
|
||||
} catch (const std::exception& e) {
|
||||
LOG(error) << "Exception in Update: " << e.what();
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
vector<fair::mq::State> fState;
|
||||
std::mutex fMtx;
|
||||
std::condition_variable fCV;
|
||||
};
|
||||
|
||||
int main(int argc, char* argv[])
|
||||
{
|
||||
try {
|
||||
unsigned int numDevices;
|
||||
|
||||
bpo::options_description options("Common options");
|
||||
options.add_options()
|
||||
("number-devices,n", bpo::value<unsigned int>(&numDevices)->default_value(0), "Number of devices (will be removed in the future)")
|
||||
("help,h", "Produce help message");
|
||||
|
||||
bpo::variables_map vm;
|
||||
bpo::store(bpo::command_line_parser(argc, argv).options(options).run(), vm);
|
||||
|
||||
if (vm.count("help")) {
|
||||
cout << "FairMQ DDS Command UI" << endl << options << endl;
|
||||
cout << "Commands: [c] check state, [o] dump config, [h] help, [r] run, [s] stop, [t] reset task, [d] reset device, [q] end, [j] init task, [i] init device, [k] complete init, [b] bind, [x] connect" << endl;
|
||||
return EXIT_SUCCESS;
|
||||
}
|
||||
|
||||
bpo::notify(vm);
|
||||
|
||||
fair::Logger::SetConsoleSeverity(fair::Severity::debug);
|
||||
fair::Logger::SetConsoleColor(true);
|
||||
fair::Logger::SetVerbosity(fair::Verbosity::low);
|
||||
|
||||
pmix::proc process;
|
||||
|
||||
if (!pmix::initialized()) {
|
||||
process = pmix::init();
|
||||
LOG(warn) << "pmix::init() OK: " << process << ", version=" << pmix::get_version();
|
||||
}
|
||||
|
||||
pmix::proc all(process);
|
||||
all.rank = pmix::rank::wildcard;
|
||||
pmix::fence({all});
|
||||
LOG(warn) << "pmix::fence() [pmix::init] OK";
|
||||
|
||||
MiniTopo topo(numDevices);
|
||||
} catch (exception& e) {
|
||||
LOG(error) << "Error: " << e.what();
|
||||
return EXIT_FAILURE;
|
||||
}
|
||||
LOG(warn) << "exiting";
|
||||
return EXIT_SUCCESS;
|
||||
}
|
|
@ -6,7 +6,7 @@ LABEL org.opencontainers.image.description "FairMQ development environment"
|
|||
RUN dnf -y update
|
||||
# https://git.gsi.de/SDE/packages/builder
|
||||
RUN dnf -y install https://alfa-ci.gsi.de/packages/rpm/fedora-$VERSION-x86_64/fairsoft-release-dev.rpm
|
||||
RUN dnf -y install clang cli11-devel pmix-devel ninja-build 'dnf-command(builddep)' libasan liblsan libtsan libubsan clang-tools-extra
|
||||
RUN dnf -y install clang cli11-devel ninja-build 'dnf-command(builddep)' libasan liblsan libtsan libubsan clang-tools-extra
|
||||
RUN dnf -y builddep fairmq
|
||||
RUN dnf -y clean all
|
||||
|
||||
|
|
|
@ -7,7 +7,7 @@ LABEL org.opencontainers.image.description "FairMQ development environment"
|
|||
RUN echo 'debconf debconf/frontend select Noninteractive' | debconf-set-selections
|
||||
RUN apt-get update
|
||||
RUN apt-get -y upgrade
|
||||
RUN apt-get -y install ca-certificates patch cmake git libboost-dev libboost-log-dev libboost-system-dev libboost-regex-dev libboost-filesystem-dev libboost-container-dev libboost-thread-dev libboost-date-time-dev libboost-program-options-dev g++ libfmt-dev ninja-build wget libczmq-dev libxml2-utils libfabric-dev libfabric-bin libpmix-dev pkg-config
|
||||
RUN apt-get -y install ca-certificates patch cmake git libboost-dev libboost-log-dev libboost-system-dev libboost-regex-dev libboost-filesystem-dev libboost-container-dev libboost-thread-dev libboost-date-time-dev libboost-program-options-dev g++ libfmt-dev ninja-build wget libczmq-dev libxml2-utils libfabric-dev libfabric-bin pkg-config
|
||||
RUN apt-get -y clean
|
||||
|
||||
RUN cd /tmp
|
||||
|
|
Loading…
Reference in New Issue
Block a user