add --print-channels to print registered channels of the device

This commit is contained in:
Alexey Rybalchenko 2017-06-20 15:58:43 +02:00 committed by Mohammad Al-Turany
parent 1d38a2350f
commit 4bc54ad32b
8 changed files with 68 additions and 10 deletions

View File

@ -74,7 +74,6 @@ FairMQDevice::FairMQDevice()
, fMultitransportProceed(false)
, fExternalConfig(false)
{
LOG(DEBUG) << "PID: " << getpid();
}
void FairMQDevice::CatchSignals()

View File

@ -379,6 +379,33 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable
const FairMQChannel& GetChannel(const std::string& channelName, const int index = 0) const;
virtual void RegisterChannelEndpoints() {}
bool RegisterChannelEndpoint(const std::string& channelName, uint16_t minNumSubChannels = 1, uint16_t maxNumSubChannels = 1)
{
bool ok = fChannelRegistry.insert(std::make_pair(channelName, std::make_pair(minNumSubChannels, maxNumSubChannels))).second;
if (!ok)
{
LOG(WARN) << "Registering channel: name already registered: \"" << channelName << "\"";
}
return ok;
}
void PrintRegisteredChannels()
{
if (fChannelRegistry.size() < 1)
{
std::cout << "no channels registered." << std::endl;
}
else
{
for (const auto& c : fChannelRegistry)
{
std::cout << c.first << ":" << c.second.first << ":" << c.second.second << std::endl;
}
}
}
protected:
std::shared_ptr<FairMQTransportFactory> fTransportFactory; ///< Transport factory
std::unordered_map<FairMQ::Transport, std::shared_ptr<FairMQTransportFactory>> fTransports; ///< Container for transports
@ -485,6 +512,7 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable
std::unordered_map<std::string, InputMsgCallback> fMsgInputs;
std::unordered_map<std::string, InputMultipartCallback> fMultipartInputs;
std::unordered_map<FairMQ::Transport, std::vector<std::string>> fMultitransportInputs;
std::unordered_map<std::string, std::pair<uint16_t, uint16_t>> fChannelRegistry;
std::vector<std::string> fInputChannelKeys;
std::mutex fMultitransportMutex;
std::atomic<bool> fMultitransportProceed;

View File

@ -21,11 +21,17 @@ using namespace std;
FairMQMerger::FairMQMerger()
: fMultipart(1)
, fInChannelName()
, fOutChannelName()
, fInChannelName("data-in")
, fOutChannelName("data-out")
{
}
void FairMQMerger::RegisterChannelEndpoints()
{
RegisterChannelEndpoint(fInChannelName, 1, 10000);
RegisterChannelEndpoint(fOutChannelName, 1, 1);
}
FairMQMerger::~FairMQMerger()
{
}

View File

@ -30,8 +30,9 @@ class FairMQMerger : public FairMQDevice
std::string fInChannelName;
std::string fOutChannelName;
virtual void Run();
virtual void InitTask();
virtual void RegisterChannelEndpoints() override;
virtual void Run() override;
virtual void InitTask() override;
};
#endif /* FAIRMQMERGER_H_ */

View File

