diff --git a/fairmq/devices/FairMQBenchmarkSampler.h b/fairmq/devices/FairMQBenchmarkSampler.h index a00954cb..1d4b74bb 100644 --- a/fairmq/devices/FairMQBenchmarkSampler.h +++ b/fairmq/devices/FairMQBenchmarkSampler.h @@ -17,6 +17,7 @@ #include #include // size_t #include // uint64_t +#include // memset #include /** @@ -28,6 +29,7 @@ class FairMQBenchmarkSampler : public FairMQDevice public: FairMQBenchmarkSampler() : fMultipart(false) + , fMemSet(false) , fNumParts(1) , fMsgSize(10000) , fMsgRate(0) @@ -39,6 +41,7 @@ class FairMQBenchmarkSampler : public FairMQDevice void InitTask() override { fMultipart = fConfig->GetProperty("multipart"); + fMemSet = fConfig->GetProperty("memset"); fNumParts = fConfig->GetProperty("num-parts"); fMsgSize = fConfig->GetProperty("msg-size"); fMsgRate = fConfig->GetProperty("msg-rate"); @@ -64,6 +67,9 @@ class FairMQBenchmarkSampler : public FairMQDevice for (size_t i = 0; i < fNumParts; ++i) { parts.AddPart(dataOutChannel.NewMessage(fMsgSize)); + if (fMemSet) { + std::memset(parts.At(i)->GetData(), 0, parts.At(i)->GetSize()); + } } if (dataOutChannel.Send(parts) >= 0) { @@ -76,6 +82,9 @@ class FairMQBenchmarkSampler : public FairMQDevice } } else { FairMQMessagePtr msg(dataOutChannel.NewMessage(fMsgSize)); + if (fMemSet) { + std::memset(msg->GetData(), 0, msg->GetSize()); + } if (dataOutChannel.Send(msg) >= 0) { if (fMaxIterations > 0) { @@ -101,6 +110,7 @@ class FairMQBenchmarkSampler : public FairMQDevice protected: bool fMultipart; + bool fMemSet; size_t fNumParts; size_t fMsgSize; std::atomic fMsgCounter; diff --git a/fairmq/run/runBenchmarkSampler.cxx b/fairmq/run/runBenchmarkSampler.cxx index 99bdfe3b..93f62584 100644 --- a/fairmq/run/runBenchmarkSampler.cxx +++ b/fairmq/run/runBenchmarkSampler.cxx @@ -15,8 +15,8 @@ void addCustomOptions(bpo::options_description& options) { options.add_options() ("out-channel", bpo::value()->default_value("data"), "Name of the output channel") - ("same-msg", bpo::value()->default_value(false), "Re-send the same message, or recreate for each iteration") ("multipart", bpo::value()->default_value(false), "Handle multipart payloads") + ("memset", bpo::value()->default_value(false), "Memset allocated buffers to 0") ("num-parts", bpo::value()->default_value(1), "Number of parts to send. 1 will send single messages, not parts") ("msg-size", bpo::value()->default_value(1000000), "Message size in bytes") ("max-iterations", bpo::value()->default_value(0), "Number of run iterations (0 - infinite)") diff --git a/fairmq/zeromq/TransportFactory.h b/fairmq/zeromq/TransportFactory.h index 64716a88..f23a9b9a 100644 --- a/fairmq/zeromq/TransportFactory.h +++ b/fairmq/zeromq/TransportFactory.h @@ -55,7 +55,7 @@ class TransportFactory final : public FairMQTransportFactory { return tools::make_unique(this); } - + MessagePtr CreateMessage(Alignment alignment) override { return tools::make_unique(alignment, this);