Adding rate control for ConditionalRun function

Devices implementing the conditional run method are typically source
devices and a rate control can be desireable. New option '--rate' with
a float number argument in Hz can be used to configure rate control.
By default it is switched off.
This commit is contained in:
Matthias Richter 2017-11-14 18:52:20 +01:00 committed by Mohammad Al-Turany
parent 7429f7a326
commit 9b2b1cf9f1
4 changed files with 47 additions and 48 deletions

View File

@ -14,6 +14,7 @@
#include <mutex> #include <mutex>
#include <thread> #include <thread>
#include <functional> #include <functional>
#include <sstream>
#include <boost/algorithm/string.hpp> // join/split #include <boost/algorithm/string.hpp> // join/split
@ -61,6 +62,9 @@ FairMQDevice::FairMQDevice()
, fMultitransportProceed(false) , fMultitransportProceed(false)
, fExternalConfig(false) , fExternalConfig(false)
, fVersion({0, 0, 0}) , fVersion({0, 0, 0})
, fRate(0.)
, fLastTime(0)
{ {
} }
@ -511,6 +515,18 @@ void FairMQDevice::RunWrapper()
{ {
while (CheckCurrentState(RUNNING) && ConditionalRun()) while (CheckCurrentState(RUNNING) && ConditionalRun())
{ {
using TimeScale = std::chrono::microseconds;
static const auto reftime = std::chrono::system_clock::now();
if (fRate > 0.001) {
auto timeSinceRef = std::chrono::duration_cast<TimeScale>(std::chrono::system_clock::now() - reftime);
auto timespan = timeSinceRef.count() - fLastTime;
TimeScale::rep period = (float)TimeScale::period::den / fRate;
if (timespan < period) {
TimeScale sleepfor(period - timespan);
std::this_thread::sleep_for(sleepfor);
}
fLastTime = std::chrono::duration_cast<TimeScale>(std::chrono::system_clock::now() - reftime).count();
}
} }
Run(); Run();
@ -893,6 +909,7 @@ void FairMQDevice::SetConfig(FairMQProgOptions& config)
fNetworkInterface = config.GetValue<string>("network-interface"); fNetworkInterface = config.GetValue<string>("network-interface");
fNumIoThreads = config.GetValue<int>("io-threads"); fNumIoThreads = config.GetValue<int>("io-threads");
fInitializationTimeoutInS = config.GetValue<int>("initialization-timeout"); fInitializationTimeoutInS = config.GetValue<int>("initialization-timeout");
std::stringstream(fConfig->GetValue<string>("rate")) >> fRate;
} }
void FairMQDevice::LogSocketRates() void FairMQDevice::LogSocketRates()

View File

@ -535,6 +535,8 @@ class FairMQDevice : public FairMQStateMachine
bool fExternalConfig; bool fExternalConfig;
const fair::mq::tools::Version fVersion; const fair::mq::tools::Version fVersion;
float fRate;
size_t fLastTime;
}; };
#endif /* FAIRMQDEVICE_H_ */ #endif /* FAIRMQDEVICE_H_ */

View File

