From 9b2b1cf9f111b8ea8c3560fb44e5da920900e957 Mon Sep 17 00:00:00 2001 From: Matthias Richter Date: Tue, 14 Nov 2017 18:52:20 +0100 Subject: [PATCH] Adding rate control for ConditionalRun function Devices implementing the conditional run method are typically source devices and a rate control can be desireable. New option '--rate' with a float number argument in Hz can be used to configure rate control. By default it is switched off. --- fairmq/FairMQDevice.cxx | 17 +++++++ fairmq/FairMQDevice.h | 2 + fairmq/options/FairMQProgOptions.cxx | 73 ++++++++++------------------ fairmq/options/FairMQProgOptions.h | 3 ++ 4 files changed, 47 insertions(+), 48 deletions(-) diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index 0ff95402..98a50d64 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -14,6 +14,7 @@ #include #include #include +#include #include // join/split @@ -61,6 +62,9 @@ FairMQDevice::FairMQDevice() , fMultitransportProceed(false) , fExternalConfig(false) , fVersion({0, 0, 0}) + , fRate(0.) + , fLastTime(0) + { } @@ -511,6 +515,18 @@ void FairMQDevice::RunWrapper() { while (CheckCurrentState(RUNNING) && ConditionalRun()) { + using TimeScale = std::chrono::microseconds; + static const auto reftime = std::chrono::system_clock::now(); + if (fRate > 0.001) { + auto timeSinceRef = std::chrono::duration_cast(std::chrono::system_clock::now() - reftime); + auto timespan = timeSinceRef.count() - fLastTime; + TimeScale::rep period = (float)TimeScale::period::den / fRate; + if (timespan < period) { + TimeScale sleepfor(period - timespan); + std::this_thread::sleep_for(sleepfor); + } + fLastTime = std::chrono::duration_cast(std::chrono::system_clock::now() - reftime).count(); + } } Run(); @@ -893,6 +909,7 @@ void FairMQDevice::SetConfig(FairMQProgOptions& config) fNetworkInterface = config.GetValue("network-interface"); fNumIoThreads = config.GetValue("io-threads"); fInitializationTimeoutInS = config.GetValue("initialization-timeout"); + std::stringstream(fConfig->GetValue("rate")) >> fRate; } void FairMQDevice::LogSocketRates() diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index 1b5c0944..b448f39d 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -535,6 +535,8 @@ class FairMQDevice : public FairMQStateMachine bool fExternalConfig; const fair::mq::tools::Version fVersion; + float fRate; + size_t fLastTime; }; #endif /* FAIRMQDEVICE_H_ */ diff --git a/fairmq/options/FairMQProgOptions.cxx b/fairmq/options/FairMQProgOptions.cxx index 9272aa51..43a51f7a 100644 --- a/fairmq/options/FairMQProgOptions.cxx +++ b/fairmq/options/FairMQProgOptions.cxx @@ -319,63 +319,40 @@ int FairMQProgOptions::NotifySwitchOption() return 0; } +void FairMQProgOptions::FillOptionDescription(boost::program_options::options_description& options) +{ + options.add_options() + ("id", po::value(), "Device ID (required argument).") + ("io-threads", po::value()->default_value(1), "Number of I/O threads.") + ("transport", po::value()->default_value("zeromq"), "Transport ('zeromq'/'nanomsg').") + ("config", po::value()->default_value("static"), "Config source ('static'/).") + ("network-interface", po::value()->default_value("default"), "Network interface to bind on (e.g. eth0, ib0..., default will try to detect the interface of the default route).") + ("config-key", po::value(), "Use provided value instead of device id for fetching the configuration from the config file.") + ("catch-signals", po::value()->default_value(1), "Enable signal handling (1/0).") + ("initialization-timeout", po::value()->default_value(120), "Timeout for the initialization in seconds (when expecting dynamic initialization).") + ("port-range-min", po::value()->default_value(22000), "Start of the port range for dynamic initialization.") + ("port-range-max", po::value()->default_value(32000), "End of the port range for dynamic initialization.") + ("log-to-file", po::value()->default_value(""), "Log output to a file.") + ("print-channels", po::value()->implicit_value(true), "Print registered channel endpoints in a machine-readable format (::)") + ("shm-segment-size", po::value()->default_value(2000000000), "shmem transport: size of the shared memory segment (in bytes).") + ("shm-segment-name", po::value()->default_value("fairmq_shmem_main"), "shmem transport: name of the shared memory segment.") + ("rate", po::value()->default_value(""), "rate for conditional run loop (Hz)") + ; + +} + void FairMQProgOptions::InitOptionDescription() { // Id required in command line if config txt file not enabled if (fUseConfigFile) { - fMQOptionsInCmd.add_options() - ("id", po::value(), "Device ID (required argument).") - ("io-threads", po::value()->default_value(1), "Number of I/O threads.") - ("transport", po::value()->default_value("zeromq"), "Transport ('zeromq'/'nanomsg').") - ("config", po::value()->default_value("static"), "Config source ('static'/).") - ("network-interface", po::value()->default_value("default"), "Network interface to bind on (e.g. eth0, ib0..., default will try to detect the interface of the default route).") - ("config-key", po::value(), "Use provided value instead of device id for fetching the configuration from the config file.") - ("catch-signals", po::value()->default_value(1), "Enable signal handling (1/0).") - ("initialization-timeout", po::value()->default_value(120), "Timeout for the initialization in seconds (when expecting dynamic initialization).") - ("port-range-min", po::value()->default_value(22000), "Start of the port range for dynamic initialization.") - ("port-range-max", po::value()->default_value(32000), "End of the port range for dynamic initialization.") - ("log-to-file", po::value()->default_value(""), "Log output to a file.") - ("print-channels", po::value()->implicit_value(true), "Print registered channel endpoints in a machine-readable format (::)") - ("shm-segment-size", po::value()->default_value(2000000000), "shmem transport: size of the shared memory segment (in bytes).") - ("shm-segment-name", po::value()->default_value("fairmq_shmem_main"), "shmem transport: name of the shared memory segment.") - ; + FillOptionDescription(fMQOptionsInCmd); - fMQOptionsInCfg.add_options() - ("id", po::value(), "Device ID (required argument).") - ("io-threads", po::value()->default_value(1), "Number of I/O threads.") - ("transport", po::value()->default_value("zeromq"), "Transport ('zeromq'/'nanomsg').") - ("config", po::value()->default_value("static"), "Config source ('static'/).") - ("network-interface", po::value()->default_value("default"), "Network interface to bind on (e.g. eth0, ib0..., default will try to detect the interface of the default route).") - ("config-key", po::value(), "Use provided value instead of device id for fetching the configuration from the config file.") - ("catch-signals", po::value()->default_value(1), "Enable signal handling (1/0).") - ("initialization-timeout", po::value()->default_value(120), "Timeout for the initialization in seconds (when expecting dynamic initialization).") - ("port-range-min", po::value()->default_value(22000), "Start of the port range for dynamic initialization.") - ("port-range-max", po::value()->default_value(32000), "End of the port range for dynamic initialization.") - ("log-to-file", po::value()->default_value(""), "Log output to a file.") - ("print-channels", po::value()->implicit_value(true), "Print registered channel endpoints in a machine-readable format (::)") - ("shm-segment-size", po::value()->default_value(2000000000), "shmem transport: size of the shared memory segment (in bytes).") - ("shm-segment-name", po::value()->default_value("fairmq_shmem_main"), "shmem transport: name of the shared memory segment.") - ; + FillOptionDescription(fMQOptionsInCfg); } else { - fMQOptionsInCmd.add_options() - ("id", po::value(), "Device ID (required argument).") - ("io-threads", po::value()->default_value(1), "Number of I/O threads.") - ("transport", po::value()->default_value("zeromq"), "Transport ('zeromq'/'nanomsg').") - ("config", po::value()->default_value("static"), "Config source ('static'/).") - ("network-interface", po::value()->default_value("default"), "Network interface to bind on (e.g. eth0, ib0..., default will try to detect the interface of the default route).") - ("config-key", po::value(), "Use provided value instead of device id for fetching the configuration from the config file.") - ("catch-signals", po::value()->default_value(1), "Enable signal handling (1/0).") - ("initialization-timeout", po::value()->default_value(120), "Timeout for the initialization in seconds (when expecting dynamic initialization).") - ("port-range-min", po::value()->default_value(22000), "Start of the port range for dynamic initialization.") - ("port-range-max", po::value()->default_value(32000), "End of the port range for dynamic initialization.") - ("log-to-file", po::value()->default_value(""), "Log output to a file.") - ("print-channels", po::value()->implicit_value(true), "Print registered channel endpoints in a machine-readable format (::)") - ("shm-segment-size", po::value()->default_value(2000000000), "shmem transport: size of the shared memory segment (in bytes).") - ("shm-segment-name", po::value()->default_value("fairmq_shmem_main"), "shmem transport: name of the shared memory segment.") - ; + FillOptionDescription(fMQOptionsInCmd); } fMQParserOptions.add_options() diff --git a/fairmq/options/FairMQProgOptions.h b/fairmq/options/FairMQProgOptions.h index 90d985cf..923b9715 100644 --- a/fairmq/options/FairMQProgOptions.h +++ b/fairmq/options/FairMQProgOptions.h @@ -299,6 +299,9 @@ class FairMQProgOptions : public FairProgOptions virtual int NotifySwitchOption(); // for custom help & version printing void InitOptionDescription(); + // fill boost option description with the standard options + static void FillOptionDescription(po::options_description& options); + // read FairMQChannelMap and insert/update corresponding values in variable map // create key for variable map as follow : channelName.index.memberName void UpdateMQValues();