PMIx: Add commands to plugin and command ui

This commit is contained in:
Alexey Rybalchenko 2020-01-06 19:48:03 +01:00 committed by Dennis Klein
parent fd2bac3e22
commit c290c16896
8 changed files with 864 additions and 82 deletions

View File

@ -18,7 +18,7 @@ get_git_version()
project(FairMQ VERSION ${PROJECT_VERSION} LANGUAGES CXX) project(FairMQ VERSION ${PROJECT_VERSION} LANGUAGES CXX)
message(STATUS "${BWhite}${PROJECT_NAME}${CR} ${PROJECT_GIT_VERSION} from ${PROJECT_DATE}") message(STATUS "${BWhite}${PROJECT_NAME}${CR} ${PROJECT_GIT_VERSION} from ${PROJECT_DATE}")
if(BUILD_OFI_TRANSPORT OR BUILD_SDK) if(BUILD_OFI_TRANSPORT OR BUILD_SDK OR BUILD_PMIX_PLUGIN)
set(PROJECT_MIN_CXX_STANDARD 14) set(PROJECT_MIN_CXX_STANDARD 14)
else() else()
set(PROJECT_MIN_CXX_STANDARD 11) set(PROJECT_MIN_CXX_STANDARD 11)
@ -44,7 +44,7 @@ fairmq_build_option(BUILD_SDK_COMMANDS "Build the FairMQ SDK commands."
fairmq_build_option(BUILD_DDS_PLUGIN "Build DDS plugin." fairmq_build_option(BUILD_DDS_PLUGIN "Build DDS plugin."
DEFAULT OFF REQUIRES "BUILD_FAIRMQ;BUILD_SDK_COMMANDS") DEFAULT OFF REQUIRES "BUILD_FAIRMQ;BUILD_SDK_COMMANDS")
fairmq_build_option(BUILD_PMIX_PLUGIN "Build PMIx plugin." fairmq_build_option(BUILD_PMIX_PLUGIN "Build PMIx plugin."
DEFAULT OFF REQUIRES "BUILD_FAIRMQ") DEFAULT OFF REQUIRES "BUILD_FAIRMQ;BUILD_SDK_COMMANDS")
fairmq_build_option(BUILD_EXAMPLES "Build FairMQ examples." fairmq_build_option(BUILD_EXAMPLES "Build FairMQ examples."
DEFAULT ON REQUIRES "BUILD_FAIRMQ") DEFAULT ON REQUIRES "BUILD_FAIRMQ")
fairmq_build_option(BUILD_SDK "Build the FairMQ controller SDK." fairmq_build_option(BUILD_SDK "Build the FairMQ controller SDK."
@ -397,9 +397,9 @@ else()
endif() endif()
message(STATUS " ${BWhite}dds_plugin${CR} ${dds_summary}") message(STATUS " ${BWhite}dds_plugin${CR} ${dds_summary}")
if(BUILD_PMIX_PLUGIN) if(BUILD_PMIX_PLUGIN)
set(pmix_summary "${BGreen}YES${CR} (disable with ${BMagenta}-DBUILD_PMIX_PLUGIN=OFF${CR})") set(pmix_summary "${BGreen}YES${CR} EXPERIMENTAL (requires C++14) (disable with ${BMagenta}-DBUILD_PMIX_PLUGIN=OFF${CR})")
else() else()
set(pmix_summary "${BRed} NO${CR} (default, enable with ${BMagenta}-DBUILD_PMIX_PLUGIN=ON${CR})") set(pmix_summary "${BRed} NO${CR} EXPERIMENTAL (requires C++14) (default, enable with ${BMagenta}-DBUILD_PMIX_PLUGIN=ON${CR})")
endif() endif()
message(STATUS " ${BWhite}pmix_plugin${CR} ${pmix_summary}") message(STATUS " ${BWhite}pmix_plugin${CR} ${pmix_summary}")
if(BUILD_EXAMPLES) if(BUILD_EXAMPLES)

View File

@ -6,16 +6,6 @@
# copied verbatim in the file "LICENSE" # # copied verbatim in the file "LICENSE" #
################################################################################ ################################################################################
####################
# external plugins #
####################
if(BUILD_DDS_PLUGIN)
add_subdirectory(plugins/DDS)
endif()
if(BUILD_PMIX_PLUGIN)
add_subdirectory(plugins/PMIx)
endif()
if(BUILD_FAIRMQ OR BUILD_SDK) if(BUILD_FAIRMQ OR BUILD_SDK)
########### ###########
# Version # # Version #
@ -473,3 +463,13 @@ endif()
if(BUILD_SDK) if(BUILD_SDK)
add_subdirectory(sdk) add_subdirectory(sdk)
endif() endif()
####################
# external plugins #
####################
if(BUILD_DDS_PLUGIN)
add_subdirectory(plugins/DDS)
endif()
if(BUILD_PMIX_PLUGIN)
add_subdirectory(plugins/PMIx)
endif()

View File

@ -10,9 +10,10 @@ set(plugin FairMQPlugin_pmix)
add_library(${plugin} SHARED add_library(${plugin} SHARED
${CMAKE_CURRENT_SOURCE_DIR}/PMIxPlugin.cxx ${CMAKE_CURRENT_SOURCE_DIR}/PMIxPlugin.cxx
${CMAKE_CURRENT_SOURCE_DIR}/PMIxPlugin.h ${CMAKE_CURRENT_SOURCE_DIR}/PMIxPlugin.h
${CMAKE_CURRENT_SOURCE_DIR}/PMIxCommands.h
${CMAKE_CURRENT_SOURCE_DIR}/PMIx.hpp ${CMAKE_CURRENT_SOURCE_DIR}/PMIx.hpp
) )
target_link_libraries(${plugin} FairMQ PMIx::libpmix) target_link_libraries(${plugin} PUBLIC FairMQ PMIx::libpmix PRIVATE Commands)
target_include_directories(${plugin} PRIVATE ${CMAKE_CURRENT_BINARY_DIR}) target_include_directories(${plugin} PRIVATE ${CMAKE_CURRENT_BINARY_DIR})
set_target_properties(${plugin} PROPERTIES set_target_properties(${plugin} PROPERTIES
CXX_VISIBILITY_PRESET hidden CXX_VISIBILITY_PRESET hidden
@ -20,7 +21,12 @@ set_target_properties(${plugin} PROPERTIES
SOVERSION "${PROJECT_VERSION_MAJOR}.${PROJECT_VERSION_MINOR}" SOVERSION "${PROJECT_VERSION_MAJOR}.${PROJECT_VERSION_MINOR}"
) )
install(TARGETS ${plugin} set(exe fairmq-pmix-command-ui)
add_executable(${exe} ${CMAKE_CURRENT_SOURCE_DIR}/runPMIxCommandUI.cxx)
target_link_libraries(${exe} FairMQ Commands StateMachine PMIx::libpmix)
target_include_directories(${exe} PRIVATE ${CMAKE_CURRENT_BINARY_DIR})
install(TARGETS ${plugin} ${exe}
EXPORT ${PROJECT_EXPORT_SET} EXPORT ${PROJECT_EXPORT_SET}
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR} LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR} RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}

View File

@ -9,17 +9,18 @@
#ifndef PMIX_HPP #ifndef PMIX_HPP
#define PMIX_HPP #define PMIX_HPP
#include <array>
#include <cstring> #include <cstring>
#include <functional>
#include <limits> #include <limits>
#include <memory> #include <memory>
#include <ostream> #include <ostream>
#include <pmix.h> #include <pmix.h>
#include <sstream>
#include <stdexcept> #include <stdexcept>
#include <string.h>
#include <type_traits> #include <type_traits>
#include <utility> #include <utility>
#include <vector> #include <vector>
#include <FairMQLogger.h>
// C++ PMIx v2.2 API // C++ PMIx v2.2 API
namespace pmix namespace pmix
@ -69,7 +70,7 @@ struct proc : pmix_proc_t
friend std::ostream& operator<<(std::ostream& os, const proc& p) friend std::ostream& operator<<(std::ostream& os, const proc& p)
{ {
return os << "nspace=" << p.nspace << ",rank=" << p.rank; return os << p.nspace << "_" << p.rank;
} }
}; };
@ -80,7 +81,6 @@ struct value : pmix_value_t
value(const value& rhs) value(const value& rhs)
{ {
LOG(warn) << "copy ctor";
status rc; status rc;
auto lhs(static_cast<pmix_value_t*>(this)); auto lhs(static_cast<pmix_value_t*>(this));
PMIX_VALUE_XFER(rc, lhs, static_cast<pmix_value_t*>(const_cast<value*>(&rhs))); PMIX_VALUE_XFER(rc, lhs, static_cast<pmix_value_t*>(const_cast<value*>(&rhs)));
@ -111,6 +111,11 @@ struct value : pmix_value_t
{ {
PMIX_VALUE_LOAD(static_cast<pmix_value_t*>(this), &val, PMIX_INT); PMIX_VALUE_LOAD(static_cast<pmix_value_t*>(this), &val, PMIX_INT);
} }
explicit value(pmix_data_array_t* val)
{
PMIX_VALUE_LOAD(static_cast<pmix_value_t*>(this), val, PMIX_DATA_ARRAY);
}
}; };
struct info : pmix_info_t struct info : pmix_info_t
@ -134,9 +139,15 @@ struct info : pmix_info_t
} }
} }
friend std::ostream& operator<<(std::ostream& os, const info& p) friend std::ostream& operator<<(std::ostream& os, const info& i)
{ {
return os << "key=" << p.key << ",value='" << p.value.data.string << "'"; return os << "key=" << i.key << ",value='" << i.value.data.string << "'";
}
info(const info& rhs)
{
PMIX_INFO_XFER(static_cast<pmix_info_t*>(this),
static_cast<pmix_info_t*>(const_cast<info*>(&rhs)));
} }
}; };
@ -144,6 +155,7 @@ struct pdata : pmix_pdata_t
{ {
pdata() { PMIX_PDATA_CONSTRUCT(static_cast<pmix_pdata_t*>(this)); } pdata() { PMIX_PDATA_CONSTRUCT(static_cast<pmix_pdata_t*>(this)); }
~pdata() { PMIX_PDATA_DESTRUCT(static_cast<pmix_pdata_t*>(this)); } ~pdata() { PMIX_PDATA_DESTRUCT(static_cast<pmix_pdata_t*>(this)); }
pdata(const pdata& rhs) pdata(const pdata& rhs)
{ {
PMIX_PDATA_XFER(static_cast<pmix_pdata_t*>(this), PMIX_PDATA_XFER(static_cast<pmix_pdata_t*>(this),
@ -171,7 +183,7 @@ auto init(const std::vector<info>& info = {}) -> proc
auto initialized() -> bool { return !!PMIx_Initialized(); } auto initialized() -> bool { return !!PMIx_Initialized(); }
auto get_version() -> const char* { return PMIx_Get_version(); } auto get_version() -> std::string { return {PMIx_Get_version()}; }
auto finalize(const std::vector<info>& info = {}) -> void auto finalize(const std::vector<info>& info = {}) -> void
{ {
@ -213,6 +225,92 @@ auto lookup(std::vector<pdata>& pdata, const std::vector<info>& info = {}) -> vo
} }
} }
std::string get_info(const std::string& name, pmix::proc& process)
{
pmix_value_t* v;
pmix::status rc = PMIx_Get(&process, name.c_str(), nullptr, 0, &v);
if (rc == PMIX_SUCCESS) {
std::stringstream ss;
switch (v->type) {
case PMIX_SIZE: ss << static_cast<size_t>(v->data.size) << " (size_t)"; break;
case PMIX_INT: ss << static_cast<int>(v->data.integer) << " (int)"; break;
case PMIX_INT8: ss << static_cast<int8_t>(v->data.int8) << " (int8_t)"; break;
case PMIX_INT16: ss << static_cast<int16_t>(v->data.int16) << " (int16_t)"; break;
case PMIX_INT32: ss << static_cast<int32_t>(v->data.int32) << " (int32_t)"; break;
case PMIX_INT64: ss << static_cast<int64_t>(v->data.int64) << " (int64_t)"; break;
case PMIX_UINT: ss << static_cast<unsigned int>(v->data.uint) << " (unsigned int)"; break;
case PMIX_UINT8: ss << static_cast<uint8_t>(v->data.uint8) << " (uint8_t)"; break;
case PMIX_UINT16: ss << static_cast<uint16_t>(v->data.uint16) << " (uint16_t)"; break;
case PMIX_UINT32: ss << static_cast<uint32_t>(v->data.uint32) << " (uint32_t)"; break;
case PMIX_UINT64: ss << static_cast<uint64_t>(v->data.uint64) << " (uint64_t)"; break;
case PMIX_FLOAT: ss << static_cast<float>(v->data.fval) << " (float)"; break;
case PMIX_DOUBLE: ss << static_cast<double>(v->data.dval) << " (double)"; break;
case PMIX_PID: ss << static_cast<pid_t>(v->data.pid) << " (pid_t)"; break;
case PMIX_STRING: ss << static_cast<char*>(v->data.string) << " (string)"; break;
case PMIX_PROC_RANK: ss << static_cast<uint32_t>(v->data.rank) << " (pmix_rank_t)"; break;
case PMIX_PROC: ss << "proc.nspace: " << static_cast<pmix_proc_t*>(v->data.proc)->nspace
<< ", proc.rank: " << static_cast<pmix_proc_t*>(v->data.proc)->rank << " (pmix_proc_t*)"; break;
default:
ss << "unknown type: " << v->type;
break;
}
return ss.str();
} else if (rc == PMIX_ERR_NOT_FOUND) {
// LOG(error) << "PMIx_Get failed: PMIX_ERR_NOT_FOUND";
return "";
} else {
// LOG(error) << "PMIx_Get failed: " << rc;
return "<undefined>";
}
}
std::string get_value_str(const pmix_value_t& v)
{
switch (v.type) {
case PMIX_BOOL: return std::to_string(static_cast<bool>(v.data.flag));
case PMIX_SIZE: return std::to_string(static_cast<size_t>(v.data.size));
case PMIX_INT: return std::to_string(static_cast<int>(v.data.integer));
case PMIX_INT8: return std::to_string(static_cast<int8_t>(v.data.int8));
case PMIX_INT16: return std::to_string(static_cast<int16_t>(v.data.int16));
case PMIX_INT32: return std::to_string(static_cast<int32_t>(v.data.int32));
case PMIX_INT64: return std::to_string(static_cast<int64_t>(v.data.int64));
case PMIX_UINT: return std::to_string(static_cast<unsigned int>(v.data.uint));
case PMIX_UINT8: return std::to_string(static_cast<uint8_t>(v.data.uint8));
case PMIX_UINT16: return std::to_string(static_cast<uint16_t>(v.data.uint16));
case PMIX_UINT32: return std::to_string(static_cast<uint32_t>(v.data.uint32));
case PMIX_UINT64: return std::to_string(static_cast<uint64_t>(v.data.uint64));
case PMIX_FLOAT: return std::to_string(static_cast<float>(v.data.fval));
case PMIX_DOUBLE: return std::to_string(static_cast<double>(v.data.dval));
case PMIX_PID: return std::to_string(static_cast<pid_t>(v.data.pid));
case PMIX_STRING: return static_cast<char*>(v.data.string);
case PMIX_PROC_RANK: return std::to_string(static_cast<uint32_t>(v.data.rank));
case PMIX_POINTER: { std::stringstream ss; ss << static_cast<void*>(v.data.ptr); return ss.str(); }
case PMIX_DATA_ARRAY: {
if (v.data.darray->type == PMIX_PROC) {
std::stringstream ss;
ss << "[";
for (size_t i = 0; i < v.data.darray->size; ++i) {
ss << static_cast<pmix_proc_t*>(static_cast<pmix_data_array_t*>(v.data.darray)->array)[0].nspace;
ss << "_";
ss << static_cast<pmix_proc_t*>(static_cast<pmix_data_array_t*>(v.data.darray)->array)[0].rank;
if (i < v.data.darray->size - 1) {
ss << ",";
}
}
ss << "]";
return ss.str();
} else {
return "UNKNOWN TYPE IN DATA ARRAY";
}
}
default: return "UNKNOWN TYPE";
}
}
} /* namespace pmix */ } /* namespace pmix */
#endif /* PMIX_HPP */ #endif /* PMIX_HPP */

View File

@ -0,0 +1,291 @@
/********************************************************************************
* Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef PMIXCOMMANDS_H
#define PMIXCOMMANDS_H
#include "PMIx.hpp"
#include <FairMQLogger.h>
#include <fairmq/tools/Semaphore.h>
#include <fairmq/tools/CppSTL.h>
#include <string>
namespace pmix
{
std::array<std::string, 47> typeNames =
{
{
"PMIX_UNDEF",
"PMIX_BOOL",
"PMIX_BYTE",
"PMIX_STRING",
"PMIX_SIZE",
"PMIX_PID",
"PMIX_INT",
"PMIX_INT8",
"PMIX_INT16",
"PMIX_INT32",
"PMIX_INT64",
"PMIX_UINT",
"PMIX_UINT8",
"PMIX_UINT16",
"PMIX_UINT32",
"PMIX_UINT64",
"PMIX_FLOAT",
"PMIX_DOUBLE",
"PMIX_TIMEVAL",
"PMIX_TIME",
"PMIX_STATUS",
"PMIX_VALUE",
"PMIX_PROC",
"PMIX_APP",
"PMIX_INFO",
"PMIX_PDATA",
"PMIX_BUFFER",
"PMIX_BYTE_OBJECT",
"PMIX_KVAL",
"PMIX_MODEX",
"PMIX_PERSIST",
"PMIX_POINTER",
"PMIX_SCOPE",
"PMIX_DATA_RANGE",
"PMIX_COMMAND",
"PMIX_INFO_DIRECTIVES",
"PMIX_DATA_TYPE",
"PMIX_PROC_STATE",
"PMIX_PROC_INFO",
"PMIX_DATA_ARRAY",
"PMIX_PROC_RANK",
"PMIX_QUERY",
"PMIX_COMPRESSED_STRING",
"PMIX_ALLOC_DIRECTIVE",
"PMIX_INFO_ARRAY",
"PMIX_IOF_CHANNEL",
"PMIX_ENVAR"
}
};
enum class Command : int
{
general = PMIX_EXTERNAL_ERR_BASE,
error = PMIX_EXTERNAL_ERR_BASE - 1
};
class Commands
{
public:
Commands(const proc& process)
: fProcess(process)
, fSubscribed(false)
{
}
~Commands()
{
Unsubscribe();
}
void Subscribe(std::function<void(const std::string& msg, const proc& sender)> callback)
{
using namespace std::placeholders;
LOG(debug) << "PMIxCommands: Subscribing...";
fCallback = callback;
std::array<pmix::status, 1> codes;
codes[0] = static_cast<int>(pmix::Command::general);
PMIX_INFO_LOAD(&(fInfos[0]), PMIX_EVENT_RETURN_OBJECT, this, PMIX_POINTER);
PMIx_Register_event_handler(codes.data(), codes.size(),
fInfos.data(), fInfos.size(),
&Commands::Handler,
&Commands::EventHandlerRegistration,
this);
fBlocker.Wait();
LOG(debug) << "PMIxCommands: Subscribing complete!";
}
void Unsubscribe()
{
if (fSubscribed) {
LOG(debug) << "PMIxCommands: Unsubscribing...";
PMIx_Deregister_event_handler(fHandlerRef, &Commands::EventHandlerDeregistration, this);
fBlocker.Wait();
LOG(debug) << "PMIxCommands: Unsubscribing complete!";
} else {
LOG(debug) << "Unsubscribe() is called while no subscription is active";
}
}
struct Holder
{
Holder() : fData(nullptr) {}
~Holder() { PMIX_DATA_ARRAY_FREE(fData); }
std::vector<pmix::info> fInfos;
pmix_data_array_t* fData;
};
void Send(const std::string& msg)
{
std::vector<pmix::info>* infos = new std::vector<pmix::info>();
infos->emplace_back("fairmq.cmd", msg);
PMIx_Notify_event(static_cast<int>(pmix::Command::general),
&fProcess,
PMIX_RANGE_NAMESPACE,
infos->data(), infos->size(),
&Commands::OpCompleteCallback<std::vector<pmix::info>>,
infos);
}
void Send(const std::string& msg, rank rank)
{
pmix::proc destination(fProcess);
destination.rank = rank;
Send(msg, {destination});
}
void Send(const std::string& msg, const std::vector<proc>& destination)
{
std::unique_ptr<Holder> holder = fair::mq::tools::make_unique<Holder>();
PMIX_DATA_ARRAY_CREATE(holder->fData, destination.size(), PMIX_PROC);
memcpy(holder->fData->array, destination.data(), destination.size() * sizeof(pmix_proc_t));
// LOG(warn) << "OLOG: " << msg << " > " << static_cast<pmix_proc_t*>(holder->fData->array)[0].nspace << ": " << static_cast<pmix_proc_t*>(holder->fData->array)[0].rank;
holder->fInfos.emplace_back(PMIX_EVENT_CUSTOM_RANGE, holder->fData);
// LOG(warn) << msg << " // packed range: " << static_cast<pmix_proc_t*>(static_cast<pmix_data_array_t*>(holder->fInfos.at(0).value.data.darray)->array)[0].nspace << "_" << static_cast<pmix_proc_t*>(static_cast<pmix_data_array_t*>(holder->fInfos.at(0).value.data.darray)->array)[0].rank;
// LOG(warn) << msg << " // packed range.type: " << pmix::typeNames.at(holder->fInfos.at(0).value.type);
// LOG(warn) << msg << " // packed range.array.type: " << pmix::typeNames.at(static_cast<pmix_data_array_t*>(holder->fInfos.at(0).value.data.darray)->type);
// LOG(warn) << msg << " // packed range.array.size: " << static_cast<pmix_data_array_t*>(holder->fInfos.at(0).value.data.darray)->size;
// LOG(warn) << holder->fInfos.size();
holder->fInfos.emplace_back("fairmq.cmd", msg);
// LOG(warn) << msg << " // packed msg: " << holder->fInfos.at(1).value.data.string;
// LOG(warn) << msg << " // packed msg.type: " << pmix::typeNames.at(holder->fInfos.at(1).value.type);
// LOG(warn) << holder->fInfos.size();
PMIx_Notify_event(static_cast<int>(pmix::Command::general),
&fProcess,
PMIX_RANGE_CUSTOM,
holder->fInfos.data(), holder->fInfos.size(),
&Commands::OpCompleteCallback<Holder>,
holder.get());
holder.release();
}
private:
static void EventHandlerRegistration(pmix_status_t s, size_t handlerRef, void* obj)
{
if (s == PMIX_SUCCESS) {
LOG(debug) << "Successfully registered event handler, reference = " << static_cast<unsigned long>(handlerRef);
static_cast<Commands*>(obj)->fHandlerRef = handlerRef;
static_cast<Commands*>(obj)->fSubscribed = true;
} else {
LOG(error) << "Could not register PMIx event handler, status = " << s;
}
static_cast<Commands*>(obj)->fBlocker.Signal();
}
static void EventHandlerDeregistration(pmix_status_t s, void* obj)
{
if (s == PMIX_SUCCESS) {
LOG(debug) << "Successfully deregistered event handler, reference = " << static_cast<Commands*>(obj)->fHandlerRef;
static_cast<Commands*>(obj)->fSubscribed = false;
} else {
LOG(error) << "Could not deregister PMIx event handler, reference = " << static_cast<Commands*>(obj)->fHandlerRef << ", status = " << s;
}
static_cast<Commands*>(obj)->fBlocker.Signal();
}
template<typename T>
static void OpCompleteCallback(pmix_status_t s, void* data)
{
if (s == PMIX_SUCCESS) {
// LOG(info) << "Operation completed successfully";
} else {
LOG(error) << "Could not complete operation, status = " << s;
}
if (data) {
// LOG(warn) << "Destroying event data...";
delete static_cast<T*>(data);
}
}
static void Handler(size_t handlerId,
pmix_status_t s,
const pmix_proc_t* src,
pmix_info_t info[], size_t ninfo,
pmix_info_t[] /* results */, size_t nresults,
pmix_event_notification_cbfunc_fn_t cbfunc,
void* cbdata)
{
std::stringstream ss;
ss << "Event handler called with "
<< "status: " << s << ", "
<< "source: " << src->nspace << "_" << src->rank << ", "
<< "ninfo: " << ninfo << ", "
<< "nresults: " << nresults << ", "
<< "handlerId: " << handlerId;
std::string msg;
Commands* obj = nullptr;
if (ninfo > 0) {
ss << ":\n";
for (size_t i = 0; i < ninfo; ++i) {
ss << " [" << i << "]: key: '" << info[i].key
<< "', value: '" << pmix::get_value_str(info[i].value)
<< "', value.type: '" << pmix::typeNames.at(info[i].value.type)
<< "', flags: " << info[i].flags;
if (std::strcmp(info[i].key, "fairmq.cmd") == 0) {
msg = pmix::get_value_str(info[i].value);
}
if (std::strcmp(info[i].key, PMIX_EVENT_RETURN_OBJECT) == 0) {
obj = static_cast<Commands*>(info[i].value.data.ptr);
}
if (i < ninfo - 1) {
ss << "\n";
}
}
}
if (obj != nullptr) {
if (static_cast<Commands*>(obj)->fProcess.rank != src->rank) {
// LOG(warn) << ss.str();
static_cast<Commands*>(obj)->fCallback(msg, proc(const_cast<char*>(src->nspace), rank(src->rank)));
} else {
// LOG(trace) << "suppressing message from itself";
}
} else {
LOG(ERROR) << "ERROR";
}
if (cbfunc != nullptr) {
cbfunc(PMIX_SUCCESS, nullptr, 0, nullptr, nullptr, cbdata);
}
}
const proc& fProcess;
size_t fHandlerRef;
std::function<void(const std::string& msg, const proc& sender)> fCallback;
std::array<pmix_info_t, 1> fInfos;
bool fSubscribed;
fair::mq::tools::SharedSemaphore fBlocker;
};
} /* namespace pmix */
#endif /* PMIXCOMMANDS_H */

