diff --git a/CMakeLists.txt b/CMakeLists.txt index a796b649..b2e06ce3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -18,7 +18,7 @@ get_git_version() project(FairMQ VERSION ${PROJECT_VERSION} LANGUAGES CXX) message(STATUS "${BWhite}${PROJECT_NAME}${CR} ${PROJECT_GIT_VERSION} from ${PROJECT_DATE}") -if(BUILD_OFI_TRANSPORT OR BUILD_SDK) +if(BUILD_OFI_TRANSPORT OR BUILD_SDK OR BUILD_PMIX_PLUGIN) set(PROJECT_MIN_CXX_STANDARD 14) else() set(PROJECT_MIN_CXX_STANDARD 11) @@ -44,7 +44,7 @@ fairmq_build_option(BUILD_SDK_COMMANDS "Build the FairMQ SDK commands." fairmq_build_option(BUILD_DDS_PLUGIN "Build DDS plugin." DEFAULT OFF REQUIRES "BUILD_FAIRMQ;BUILD_SDK_COMMANDS") fairmq_build_option(BUILD_PMIX_PLUGIN "Build PMIx plugin." - DEFAULT OFF REQUIRES "BUILD_FAIRMQ") + DEFAULT OFF REQUIRES "BUILD_FAIRMQ;BUILD_SDK_COMMANDS") fairmq_build_option(BUILD_EXAMPLES "Build FairMQ examples." DEFAULT ON REQUIRES "BUILD_FAIRMQ") fairmq_build_option(BUILD_SDK "Build the FairMQ controller SDK." @@ -397,9 +397,9 @@ else() endif() message(STATUS " ${BWhite}dds_plugin${CR} ${dds_summary}") if(BUILD_PMIX_PLUGIN) - set(pmix_summary "${BGreen}YES${CR} (disable with ${BMagenta}-DBUILD_PMIX_PLUGIN=OFF${CR})") + set(pmix_summary "${BGreen}YES${CR} EXPERIMENTAL (requires C++14) (disable with ${BMagenta}-DBUILD_PMIX_PLUGIN=OFF${CR})") else() - set(pmix_summary "${BRed} NO${CR} (default, enable with ${BMagenta}-DBUILD_PMIX_PLUGIN=ON${CR})") + set(pmix_summary "${BRed} NO${CR} EXPERIMENTAL (requires C++14) (default, enable with ${BMagenta}-DBUILD_PMIX_PLUGIN=ON${CR})") endif() message(STATUS " ${BWhite}pmix_plugin${CR} ${pmix_summary}") if(BUILD_EXAMPLES) diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index fc92679f..878f4619 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -6,16 +6,6 @@ # copied verbatim in the file "LICENSE" # ################################################################################ -#################### -# external plugins # -#################### -if(BUILD_DDS_PLUGIN) - add_subdirectory(plugins/DDS) -endif() -if(BUILD_PMIX_PLUGIN) - add_subdirectory(plugins/PMIx) -endif() - if(BUILD_FAIRMQ OR BUILD_SDK) ########### # Version # @@ -473,3 +463,13 @@ endif() if(BUILD_SDK) add_subdirectory(sdk) endif() + +#################### +# external plugins # +#################### +if(BUILD_DDS_PLUGIN) + add_subdirectory(plugins/DDS) +endif() +if(BUILD_PMIX_PLUGIN) + add_subdirectory(plugins/PMIx) +endif() diff --git a/fairmq/plugins/PMIx/CMakeLists.txt b/fairmq/plugins/PMIx/CMakeLists.txt index 1ab32147..efd43e51 100644 --- a/fairmq/plugins/PMIx/CMakeLists.txt +++ b/fairmq/plugins/PMIx/CMakeLists.txt @@ -10,9 +10,10 @@ set(plugin FairMQPlugin_pmix) add_library(${plugin} SHARED ${CMAKE_CURRENT_SOURCE_DIR}/PMIxPlugin.cxx ${CMAKE_CURRENT_SOURCE_DIR}/PMIxPlugin.h + ${CMAKE_CURRENT_SOURCE_DIR}/PMIxCommands.h ${CMAKE_CURRENT_SOURCE_DIR}/PMIx.hpp ) -target_link_libraries(${plugin} FairMQ PMIx::libpmix) +target_link_libraries(${plugin} PUBLIC FairMQ PMIx::libpmix PRIVATE Commands) target_include_directories(${plugin} PRIVATE ${CMAKE_CURRENT_BINARY_DIR}) set_target_properties(${plugin} PROPERTIES CXX_VISIBILITY_PRESET hidden @@ -20,7 +21,12 @@ set_target_properties(${plugin} PROPERTIES SOVERSION "${PROJECT_VERSION_MAJOR}.${PROJECT_VERSION_MINOR}" ) -install(TARGETS ${plugin} +set(exe fairmq-pmix-command-ui) +add_executable(${exe} ${CMAKE_CURRENT_SOURCE_DIR}/runPMIxCommandUI.cxx) +target_link_libraries(${exe} FairMQ Commands StateMachine PMIx::libpmix) +target_include_directories(${exe} PRIVATE ${CMAKE_CURRENT_BINARY_DIR}) + +install(TARGETS ${plugin} ${exe} EXPORT ${PROJECT_EXPORT_SET} LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR} RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR} diff --git a/fairmq/plugins/PMIx/PMIx.hpp b/fairmq/plugins/PMIx/PMIx.hpp index 9e94c01b..ff07b70a 100644 --- a/fairmq/plugins/PMIx/PMIx.hpp +++ b/fairmq/plugins/PMIx/PMIx.hpp @@ -9,17 +9,18 @@ #ifndef PMIX_HPP #define PMIX_HPP +#include #include +#include #include #include #include #include +#include #include -#include #include #include #include -#include // C++ PMIx v2.2 API namespace pmix @@ -69,7 +70,7 @@ struct proc : pmix_proc_t friend std::ostream& operator<<(std::ostream& os, const proc& p) { - return os << "nspace=" << p.nspace << ",rank=" << p.rank; + return os << p.nspace << "_" << p.rank; } }; @@ -80,7 +81,6 @@ struct value : pmix_value_t value(const value& rhs) { - LOG(warn) << "copy ctor"; status rc; auto lhs(static_cast(this)); PMIX_VALUE_XFER(rc, lhs, static_cast(const_cast(&rhs))); @@ -111,6 +111,11 @@ struct value : pmix_value_t { PMIX_VALUE_LOAD(static_cast(this), &val, PMIX_INT); } + + explicit value(pmix_data_array_t* val) + { + PMIX_VALUE_LOAD(static_cast(this), val, PMIX_DATA_ARRAY); + } }; struct info : pmix_info_t @@ -134,9 +139,15 @@ struct info : pmix_info_t } } - friend std::ostream& operator<<(std::ostream& os, const info& p) + friend std::ostream& operator<<(std::ostream& os, const info& i) { - return os << "key=" << p.key << ",value='" << p.value.data.string << "'"; + return os << "key=" << i.key << ",value='" << i.value.data.string << "'"; + } + + info(const info& rhs) + { + PMIX_INFO_XFER(static_cast(this), + static_cast(const_cast(&rhs))); } }; @@ -144,6 +155,7 @@ struct pdata : pmix_pdata_t { pdata() { PMIX_PDATA_CONSTRUCT(static_cast(this)); } ~pdata() { PMIX_PDATA_DESTRUCT(static_cast(this)); } + pdata(const pdata& rhs) { PMIX_PDATA_XFER(static_cast(this), @@ -171,7 +183,7 @@ auto init(const std::vector& info = {}) -> proc auto initialized() -> bool { return !!PMIx_Initialized(); } -auto get_version() -> const char* { return PMIx_Get_version(); } +auto get_version() -> std::string { return {PMIx_Get_version()}; } auto finalize(const std::vector& info = {}) -> void { @@ -213,6 +225,92 @@ auto lookup(std::vector& pdata, const std::vector& info = {}) -> vo } } +std::string get_info(const std::string& name, pmix::proc& process) +{ + pmix_value_t* v; + + pmix::status rc = PMIx_Get(&process, name.c_str(), nullptr, 0, &v); + if (rc == PMIX_SUCCESS) { + std::stringstream ss; + + switch (v->type) { + case PMIX_SIZE: ss << static_cast(v->data.size) << " (size_t)"; break; + case PMIX_INT: ss << static_cast(v->data.integer) << " (int)"; break; + case PMIX_INT8: ss << static_cast(v->data.int8) << " (int8_t)"; break; + case PMIX_INT16: ss << static_cast(v->data.int16) << " (int16_t)"; break; + case PMIX_INT32: ss << static_cast(v->data.int32) << " (int32_t)"; break; + case PMIX_INT64: ss << static_cast(v->data.int64) << " (int64_t)"; break; + case PMIX_UINT: ss << static_cast(v->data.uint) << " (unsigned int)"; break; + case PMIX_UINT8: ss << static_cast(v->data.uint8) << " (uint8_t)"; break; + case PMIX_UINT16: ss << static_cast(v->data.uint16) << " (uint16_t)"; break; + case PMIX_UINT32: ss << static_cast(v->data.uint32) << " (uint32_t)"; break; + case PMIX_UINT64: ss << static_cast(v->data.uint64) << " (uint64_t)"; break; + case PMIX_FLOAT: ss << static_cast(v->data.fval) << " (float)"; break; + case PMIX_DOUBLE: ss << static_cast(v->data.dval) << " (double)"; break; + case PMIX_PID: ss << static_cast(v->data.pid) << " (pid_t)"; break; + case PMIX_STRING: ss << static_cast(v->data.string) << " (string)"; break; + case PMIX_PROC_RANK: ss << static_cast(v->data.rank) << " (pmix_rank_t)"; break; + case PMIX_PROC: ss << "proc.nspace: " << static_cast(v->data.proc)->nspace + << ", proc.rank: " << static_cast(v->data.proc)->rank << " (pmix_proc_t*)"; break; + default: + ss << "unknown type: " << v->type; + break; + } + + return ss.str(); + } else if (rc == PMIX_ERR_NOT_FOUND) { + // LOG(error) << "PMIx_Get failed: PMIX_ERR_NOT_FOUND"; + return ""; + } else { + // LOG(error) << "PMIx_Get failed: " << rc; + return ""; + } +} + +std::string get_value_str(const pmix_value_t& v) +{ + switch (v.type) { + case PMIX_BOOL: return std::to_string(static_cast(v.data.flag)); + case PMIX_SIZE: return std::to_string(static_cast(v.data.size)); + case PMIX_INT: return std::to_string(static_cast(v.data.integer)); + case PMIX_INT8: return std::to_string(static_cast(v.data.int8)); + case PMIX_INT16: return std::to_string(static_cast(v.data.int16)); + case PMIX_INT32: return std::to_string(static_cast(v.data.int32)); + case PMIX_INT64: return std::to_string(static_cast(v.data.int64)); + case PMIX_UINT: return std::to_string(static_cast(v.data.uint)); + case PMIX_UINT8: return std::to_string(static_cast(v.data.uint8)); + case PMIX_UINT16: return std::to_string(static_cast(v.data.uint16)); + case PMIX_UINT32: return std::to_string(static_cast(v.data.uint32)); + case PMIX_UINT64: return std::to_string(static_cast(v.data.uint64)); + case PMIX_FLOAT: return std::to_string(static_cast(v.data.fval)); + case PMIX_DOUBLE: return std::to_string(static_cast(v.data.dval)); + case PMIX_PID: return std::to_string(static_cast(v.data.pid)); + case PMIX_STRING: return static_cast(v.data.string); + case PMIX_PROC_RANK: return std::to_string(static_cast(v.data.rank)); + case PMIX_POINTER: { std::stringstream ss; ss << static_cast(v.data.ptr); return ss.str(); } + case PMIX_DATA_ARRAY: { + if (v.data.darray->type == PMIX_PROC) { + std::stringstream ss; + ss << "["; + for (size_t i = 0; i < v.data.darray->size; ++i) { + ss << static_cast(static_cast(v.data.darray)->array)[0].nspace; + ss << "_"; + ss << static_cast(static_cast(v.data.darray)->array)[0].rank; + + if (i < v.data.darray->size - 1) { + ss << ","; + } + } + ss << "]"; + return ss.str(); + } else { + return "UNKNOWN TYPE IN DATA ARRAY"; + } + } + default: return "UNKNOWN TYPE"; + } +} + } /* namespace pmix */ #endif /* PMIX_HPP */ diff --git a/fairmq/plugins/PMIx/PMIxCommands.h b/fairmq/plugins/PMIx/PMIxCommands.h new file mode 100644 index 00000000..da2469d9 --- /dev/null +++ b/fairmq/plugins/PMIx/PMIxCommands.h @@ -0,0 +1,291 @@ +/******************************************************************************** + * Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#ifndef PMIXCOMMANDS_H +#define PMIXCOMMANDS_H + +#include "PMIx.hpp" + +#include +#include +#include +#include + +namespace pmix +{ + +std::array typeNames = +{ + { + "PMIX_UNDEF", + "PMIX_BOOL", + "PMIX_BYTE", + "PMIX_STRING", + "PMIX_SIZE", + "PMIX_PID", + "PMIX_INT", + "PMIX_INT8", + "PMIX_INT16", + "PMIX_INT32", + "PMIX_INT64", + "PMIX_UINT", + "PMIX_UINT8", + "PMIX_UINT16", + "PMIX_UINT32", + "PMIX_UINT64", + "PMIX_FLOAT", + "PMIX_DOUBLE", + "PMIX_TIMEVAL", + "PMIX_TIME", + "PMIX_STATUS", + "PMIX_VALUE", + "PMIX_PROC", + "PMIX_APP", + "PMIX_INFO", + "PMIX_PDATA", + "PMIX_BUFFER", + "PMIX_BYTE_OBJECT", + "PMIX_KVAL", + "PMIX_MODEX", + "PMIX_PERSIST", + "PMIX_POINTER", + "PMIX_SCOPE", + "PMIX_DATA_RANGE", + "PMIX_COMMAND", + "PMIX_INFO_DIRECTIVES", + "PMIX_DATA_TYPE", + "PMIX_PROC_STATE", + "PMIX_PROC_INFO", + "PMIX_DATA_ARRAY", + "PMIX_PROC_RANK", + "PMIX_QUERY", + "PMIX_COMPRESSED_STRING", + "PMIX_ALLOC_DIRECTIVE", + "PMIX_INFO_ARRAY", + "PMIX_IOF_CHANNEL", + "PMIX_ENVAR" + } +}; + +enum class Command : int +{ + general = PMIX_EXTERNAL_ERR_BASE, + error = PMIX_EXTERNAL_ERR_BASE - 1 +}; + + +class Commands +{ + public: + Commands(const proc& process) + : fProcess(process) + , fSubscribed(false) + { + } + + ~Commands() + { + Unsubscribe(); + } + + void Subscribe(std::function callback) + { + using namespace std::placeholders; + + LOG(debug) << "PMIxCommands: Subscribing..."; + + fCallback = callback; + std::array codes; + codes[0] = static_cast(pmix::Command::general); + + PMIX_INFO_LOAD(&(fInfos[0]), PMIX_EVENT_RETURN_OBJECT, this, PMIX_POINTER); + + PMIx_Register_event_handler(codes.data(), codes.size(), + fInfos.data(), fInfos.size(), + &Commands::Handler, + &Commands::EventHandlerRegistration, + this); + fBlocker.Wait(); + LOG(debug) << "PMIxCommands: Subscribing complete!"; + } + + void Unsubscribe() + { + if (fSubscribed) { + LOG(debug) << "PMIxCommands: Unsubscribing..."; + PMIx_Deregister_event_handler(fHandlerRef, &Commands::EventHandlerDeregistration, this); + fBlocker.Wait(); + LOG(debug) << "PMIxCommands: Unsubscribing complete!"; + } else { + LOG(debug) << "Unsubscribe() is called while no subscription is active"; + } + } + + struct Holder + { + Holder() : fData(nullptr) {} + ~Holder() { PMIX_DATA_ARRAY_FREE(fData); } + + std::vector fInfos; + pmix_data_array_t* fData; + }; + + void Send(const std::string& msg) + { + std::vector* infos = new std::vector(); + infos->emplace_back("fairmq.cmd", msg); + PMIx_Notify_event(static_cast(pmix::Command::general), + &fProcess, + PMIX_RANGE_NAMESPACE, + infos->data(), infos->size(), + &Commands::OpCompleteCallback>, + infos); + } + + void Send(const std::string& msg, rank rank) + { + pmix::proc destination(fProcess); + destination.rank = rank; + Send(msg, {destination}); + } + + void Send(const std::string& msg, const std::vector& destination) + { + std::unique_ptr holder = fair::mq::tools::make_unique(); + + PMIX_DATA_ARRAY_CREATE(holder->fData, destination.size(), PMIX_PROC); + memcpy(holder->fData->array, destination.data(), destination.size() * sizeof(pmix_proc_t)); + // LOG(warn) << "OLOG: " << msg << " > " << static_cast(holder->fData->array)[0].nspace << ": " << static_cast(holder->fData->array)[0].rank; + holder->fInfos.emplace_back(PMIX_EVENT_CUSTOM_RANGE, holder->fData); + // LOG(warn) << msg << " // packed range: " << static_cast(static_cast(holder->fInfos.at(0).value.data.darray)->array)[0].nspace << "_" << static_cast(static_cast(holder->fInfos.at(0).value.data.darray)->array)[0].rank; + // LOG(warn) << msg << " // packed range.type: " << pmix::typeNames.at(holder->fInfos.at(0).value.type); + // LOG(warn) << msg << " // packed range.array.type: " << pmix::typeNames.at(static_cast(holder->fInfos.at(0).value.data.darray)->type); + // LOG(warn) << msg << " // packed range.array.size: " << static_cast(holder->fInfos.at(0).value.data.darray)->size; + // LOG(warn) << holder->fInfos.size(); + holder->fInfos.emplace_back("fairmq.cmd", msg); + // LOG(warn) << msg << " // packed msg: " << holder->fInfos.at(1).value.data.string; + // LOG(warn) << msg << " // packed msg.type: " << pmix::typeNames.at(holder->fInfos.at(1).value.type); + // LOG(warn) << holder->fInfos.size(); + + PMIx_Notify_event(static_cast(pmix::Command::general), + &fProcess, + PMIX_RANGE_CUSTOM, + holder->fInfos.data(), holder->fInfos.size(), + &Commands::OpCompleteCallback, + holder.get()); + holder.release(); + } + + private: + static void EventHandlerRegistration(pmix_status_t s, size_t handlerRef, void* obj) + { + if (s == PMIX_SUCCESS) { + LOG(debug) << "Successfully registered event handler, reference = " << static_cast(handlerRef); + static_cast(obj)->fHandlerRef = handlerRef; + static_cast(obj)->fSubscribed = true; + } else { + LOG(error) << "Could not register PMIx event handler, status = " << s; + } + static_cast(obj)->fBlocker.Signal(); + } + + static void EventHandlerDeregistration(pmix_status_t s, void* obj) + { + if (s == PMIX_SUCCESS) { + LOG(debug) << "Successfully deregistered event handler, reference = " << static_cast(obj)->fHandlerRef; + static_cast(obj)->fSubscribed = false; + } else { + LOG(error) << "Could not deregister PMIx event handler, reference = " << static_cast(obj)->fHandlerRef << ", status = " << s; + } + static_cast(obj)->fBlocker.Signal(); + } + + template + static void OpCompleteCallback(pmix_status_t s, void* data) + { + if (s == PMIX_SUCCESS) { + // LOG(info) << "Operation completed successfully"; + } else { + LOG(error) << "Could not complete operation, status = " << s; + } + if (data) { + // LOG(warn) << "Destroying event data..."; + delete static_cast(data); + } + } + + static void Handler(size_t handlerId, + pmix_status_t s, + const pmix_proc_t* src, + pmix_info_t info[], size_t ninfo, + pmix_info_t[] /* results */, size_t nresults, + pmix_event_notification_cbfunc_fn_t cbfunc, + void* cbdata) + { + std::stringstream ss; + ss << "Event handler called with " + << "status: " << s << ", " + << "source: " << src->nspace << "_" << src->rank << ", " + << "ninfo: " << ninfo << ", " + << "nresults: " << nresults << ", " + << "handlerId: " << handlerId; + + std::string msg; + + Commands* obj = nullptr; + + if (ninfo > 0) { + ss << ":\n"; + for (size_t i = 0; i < ninfo; ++i) { + ss << " [" << i << "]: key: '" << info[i].key + << "', value: '" << pmix::get_value_str(info[i].value) + << "', value.type: '" << pmix::typeNames.at(info[i].value.type) + << "', flags: " << info[i].flags; + + if (std::strcmp(info[i].key, "fairmq.cmd") == 0) { + msg = pmix::get_value_str(info[i].value); + } + + if (std::strcmp(info[i].key, PMIX_EVENT_RETURN_OBJECT) == 0) { + obj = static_cast(info[i].value.data.ptr); + } + + if (i < ninfo - 1) { + ss << "\n"; + } + } + } + + + if (obj != nullptr) { + if (static_cast(obj)->fProcess.rank != src->rank) { + // LOG(warn) << ss.str(); + static_cast(obj)->fCallback(msg, proc(const_cast(src->nspace), rank(src->rank))); + } else { + // LOG(trace) << "suppressing message from itself"; + } + } else { + LOG(ERROR) << "ERROR"; + } + + if (cbfunc != nullptr) { + cbfunc(PMIX_SUCCESS, nullptr, 0, nullptr, nullptr, cbdata); + } + } + + const proc& fProcess; + size_t fHandlerRef; + std::function fCallback; + std::array fInfos; + bool fSubscribed; + fair::mq::tools::SharedSemaphore fBlocker; +}; + +} /* namespace pmix */ + +#endif /* PMIXCOMMANDS_H */ diff --git a/fairmq/plugins/PMIx/PMIxPlugin.cxx b/fairmq/plugins/PMIx/PMIxPlugin.cxx index af674c00..738d6096 100644 --- a/fairmq/plugins/PMIx/PMIxPlugin.cxx +++ b/fairmq/plugins/PMIx/PMIxPlugin.cxx @@ -8,8 +8,15 @@ #include "PMIxPlugin.h" +#include #include + +#include #include +#include // UINT32_MAX + +using namespace std; +using namespace fair::mq::sdk::cmd; namespace fair { @@ -18,47 +25,99 @@ namespace mq namespace plugins { -PMIxPlugin::PMIxPlugin(const std::string& name, +PMIxPlugin::PMIxPlugin(const string& name, const Plugin::Version version, - const std::string& maintainer, - const std::string& homepage, + const string& maintainer, + const string& homepage, PluginServices* pluginServices) : Plugin(name, version, maintainer, homepage, pluginServices) + , fProcess(Init()) , fPid(getpid()) + , fPMIxClient(tools::ToString("PMIx client(pid=", fPid, ") ")) + , fDeviceId(string(fProcess.nspace) + "_" + to_string(fProcess.rank)) + , fCommands(fProcess) + , fLastExternalController(UINT32_MAX) + , fExitingAckedByLastExternalController(false) + , fCurrentState(DeviceState::Idle) + , fLastState(DeviceState::Idle) { - Init(); - SetProperty("id", std::string(fProc.nspace) + "_" + std::to_string(fProc.rank)); - Fence(); + TakeDeviceControl(); + LOG(debug) << PMIxClient() << "pmix::init() OK: " << fProcess << ", version=" << pmix::get_version(); + SetProperty("id", fDeviceId); - SubscribeToDeviceStateChange([&](DeviceState newState) { + Fence("pmix::init"); + SubscribeForCommands(); + Fence("subscribed"); + + // fCommands.Send("test1"); + // fCommands.Send("test2", 0); + // fCommands.Send("test3", 0); + + // LOG(info) << "PMIX_EXTERNAL_ERR_BASE: " << PMIX_EXTERNAL_ERR_BASE; + + // job level infos + // LOG(info) << "PMIX_SESSION_ID: " << pmix::getInfo(PMIX_SESSION_ID, fProcess); + // LOG(info) << "PMIX_UNIV_SIZE: " << pmix::getInfo(PMIX_UNIV_SIZE, fProcess); + // LOG(info) << "PMIX_JOB_SIZE: " << pmix::getInfo(PMIX_JOB_SIZE, fProcess); + // LOG(info) << "PMIX_JOB_NUM_APPS: " << pmix::getInfo(PMIX_JOB_NUM_APPS, fProcess); + // LOG(info) << "PMIX_APP_SIZE: " << pmix::getInfo(PMIX_APP_SIZE, fProcess); + // LOG(info) << "PMIX_MAX_PROCS: " << pmix::getInfo(PMIX_MAX_PROCS, fProcess); + // LOG(info) << "PMIX_NUM_NODES: " << pmix::getInfo(PMIX_NUM_NODES, fProcess); + // LOG(info) << "PMIX_CLUSTER_ID: " << pmix::getInfo(PMIX_CLUSTER_ID, fProcess); + // LOG(info) << "PMIX_NSPACE: " << pmix::getInfo(PMIX_NSPACE, fProcess); + // LOG(info) << "PMIX_JOBID: " << pmix::getInfo(PMIX_JOBID, fProcess); + // LOG(info) << "PMIX_NODE_LIST: " << pmix::getInfo(PMIX_NODE_LIST, fProcess); + // LOG(info) << "PMIX_ALLOCATED_NODELIST: " << pmix::getInfo(PMIX_ALLOCATED_NODELIST, fProcess); + // LOG(info) << "PMIX_NPROC_OFFSET: " << pmix::getInfo(PMIX_NPROC_OFFSET, fProcess); + // LOG(info) << "PMIX_LOCALLDR: " << pmix::getInfo(PMIX_LOCALLDR, fProcess); + // LOG(info) << "PMIX_APPLDR: " << pmix::getInfo(PMIX_APPLDR, fProcess); + + // // per-node information + // LOG(info) << "PMIX_NODE_SIZE: " << pmix::getInfo(PMIX_NODE_SIZE, fProcess); + // LOG(info) << "PMIX_LOCAL_SIZE: " << pmix::getInfo(PMIX_LOCAL_SIZE, fProcess); + // LOG(info) << "PMIX_AVAIL_PHYS_MEMORY: " << pmix::getInfo(PMIX_AVAIL_PHYS_MEMORY, fProcess); + + // // per-process information + // LOG(info) << "PMIX_PROCID: " << pmix::getInfo(PMIX_PROCID, fProcess); + // LOG(info) << "PMIX_APPNUM: " << pmix::getInfo(PMIX_APPNUM, fProcess); + // LOG(info) << "PMIX_LOCAL_RANK: " << pmix::getInfo(PMIX_LOCAL_RANK, fProcess); + // LOG(info) << "PMIX_NODE_RANK: " << pmix::getInfo(PMIX_NODE_RANK, fProcess); + // LOG(info) << "PMIX_RANK: " << pmix::getInfo(PMIX_RANK, fProcess); + // LOG(info) << "PMIX_GLOBAL_RANK: " << pmix::getInfo(PMIX_GLOBAL_RANK, fProcess); + // LOG(info) << "PMIX_APP_RANK: " << pmix::getInfo(PMIX_APP_RANK, fProcess); + + SubscribeToDeviceStateChange([this](DeviceState newState) { switch (newState) { - case DeviceState::Idle: - Fence(); - break; - case DeviceState::Bound: - Publish(); - Fence(); - break; - case DeviceState::Connecting: - Lookup(); - break; - case DeviceState::DeviceReady: - Fence(); - break; - case DeviceState::Ready: - Fence(); - break; - case DeviceState::Exiting: - UnsubscribeFromDeviceStateChange(); - break; - default: - break; + case DeviceState::Bound: + Publish(); + break; + case DeviceState::Connecting: + Lookup(); + break; + case DeviceState::Exiting: + ReleaseDeviceControl(); + UnsubscribeFromDeviceStateChange(); + break; + default: + break; + } + + lock_guard lock{fStateChangeSubscriberMutex}; + fLastState = fCurrentState; + fCurrentState = newState; + for (auto subscriberId : fStateChangeSubscribers) { + LOG(debug) << "Publishing state-change: " << fLastState << "->" << newState << " to " << subscriberId; + Cmds cmds(make(fDeviceId, 0, fLastState, fCurrentState)); + fCommands.Send(cmds.Serialize(Format::JSON), static_cast(subscriberId)); } }); } PMIxPlugin::~PMIxPlugin() { + LOG(debug) << "Destroying PMIxPlugin"; + ReleaseDeviceControl(); + fCommands.Unsubscribe(); while (pmix::initialized()) { try { pmix::finalize(); @@ -69,33 +128,112 @@ PMIxPlugin::~PMIxPlugin() } } -auto PMIxPlugin::PMIxClient() const -> std::string +auto PMIxPlugin::SubscribeForCommands() -> void { - std::stringstream ss; - ss << "PMIx client(pid=" << fPid << ") "; - return ss.str(); + fCommands.Subscribe([this](const string& cmdStr, const pmix::proc& sender) { + // LOG(info) << "PMIx Plugin received message: '" << cmdStr << "', from " << sender; + + Cmds inCmds; + inCmds.Deserialize(cmdStr, Format::JSON); + + for (const auto& cmd : inCmds) { + LOG(info) << "Received command type: '" << cmd->GetType() << "' from " << sender; + switch (cmd->GetType()) { + case Type::check_state: + fCommands.Send(Cmds(make(fDeviceId, GetCurrentDeviceState())) + .Serialize(Format::JSON), + {sender}); + break; + case Type::change_state: { + Transition transition = static_cast(*cmd).GetTransition(); + if (ChangeDeviceState(transition)) { + fCommands.Send( + Cmds(make(fDeviceId, Result::Ok, transition)) + .Serialize(Format::JSON), + {sender}); + } else { + fCommands.Send( + Cmds(make(fDeviceId, Result::Failure, transition)) + .Serialize(Format::JSON), + {sender}); + } + { + lock_guard lock{fStateChangeSubscriberMutex}; + fLastExternalController = sender.rank; + } + } + break; + case Type::subscribe_to_state_change: { + { + lock_guard lock{fStateChangeSubscriberMutex}; + fStateChangeSubscribers.insert(sender.rank); + } + + LOG(debug) << "Publishing state-change: " << fLastState << "->" << fCurrentState + << " to " << sender; + Cmds outCmds(make(fDeviceId, Result::Ok), + make(fDeviceId, 0, fLastState, fCurrentState)); + fCommands.Send(outCmds.Serialize(Format::JSON), {sender}); + } + break; + case Type::unsubscribe_from_state_change: { + { + lock_guard lock{fStateChangeSubscriberMutex}; + fStateChangeSubscribers.erase(sender.rank); + } + fCommands.Send(Cmds(make(fDeviceId, Result::Ok)) + .Serialize(Format::JSON), + {sender}); + } + break; + case Type::state_change_exiting_received: { + { + lock_guard lock{fStateChangeSubscriberMutex}; + if (fLastExternalController == sender.rank) { + fExitingAckedByLastExternalController = true; + } + } + fExitingAcked.notify_one(); + } + break; + case Type::dump_config: { + stringstream ss; + for (const auto& k: GetPropertyKeys()) { + ss << fDeviceId << ": " << k << " -> " << GetPropertyAsString(k) << "\n"; + } + fCommands.Send(Cmds(make(fDeviceId, ss.str())).Serialize(Format::JSON), + {sender}); + } + break; + default: + LOG(warn) << "Unexpected/unknown command received: " << cmdStr; + LOG(warn) << "Origin: " << sender; + break; + } + } + }); } -auto PMIxPlugin::Init() -> void +auto PMIxPlugin::Init() -> pmix::proc { if (!pmix::initialized()) { - fProc = pmix::init(); - LOG(debug) << PMIxClient() << "pmix::init() OK: " << fProc - << ",version=" << pmix::get_version(); + return pmix::init(); + } else { + throw runtime_error("trying to initialize PMIx while it is already initialized"); } } auto PMIxPlugin::Publish() -> void { auto channels(GetChannelInfo()); - std::vector info; + vector info; for (const auto& c : channels) { - std::string methodKey{"chans." + c.first + "." + std::to_string(c.second - 1) + ".method"}; - if (GetProperty(methodKey) == "bind") { + string methodKey("chans." + c.first + "." + to_string(c.second - 1) + ".method"); + if (GetProperty(methodKey) == "bind") { for (int i = 0; i < c.second; ++i) { - std::string addressKey{"chans." + c.first + "." + std::to_string(i) + ".address"}; - info.emplace_back(addressKey, GetProperty(addressKey)); + string addressKey("chans." + c.first + "." + to_string(i) + ".address"); + info.emplace_back(addressKey, GetProperty(addressKey)); LOG(debug) << PMIxClient() << info.back(); } } @@ -103,32 +241,37 @@ auto PMIxPlugin::Publish() -> void if (info.size() > 0) { pmix::publish(info); - LOG(debug) << PMIxClient() << "pmix::publish() OK: published " - << info.size() << " binding channels."; + LOG(debug) << PMIxClient() << "pmix::publish() OK: published " << info.size() + << " binding channels."; } } auto PMIxPlugin::Fence() -> void { - pmix::proc all(fProc); + pmix::proc all(fProcess); all.rank = pmix::rank::wildcard; pmix::fence({all}); - LOG(debug) << PMIxClient() << "pmix::fence() OK"; +} + +auto PMIxPlugin::Fence(const std::string& label) -> void +{ + Fence(label); + LOG(debug) << PMIxClient() << "pmix::fence() [" << label << "] OK"; } auto PMIxPlugin::Lookup() -> void { auto channels(GetChannelInfo()); for (const auto& c : channels) { - std::string methodKey{"chans." + c.first + "." + std::to_string(c.second - 1) + ".method"}; - if (GetProperty(methodKey) == "connect") { + string methodKey("chans." + c.first + "." + to_string(c.second - 1) + ".method"); + if (GetProperty(methodKey) == "connect") { for (int i = 0; i < c.second; ++i) { - std::vector pdata; - std::string addressKey{"chans." + c.first + "." + std::to_string(i) + ".address"}; + vector pdata; + string addressKey("chans." + c.first + "." + to_string(i) + ".address"); pdata.emplace_back(); pdata.back().set_key(addressKey); - std::vector info; + vector info; info.emplace_back(PMIX_WAIT, static_cast(pdata.size())); if (pdata.size() > 0) { @@ -141,11 +284,11 @@ auto PMIxPlugin::Lookup() -> void LOG(debug) << PMIxClient() << "pmix::lookup() not found: key=" << p.key; } else if (p.value.type == PMIX_STRING) { LOG(debug) << PMIxClient() << "pmix::lookup() found:" - << " key=" << p.key << ",value=" << p.value.data.string; - SetProperty(p.key, p.value.data.string); + << " key=" << p.key << ",value=" << p.value.data.string; + SetProperty(p.key, p.value.data.string); } else { LOG(debug) << PMIxClient() << "pmix::lookup() wrong type returned: " - << "key=" << p.key << ",type=" << p.value.type; + << "key=" << p.key << ",type=" << p.value.type; } } } @@ -153,6 +296,14 @@ auto PMIxPlugin::Lookup() -> void } } +auto PMIxPlugin::WaitForExitingAck() -> void +{ + unique_lock lock(fStateChangeSubscriberMutex); + fExitingAcked.wait_for(lock, chrono::milliseconds(1000), [this]() { + return fExitingAckedByLastExternalController; + }); +} + } /* namespace plugins */ } /* namespace mq */ } /* namespace fair */ diff --git a/fairmq/plugins/PMIx/PMIxPlugin.h b/fairmq/plugins/PMIx/PMIxPlugin.h index 9432e637..e22db4ef 100644 --- a/fairmq/plugins/PMIx/PMIxPlugin.h +++ b/fairmq/plugins/PMIx/PMIxPlugin.h @@ -10,6 +10,7 @@ #define FAIR_MQ_PLUGINS_PMIX #include "PMIx.hpp" +#include "PMIxCommands.h" #include #include @@ -39,23 +40,40 @@ class PMIxPlugin : public Plugin const std::string& homepage, PluginServices* pluginServices); ~PMIxPlugin(); - auto PMIxClient() const -> std::string; + + auto PMIxClient() const -> std::string { return fPMIxClient; }; private: - pmix::proc fProc; + pmix::proc fProcess; pid_t fPid; + std::string fPMIxClient; + std::string fDeviceId; + pmix::Commands fCommands; - auto Init() -> void; + std::set fStateChangeSubscribers; + uint32_t fLastExternalController; + bool fExitingAckedByLastExternalController; + std::condition_variable fExitingAcked; + std::mutex fStateChangeSubscriberMutex; + + DeviceState fCurrentState; + DeviceState fLastState; + + auto Init() -> pmix::proc; auto Publish() -> void; auto Fence() -> void; + auto Fence(const std::string& label) -> void; auto Lookup() -> void; + + auto SubscribeForCommands() -> void; + auto WaitForExitingAck() -> void; }; Plugin::ProgOptions PMIxProgramOptions() { - boost::program_options::options_description options{"PMIx Plugin"}; - options.add_options()( - "pmix-dummy", boost::program_options::value()->default_value(0), "Dummy."); + boost::program_options::options_description options("PMIx Plugin"); + options.add_options() + ("pmix-dummy", boost::program_options::value()->default_value(0), "Dummy."); return options; } diff --git a/fairmq/plugins/PMIx/runPMIxCommandUI.cxx b/fairmq/plugins/PMIx/runPMIxCommandUI.cxx new file mode 100644 index 00000000..474fdaae --- /dev/null +++ b/fairmq/plugins/PMIx/runPMIxCommandUI.cxx @@ -0,0 +1,218 @@ +/******************************************************************************** + * Copyright (C) 2014-2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#include +#include + +#include + +#include "PMIx.hpp" +#include "PMIxCommands.h" + +#include + +#include +#include +#include +#include +#include +#include +#include // raw mode console input +#include +#include +#include +#include + +using namespace std; +using namespace fair::mq::sdk::cmd; +namespace bpo = boost::program_options; + +const std::map expected = +{ + { fair::mq::Transition::InitDevice, fair::mq::State::InitializingDevice }, + { fair::mq::Transition::CompleteInit, fair::mq::State::Initialized }, + { fair::mq::Transition::Bind, fair::mq::State::Bound }, + { fair::mq::Transition::Connect, fair::mq::State::DeviceReady }, + { fair::mq::Transition::InitTask, fair::mq::State::Ready }, + { fair::mq::Transition::Run, fair::mq::State::Running }, + { fair::mq::Transition::Stop, fair::mq::State::Ready }, + { fair::mq::Transition::ResetTask, fair::mq::State::DeviceReady }, + { fair::mq::Transition::ResetDevice, fair::mq::State::Idle }, + { fair::mq::Transition::End, fair::mq::State::Exiting } +}; + +struct StateSubscription +{ + pmix::Commands& fCommands; + + explicit StateSubscription(pmix::Commands& commands) + : fCommands(commands) + { + fCommands.Send(Cmds(make()).Serialize(Format::JSON)); + } + + ~StateSubscription() + { + fCommands.Send(Cmds(make()).Serialize(Format::JSON)); + this_thread::sleep_for(chrono::milliseconds(100)); // give PMIx a chance to complete request + } +}; + +struct MiniTopo +{ + explicit MiniTopo(unsigned int n) + : fState(n, fair::mq::State::Ok) + {} + + void WaitFor(const fair::mq::State state) + { + std::unique_lock lk(fMtx); + + fCV.wait(lk, [&](){ + unsigned int count = std::count_if(fState.cbegin(), fState.cend(), [=](const auto& s) { + return s == state; + }); + + bool result = count == fState.size(); + cout << "expecting " << state << " for " << fState.size() << " devices, found " << count << ", condition: " << result << endl; + return result; + }); + } + + void Update(uint32_t rank, const fair::mq::State state) + { + try { + { + std::lock_guard lk(fMtx); + fState.at(rank - 1) = state; + } + fCV.notify_one(); + } catch (const std::exception& e) { + LOG(error) << "Exception in Update: " << e.what(); + } + } + + private: + vector fState; + std::mutex fMtx; + std::condition_variable fCV; +}; + +int main(int argc, char* argv[]) +{ + try { + unsigned int numDevices; + + bpo::options_description options("Common options"); + options.add_options() + ("number-devices,n", bpo::value(&numDevices)->default_value(0), "Number of devices (will be removed in the future)") + ("help,h", "Produce help message"); + + bpo::variables_map vm; + bpo::store(bpo::command_line_parser(argc, argv).options(options).run(), vm); + + if (vm.count("help")) { + cout << "FairMQ DDS Command UI" << endl << options << endl; + cout << "Commands: [c] check state, [o] dump config, [h] help, [r] run, [s] stop, [t] reset task, [d] reset device, [q] end, [j] init task, [i] init device, [k] complete init, [b] bind, [x] connect" << endl; + return EXIT_SUCCESS; + } + + bpo::notify(vm); + + fair::Logger::SetConsoleSeverity(fair::Severity::debug); + fair::Logger::SetConsoleColor(true); + fair::Logger::SetVerbosity(fair::Verbosity::low); + + pmix::proc process; + + if (!pmix::initialized()) { + process = pmix::init(); + LOG(warn) << "pmix::init() OK: " << process << ", version=" << pmix::get_version(); + } + + pmix::proc all(process); + all.rank = pmix::rank::wildcard; + pmix::fence({all}); + LOG(warn) << "pmix::fence() [pmix::init] OK"; + + MiniTopo topo(numDevices); + pmix::Commands commands(process); + + commands.Subscribe([&](const string& msg, const pmix::proc& sender) { + // LOG(info) << "Received '" << msg << "' from " << sender; + Cmds cmds; + cmds.Deserialize(msg, Format::JSON); + // cout << "Received " << cmds.Size() << " command(s) with total size of " << msg.length() << " bytes: " << endl; + for (const auto& cmd : cmds) { + // cout << " > " << cmd->GetType() << endl; + switch (cmd->GetType()) { + case Type::state_change: { + cout << "Received state_change from " << static_cast(*cmd).GetDeviceId() << ": " << static_cast(*cmd).GetLastState() << "->" << static_cast(*cmd).GetCurrentState() << endl; + topo.Update(sender.rank, static_cast(*cmd).GetCurrentState()); + if (static_cast(*cmd).GetCurrentState() == fair::mq::State::Exiting) { + commands.Send(Cmds(make()).Serialize(Format::JSON), {sender}); + } + } + break; + case Type::state_change_subscription: + if (static_cast(*cmd).GetResult() != Result::Ok) { + cout << "State change subscription failed for " << static_cast(*cmd).GetDeviceId() << endl; + } + break; + case Type::state_change_unsubscription: + if (static_cast(*cmd).GetResult() != Result::Ok) { + cout << "State change unsubscription failed for " << static_cast(*cmd).GetDeviceId() << endl; + } + break; + case Type::transition_status: { + if (static_cast(*cmd).GetResult() == Result::Ok) { + cout << "Device " << static_cast(*cmd).GetDeviceId() << " started to transition with " << static_cast(*cmd).GetTransition() << endl; + } else { + cout << "Device " << static_cast(*cmd).GetDeviceId() << " cannot transition with " << static_cast(*cmd).GetTransition() << endl; + } + } + break; + case Type::current_state: + cout << "Device " << static_cast(*cmd).GetDeviceId() << " is in " << static_cast(*cmd).GetCurrentState() << " state" << endl; + break; + case Type::config: + cout << "Received config for device " << static_cast(*cmd).GetDeviceId() << ":\n" << static_cast(*cmd).GetConfig() << endl; + break; + default: + cout << "Unexpected/unknown command received: " << cmd->GetType() << endl; + cout << "Origin: " << sender << endl; + break; + } + } + }); + + pmix::fence({all}); + LOG(warn) << "pmix::fence() [subscribed] OK"; + + StateSubscription stateSubscription(commands); + + for (auto transition : { fair::mq::Transition::InitDevice, + fair::mq::Transition::CompleteInit, + fair::mq::Transition::Bind, + fair::mq::Transition::Connect, + fair::mq::Transition::InitTask, + fair::mq::Transition::Run, + fair::mq::Transition::Stop, + fair::mq::Transition::ResetTask, + fair::mq::Transition::ResetDevice, + fair::mq::Transition::End }) { + commands.Send(Cmds(make(transition)).Serialize(Format::JSON)); + topo.WaitFor(expected.at(transition)); + } + } catch (exception& e) { + LOG(error) << "Error: " << e.what(); + return EXIT_FAILURE; + } + LOG(warn) << "exiting"; + return EXIT_SUCCESS; +}