@ -74,6 +74,13 @@ void FairMQProgOptions::ParseAll(const int argc, char** argv, bool allowUnregist
exit(EXIT_SUCCESS);
}
if (fVarMap.count("print-channels"))
{
// if this option is provided, do no further checks and let the device print the channels
DefaultConsoleSetFilter(fSeverityMap.at("NOLOG"));
return;
}
if (fVarMap.count("id") == 0)
{
LOG(ERROR) << "Device id not provided, provide with --id";
@ -318,6 +325,7 @@ void FairMQProgOptions::InitOptionDescription()
("port-range-min", po::value<int >()->default_value(22000), "Start of the port range for dynamic initialization.")
("port-range-max", po::value<int >()->default_value(32000), "End of the port range for dynamic initialization.")
("log-to-file", po::value<string>()->default_value(""), "Log output to a file.")
("print-channels", po::value<bool >()->implicit_value(true), "Print registered channel endpoints in a machine-readable format (<channel name>:<min num subchannels>:<max num subchannels>)")
("shm-segment-size", po::value<size_t>()->default_value(2000000000), "shmem transport: size of the shared memory segment (in bytes).")
("shm-segment-name", po::value<string>()->default_value("fairmq_shmem_main"), "shmem transport: name of the shared memory segment.")
;
@ -335,6 +343,7 @@ void FairMQProgOptions::InitOptionDescription()
("port-range-min", po::value<int >()->default_value(22000), "Start of the port range for dynamic initialization.")
("port-range-max", po::value<int >()->default_value(32000), "End of the port range for dynamic initialization.")
("log-to-file", po::value<string>()->default_value(""), "Log output to a file.")
("print-channels", po::value<bool >()->implicit_value(true), "Print registered channel endpoints in a machine-readable format (<channel name>:<min num subchannels>:<max num subchannels>)")
("shm-segment-size", po::value<size_t>()->default_value(2000000000), "shmem transport: size of the shared memory segment (in bytes).")
("shm-segment-name", po::value<string>()->default_value("fairmq_shmem_main"), "shmem transport: name of the shared memory segment.")
;
@ -354,6 +363,7 @@ void FairMQProgOptions::InitOptionDescription()
("port-range-min", po::value<int >()->default_value(22000), "Start of the port range for dynamic initialization.")
("port-range-max", po::value<int >()->default_value(32000), "End of the port range for dynamic initialization.")
("log-to-file", po::value<string>()->default_value(""), "Log output to a file.")
("print-channels", po::value<bool >()->implicit_value(true), "Print registered channel endpoints in a machine-readable format (<channel name>:<min num subchannels>:<max num subchannels>)")
("shm-segment-size", po::value<size_t>()->default_value(2000000000), "shmem transport: size of the shared memory segment (in bytes).")
("shm-segment-name", po::value<string>()->default_value("fairmq_shmem_main"), "shmem transport: name of the shared memory segment.")
;
@ -365,7 +375,7 @@ void FairMQProgOptions::InitOptionDescription()
("config-json-string", po::value<vector<string>>()->multitoken(), "JSON input as command line string.")
// ("config-json-file", po::value<string>(), "JSON input as file.")
("mq-config", po::value<string>(), "JSON/XML input as file. The configuration object will check xml or json file extention and will call the json or xml parser accordingly")
(FairMQParser::SUBOPT::OptionKeyChannelConfig, po::value<std::vector<std::string> >()->multitoken()->composing(), "Configuration of single or multiple channel(s) by comma separated key=value list")
(FairMQParser::SUBOPT::OptionKeyChannelConfig, po::value<std::vector<std::string>>()->multitoken()->composing(), "Configuration of single or multiple channel(s) by comma separated key=value list")
;
AddToCmdLineOptions(fGenericDesc);

View File

@ -39,7 +39,7 @@ FairProgOptions::FairProgOptions() :
("version,v", "print version")
("verbosity", po::value<std::string>(&fVerbosityLevel)->default_value("DEBUG"), "Verbosity level : TRACE, DEBUG, RESULTS, INFO, WARN, ERROR, STATE, NOLOG")
("log-color", po::value<bool>()->default_value(true), "logger color: true or false")
("print-options", po::value<bool>()->implicit_value(true), "print options in machine-readable format");
("print-options", po::value<bool>()->implicit_value(true), "print options in machine-readable format (<option>:<computed-value>:<type>:<description>)");
fSeverityMap["TRACE"] = FairMQ::severity_level::TRACE;
fSeverityMap["DEBUG"] = FairMQ::severity_level::DEBUG;

View File

@ -205,13 +205,17 @@ struct ConvertVariableValue : T
if (is_this_type<std::size_t>(varValue))
return T::template Value<std::size_t>(varValue, std::string("<std::size_t>"), defaultedValue, emptyValue);
// std::vector size_t
if (is_this_type<std::vector<std::size_t>>(varValue))
return T::template Value<std::vector<std::size_t>>(varValue, std::string("<vector<std::size_t>>"), defaultedValue, emptyValue);
// uint64_t
if (is_this_type<std::uint64_t>(varValue))
return T::template Value<std::uint64_t>(varValue, std::string("<std::uint64_t>"), defaultedValue, emptyValue);
// std::vector size_t
if (is_this_type<std::vector<std::size_t>>(varValue))
return T::template Value<std::vector<std::size_t>>(varValue, std::string("<vector<std::size_t>>"), defaultedValue, emptyValue);
// std::vector uint64_t
if (is_this_type<std::vector<std::uint64_t>>(varValue))
return T::template Value<std::vector<std::uint64_t>>(varValue, std::string("<vector<std::uint64_t>>"), defaultedValue, emptyValue);
// bool
if (is_this_type<bool>(varValue))

View File

@ -24,6 +24,14 @@
template<typename TMQDevice>
inline int runStateMachine(TMQDevice& device, FairMQProgOptions& cfg)
{
device.RegisterChannelEndpoints();
if (cfg.GetValue<bool>("print-channels"))
{
device.PrintRegisteredChannels();
device.ChangeState(TMQDevice::END);
return 0;
}
if (cfg.GetValue<int>("catch-signals") > 0)
{
device.CatchSignals();
@ -33,6 +41,8 @@ inline int runStateMachine(TMQDevice& device, FairMQProgOptions& cfg)
LOG(WARN) << "Signal handling (e.g. ctrl+C) has been deactivated via command line argument";
}
LOG(DEBUG) << "PID: " << getpid();
device.SetConfig(cfg);
std::string config = cfg.GetValue<std::string>("config");
std::string control = cfg.GetValue<std::string>("control");