Allow to limit number of messages for the Benchmark sampler and sink

This commit is contained in:
Alexey Rybalchenko 2016-01-07 17:37:43 +01:00 committed by Florian Uhlig
parent c10a6abeef
commit 0e1a1ad552
7 changed files with 161 additions and 69 deletions

View File

@ -16,6 +16,7 @@
#include <boost/thread.hpp> #include <boost/thread.hpp>
#include <boost/bind.hpp> #include <boost/bind.hpp>
#include <boost/timer/timer.hpp>
#include "FairMQBenchmarkSampler.h" #include "FairMQBenchmarkSampler.h"
#include "FairMQLogger.h" #include "FairMQLogger.h"
@ -23,9 +24,8 @@
using namespace std; using namespace std;
FairMQBenchmarkSampler::FairMQBenchmarkSampler() FairMQBenchmarkSampler::FairMQBenchmarkSampler()
: fEventSize(10000) : fMsgSize(10000)
, fEventRate(1) , fNumMsgs(0)
, fEventCounter(0)
{ {
} }
@ -35,53 +35,38 @@ FairMQBenchmarkSampler::~FairMQBenchmarkSampler()
void FairMQBenchmarkSampler::Run() void FairMQBenchmarkSampler::Run()
{ {
boost::thread resetEventCounter(boost::bind(&FairMQBenchmarkSampler::ResetEventCounter, this)); void* buffer = malloc(fMsgSize);
int numSentMsgs = 0;
void* buffer = operator new[](fEventSize); unique_ptr<FairMQMessage> baseMsg(fTransportFactory->CreateMessage(buffer, fMsgSize));
unique_ptr<FairMQMessage> baseMsg(fTransportFactory->CreateMessage(buffer, fEventSize));
// store the channel reference to avoid traversing the map on every loop iteration // store the channel reference to avoid traversing the map on every loop iteration
const FairMQChannel& dataChannel = fChannels.at("data-out").at(0); const FairMQChannel& dataOutChannel = fChannels.at("data-out").at(0);
LOG(INFO) << "Starting the benchmark with message size of " << fMsgSize << " and number of messages " << fNumMsgs << ".";
boost::timer::auto_cpu_timer timer;
while (CheckCurrentState(RUNNING)) while (CheckCurrentState(RUNNING))
{ {
unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage()); unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
msg->Copy(baseMsg); msg->Copy(baseMsg);
dataChannel.Send(msg); if (dataOutChannel.Send(msg) >= 0)
--fEventCounter;
while (fEventCounter == 0)
{ {
boost::this_thread::sleep(boost::posix_time::milliseconds(1)); if (fNumMsgs > 0)
}
}
try {
resetEventCounter.interrupt();
resetEventCounter.join();
} catch(boost::thread_resource_error& e) {
LOG(ERROR) << e.what();
}
}
void FairMQBenchmarkSampler::ResetEventCounter()
{ {
while (true) numSentMsgs++;
{ if (numSentMsgs >= fNumMsgs)
try
{
fEventCounter = fEventRate / 100;
boost::this_thread::sleep(boost::posix_time::milliseconds(10));
}
catch (boost::thread_interrupted&)
{ {
break; break;
} }
} }
} }
}
LOG(INFO) << "Sent " << numSentMsgs << " messages, leaving RUNNING state.";
LOG(INFO) << "Sending time: ";
}
void FairMQBenchmarkSampler::SetProperty(const int key, const string& value) void FairMQBenchmarkSampler::SetProperty(const int key, const string& value)
{ {
@ -106,11 +91,11 @@ void FairMQBenchmarkSampler::SetProperty(const int key, const int value)
{ {
switch (key) switch (key)
{ {
case EventSize: case MsgSize:
fEventSize = value; fMsgSize = value;
break; break;
case EventRate: case NumMsgs:
fEventRate = value; fNumMsgs = value;
break; break;
default: default:
FairMQDevice::SetProperty(key, value); FairMQDevice::SetProperty(key, value);
@ -122,10 +107,10 @@ int FairMQBenchmarkSampler::GetProperty(const int key, const int default_ /*= 0*
{ {
switch (key) switch (key)
{ {
case EventSize: case MsgSize:
return fEventSize; return fMsgSize;
case EventRate: case NumMsgs:
return fEventRate; return fNumMsgs;
default: default:
return FairMQDevice::GetProperty(key, default_); return FairMQDevice::GetProperty(key, default_);
} }
@ -135,10 +120,10 @@ string FairMQBenchmarkSampler::GetPropertyDescription(const int key)
{ {
switch (key) switch (key)
{ {
case EventSize: case MsgSize:
return "EventSize: Size of the transfered message buffer."; return "MsgSize: Size of the transfered message buffer.";
case EventRate: case NumMsgs:
return "EventRate: Upper limit for the message rate."; return "NumMsgs: Number of messages to send.";
default: default:
return FairMQDevice::GetPropertyDescription(key); return FairMQDevice::GetPropertyDescription(key);
} }

View File

@ -28,8 +28,9 @@ class FairMQBenchmarkSampler : public FairMQDevice
public: public:
enum enum
{ {
EventSize = FairMQDevice::Last, MsgSize = FairMQDevice::Last,
EventRate, NumMsgs,
MsgRate,
Last Last
}; };
@ -37,7 +38,6 @@ class FairMQBenchmarkSampler : public FairMQDevice
virtual ~FairMQBenchmarkSampler(); virtual ~FairMQBenchmarkSampler();
void Log(int intervalInMs); void Log(int intervalInMs);
void ResetEventCounter();
virtual void SetProperty(const int key, const std::string& value); virtual void SetProperty(const int key, const std::string& value);
virtual std::string GetProperty(const int key, const std::string& default_ = ""); virtual std::string GetProperty(const int key, const std::string& default_ = "");
@ -48,9 +48,8 @@ class FairMQBenchmarkSampler : public FairMQDevice
virtual void ListProperties(); virtual void ListProperties();
protected: protected:
int fEventSize; int fMsgSize;
int fEventRate; int fNumMsgs;
int fEventCounter;
virtual void Run(); virtual void Run();
}; };

View File

@ -14,27 +14,112 @@
#include <boost/thread.hpp> #include <boost/thread.hpp>
#include <boost/bind.hpp> #include <boost/bind.hpp>
#include <boost/timer/timer.hpp>
#include "FairMQSink.h" #include "FairMQSink.h"
#include "FairMQLogger.h" #include "FairMQLogger.h"
using namespace std;
FairMQSink::FairMQSink() FairMQSink::FairMQSink()
: fNumMsgs(0)
{ {
} }
void FairMQSink::Run() void FairMQSink::Run()
{ {
int numReceivedMsgs = 0;
// store the channel reference to avoid traversing the map on every loop iteration // store the channel reference to avoid traversing the map on every loop iteration
const FairMQChannel& dataChannel = fChannels.at("data-in").at(0); const FairMQChannel& dataInChannel = fChannels.at("data-in").at(0);
LOG(INFO) << "Starting the benchmark and expecting to receive " << fNumMsgs << " messages.";
boost::timer::auto_cpu_timer timer;
while (CheckCurrentState(RUNNING)) while (CheckCurrentState(RUNNING))
{ {
std::unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage()); std::unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
dataChannel.Receive(msg); if (dataInChannel.Receive(msg) >= 0)
{
if (fNumMsgs > 0)
{
numReceivedMsgs++;
if (numReceivedMsgs >= fNumMsgs)
{
break;
} }
} }
}
}
LOG(INFO) << "Received " << numReceivedMsgs << " messages, leaving RUNNING state.";
LOG(INFO) << "Receiving time: ";
}
FairMQSink::~FairMQSink() FairMQSink::~FairMQSink()
{ {
} }
void FairMQSink::SetProperty(const int key, const string& value)
{
switch (key)
{
default:
FairMQDevice::SetProperty(key, value);
break;
}
}
string FairMQSink::GetProperty(const int key, const string& default_ /*= ""*/)
{
switch (key)
{
default:
return FairMQDevice::GetProperty(key, default_);
}
}
void FairMQSink::SetProperty(const int key, const int value)
{
switch (key)
{
case NumMsgs:
fNumMsgs = value;
break;
default:
FairMQDevice::SetProperty(key, value);
break;
}
}
int FairMQSink::GetProperty(const int key, const int default_ /*= 0*/)
{
switch (key)
{
case NumMsgs:
return fNumMsgs;
default:
return FairMQDevice::GetProperty(key, default_);
}
}
string FairMQSink::GetPropertyDescription(const int key)
{
switch (key)
{
case NumMsgs:
return "NumMsgs: Number of messages to send.";
default:
return FairMQDevice::GetPropertyDescription(key);
}
}
void FairMQSink::ListProperties()
{
LOG(INFO) << "Properties of FairMQSink:";
for (int p = FairMQConfigurable::Last; p < FairMQSink::Last; ++p)
{
LOG(INFO) << " " << GetPropertyDescription(p);
}
LOG(INFO) << "---------------------------";
}

View File

@ -15,15 +15,33 @@
#ifndef FAIRMQSINK_H_ #ifndef FAIRMQSINK_H_
#define FAIRMQSINK_H_ #define FAIRMQSINK_H_
#include <string>
#include "FairMQDevice.h" #include "FairMQDevice.h"
class FairMQSink : public FairMQDevice class FairMQSink : public FairMQDevice
{ {
public: public:
enum
{
NumMsgs = FairMQDevice::Last,
Last
};
FairMQSink(); FairMQSink();
virtual ~FairMQSink(); virtual ~FairMQSink();
virtual void SetProperty(const int key, const std::string& value);
virtual std::string GetProperty(const int key, const std::string& default_ = "");
virtual void SetProperty(const int key, const int value);
virtual int GetProperty(const int key, const int default_ = 0);
virtual std::string GetPropertyDescription(const int key);
virtual void ListProperties();
protected: protected:
int fNumMsgs;
virtual void Run(); virtual void Run();
}; };

View File

@ -41,13 +41,13 @@ int main(int argc, char** argv)
try try
{ {
int eventSize; int msgSize;
int eventRate; int numMsgs;
options_description sampler_options("Sampler options"); options_description sampler_options("Sampler options");
sampler_options.add_options() sampler_options.add_options()
("event-size", value<int>(&eventSize)->default_value(1000), "Event size in bytes") ("msg-size", value<int>(&msgSize)->default_value(1000), "Message size in bytes")
("event-rate", value<int>(&eventRate)->default_value(0), "Event rate limit in maximum number of events per second"); ("num-msgs", value<int>(&numMsgs)->default_value(0), "Number of messages to send");
config.AddToCmdLineOptions(sampler_options); config.AddToCmdLineOptions(sampler_options);
@ -72,8 +72,8 @@ int main(int argc, char** argv)
#endif #endif
sampler.SetProperty(FairMQBenchmarkSampler::Id, id); sampler.SetProperty(FairMQBenchmarkSampler::Id, id);
sampler.SetProperty(FairMQBenchmarkSampler::EventSize, eventSize); sampler.SetProperty(FairMQBenchmarkSampler::MsgSize, msgSize);
sampler.SetProperty(FairMQBenchmarkSampler::EventRate, eventRate); sampler.SetProperty(FairMQBenchmarkSampler::NumMsgs, numMsgs);
sampler.SetProperty(FairMQBenchmarkSampler::NumIoThreads, config.GetValue<int>("io-threads")); sampler.SetProperty(FairMQBenchmarkSampler::NumIoThreads, config.GetValue<int>("io-threads"));
sampler.ChangeState("INIT_DEVICE"); sampler.ChangeState("INIT_DEVICE");

View File

@ -40,6 +40,14 @@ int main(int argc, char** argv)
try try
{ {
int numMsgs;
options_description sink_options("Sink options");
sink_options.add_options()
("num-msgs", value<int>(&numMsgs)->default_value(0), "Number of messages to receive");
config.AddToCmdLineOptions(sink_options);
if (config.ParseAll(argc, argv)) if (config.ParseAll(argc, argv))
{ {
return 0; return 0;
@ -64,6 +72,7 @@ int main(int argc, char** argv)
sink.SetTransport(transportFactory); sink.SetTransport(transportFactory);
sink.SetProperty(FairMQSink::Id, id); sink.SetProperty(FairMQSink::Id, id);
sink.SetProperty(FairMQSink::NumMsgs, numMsgs);
sink.SetProperty(FairMQSink::NumIoThreads, config.GetValue<int>("io-threads")); sink.SetProperty(FairMQSink::NumIoThreads, config.GetValue<int>("io-threads"));
sink.ChangeState("INIT_DEVICE"); sink.ChangeState("INIT_DEVICE");

View File

@ -1,18 +1,14 @@
#!/bin/bash #!/bin/bash
if(@NANOMSG_FOUND@); then
buffSize="500000000" # nanomsg buffer size is in bytes
else
buffSize="10000" # zeromq high-water mark is in messages
fi
SAMPLER="bsampler" SAMPLER="bsampler"
SAMPLER+=" --id bsampler1" SAMPLER+=" --id bsampler1"
SAMPLER+=" --event-size 10000" SAMPLER+=" --msg-size 1000000"
#SAMPLER+=" --num-msgs 1000"
SAMPLER+=" --config-json-file @CMAKE_BINARY_DIR@/bin/config/benchmark.json" SAMPLER+=" --config-json-file @CMAKE_BINARY_DIR@/bin/config/benchmark.json"
xterm -geometry 80x23+0+0 -hold -e @CMAKE_BINARY_DIR@/bin/$SAMPLER & xterm -geometry 80x23+0+0 -hold -e @CMAKE_BINARY_DIR@/bin/$SAMPLER &
SINK="sink" SINK="sink"
SINK+=" --id sink1" SINK+=" --id sink1"
#SINK+=" --num-msgs 1000"
SINK+=" --config-json-file @CMAKE_BINARY_DIR@/bin/config/benchmark.json" SINK+=" --config-json-file @CMAKE_BINARY_DIR@/bin/config/benchmark.json"
xterm -geometry 80x23+500+0 -hold -e @CMAKE_BINARY_DIR@/bin/$SINK & xterm -geometry 80x23+500+0 -hold -e @CMAKE_BINARY_DIR@/bin/$SINK &