diff --git a/fairmq/devices/FairMQBenchmarkSampler.cxx b/fairmq/devices/FairMQBenchmarkSampler.cxx index 74a3610b..0e0d6c40 100644 --- a/fairmq/devices/FairMQBenchmarkSampler.cxx +++ b/fairmq/devices/FairMQBenchmarkSampler.cxx @@ -1,8 +1,8 @@ /******************************************************************************** * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence (LGPL) version 3, * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ /** @@ -14,23 +14,22 @@ #include "FairMQBenchmarkSampler.h" -#include -#include - +#include #include "../FairMQLogger.h" #include "../options/FairMQProgOptions.h" +#include +#include + using namespace std; FairMQBenchmarkSampler::FairMQBenchmarkSampler() : fSameMessage(true) , fMsgSize(10000) - , fMsgCounter(0) - , fMsgRate(1) + , fMsgRate(0) , fNumIterations(0) , fMaxIterations(0) , fOutChannelName() - , fResetMsgCounter() { } @@ -42,16 +41,11 @@ void FairMQBenchmarkSampler::InitTask() { fSameMessage = fConfig->GetValue("same-msg"); fMsgSize = fConfig->GetValue("msg-size"); - fMsgRate = fConfig->GetValue("msg-rate"); + fMsgRate = fConfig->GetValue("msg-rate"); fMaxIterations = fConfig->GetValue("max-iterations"); fOutChannelName = fConfig->GetValue("out-channel"); } -void FairMQBenchmarkSampler::PreRun() -{ - fResetMsgCounter = std::thread(&FairMQBenchmarkSampler::ResetMsgCounter, this); -} - void FairMQBenchmarkSampler::Run() { // store the channel reference to avoid traversing the map on every loop iteration @@ -62,6 +56,8 @@ void FairMQBenchmarkSampler::Run() LOG(info) << "Starting the benchmark with message size of " << fMsgSize << " and " << fMaxIterations << " iterations."; auto tStart = chrono::high_resolution_clock::now(); + fair::mq::tools::RateLimiter rateLimiter(fMsgRate); + while (CheckCurrentState(RUNNING)) { if (fSameMessage) @@ -98,31 +94,14 @@ void FairMQBenchmarkSampler::Run() } } - --fMsgCounter; - while (fMsgCounter == 0) + if (fMsgRate > 0) { - this_thread::sleep_for(chrono::milliseconds(1)); + rateLimiter.maybe_sleep(); } } auto tEnd = chrono::high_resolution_clock::now(); LOG(info) << "Done " << fNumIterations << " iterations in " << chrono::duration(tEnd - tStart).count() << "ms."; - -} - -void FairMQBenchmarkSampler::PostRun() -{ - fResetMsgCounter.join(); -} - -void FairMQBenchmarkSampler::ResetMsgCounter() -{ - while (CheckCurrentState(RUNNING)) - { - fMsgCounter = fMsgRate / 100; - this_thread::sleep_for(chrono::milliseconds(10)); - } - fMsgCounter = -1; } diff --git a/fairmq/devices/FairMQBenchmarkSampler.h b/fairmq/devices/FairMQBenchmarkSampler.h index e368a880..313e67cf 100644 --- a/fairmq/devices/FairMQBenchmarkSampler.h +++ b/fairmq/devices/FairMQBenchmarkSampler.h @@ -31,20 +31,14 @@ class FairMQBenchmarkSampler : public FairMQDevice FairMQBenchmarkSampler(); virtual ~FairMQBenchmarkSampler(); - void PreRun() override; - void PostRun() override; - - void ResetMsgCounter(); - protected: bool fSameMessage; int fMsgSize; std::atomic fMsgCounter; - int fMsgRate; + float fMsgRate; uint64_t fNumIterations; uint64_t fMaxIterations; std::string fOutChannelName; - std::thread fResetMsgCounter; virtual void InitTask() override; virtual void Run() override; diff --git a/fairmq/run/runBenchmarkSampler.cxx b/fairmq/run/runBenchmarkSampler.cxx index 8703aff0..5f7d7fe6 100644 --- a/fairmq/run/runBenchmarkSampler.cxx +++ b/fairmq/run/runBenchmarkSampler.cxx @@ -18,7 +18,7 @@ void addCustomOptions(bpo::options_description& options) ("same-msg", bpo::value()->default_value(false), "Re-send the same message, or recreate for each iteration") ("msg-size", bpo::value()->default_value(1000000), "Message size in bytes") ("max-iterations", bpo::value()->default_value(0), "Number of run iterations (0 - infinite)") - ("msg-rate", bpo::value()->default_value(0), "Msg rate limit in maximum number of messages per second"); + ("msg-rate", bpo::value()->default_value(0), "Msg rate limit in maximum number of messages per second"); } FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)