Refactor FairMQProgOptions

This commit is contained in:
Alexey Rybalchenko
2018-05-24 15:38:22 +02:00
committed by Mohammad Al-Turany
parent ca694e4054
commit 8b88e67360
33 changed files with 489 additions and 1758 deletions

View File

@@ -12,25 +12,70 @@
* Created on March 11, 2015, 10:20 PM
*/
#include "FairMQLogger.h"
#include "FairMQProgOptions.h"
#include "FairProgOptionsHelper.h"
#include "FairMQParser.h"
#include "FairMQSuboptParser.h"
#include "FairMQLogger.h"
#include <boost/algorithm/string.hpp> // join/split
#include <algorithm>
#include <iomanip>
#include <iostream>
#include <exception>
using namespace std;
using namespace fair::mq;
namespace po = boost::program_options;
FairMQProgOptions::FairMQProgOptions()
: FairProgOptions()
, fMQCmdOptions("FairMQ device options")
, fMQParserOptions("FairMQ config parser options")
, fFairMQMap()
: fVarMap()
, fFairMQChannelMap()
, fAllOptions("FairMQ Command Line Options")
, fGeneralOptions("General options")
, fMQOptions("FairMQ device options")
, fParserOptions("FairMQ channel config parser options")
, fConfigMutex()
, fChannelInfo()
, fMQKeyMap()
, fChannelKeyMap()
, fUnregisteredOptions()
, fEvents()
{
InitOptionDescription();
fGeneralOptions.add_options()
("help,h", "Print help")
("version,v", "Print version")
("severity", po::value<string>()->default_value("debug"), "Log severity level: trace, debug, info, state, warn, error, fatal, nolog")
("verbosity", po::value<string>()->default_value("medium"), "Log verbosity level: veryhigh, high, medium, low")
("color", po::value<bool >()->default_value(true), "Log color (true/false)")
("log-to-file", po::value<string>()->default_value(""), "Log output to a file.")
("print-options", po::value<bool >()->implicit_value(true), "Print options in machine-readable format (<option>:<computed-value>:<type>:<description>)");
fMQOptions.add_options()
("id", po::value<string>(), "Device ID (required argument).")
("io-threads", po::value<int >()->default_value(1), "Number of I/O threads.")
("transport", po::value<string>()->default_value("zeromq"), "Transport ('zeromq'/'nanomsg'/'shmem') .")
("network-interface", po::value<string>()->default_value("default"), "Network interface to bind on (e.g. eth0, ib0..., default will try to detect the interface of the default route).")
("config-key", po::value<string>(), "Use provided value instead of device id for fetching the configuration from the config file.")
("initialization-timeout", po::value<int >()->default_value(120), "Timeout for the initialization in seconds (when expecting dynamic initialization).")
("port-range-min", po::value<int >()->default_value(22000), "Start of the port range for dynamic initialization.")
("port-range-max", po::value<int >()->default_value(32000), "End of the port range for dynamic initialization.")
("print-channels", po::value<bool >()->implicit_value(true), "Print registered channel endpoints in a machine-readable format (<channel name>:<min num subchannels>:<max num subchannels>)")
("shm-segment-size", po::value<size_t>()->default_value(2000000000), "Shared memory: size of the shared memory segment (in bytes).")
("shm-monitor", po::value<bool >()->default_value(true), "Shared memory: run monitor daemon.")
("rate", po::value<float >()->default_value(0.), "Rate for conditional run loop (Hz).")
("session", po::value<string>()->default_value("default"), "Session name.");
fParserOptions.add_options()
("mq-config", po::value<string>(), "JSON input as file.")
("channel-config", po::value<vector<string>>()->multitoken()->composing(), "Configuration of single or multiple channel(s) by comma separated key=value list");
fAllOptions.add(fGeneralOptions);
fAllOptions.add(fMQOptions);
fAllOptions.add(fParserOptions);
ParseDefaults();
}
@@ -52,12 +97,20 @@ int FairMQProgOptions::ParseAll(const vector<string>& cmdLineArgs, bool allowUnr
int FairMQProgOptions::ParseAll(const int argc, char const* const* argv, bool allowUnregistered)
{
if (FairProgOptions::ParseCmdLine(argc, argv, allowUnregistered))
ParseCmdLine(argc, argv, allowUnregistered);
// if this option is provided, handle them and return stop value
if (fVarMap.count("help"))
{
// ParseCmdLine returns 0 if no immediate switches found.
cout << fAllOptions << endl;
return 1;
}
// if this option is provided, handle them and return stop value
if (fVarMap.count("print-options"))
{
PrintOptionsRaw();
return 1;
}
// if these options are provided, do no further checks and let the device handle them
if (fVarMap.count("print-channels") || fVarMap.count("version"))
{
@@ -90,7 +143,7 @@ int FairMQProgOptions::ParseAll(const int argc, char const* const* argv, bool al
{
id = fVarMap["config-key"].as<string>();
}
else
else if (fVarMap.count("id"))
{
id = fVarMap["id"].as<string>();
}
@@ -101,54 +154,84 @@ int FairMQProgOptions::ParseAll(const int argc, char const* const* argv, bool al
if (fVarMap.count("mq-config"))
{
LOG(debug) << "mq-config: Using default JSON parser";
Store(fair::mq::parser::JSON().UserParser(fVarMap["mq-config"].as<string>(), id));
}
else if (fVarMap.count("config-json-string"))
{
LOG(debug) << "config-json-string: Parsing JSON string";
string value = fair::mq::ConvertVariableValue<fair::mq::VarInfoToString>()(fVarMap.at("config-json-string"));
stringstream ss;
ss << value;
Store(fair::mq::parser::JSON().UserParser(ss, id));
UpdateChannelMap(parser::JSON().UserParser(fVarMap.at("mq-config").as<string>(), id));
}
else if (fVarMap.count("channel-config"))
{
LOG(debug) << "channel-config: Parsing channel configuration";
Store(fair::mq::parser::SUBOPT().UserParser(fVarMap.at("channel-config").as<vector<string>>(), id));
UpdateChannelMap(parser::SUBOPT().UserParser(fVarMap.at("channel-config").as<vector<string>>(), id));
}
else
{
LOG(warn) << "FairMQProgOptions: no channels configuration provided via neither of:";
for (const auto& p : fMQParserOptions.options())
for (const auto& p : fParserOptions.options())
{
LOG(warn) << "--" << p->canonical_display_name();
}
LOG(warn) << "No channels will be created (You can create them manually).";
}
}
catch (std::exception& e)
catch (exception& e)
{
LOG(error) << e.what();
return 1;
}
FairProgOptions::PrintOptions();
PrintOptions();
return 0;
}
int FairMQProgOptions::Store(const FairMQMap& channels)
void FairMQProgOptions::ParseCmdLine(const int argc, char const* const* argv, bool allowUnregistered)
{
fFairMQMap = channels;
UpdateChannelInfo();
UpdateMQValues();
return 0;
// get options from cmd line and store in variable map
// here we use command_line_parser instead of parse_command_line, to allow unregistered and positional options
if (allowUnregistered)
{
po::command_line_parser parser{argc, argv};
parser.options(fAllOptions).allow_unregistered();
po::parsed_options parsed = parser.run();
fUnregisteredOptions = po::collect_unrecognized(parsed.options, po::include_positional);
po::store(parsed, fVarMap);
}
else
{
po::store(po::parse_command_line(argc, argv, fAllOptions), fVarMap);
}
po::notify(fVarMap);
}
void FairMQProgOptions::ParseDefaults()
{
vector<string> emptyArgs;
emptyArgs.push_back("dummy");
vector<const char*> argv(emptyArgs.size());
transform(emptyArgs.begin(), emptyArgs.end(), argv.begin(), [](const string& str)
{
return str.c_str();
});
po::store(po::parse_command_line(argv.size(), const_cast<char**>(argv.data()), fAllOptions), fVarMap);
}
unordered_map<string, vector<FairMQChannel>> FairMQProgOptions::GetFairMQMap() const
{
return fFairMQChannelMap;
}
unordered_map<string, int> FairMQProgOptions::GetChannelInfo() const
{
return fChannelInfo;
}
// replace FairMQChannelMap, and update variable map accordingly
int FairMQProgOptions::UpdateChannelMap(const FairMQMap& channels)
int FairMQProgOptions::UpdateChannelMap(const unordered_map<string, vector<FairMQChannel>>& channels)
{
fFairMQMap = channels;
fFairMQChannelMap = channels;
UpdateChannelInfo();
UpdateMQValues();
return 0;
@@ -157,7 +240,7 @@ int FairMQProgOptions::UpdateChannelMap(const FairMQMap& channels)
void FairMQProgOptions::UpdateChannelInfo()
{
fChannelInfo.clear();
for (const auto& c : fFairMQMap)
for (const auto& c : fFairMQChannelMap)
{
fChannelInfo.insert(make_pair(c.first, c.second.size()));
}
@@ -167,7 +250,7 @@ void FairMQProgOptions::UpdateChannelInfo()
// create key for variable map as follow : channelName.index.memberName
void FairMQProgOptions::UpdateMQValues()
{
for (const auto& p : fFairMQMap)
for (const auto& p : fFairMQChannelMap)
{
int index = 0;
@@ -183,15 +266,15 @@ void FairMQProgOptions::UpdateMQValues()
string rcvKernelSizeKey = "chans." + p.first + "." + to_string(index) + ".rcvKernelSize";
string rateLoggingKey = "chans." + p.first + "." + to_string(index) + ".rateLogging";
fMQKeyMap[typeKey] = MQKey{p.first, index, "type"};
fMQKeyMap[methodKey] = MQKey{p.first, index, "method"};
fMQKeyMap[addressKey] = MQKey{p.first, index, "address"};
fMQKeyMap[transportKey] = MQKey{p.first, index, "transport"};
fMQKeyMap[sndBufSizeKey] = MQKey{p.first, index, "sndBufSize"};
fMQKeyMap[rcvBufSizeKey] = MQKey{p.first, index, "rcvBufSize"};
fMQKeyMap[sndKernelSizeKey] = MQKey{p.first, index, "sndKernelSize"};
fMQKeyMap[rcvKernelSizeKey] = MQKey{p.first, index, "rcvkernelSize"};
fMQKeyMap[rateLoggingKey] = MQKey{p.first, index, "rateLogging"};
fChannelKeyMap[typeKey] = ChannelKey{p.first, index, "type"};
fChannelKeyMap[methodKey] = ChannelKey{p.first, index, "method"};
fChannelKeyMap[addressKey] = ChannelKey{p.first, index, "address"};
fChannelKeyMap[transportKey] = ChannelKey{p.first, index, "transport"};
fChannelKeyMap[sndBufSizeKey] = ChannelKey{p.first, index, "sndBufSize"};
fChannelKeyMap[rcvBufSizeKey] = ChannelKey{p.first, index, "rcvBufSize"};
fChannelKeyMap[sndKernelSizeKey] = ChannelKey{p.first, index, "sndKernelSize"};
fChannelKeyMap[rcvKernelSizeKey] = ChannelKey{p.first, index, "rcvkernelSize"};
fChannelKeyMap[rateLoggingKey] = ChannelKey{p.first, index, "rateLogging"};
UpdateVarMap<string>(typeKey, channel.GetType());
UpdateVarMap<string>(methodKey, channel.GetMethod());
@@ -209,103 +292,56 @@ void FairMQProgOptions::UpdateMQValues()
}
}
int FairMQProgOptions::ImmediateOptions()
{
if (fVarMap.count("help"))
{
cout << "===== FairMQ Program Options =====" << endl << fAllOptions;
return 1;
}
if (fVarMap.count("print-options"))
{
PrintOptionsRaw();
return 1;
}
return 0;
}
void FairMQProgOptions::InitOptionDescription()
{
fMQCmdOptions.add_options()
("id", po::value<string>(), "Device ID (required argument).")
("io-threads", po::value<int >()->default_value(1), "Number of I/O threads.")
("transport", po::value<string>()->default_value("zeromq"), "Transport ('zeromq'/'nanomsg').")
("config", po::value<string>()->default_value("static"), "Config source ('static'/<config library filename>).")
("network-interface", po::value<string>()->default_value("default"), "Network interface to bind on (e.g. eth0, ib0..., default will try to detect the interface of the default route).")
("config-key", po::value<string>(), "Use provided value instead of device id for fetching the configuration from the config file.")
("initialization-timeout", po::value<int >()->default_value(120), "Timeout for the initialization in seconds (when expecting dynamic initialization).")
("port-range-min", po::value<int >()->default_value(22000), "Start of the port range for dynamic initialization.")
("port-range-max", po::value<int >()->default_value(32000), "End of the port range for dynamic initialization.")
("print-channels", po::value<bool >()->implicit_value(true), "Print registered channel endpoints in a machine-readable format (<channel name>:<min num subchannels>:<max num subchannels>)")
("shm-segment-size", po::value<size_t>()->default_value(2000000000), "Shared memory: size of the shared memory segment (in bytes).")
("shm-monitor", po::value<bool >()->default_value(true), "Shared memory: run monitor daemon.")
("rate", po::value<float >()->default_value(0.), "Rate for conditional run loop (Hz).")
("session", po::value<string>()->default_value("default"), "Session name.")
;
fMQParserOptions.add_options()
("config-json-string", po::value<vector<string>>()->multitoken(), "JSON input as command line string.")
("mq-config", po::value<string>(), "JSON/XML input as file. The configuration object will check xml or json file extention and will call the json or xml parser accordingly")
("channel-config", po::value<vector<string>>()->multitoken()->composing(), "Configuration of single or multiple channel(s) by comma separated key=value list")
;
AddToCmdLineOptions(fMQCmdOptions);
AddToCmdLineOptions(fMQParserOptions);
}
int FairMQProgOptions::UpdateChannelMap(const string& channelName, int index, const string& member, const string& val)
int FairMQProgOptions::UpdateChannelValue(const string& channelName, int index, const string& member, const string& val)
{
if (member == "type")
{
fFairMQMap.at(channelName).at(index).UpdateType(val);
fFairMQChannelMap.at(channelName).at(index).UpdateType(val);
return 0;
}
if (member == "method")
{
fFairMQMap.at(channelName).at(index).UpdateMethod(val);
fFairMQChannelMap.at(channelName).at(index).UpdateMethod(val);
return 0;
}
if (member == "address")
{
fFairMQMap.at(channelName).at(index).UpdateAddress(val);
fFairMQChannelMap.at(channelName).at(index).UpdateAddress(val);
return 0;
}
if (member == "transport")
{
fFairMQMap.at(channelName).at(index).UpdateTransport(val);
fFairMQChannelMap.at(channelName).at(index).UpdateTransport(val);
return 0;
}
else
{
//if we get there it means something is wrong
LOG(error) << "update of FairMQChannel map failed for the following key: "
<< channelName << "." << index << "." << member;
LOG(error) << "update of FairMQChannel map failed for the following key: " << channelName << "." << index << "." << member;
return 1;
}
}
int FairMQProgOptions::UpdateChannelMap(const string& channelName, int index, const string& member, int val)
int FairMQProgOptions::UpdateChannelValue(const string& channelName, int index, const string& member, int val)
{
if (member == "sndBufSize")
{
fFairMQMap.at(channelName).at(index).UpdateSndBufSize(val);
fFairMQChannelMap.at(channelName).at(index).UpdateSndBufSize(val);
return 0;
}
if (member == "rcvBufSize")
{
fFairMQMap.at(channelName).at(index).UpdateRcvBufSize(val);
fFairMQChannelMap.at(channelName).at(index).UpdateRcvBufSize(val);
return 0;
}
if (member == "rateLogging")
{
fFairMQMap.at(channelName).at(index).UpdateRateLogging(val);
fFairMQChannelMap.at(channelName).at(index).UpdateRateLogging(val);
return 0;
}
else
@@ -315,3 +351,135 @@ int FairMQProgOptions::UpdateChannelMap(const string& channelName, int index, co
return 1;
}
}
vector<string> FairMQProgOptions::GetPropertyKeys() const
{
lock_guard<mutex> lock{fConfigMutex};
vector<string> result;
for (const auto& it : fVarMap)
{
result.push_back(it.first.c_str());
}
return result;
}
/// Add option descriptions
int FairMQProgOptions::AddToCmdLineOptions(const po::options_description optDesc, bool /* visible */)
{
fAllOptions.add(optDesc);
return 0;
}
po::options_description& FairMQProgOptions::GetCmdLineOptions()
{
return fAllOptions;
}
int FairMQProgOptions::PrintOptions()
{
// -> loop over variable map and print its content
// -> In this example the following types are supported:
// string, int, float, double, short, boost::filesystem::path
// vector<string>, vector<int>, vector<float>, vector<double>, vector<short>
map<string, VarValInfo> mapinfo;
// get string length for formatting and convert varmap values into string
int maxLenKey = 0;
int maxLenValue = 0;
int maxLenType = 0;
int maxLenDefault = 0;
for (const auto& m : fVarMap)
{
maxLenKey = max(maxLenKey, static_cast<int>(m.first.length()));
VarValInfo valinfo = ConvertVariableValue<options::ToVarValInfo>()((m.second));
mapinfo[m.first] = valinfo;
maxLenValue = max(maxLenValue, static_cast<int>(valinfo.value.length()));
maxLenType = max(maxLenType, static_cast<int>(valinfo.type.length()));
maxLenDefault = max(maxLenDefault, static_cast<int>(valinfo.defaulted.length()));
}
// TODO : limit the value len field in a better way
if (maxLenValue > 100)
{
maxLenValue = 100;
}
for (const auto& o : fUnregisteredOptions)
{
LOG(WARN) << "detected unregistered option: " << o;
}
stringstream ss;
ss << "Configuration: \n";
for (const auto& p : mapinfo)
{
ss << setfill(' ') << left
<< setw(maxLenKey) << p.first << " = "
<< setw(maxLenValue) << p.second.value
<< setw(maxLenType) << p.second.type
<< setw(maxLenDefault) << p.second.defaulted
<< "\n";
}
LOG(info) << ss.str();
return 0;
}
int FairMQProgOptions::PrintOptionsRaw()
{
const vector<boost::shared_ptr<po::option_description>>& options = fAllOptions.options();
for (const auto& o : options)
{
VarValInfo value;
if (fVarMap.count(o->canonical_display_name()))
{
value = ConvertVariableValue<options::ToVarValInfo>()((fVarMap[o->canonical_display_name()]));
}
string description = o->description();
replace(description.begin(), description.end(), '\n', ' ');
cout << o->long_name() << ":" << value.value << ":" << (value.type == "" ? "<unknown>" : value.type) << ":" << description << endl;
}
return 0;
}
string FairMQProgOptions::GetStringValue(const string& key)
{
unique_lock<mutex> lock(fConfigMutex);
string valueStr;
try
{
if (fVarMap.count(key))
{
valueStr = ConvertVariableValue<options::ToString>()(fVarMap.at(key));
}
}
catch (exception& e)
{
LOG(error) << "Exception thrown for the key '" << key << "'";
LOG(error) << e.what();
}
return valueStr;
}
int FairMQProgOptions::Count(const string& key) const
{
unique_lock<mutex> lock(fConfigMutex);
return fVarMap.count(key);
}