From 4bc54ad32b5d8079b7e0e4ddfa77624c71639bbf Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Tue, 20 Jun 2017 15:58:43 +0200 Subject: [PATCH] add `--print-channels` to print registered channels of the device --- fairmq/FairMQDevice.cxx | 1 - fairmq/FairMQDevice.h | 28 ++++++++++++++++++++++++++ fairmq/devices/FairMQMerger.cxx | 10 +++++++-- fairmq/devices/FairMQMerger.h | 5 +++-- fairmq/options/FairMQProgOptions.cxx | 12 ++++++++++- fairmq/options/FairProgOptions.cxx | 2 +- fairmq/options/FairProgOptionsHelper.h | 10 ++++++--- fairmq/tools/runSimpleMQStateMachine.h | 10 +++++++++ 8 files changed, 68 insertions(+), 10 deletions(-) diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index 1564ef95..07fe15d5 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -74,7 +74,6 @@ FairMQDevice::FairMQDevice() , fMultitransportProceed(false) , fExternalConfig(false) { - LOG(DEBUG) << "PID: " << getpid(); } void FairMQDevice::CatchSignals() diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index 0a429ec4..11947716 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -379,6 +379,33 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable const FairMQChannel& GetChannel(const std::string& channelName, const int index = 0) const; + virtual void RegisterChannelEndpoints() {} + + bool RegisterChannelEndpoint(const std::string& channelName, uint16_t minNumSubChannels = 1, uint16_t maxNumSubChannels = 1) + { + bool ok = fChannelRegistry.insert(std::make_pair(channelName, std::make_pair(minNumSubChannels, maxNumSubChannels))).second; + if (!ok) + { + LOG(WARN) << "Registering channel: name already registered: \"" << channelName << "\""; + } + return ok; + } + + void PrintRegisteredChannels() + { + if (fChannelRegistry.size() < 1) + { + std::cout << "no channels registered." << std::endl; + } + else + { + for (const auto& c : fChannelRegistry) + { + std::cout << c.first << ":" << c.second.first << ":" << c.second.second << std::endl; + } + } + } + protected: std::shared_ptr fTransportFactory; ///< Transport factory std::unordered_map> fTransports; ///< Container for transports @@ -485,6 +512,7 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable std::unordered_map fMsgInputs; std::unordered_map fMultipartInputs; std::unordered_map> fMultitransportInputs; + std::unordered_map> fChannelRegistry; std::vector fInputChannelKeys; std::mutex fMultitransportMutex; std::atomic fMultitransportProceed; diff --git a/fairmq/devices/FairMQMerger.cxx b/fairmq/devices/FairMQMerger.cxx index fbae376c..779735fd 100644 --- a/fairmq/devices/FairMQMerger.cxx +++ b/fairmq/devices/FairMQMerger.cxx @@ -21,11 +21,17 @@ using namespace std; FairMQMerger::FairMQMerger() : fMultipart(1) - , fInChannelName() - , fOutChannelName() + , fInChannelName("data-in") + , fOutChannelName("data-out") { } +void FairMQMerger::RegisterChannelEndpoints() +{ + RegisterChannelEndpoint(fInChannelName, 1, 10000); + RegisterChannelEndpoint(fOutChannelName, 1, 1); +} + FairMQMerger::~FairMQMerger() { } diff --git a/fairmq/devices/FairMQMerger.h b/fairmq/devices/FairMQMerger.h index 7f2bda2f..6146f62f 100644 --- a/fairmq/devices/FairMQMerger.h +++ b/fairmq/devices/FairMQMerger.h @@ -30,8 +30,9 @@ class FairMQMerger : public FairMQDevice std::string fInChannelName; std::string fOutChannelName; - virtual void Run(); - virtual void InitTask(); + virtual void RegisterChannelEndpoints() override; + virtual void Run() override; + virtual void InitTask() override; }; #endif /* FAIRMQMERGER_H_ */ diff --git a/fairmq/options/FairMQProgOptions.cxx b/fairmq/options/FairMQProgOptions.cxx index b7810f3e..ea723c0f 100644 --- a/fairmq/options/FairMQProgOptions.cxx +++ b/fairmq/options/FairMQProgOptions.cxx @@ -74,6 +74,13 @@ void FairMQProgOptions::ParseAll(const int argc, char** argv, bool allowUnregist exit(EXIT_SUCCESS); } + if (fVarMap.count("print-channels")) + { + // if this option is provided, do no further checks and let the device print the channels + DefaultConsoleSetFilter(fSeverityMap.at("NOLOG")); + return; + } + if (fVarMap.count("id") == 0) { LOG(ERROR) << "Device id not provided, provide with --id"; @@ -318,6 +325,7 @@ void FairMQProgOptions::InitOptionDescription() ("port-range-min", po::value()->default_value(22000), "Start of the port range for dynamic initialization.") ("port-range-max", po::value()->default_value(32000), "End of the port range for dynamic initialization.") ("log-to-file", po::value()->default_value(""), "Log output to a file.") + ("print-channels", po::value()->implicit_value(true), "Print registered channel endpoints in a machine-readable format (::)") ("shm-segment-size", po::value()->default_value(2000000000), "shmem transport: size of the shared memory segment (in bytes).") ("shm-segment-name", po::value()->default_value("fairmq_shmem_main"), "shmem transport: name of the shared memory segment.") ; @@ -335,6 +343,7 @@ void FairMQProgOptions::InitOptionDescription() ("port-range-min", po::value()->default_value(22000), "Start of the port range for dynamic initialization.") ("port-range-max", po::value()->default_value(32000), "End of the port range for dynamic initialization.") ("log-to-file", po::value()->default_value(""), "Log output to a file.") + ("print-channels", po::value()->implicit_value(true), "Print registered channel endpoints in a machine-readable format (::)") ("shm-segment-size", po::value()->default_value(2000000000), "shmem transport: size of the shared memory segment (in bytes).") ("shm-segment-name", po::value()->default_value("fairmq_shmem_main"), "shmem transport: name of the shared memory segment.") ; @@ -354,6 +363,7 @@ void FairMQProgOptions::InitOptionDescription() ("port-range-min", po::value()->default_value(22000), "Start of the port range for dynamic initialization.") ("port-range-max", po::value()->default_value(32000), "End of the port range for dynamic initialization.") ("log-to-file", po::value()->default_value(""), "Log output to a file.") + ("print-channels", po::value()->implicit_value(true), "Print registered channel endpoints in a machine-readable format (::)") ("shm-segment-size", po::value()->default_value(2000000000), "shmem transport: size of the shared memory segment (in bytes).") ("shm-segment-name", po::value()->default_value("fairmq_shmem_main"), "shmem transport: name of the shared memory segment.") ; @@ -365,7 +375,7 @@ void FairMQProgOptions::InitOptionDescription() ("config-json-string", po::value>()->multitoken(), "JSON input as command line string.") // ("config-json-file", po::value(), "JSON input as file.") ("mq-config", po::value(), "JSON/XML input as file. The configuration object will check xml or json file extention and will call the json or xml parser accordingly") - (FairMQParser::SUBOPT::OptionKeyChannelConfig, po::value >()->multitoken()->composing(), "Configuration of single or multiple channel(s) by comma separated key=value list") + (FairMQParser::SUBOPT::OptionKeyChannelConfig, po::value>()->multitoken()->composing(), "Configuration of single or multiple channel(s) by comma separated key=value list") ; AddToCmdLineOptions(fGenericDesc); diff --git a/fairmq/options/FairProgOptions.cxx b/fairmq/options/FairProgOptions.cxx index c88f0778..5ecab041 100644 --- a/fairmq/options/FairProgOptions.cxx +++ b/fairmq/options/FairProgOptions.cxx @@ -39,7 +39,7 @@ FairProgOptions::FairProgOptions() : ("version,v", "print version") ("verbosity", po::value(&fVerbosityLevel)->default_value("DEBUG"), "Verbosity level : TRACE, DEBUG, RESULTS, INFO, WARN, ERROR, STATE, NOLOG") ("log-color", po::value()->default_value(true), "logger color: true or false") - ("print-options", po::value()->implicit_value(true), "print options in machine-readable format"); + ("print-options", po::value()->implicit_value(true), "print options in machine-readable format (