diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index d61825c5..9e686a44 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -219,7 +219,6 @@ if(BUILD_FAIRMQ) FairMQPoller.cxx FairMQSocket.cxx FairMQTransportFactory.cxx - devices/FairMQBenchmarkSampler.cxx devices/FairMQMerger.cxx devices/FairMQMultiplier.cxx devices/FairMQProxy.cxx diff --git a/fairmq/devices/FairMQBenchmarkSampler.cxx b/fairmq/devices/FairMQBenchmarkSampler.cxx deleted file mode 100644 index cb3e0582..00000000 --- a/fairmq/devices/FairMQBenchmarkSampler.cxx +++ /dev/null @@ -1,100 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ - -#include "FairMQBenchmarkSampler.h" - -#include "tools/RateLimit.h" -#include "../FairMQLogger.h" - -#include - -using namespace std; - -FairMQBenchmarkSampler::FairMQBenchmarkSampler() - : fMultipart(false) - , fNumParts(1) - , fMsgSize(10000) - , fMsgRate(0) - , fNumIterations(0) - , fMaxIterations(0) - , fOutChannelName() -{ -} - -void FairMQBenchmarkSampler::InitTask() -{ - fMultipart = fConfig->GetProperty("multipart"); - fNumParts = fConfig->GetProperty("num-parts"); - fMsgSize = fConfig->GetProperty("msg-size"); - fMsgRate = fConfig->GetProperty("msg-rate"); - fMaxIterations = fConfig->GetProperty("max-iterations"); - fOutChannelName = fConfig->GetProperty("out-channel"); -} - -void FairMQBenchmarkSampler::Run() -{ - // store the channel reference to avoid traversing the map on every loop iteration - FairMQChannel& dataOutChannel = fChannels.at(fOutChannelName).at(0); - - FairMQMessagePtr baseMsg(dataOutChannel.NewMessage(fMsgSize)); - - LOG(info) << "Starting the benchmark with message size of " << fMsgSize << " and " << fMaxIterations << " iterations."; - auto tStart = chrono::high_resolution_clock::now(); - - fair::mq::tools::RateLimiter rateLimiter(fMsgRate); - - while (!NewStatePending()) - { - if (fMultipart) - { - FairMQParts parts; - - for (size_t i = 0; i < fNumParts; ++i) - { - parts.AddPart(dataOutChannel.NewMessage(fMsgSize)); - } - - if (dataOutChannel.Send(parts) >= 0) - { - if (fMaxIterations > 0) - { - if (fNumIterations >= fMaxIterations) - { - break; - } - } - ++fNumIterations; - } - } - else - { - FairMQMessagePtr msg(dataOutChannel.NewMessage(fMsgSize)); - - if (dataOutChannel.Send(msg) >= 0) - { - if (fMaxIterations > 0) - { - if (fNumIterations >= fMaxIterations) - { - break; - } - } - ++fNumIterations; - } - } - - if (fMsgRate > 0) - { - rateLimiter.maybe_sleep(); - } - } - - auto tEnd = chrono::high_resolution_clock::now(); - - LOG(info) << "Done " << fNumIterations << " iterations in " << chrono::duration(tEnd - tStart).count() << "ms."; -} diff --git a/fairmq/devices/FairMQBenchmarkSampler.h b/fairmq/devices/FairMQBenchmarkSampler.h index c48612ba..a00954cb 100644 --- a/fairmq/devices/FairMQBenchmarkSampler.h +++ b/fairmq/devices/FairMQBenchmarkSampler.h @@ -9,13 +9,15 @@ #ifndef FAIRMQBENCHMARKSAMPLER_H_ #define FAIRMQBENCHMARKSAMPLER_H_ -#include -#include -#include // size_t -#include // uint64_t - - +#include "../FairMQLogger.h" #include "FairMQDevice.h" +#include "tools/RateLimit.h" + +#include +#include +#include // size_t +#include // uint64_t +#include /** * Sampler to generate traffic for benchmarking. @@ -24,7 +26,77 @@ class FairMQBenchmarkSampler : public FairMQDevice { public: - FairMQBenchmarkSampler(); + FairMQBenchmarkSampler() + : fMultipart(false) + , fNumParts(1) + , fMsgSize(10000) + , fMsgRate(0) + , fNumIterations(0) + , fMaxIterations(0) + , fOutChannelName() + {} + + void InitTask() override + { + fMultipart = fConfig->GetProperty("multipart"); + fNumParts = fConfig->GetProperty("num-parts"); + fMsgSize = fConfig->GetProperty("msg-size"); + fMsgRate = fConfig->GetProperty("msg-rate"); + fMaxIterations = fConfig->GetProperty("max-iterations"); + fOutChannelName = fConfig->GetProperty("out-channel"); + } + + void Run() override + { + // store the channel reference to avoid traversing the map on every loop iteration + FairMQChannel& dataOutChannel = fChannels.at(fOutChannelName).at(0); + + FairMQMessagePtr baseMsg(dataOutChannel.NewMessage(fMsgSize)); + + LOG(info) << "Starting the benchmark with message size of " << fMsgSize << " and " << fMaxIterations << " iterations."; + auto tStart = std::chrono::high_resolution_clock::now(); + + fair::mq::tools::RateLimiter rateLimiter(fMsgRate); + + while (!NewStatePending()) { + if (fMultipart) { + FairMQParts parts; + + for (size_t i = 0; i < fNumParts; ++i) { + parts.AddPart(dataOutChannel.NewMessage(fMsgSize)); + } + + if (dataOutChannel.Send(parts) >= 0) { + if (fMaxIterations > 0) { + if (fNumIterations >= fMaxIterations) { + break; + } + } + ++fNumIterations; + } + } else { + FairMQMessagePtr msg(dataOutChannel.NewMessage(fMsgSize)); + + if (dataOutChannel.Send(msg) >= 0) { + if (fMaxIterations > 0) { + if (fNumIterations >= fMaxIterations) { + break; + } + } + ++fNumIterations; + } + } + + if (fMsgRate > 0) { + rateLimiter.maybe_sleep(); + } + } + + auto tEnd = std::chrono::high_resolution_clock::now(); + + LOG(info) << "Done " << fNumIterations << " iterations in " << std::chrono::duration(tEnd - tStart).count() << "ms."; + } + virtual ~FairMQBenchmarkSampler() {} protected: @@ -36,9 +108,6 @@ class FairMQBenchmarkSampler : public FairMQDevice uint64_t fNumIterations; uint64_t fMaxIterations; std::string fOutChannelName; - - virtual void InitTask() override; - virtual void Run() override; }; #endif /* FAIRMQBENCHMARKSAMPLER_H_ */ diff --git a/fairmq/devices/FairMQSink.h b/fairmq/devices/FairMQSink.h index b97c9cf3..86dc2141 100644 --- a/fairmq/devices/FairMQSink.h +++ b/fairmq/devices/FairMQSink.h @@ -15,14 +15,14 @@ #ifndef FAIRMQSINK_H_ #define FAIRMQSINK_H_ -#include -#include - #include "../FairMQDevice.h" #include "../FairMQLogger.h" +#include +#include + // template -class FairMQSink : public FairMQDevice//, public OutputPolicy +class FairMQSink : public FairMQDevice //, public OutputPolicy { public: FairMQSink() @@ -32,8 +32,7 @@ class FairMQSink : public FairMQDevice//, public OutputPolicy , fInChannelName() {} - virtual ~FairMQSink() - {} + virtual ~FairMQSink() {} protected: bool fMultipart; @@ -56,35 +55,25 @@ class FairMQSink : public FairMQDevice//, public OutputPolicy LOG(info) << "Starting the benchmark and expecting to receive " << fMaxIterations << " messages."; auto tStart = std::chrono::high_resolution_clock::now(); - while (!NewStatePending()) - { - if (fMultipart) - { + while (!NewStatePending()) { + if (fMultipart) { FairMQParts parts; - if (dataInChannel.Receive(parts) >= 0) - { - if (fMaxIterations > 0) - { - if (fNumIterations >= fMaxIterations) - { + if (dataInChannel.Receive(parts) >= 0) { + if (fMaxIterations > 0) { + if (fNumIterations >= fMaxIterations) { LOG(info) << "Configured maximum number of iterations reached."; break; } } fNumIterations++; } - } - else - { + } else { FairMQMessagePtr msg(dataInChannel.NewMessage()); - if (dataInChannel.Receive(msg) >= 0) - { - if (fMaxIterations > 0) - { - if (fNumIterations >= fMaxIterations) - { + if (dataInChannel.Receive(msg) >= 0) { + if (fMaxIterations > 0) { + if (fNumIterations >= fMaxIterations) { LOG(info) << "Configured maximum number of iterations reached."; break; } @@ -96,7 +85,8 @@ class FairMQSink : public FairMQDevice//, public OutputPolicy auto tEnd = std::chrono::high_resolution_clock::now(); - LOG(info) << "Leaving RUNNING state. Received " << fNumIterations << " messages in " << std::chrono::duration(tEnd - tStart).count() << "ms."; + LOG(info) << "Leaving RUNNING state. Received " << fNumIterations << " messages in " + << std::chrono::duration(tEnd - tStart).count() << "ms."; } };