@ -319,63 +319,40 @@ int FairMQProgOptions::NotifySwitchOption()
return 0; return 0;
} }
void FairMQProgOptions::FillOptionDescription(boost::program_options::options_description& options)
{
options.add_options()
("id", po::value<string>(), "Device ID (required argument).")
("io-threads", po::value<int >()->default_value(1), "Number of I/O threads.")
("transport", po::value<string>()->default_value("zeromq"), "Transport ('zeromq'/'nanomsg').")
("config", po::value<string>()->default_value("static"), "Config source ('static'/<config library filename>).")
("network-interface", po::value<string>()->default_value("default"), "Network interface to bind on (e.g. eth0, ib0..., default will try to detect the interface of the default route).")
("config-key", po::value<string>(), "Use provided value instead of device id for fetching the configuration from the config file.")
("catch-signals", po::value<int >()->default_value(1), "Enable signal handling (1/0).")
("initialization-timeout", po::value<int >()->default_value(120), "Timeout for the initialization in seconds (when expecting dynamic initialization).")
("port-range-min", po::value<int >()->default_value(22000), "Start of the port range for dynamic initialization.")
("port-range-max", po::value<int >()->default_value(32000), "End of the port range for dynamic initialization.")
("log-to-file", po::value<string>()->default_value(""), "Log output to a file.")
("print-channels", po::value<bool >()->implicit_value(true), "Print registered channel endpoints in a machine-readable format (<channel name>:<min num subchannels>:<max num subchannels>)")
("shm-segment-size", po::value<size_t>()->default_value(2000000000), "shmem transport: size of the shared memory segment (in bytes).")
("shm-segment-name", po::value<string>()->default_value("fairmq_shmem_main"), "shmem transport: name of the shared memory segment.")
("rate", po::value<string>()->default_value(""), "rate for conditional run loop (Hz)")
;
}
void FairMQProgOptions::InitOptionDescription() void FairMQProgOptions::InitOptionDescription()
{ {
// Id required in command line if config txt file not enabled // Id required in command line if config txt file not enabled
if (fUseConfigFile) if (fUseConfigFile)
{ {
fMQOptionsInCmd.add_options() FillOptionDescription(fMQOptionsInCmd);
("id", po::value<string>(), "Device ID (required argument).")
("io-threads", po::value<int >()->default_value(1), "Number of I/O threads.")
("transport", po::value<string>()->default_value("zeromq"), "Transport ('zeromq'/'nanomsg').")
("config", po::value<string>()->default_value("static"), "Config source ('static'/<config library filename>).")
("network-interface", po::value<string>()->default_value("default"), "Network interface to bind on (e.g. eth0, ib0..., default will try to detect the interface of the default route).")
("config-key", po::value<string>(), "Use provided value instead of device id for fetching the configuration from the config file.")
("catch-signals", po::value<int >()->default_value(1), "Enable signal handling (1/0).")
("initialization-timeout", po::value<int >()->default_value(120), "Timeout for the initialization in seconds (when expecting dynamic initialization).")
("port-range-min", po::value<int >()->default_value(22000), "Start of the port range for dynamic initialization.")
("port-range-max", po::value<int >()->default_value(32000), "End of the port range for dynamic initialization.")
("log-to-file", po::value<string>()->default_value(""), "Log output to a file.")
("print-channels", po::value<bool >()->implicit_value(true), "Print registered channel endpoints in a machine-readable format (<channel name>:<min num subchannels>:<max num subchannels>)")
("shm-segment-size", po::value<size_t>()->default_value(2000000000), "shmem transport: size of the shared memory segment (in bytes).")
("shm-segment-name", po::value<string>()->default_value("fairmq_shmem_main"), "shmem transport: name of the shared memory segment.")
;
fMQOptionsInCfg.add_options() FillOptionDescription(fMQOptionsInCfg);
("id", po::value<string>(), "Device ID (required argument).")
("io-threads", po::value<int >()->default_value(1), "Number of I/O threads.")
("transport", po::value<string>()->default_value("zeromq"), "Transport ('zeromq'/'nanomsg').")
("config", po::value<string>()->default_value("static"), "Config source ('static'/<config library filename>).")
("network-interface", po::value<string>()->default_value("default"), "Network interface to bind on (e.g. eth0, ib0..., default will try to detect the interface of the default route).")
("config-key", po::value<string>(), "Use provided value instead of device id for fetching the configuration from the config file.")
("catch-signals", po::value<int >()->default_value(1), "Enable signal handling (1/0).")
("initialization-timeout", po::value<int >()->default_value(120), "Timeout for the initialization in seconds (when expecting dynamic initialization).")
("port-range-min", po::value<int >()->default_value(22000), "Start of the port range for dynamic initialization.")
("port-range-max", po::value<int >()->default_value(32000), "End of the port range for dynamic initialization.")
("log-to-file", po::value<string>()->default_value(""), "Log output to a file.")
("print-channels", po::value<bool >()->implicit_value(true), "Print registered channel endpoints in a machine-readable format (<channel name>:<min num subchannels>:<max num subchannels>)")
("shm-segment-size", po::value<size_t>()->default_value(2000000000), "shmem transport: size of the shared memory segment (in bytes).")
("shm-segment-name", po::value<string>()->default_value("fairmq_shmem_main"), "shmem transport: name of the shared memory segment.")
;
} }
else else
{ {
fMQOptionsInCmd.add_options() FillOptionDescription(fMQOptionsInCmd);
("id", po::value<string>(), "Device ID (required argument).")
("io-threads", po::value<int >()->default_value(1), "Number of I/O threads.")
("transport", po::value<string>()->default_value("zeromq"), "Transport ('zeromq'/'nanomsg').")
("config", po::value<string>()->default_value("static"), "Config source ('static'/<config library filename>).")
("network-interface", po::value<string>()->default_value("default"), "Network interface to bind on (e.g. eth0, ib0..., default will try to detect the interface of the default route).")
("config-key", po::value<string>(), "Use provided value instead of device id for fetching the configuration from the config file.")
("catch-signals", po::value<int >()->default_value(1), "Enable signal handling (1/0).")
("initialization-timeout", po::value<int >()->default_value(120), "Timeout for the initialization in seconds (when expecting dynamic initialization).")
("port-range-min", po::value<int >()->default_value(22000), "Start of the port range for dynamic initialization.")
("port-range-max", po::value<int >()->default_value(32000), "End of the port range for dynamic initialization.")
("log-to-file", po::value<string>()->default_value(""), "Log output to a file.")
("print-channels", po::value<bool >()->implicit_value(true), "Print registered channel endpoints in a machine-readable format (<channel name>:<min num subchannels>:<max num subchannels>)")
("shm-segment-size", po::value<size_t>()->default_value(2000000000), "shmem transport: size of the shared memory segment (in bytes).")
("shm-segment-name", po::value<string>()->default_value("fairmq_shmem_main"), "shmem transport: name of the shared memory segment.")
;
} }
fMQParserOptions.add_options() fMQParserOptions.add_options()

View File

@ -299,6 +299,9 @@ class FairMQProgOptions : public FairProgOptions
virtual int NotifySwitchOption(); // for custom help & version printing virtual int NotifySwitchOption(); // for custom help & version printing
void InitOptionDescription(); void InitOptionDescription();
// fill boost option description with the standard options
static void FillOptionDescription(po::options_description& options);
// read FairMQChannelMap and insert/update corresponding values in variable map // read FairMQChannelMap and insert/update corresponding values in variable map
// create key for variable map as follow : channelName.index.memberName // create key for variable map as follow : channelName.index.memberName
void UpdateMQValues(); void UpdateMQValues();