View File

@ -8,8 +8,15 @@
#include "PMIxPlugin.h" #include "PMIxPlugin.h"
#include <fairmq/sdk/commands/Commands.h>
#include <fairmq/Tools.h> #include <fairmq/Tools.h>
#include <sstream>
#include <stdexcept> #include <stdexcept>
#include <cstdint> // UINT32_MAX
using namespace std;
using namespace fair::mq::sdk::cmd;
namespace fair namespace fair
{ {
@ -18,47 +25,99 @@ namespace mq
namespace plugins namespace plugins
{ {
PMIxPlugin::PMIxPlugin(const std::string& name, PMIxPlugin::PMIxPlugin(const string& name,
const Plugin::Version version, const Plugin::Version version,
const std::string& maintainer, const string& maintainer,
const std::string& homepage, const string& homepage,
PluginServices* pluginServices) PluginServices* pluginServices)
: Plugin(name, version, maintainer, homepage, pluginServices) : Plugin(name, version, maintainer, homepage, pluginServices)
, fProcess(Init())
, fPid(getpid()) , fPid(getpid())
, fPMIxClient(tools::ToString("PMIx client(pid=", fPid, ") "))
, fDeviceId(string(fProcess.nspace) + "_" + to_string(fProcess.rank))
, fCommands(fProcess)
, fLastExternalController(UINT32_MAX)
, fExitingAckedByLastExternalController(false)
, fCurrentState(DeviceState::Idle)
, fLastState(DeviceState::Idle)
{ {
Init(); TakeDeviceControl();
SetProperty<std::string>("id", std::string(fProc.nspace) + "_" + std::to_string(fProc.rank)); LOG(debug) << PMIxClient() << "pmix::init() OK: " << fProcess << ", version=" << pmix::get_version();
Fence(); SetProperty<string>("id", fDeviceId);
SubscribeToDeviceStateChange([&](DeviceState newState) { Fence("pmix::init");
SubscribeForCommands();
Fence("subscribed");
// fCommands.Send("test1");
// fCommands.Send("test2", 0);
// fCommands.Send("test3", 0);
// LOG(info) << "PMIX_EXTERNAL_ERR_BASE: " << PMIX_EXTERNAL_ERR_BASE;
// job level infos
// LOG(info) << "PMIX_SESSION_ID: " << pmix::getInfo(PMIX_SESSION_ID, fProcess);
// LOG(info) << "PMIX_UNIV_SIZE: " << pmix::getInfo(PMIX_UNIV_SIZE, fProcess);
// LOG(info) << "PMIX_JOB_SIZE: " << pmix::getInfo(PMIX_JOB_SIZE, fProcess);
// LOG(info) << "PMIX_JOB_NUM_APPS: " << pmix::getInfo(PMIX_JOB_NUM_APPS, fProcess);
// LOG(info) << "PMIX_APP_SIZE: " << pmix::getInfo(PMIX_APP_SIZE, fProcess);
// LOG(info) << "PMIX_MAX_PROCS: " << pmix::getInfo(PMIX_MAX_PROCS, fProcess);
// LOG(info) << "PMIX_NUM_NODES: " << pmix::getInfo(PMIX_NUM_NODES, fProcess);
// LOG(info) << "PMIX_CLUSTER_ID: " << pmix::getInfo(PMIX_CLUSTER_ID, fProcess);
// LOG(info) << "PMIX_NSPACE: " << pmix::getInfo(PMIX_NSPACE, fProcess);
// LOG(info) << "PMIX_JOBID: " << pmix::getInfo(PMIX_JOBID, fProcess);
// LOG(info) << "PMIX_NODE_LIST: " << pmix::getInfo(PMIX_NODE_LIST, fProcess);
// LOG(info) << "PMIX_ALLOCATED_NODELIST: " << pmix::getInfo(PMIX_ALLOCATED_NODELIST, fProcess);
// LOG(info) << "PMIX_NPROC_OFFSET: " << pmix::getInfo(PMIX_NPROC_OFFSET, fProcess);
// LOG(info) << "PMIX_LOCALLDR: " << pmix::getInfo(PMIX_LOCALLDR, fProcess);
// LOG(info) << "PMIX_APPLDR: " << pmix::getInfo(PMIX_APPLDR, fProcess);
// // per-node information
// LOG(info) << "PMIX_NODE_SIZE: " << pmix::getInfo(PMIX_NODE_SIZE, fProcess);
// LOG(info) << "PMIX_LOCAL_SIZE: " << pmix::getInfo(PMIX_LOCAL_SIZE, fProcess);
// LOG(info) << "PMIX_AVAIL_PHYS_MEMORY: " << pmix::getInfo(PMIX_AVAIL_PHYS_MEMORY, fProcess);
// // per-process information
// LOG(info) << "PMIX_PROCID: " << pmix::getInfo(PMIX_PROCID, fProcess);
// LOG(info) << "PMIX_APPNUM: " << pmix::getInfo(PMIX_APPNUM, fProcess);
// LOG(info) << "PMIX_LOCAL_RANK: " << pmix::getInfo(PMIX_LOCAL_RANK, fProcess);
// LOG(info) << "PMIX_NODE_RANK: " << pmix::getInfo(PMIX_NODE_RANK, fProcess);
// LOG(info) << "PMIX_RANK: " << pmix::getInfo(PMIX_RANK, fProcess);
// LOG(info) << "PMIX_GLOBAL_RANK: " << pmix::getInfo(PMIX_GLOBAL_RANK, fProcess);
// LOG(info) << "PMIX_APP_RANK: " << pmix::getInfo(PMIX_APP_RANK, fProcess);
SubscribeToDeviceStateChange([this](DeviceState newState) {
switch (newState) { switch (newState) {
case DeviceState::Idle:
Fence();
break;
case DeviceState::Bound: case DeviceState::Bound:
Publish(); Publish();
Fence();
break; break;
case DeviceState::Connecting: case DeviceState::Connecting:
Lookup(); Lookup();
break; break;
case DeviceState::DeviceReady:
Fence();
break;
case DeviceState::Ready:
Fence();
break;
case DeviceState::Exiting: case DeviceState::Exiting:
ReleaseDeviceControl();
UnsubscribeFromDeviceStateChange(); UnsubscribeFromDeviceStateChange();
break; break;
default: default:
break; break;
} }
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
fLastState = fCurrentState;
fCurrentState = newState;
for (auto subscriberId : fStateChangeSubscribers) {
LOG(debug) << "Publishing state-change: " << fLastState << "->" << newState << " to " << subscriberId;
Cmds cmds(make<StateChange>(fDeviceId, 0, fLastState, fCurrentState));
fCommands.Send(cmds.Serialize(Format::JSON), static_cast<pmix::rank>(subscriberId));
}
}); });
} }
PMIxPlugin::~PMIxPlugin() PMIxPlugin::~PMIxPlugin()
{ {
LOG(debug) << "Destroying PMIxPlugin";
ReleaseDeviceControl();
fCommands.Unsubscribe();
while (pmix::initialized()) { while (pmix::initialized()) {
try { try {
pmix::finalize(); pmix::finalize();
@ -69,33 +128,112 @@ PMIxPlugin::~PMIxPlugin()
} }
} }
auto PMIxPlugin::PMIxClient() const -> std::string auto PMIxPlugin::SubscribeForCommands() -> void
{ {
std::stringstream ss; fCommands.Subscribe([this](const string& cmdStr, const pmix::proc& sender) {
ss << "PMIx client(pid=" << fPid << ") "; // LOG(info) << "PMIx Plugin received message: '" << cmdStr << "', from " << sender;
return ss.str();
Cmds inCmds;
inCmds.Deserialize(cmdStr, Format::JSON);
for (const auto& cmd : inCmds) {
LOG(info) << "Received command type: '" << cmd->GetType() << "' from " << sender;
switch (cmd->GetType()) {
case Type::check_state:
fCommands.Send(Cmds(make<CurrentState>(fDeviceId, GetCurrentDeviceState()))
.Serialize(Format::JSON),
{sender});
break;
case Type::change_state: {
Transition transition = static_cast<ChangeState&>(*cmd).GetTransition();
if (ChangeDeviceState(transition)) {
fCommands.Send(
Cmds(make<TransitionStatus>(fDeviceId, Result::Ok, transition))
.Serialize(Format::JSON),
{sender});
} else {
fCommands.Send(
Cmds(make<TransitionStatus>(fDeviceId, Result::Failure, transition))
.Serialize(Format::JSON),
{sender});
}
{
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
fLastExternalController = sender.rank;
}
}
break;
case Type::subscribe_to_state_change: {
{
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
fStateChangeSubscribers.insert(sender.rank);
} }
auto PMIxPlugin::Init() -> void LOG(debug) << "Publishing state-change: " << fLastState << "->" << fCurrentState
<< " to " << sender;
Cmds outCmds(make<StateChangeSubscription>(fDeviceId, Result::Ok),
make<StateChange>(fDeviceId, 0, fLastState, fCurrentState));
fCommands.Send(outCmds.Serialize(Format::JSON), {sender});
}
break;
case Type::unsubscribe_from_state_change: {
{
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
fStateChangeSubscribers.erase(sender.rank);
}
fCommands.Send(Cmds(make<StateChangeUnsubscription>(fDeviceId, Result::Ok))
.Serialize(Format::JSON),
{sender});
}
break;
case Type::state_change_exiting_received: {
{
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
if (fLastExternalController == sender.rank) {
fExitingAckedByLastExternalController = true;
}
}
fExitingAcked.notify_one();
}
break;
case Type::dump_config: {
stringstream ss;
for (const auto& k: GetPropertyKeys()) {
ss << fDeviceId << ": " << k << " -> " << GetPropertyAsString(k) << "\n";
}
fCommands.Send(Cmds(make<Config>(fDeviceId, ss.str())).Serialize(Format::JSON),
{sender});
}
break;
default:
LOG(warn) << "Unexpected/unknown command received: " << cmdStr;
LOG(warn) << "Origin: " << sender;
break;
}
}
});
}
auto PMIxPlugin::Init() -> pmix::proc
{ {
if (!pmix::initialized()) { if (!pmix::initialized()) {
fProc = pmix::init(); return pmix::init();
LOG(debug) << PMIxClient() << "pmix::init() OK: " << fProc } else {
<< ",version=" << pmix::get_version(); throw runtime_error("trying to initialize PMIx while it is already initialized");
} }
} }
auto PMIxPlugin::Publish() -> void auto PMIxPlugin::Publish() -> void
{ {
auto channels(GetChannelInfo()); auto channels(GetChannelInfo());
std::vector<pmix::info> info; vector<pmix::info> info;
for (const auto& c : channels) { for (const auto& c : channels) {
std::string methodKey{"chans." + c.first + "." + std::to_string(c.second - 1) + ".method"}; string methodKey("chans." + c.first + "." + to_string(c.second - 1) + ".method");
if (GetProperty<std::string>(methodKey) == "bind") { if (GetProperty<string>(methodKey) == "bind") {
for (int i = 0; i < c.second; ++i) { for (int i = 0; i < c.second; ++i) {
std::string addressKey{"chans." + c.first + "." + std::to_string(i) + ".address"}; string addressKey("chans." + c.first + "." + to_string(i) + ".address");
info.emplace_back(addressKey, GetProperty<std::string>(addressKey)); info.emplace_back(addressKey, GetProperty<string>(addressKey));
LOG(debug) << PMIxClient() << info.back(); LOG(debug) << PMIxClient() << info.back();
} }
} }
@ -103,32 +241,37 @@ auto PMIxPlugin::Publish() -> void
if (info.size() > 0) { if (info.size() > 0) {
pmix::publish(info); pmix::publish(info);
LOG(debug) << PMIxClient() << "pmix::publish() OK: published " LOG(debug) << PMIxClient() << "pmix::publish() OK: published " << info.size()
<< info.size() << " binding channels."; << " binding channels.";
} }
} }
auto PMIxPlugin::Fence() -> void auto PMIxPlugin::Fence() -> void
{ {
pmix::proc all(fProc); pmix::proc all(fProcess);
all.rank = pmix::rank::wildcard; all.rank = pmix::rank::wildcard;
pmix::fence({all}); pmix::fence({all});
LOG(debug) << PMIxClient() << "pmix::fence() OK"; }
auto PMIxPlugin::Fence(const std::string& label) -> void
{
Fence(label);
LOG(debug) << PMIxClient() << "pmix::fence() [" << label << "] OK";
} }
auto PMIxPlugin::Lookup() -> void auto PMIxPlugin::Lookup() -> void
{ {
auto channels(GetChannelInfo()); auto channels(GetChannelInfo());
for (const auto& c : channels) { for (const auto& c : channels) {
std::string methodKey{"chans." + c.first + "." + std::to_string(c.second - 1) + ".method"}; string methodKey("chans." + c.first + "." + to_string(c.second - 1) + ".method");
if (GetProperty<std::string>(methodKey) == "connect") { if (GetProperty<string>(methodKey) == "connect") {
for (int i = 0; i < c.second; ++i) { for (int i = 0; i < c.second; ++i) {
std::vector<pmix::pdata> pdata; vector<pmix::pdata> pdata;
std::string addressKey{"chans." + c.first + "." + std::to_string(i) + ".address"}; string addressKey("chans." + c.first + "." + to_string(i) + ".address");
pdata.emplace_back(); pdata.emplace_back();
pdata.back().set_key(addressKey); pdata.back().set_key(addressKey);
std::vector<pmix::info> info; vector<pmix::info> info;
info.emplace_back(PMIX_WAIT, static_cast<int>(pdata.size())); info.emplace_back(PMIX_WAIT, static_cast<int>(pdata.size()));
if (pdata.size() > 0) { if (pdata.size() > 0) {
@ -142,7 +285,7 @@ auto PMIxPlugin::Lookup() -> void
} else if (p.value.type == PMIX_STRING) { } else if (p.value.type == PMIX_STRING) {
LOG(debug) << PMIxClient() << "pmix::lookup() found:" LOG(debug) << PMIxClient() << "pmix::lookup() found:"
<< " key=" << p.key << ",value=" << p.value.data.string; << " key=" << p.key << ",value=" << p.value.data.string;
SetProperty<std::string>(p.key, p.value.data.string); SetProperty<string>(p.key, p.value.data.string);
} else { } else {
LOG(debug) << PMIxClient() << "pmix::lookup() wrong type returned: " LOG(debug) << PMIxClient() << "pmix::lookup() wrong type returned: "
<< "key=" << p.key << ",type=" << p.value.type; << "key=" << p.key << ",type=" << p.value.type;
@ -153,6 +296,14 @@ auto PMIxPlugin::Lookup() -> void
} }
} }
auto PMIxPlugin::WaitForExitingAck() -> void
{
unique_lock<mutex> lock(fStateChangeSubscriberMutex);
fExitingAcked.wait_for(lock, chrono::milliseconds(1000), [this]() {
return fExitingAckedByLastExternalController;
});
}
} /* namespace plugins */ } /* namespace plugins */
} /* namespace mq */ } /* namespace mq */
} /* namespace fair */ } /* namespace fair */

View File

@ -10,6 +10,7 @@
#define FAIR_MQ_PLUGINS_PMIX #define FAIR_MQ_PLUGINS_PMIX
#include "PMIx.hpp" #include "PMIx.hpp"
#include "PMIxCommands.h"
#include <fairmq/Plugin.h> #include <fairmq/Plugin.h>
#include <fairmq/Version.h> #include <fairmq/Version.h>
@ -39,23 +40,40 @@ class PMIxPlugin : public Plugin
const std::string& homepage, const std::string& homepage,
PluginServices* pluginServices); PluginServices* pluginServices);
~PMIxPlugin(); ~PMIxPlugin();
auto PMIxClient() const -> std::string;
auto PMIxClient() const -> std::string { return fPMIxClient; };
private: private:
pmix::proc fProc; pmix::proc fProcess;
pid_t fPid; pid_t fPid;
std::string fPMIxClient;
std::string fDeviceId;
pmix::Commands fCommands;
auto Init() -> void; std::set<uint32_t> fStateChangeSubscribers;
uint32_t fLastExternalController;
bool fExitingAckedByLastExternalController;
std::condition_variable fExitingAcked;
std::mutex fStateChangeSubscriberMutex;
DeviceState fCurrentState;
DeviceState fLastState;
auto Init() -> pmix::proc;
auto Publish() -> void; auto Publish() -> void;
auto Fence() -> void; auto Fence() -> void;
auto Fence(const std::string& label) -> void;
auto Lookup() -> void; auto Lookup() -> void;
auto SubscribeForCommands() -> void;
auto WaitForExitingAck() -> void;
}; };
Plugin::ProgOptions PMIxProgramOptions() Plugin::ProgOptions PMIxProgramOptions()
{ {
boost::program_options::options_description options{"PMIx Plugin"}; boost::program_options::options_description options("PMIx Plugin");
options.add_options()( options.add_options()
"pmix-dummy", boost::program_options::value<int>()->default_value(0), "Dummy."); ("pmix-dummy", boost::program_options::value<int>()->default_value(0), "Dummy.");
return options; return options;
} }

View File

@ -0,0 +1,218 @@
/********************************************************************************
* Copyright (C) 2014-2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <fairmq/sdk/commands/Commands.h>
#include <fairmq/States.h>
#include <fairlogger/Logger.h>
#include "PMIx.hpp"
#include "PMIxCommands.h"
#include <boost/program_options.hpp>
#include <condition_variable>
#include <algorithm>
#include <cstdlib>
#include <iostream>
#include <mutex>
#include <string>
#include <termios.h> // raw mode console input
#include <thread>
#include <utility>
#include <unistd.h>
#include <vector>
using namespace std;
using namespace fair::mq::sdk::cmd;
namespace bpo = boost::program_options;
const std::map<fair::mq::Transition, fair::mq::State> expected =
{
{ fair::mq::Transition::InitDevice, fair::mq::State::InitializingDevice },
{ fair::mq::Transition::CompleteInit, fair::mq::State::Initialized },
{ fair::mq::Transition::Bind, fair::mq::State::Bound },
{ fair::mq::Transition::Connect, fair::mq::State::DeviceReady },
{ fair::mq::Transition::InitTask, fair::mq::State::Ready },
{ fair::mq::Transition::Run, fair::mq::State::Running },
{ fair::mq::Transition::Stop, fair::mq::State::Ready },
{ fair::mq::Transition::ResetTask, fair::mq::State::DeviceReady },
{ fair::mq::Transition::ResetDevice, fair::mq::State::Idle },
{ fair::mq::Transition::End, fair::mq::State::Exiting }
};
struct StateSubscription
{
pmix::Commands& fCommands;
explicit StateSubscription(pmix::Commands& commands)
: fCommands(commands)
{
fCommands.Send(Cmds(make<SubscribeToStateChange>()).Serialize(Format::JSON));
}
~StateSubscription()
{
fCommands.Send(Cmds(make<UnsubscribeFromStateChange>()).Serialize(Format::JSON));
this_thread::sleep_for(chrono::milliseconds(100)); // give PMIx a chance to complete request
}
};
struct MiniTopo
{
explicit MiniTopo(unsigned int n)
: fState(n, fair::mq::State::Ok)
{}
void WaitFor(const fair::mq::State state)
{
std::unique_lock<std::mutex> lk(fMtx);
fCV.wait(lk, [&](){
unsigned int count = std::count_if(fState.cbegin(), fState.cend(), [=](const auto& s) {
return s == state;
});
bool result = count == fState.size();
cout << "expecting " << state << " for " << fState.size() << " devices, found " << count << ", condition: " << result << endl;
return result;
});
}
void Update(uint32_t rank, const fair::mq::State state)
{
try {
{
std::lock_guard<std::mutex> lk(fMtx);
fState.at(rank - 1) = state;
}
fCV.notify_one();
} catch (const std::exception& e) {
LOG(error) << "Exception in Update: " << e.what();
}
}
private:
vector<fair::mq::State> fState;
std::mutex fMtx;
std::condition_variable fCV;
};
int main(int argc, char* argv[])
{
try {
unsigned int numDevices;
bpo::options_description options("Common options");
options.add_options()
("number-devices,n", bpo::value<unsigned int>(&numDevices)->default_value(0), "Number of devices (will be removed in the future)")
("help,h", "Produce help message");
bpo::variables_map vm;
bpo::store(bpo::command_line_parser(argc, argv).options(options).run(), vm);
if (vm.count("help")) {
cout << "FairMQ DDS Command UI" << endl << options << endl;
cout << "Commands: [c] check state, [o] dump config, [h] help, [r] run, [s] stop, [t] reset task, [d] reset device, [q] end, [j] init task, [i] init device, [k] complete init, [b] bind, [x] connect" << endl;
return EXIT_SUCCESS;
}
bpo::notify(vm);
fair::Logger::SetConsoleSeverity(fair::Severity::debug);
fair::Logger::SetConsoleColor(true);
fair::Logger::SetVerbosity(fair::Verbosity::low);
pmix::proc process;
if (!pmix::initialized()) {
process = pmix::init();
LOG(warn) << "pmix::init() OK: " << process << ", version=" << pmix::get_version();
}
pmix::proc all(process);
all.rank = pmix::rank::wildcard;
pmix::fence({all});
LOG(warn) << "pmix::fence() [pmix::init] OK";
MiniTopo topo(numDevices);
pmix::Commands commands(process);
commands.Subscribe([&](const string& msg, const pmix::proc& sender) {
// LOG(info) << "Received '" << msg << "' from " << sender;
Cmds cmds;
cmds.Deserialize(msg, Format::JSON);
// cout << "Received " << cmds.Size() << " command(s) with total size of " << msg.length() << " bytes: " << endl;
for (const auto& cmd : cmds) {
// cout << " > " << cmd->GetType() << endl;
switch (cmd->GetType()) {
case Type::state_change: {
cout << "Received state_change from " << static_cast<StateChange&>(*cmd).GetDeviceId() << ": " << static_cast<StateChange&>(*cmd).GetLastState() << "->" << static_cast<StateChange&>(*cmd).GetCurrentState() << endl;
topo.Update(sender.rank, static_cast<StateChange&>(*cmd).GetCurrentState());
if (static_cast<StateChange&>(*cmd).GetCurrentState() == fair::mq::State::Exiting) {
commands.Send(Cmds(make<StateChangeExitingReceived>()).Serialize(Format::JSON), {sender});
}
}
break;
case Type::state_change_subscription:
if (static_cast<StateChangeSubscription&>(*cmd).GetResult() != Result::Ok) {
cout << "State change subscription failed for " << static_cast<StateChangeSubscription&>(*cmd).GetDeviceId() << endl;
}
break;
case Type::state_change_unsubscription:
if (static_cast<StateChangeUnsubscription&>(*cmd).GetResult() != Result::Ok) {
cout << "State change unsubscription failed for " << static_cast<StateChangeUnsubscription&>(*cmd).GetDeviceId() << endl;
}
break;
case Type::transition_status: {
if (static_cast<TransitionStatus&>(*cmd).GetResult() == Result::Ok) {
cout << "Device " << static_cast<TransitionStatus&>(*cmd).GetDeviceId() << " started to transition with " << static_cast<TransitionStatus&>(*cmd).GetTransition() << endl;
} else {
cout << "Device " << static_cast<TransitionStatus&>(*cmd).GetDeviceId() << " cannot transition with " << static_cast<TransitionStatus&>(*cmd).GetTransition() << endl;
}
}
break;
case Type::current_state:
cout << "Device " << static_cast<CurrentState&>(*cmd).GetDeviceId() << " is in " << static_cast<CurrentState&>(*cmd).GetCurrentState() << " state" << endl;
break;
case Type::config:
cout << "Received config for device " << static_cast<Config&>(*cmd).GetDeviceId() << ":\n" << static_cast<Config&>(*cmd).GetConfig() << endl;
break;
default:
cout << "Unexpected/unknown command received: " << cmd->GetType() << endl;
cout << "Origin: " << sender << endl;
break;
}
}
});
pmix::fence({all});
LOG(warn) << "pmix::fence() [subscribed] OK";
StateSubscription stateSubscription(commands);
for (auto transition : { fair::mq::Transition::InitDevice,
fair::mq::Transition::CompleteInit,
fair::mq::Transition::Bind,
fair::mq::Transition::Connect,
fair::mq::Transition::InitTask,
fair::mq::Transition::Run,
fair::mq::Transition::Stop,
fair::mq::Transition::ResetTask,
fair::mq::Transition::ResetDevice,
fair::mq::Transition::End }) {
commands.Send(Cmds(make<ChangeState>(transition)).Serialize(Format::JSON));
topo.WaitFor(expected.at(transition));
}
} catch (exception& e) {
LOG(error) << "Error: " << e.what();
return EXIT_FAILURE;
}
LOG(warn) << "exiting";
return EXIT_SUCCESS;
}