/******************************************************************************** * Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ #ifndef PMIXCOMMANDS_H #define PMIXCOMMANDS_H #include "PMIx.hpp" #include #include #include // make_unique #include namespace pmix { std::array typeNames = { { "PMIX_UNDEF", "PMIX_BOOL", "PMIX_BYTE", "PMIX_STRING", "PMIX_SIZE", "PMIX_PID", "PMIX_INT", "PMIX_INT8", "PMIX_INT16", "PMIX_INT32", "PMIX_INT64", "PMIX_UINT", "PMIX_UINT8", "PMIX_UINT16", "PMIX_UINT32", "PMIX_UINT64", "PMIX_FLOAT", "PMIX_DOUBLE", "PMIX_TIMEVAL", "PMIX_TIME", "PMIX_STATUS", "PMIX_VALUE", "PMIX_PROC", "PMIX_APP", "PMIX_INFO", "PMIX_PDATA", "PMIX_BUFFER", "PMIX_BYTE_OBJECT", "PMIX_KVAL", "PMIX_MODEX", "PMIX_PERSIST", "PMIX_POINTER", "PMIX_SCOPE", "PMIX_DATA_RANGE", "PMIX_COMMAND", "PMIX_INFO_DIRECTIVES", "PMIX_DATA_TYPE", "PMIX_PROC_STATE", "PMIX_PROC_INFO", "PMIX_DATA_ARRAY", "PMIX_PROC_RANK", "PMIX_QUERY", "PMIX_COMPRESSED_STRING", "PMIX_ALLOC_DIRECTIVE", "PMIX_INFO_ARRAY", "PMIX_IOF_CHANNEL", "PMIX_ENVAR" } }; enum class Command : int { general = PMIX_EXTERNAL_ERR_BASE, error = PMIX_EXTERNAL_ERR_BASE - 1 }; class Commands { public: Commands(const proc& process) : fProcess(process) , fSubscribed(false) { } ~Commands() { Unsubscribe(); } void Subscribe(std::function callback) { using namespace std::placeholders; LOG(debug) << "PMIxCommands: Subscribing..."; fCallback = callback; std::array codes; codes[0] = static_cast(pmix::Command::general); PMIX_INFO_LOAD(&(fInfos[0]), PMIX_EVENT_RETURN_OBJECT, this, PMIX_POINTER); PMIx_Register_event_handler(codes.data(), codes.size(), fInfos.data(), fInfos.size(), &Commands::Handler, &Commands::EventHandlerRegistration, this); fBlocker.Wait(); LOG(debug) << "PMIxCommands: Subscribing complete!"; } void Unsubscribe() { if (fSubscribed) { LOG(debug) << "PMIxCommands: Unsubscribing..."; PMIx_Deregister_event_handler(fHandlerRef, &Commands::EventHandlerDeregistration, this); fBlocker.Wait(); LOG(debug) << "PMIxCommands: Unsubscribing complete!"; } else { LOG(debug) << "Unsubscribe() is called while no subscription is active"; } } struct Holder { Holder() : fData(nullptr) {} ~Holder() { PMIX_DATA_ARRAY_FREE(fData); } std::vector fInfos; pmix_data_array_t* fData; }; void Send(const std::string& msg) { std::vector* infos = new std::vector(); infos->emplace_back("fairmq.cmd", msg); PMIx_Notify_event(static_cast(pmix::Command::general), &fProcess, PMIX_RANGE_NAMESPACE, infos->data(), infos->size(), &Commands::OpCompleteCallback>, infos); } void Send(const std::string& msg, rank rank) { pmix::proc destination(fProcess); destination.rank = rank; Send(msg, {destination}); } void Send(const std::string& msg, const std::vector& destination) { std::unique_ptr holder = std::make_unique(); PMIX_DATA_ARRAY_CREATE(holder->fData, destination.size(), PMIX_PROC); memcpy(holder->fData->array, destination.data(), destination.size() * sizeof(pmix_proc_t)); // LOG(warn) << "OLOG: " << msg << " > " << static_cast(holder->fData->array)[0].nspace << ": " << static_cast(holder->fData->array)[0].rank; holder->fInfos.emplace_back(PMIX_EVENT_CUSTOM_RANGE, holder->fData); // LOG(warn) << msg << " // packed range: " << static_cast(static_cast(holder->fInfos.at(0).value.data.darray)->array)[0].nspace << "_" << static_cast(static_cast(holder->fInfos.at(0).value.data.darray)->array)[0].rank; // LOG(warn) << msg << " // packed range.type: " << pmix::typeNames.at(holder->fInfos.at(0).value.type); // LOG(warn) << msg << " // packed range.array.type: " << pmix::typeNames.at(static_cast(holder->fInfos.at(0).value.data.darray)->type); // LOG(warn) << msg << " // packed range.array.size: " << static_cast(holder->fInfos.at(0).value.data.darray)->size; // LOG(warn) << holder->fInfos.size(); holder->fInfos.emplace_back("fairmq.cmd", msg); // LOG(warn) << msg << " // packed msg: " << holder->fInfos.at(1).value.data.string; // LOG(warn) << msg << " // packed msg.type: " << pmix::typeNames.at(holder->fInfos.at(1).value.type); // LOG(warn) << holder->fInfos.size(); PMIx_Notify_event(static_cast(pmix::Command::general), &fProcess, PMIX_RANGE_CUSTOM, holder->fInfos.data(), holder->fInfos.size(), &Commands::OpCompleteCallback, holder.get()); holder.release(); } private: static void EventHandlerRegistration(pmix_status_t s, size_t handlerRef, void* obj) { if (s == PMIX_SUCCESS) { LOG(debug) << "Successfully registered event handler, reference = " << static_cast(handlerRef); static_cast(obj)->fHandlerRef = handlerRef; static_cast(obj)->fSubscribed = true; } else { LOG(error) << "Could not register PMIx event handler, status = " << s; } static_cast(obj)->fBlocker.Signal(); } static void EventHandlerDeregistration(pmix_status_t s, void* obj) { if (s == PMIX_SUCCESS) { LOG(debug) << "Successfully deregistered event handler, reference = " << static_cast(obj)->fHandlerRef; static_cast(obj)->fSubscribed = false; } else { LOG(error) << "Could not deregister PMIx event handler, reference = " << static_cast(obj)->fHandlerRef << ", status = " << s; } static_cast(obj)->fBlocker.Signal(); } template static void OpCompleteCallback(pmix_status_t s, void* data) { if (s == PMIX_SUCCESS) { // LOG(info) << "Operation completed successfully"; } else { LOG(error) << "Could not complete operation, status = " << s; } if (data) { // LOG(warn) << "Destroying event data..."; delete static_cast(data); } } static void Handler(size_t handlerId, pmix_status_t s, const pmix_proc_t* src, pmix_info_t info[], size_t ninfo, pmix_info_t[] /* results */, size_t nresults, pmix_event_notification_cbfunc_fn_t cbfunc, void* cbdata) { std::stringstream ss; ss << "Event handler called with " << "status: " << s << ", " << "source: " << src->nspace << "_" << src->rank << ", " << "ninfo: " << ninfo << ", " << "nresults: " << nresults << ", " << "handlerId: " << handlerId; std::string msg; Commands* obj = nullptr; if (ninfo > 0) { ss << ":\n"; for (size_t i = 0; i < ninfo; ++i) { ss << " [" << i << "]: key: '" << info[i].key << "', value: '" << pmix::get_value_str(info[i].value) << "', value.type: '" << pmix::typeNames.at(info[i].value.type) << "', flags: " << info[i].flags; if (std::strcmp(info[i].key, "fairmq.cmd") == 0) { msg = pmix::get_value_str(info[i].value); } if (std::strcmp(info[i].key, PMIX_EVENT_RETURN_OBJECT) == 0) { obj = static_cast(info[i].value.data.ptr); } if (i < ninfo - 1) { ss << "\n"; } } } if (obj != nullptr) { if (static_cast(obj)->fProcess.rank != src->rank) { // LOG(warn) << ss.str(); static_cast(obj)->fCallback(msg, proc(const_cast(src->nspace), rank(src->rank))); } else { // LOG(trace) << "suppressing message from itself"; } } else { LOG(error) << "ERROR"; } if (cbfunc != nullptr) { cbfunc(PMIX_SUCCESS, nullptr, 0, nullptr, nullptr, cbdata); } } const proc& fProcess; size_t fHandlerRef; std::function fCallback; std::array fInfos; bool fSubscribed; fair::mq::tools::SharedSemaphore fBlocker; }; } /* namespace pmix */ #endif /* PMIXCOMMANDS_H */