Allow rate limiting in the Shared Memory example

This commit is contained in:
Alexey Rybalchenko 2016-05-18 16:00:26 +02:00
parent 1f09317cd0
commit af971c6ab1
4 changed files with 50 additions and 1 deletions

View File

@ -26,6 +26,8 @@ using namespace std;
FairMQBenchmarkSampler::FairMQBenchmarkSampler() FairMQBenchmarkSampler::FairMQBenchmarkSampler()
: fMsgSize(10000) : fMsgSize(10000)
, fNumMsgs(0) , fNumMsgs(0)
, fMsgCounter(0)
, fMsgRate(1)
{ {
} }
@ -35,6 +37,8 @@ FairMQBenchmarkSampler::~FairMQBenchmarkSampler()
void FairMQBenchmarkSampler::Run() void FairMQBenchmarkSampler::Run()
{ {
boost::thread resetMsgCounter(boost::bind(&FairMQBenchmarkSampler::ResetMsgCounter, this));
int numSentMsgs = 0; int numSentMsgs = 0;
unique_ptr<FairMQMessage> baseMsg(fTransportFactory->CreateMessage(fMsgSize)); unique_ptr<FairMQMessage> baseMsg(fTransportFactory->CreateMessage(fMsgSize));
@ -61,10 +65,40 @@ void FairMQBenchmarkSampler::Run()
} }
} }
} }
--fMsgCounter;
while (fMsgCounter == 0) {
boost::this_thread::sleep(boost::posix_time::milliseconds(1));
}
} }
LOG(INFO) << "Sent " << numSentMsgs << " messages, leaving RUNNING state."; LOG(INFO) << "Sent " << numSentMsgs << " messages, leaving RUNNING state.";
LOG(INFO) << "Sending time: "; LOG(INFO) << "Sending time: ";
try
{
resetMsgCounter.interrupt();
resetMsgCounter.join();
}
catch(boost::thread_resource_error& e)
{
LOG(ERROR) << e.what();
exit(EXIT_FAILURE);
}
}
void FairMQBenchmarkSampler::ResetMsgCounter()
{
while (true) {
try {
fMsgCounter = fMsgRate / 100;
boost::this_thread::sleep(boost::posix_time::milliseconds(10));
} catch (boost::thread_interrupted&) {
LOG(DEBUG) << "Event rate limiter thread interrupted";
break;
}
}
} }
void FairMQBenchmarkSampler::SetProperty(const int key, const string& value) void FairMQBenchmarkSampler::SetProperty(const int key, const string& value)
@ -93,6 +127,9 @@ void FairMQBenchmarkSampler::SetProperty(const int key, const int value)
case MsgSize: case MsgSize:
fMsgSize = value; fMsgSize = value;
break; break;
case MsgRate:
fMsgRate = value;
break;
case NumMsgs: case NumMsgs:
fNumMsgs = value; fNumMsgs = value;
break; break;
@ -108,6 +145,8 @@ int FairMQBenchmarkSampler::GetProperty(const int key, const int default_ /*= 0*
{ {
case MsgSize: case MsgSize:
return fMsgSize; return fMsgSize;
case MsgRate:
return fMsgRate;
case NumMsgs: case NumMsgs:
return fNumMsgs; return fNumMsgs;
default: default:
@ -123,6 +162,8 @@ string FairMQBenchmarkSampler::GetPropertyDescription(const int key)
return "MsgSize: Size of the transfered message buffer."; return "MsgSize: Size of the transfered message buffer.";
case NumMsgs: case NumMsgs:
return "NumMsgs: Number of messages to send."; return "NumMsgs: Number of messages to send.";
case MsgRate:
return "MsgRate: Maximum msg rate.";
default: default:
return FairMQDevice::GetPropertyDescription(key); return FairMQDevice::GetPropertyDescription(key);
} }

View File

@ -37,6 +37,8 @@ class FairMQBenchmarkSampler : public FairMQDevice
FairMQBenchmarkSampler(); FairMQBenchmarkSampler();
virtual ~FairMQBenchmarkSampler(); virtual ~FairMQBenchmarkSampler();
void ResetMsgCounter();
virtual void SetProperty(const int key, const std::string& value); virtual void SetProperty(const int key, const std::string& value);
virtual std::string GetProperty(const int key, const std::string& default_ = ""); virtual std::string GetProperty(const int key, const std::string& default_ = "");
virtual void SetProperty(const int key, const int value); virtual void SetProperty(const int key, const int value);
@ -48,6 +50,8 @@ class FairMQBenchmarkSampler : public FairMQDevice
protected: protected:
int fMsgSize; int fMsgSize;
int fNumMsgs; int fNumMsgs;
int fMsgCounter;
int fMsgRate;
virtual void Run(); virtual void Run();
}; };

View File

@ -29,11 +29,13 @@ int main(int argc, char** argv)
{ {
int msgSize; int msgSize;
int numMsgs; int numMsgs;
int msgRate;
options_description samplerOptions("Sampler options"); options_description samplerOptions("Sampler options");
samplerOptions.add_options() samplerOptions.add_options()
("msg-size", value<int>(&msgSize)->default_value(1000), "Message size in bytes") ("msg-size", value<int>(&msgSize)->default_value(1000), "Message size in bytes")
("num-msgs", value<int>(&numMsgs)->default_value(0), "Number of messages to send"); ("num-msgs", value<int>(&numMsgs)->default_value(0), "Number of messages to send")
("msg-rate", value<int>(&msgRate)->default_value(0), "Msg rate limit in maximum number of messages per second");
FairMQProgOptions config; FairMQProgOptions config;
config.AddToCmdLineOptions(samplerOptions); config.AddToCmdLineOptions(samplerOptions);
@ -42,6 +44,7 @@ int main(int argc, char** argv)
FairMQBenchmarkSampler sampler; FairMQBenchmarkSampler sampler;
sampler.SetProperty(FairMQBenchmarkSampler::MsgSize, msgSize); sampler.SetProperty(FairMQBenchmarkSampler::MsgSize, msgSize);
sampler.SetProperty(FairMQBenchmarkSampler::NumMsgs, numMsgs); sampler.SetProperty(FairMQBenchmarkSampler::NumMsgs, numMsgs);
sampler.SetProperty(FairMQBenchmarkSampler::MsgRate, msgRate);
runStateMachine(sampler, config); runStateMachine(sampler, config);
} }

View File

@ -28,6 +28,7 @@ SAMPLER+=" --id bsampler1"
#SAMPLER+=" --control static" #SAMPLER+=" --control static"
#SAMPLER+=" --transport nanomsg" #SAMPLER+=" --transport nanomsg"
SAMPLER+=" --msg-size $msgSize" SAMPLER+=" --msg-size $msgSize"
# SAMPLER+=" --msg-rate 1000"
SAMPLER+=" --num-msgs $numMsgs" SAMPLER+=" --num-msgs $numMsgs"
SAMPLER+=" --mq-config @CMAKE_BINARY_DIR@/bin/config/benchmark.json" SAMPLER+=" --mq-config @CMAKE_BINARY_DIR@/bin/config/benchmark.json"
xterm -geometry 80x23+0+0 -hold -e @CMAKE_BINARY_DIR@/bin/$SAMPLER & xterm -geometry 80x23+0+0 -hold -e @CMAKE_BINARY_DIR@/bin/$SAMPLER &