diff --git a/fairmq/devices/FairMQBenchmarkSampler.cxx b/fairmq/devices/FairMQBenchmarkSampler.cxx index 3adb9fed..d380154b 100644 --- a/fairmq/devices/FairMQBenchmarkSampler.cxx +++ b/fairmq/devices/FairMQBenchmarkSampler.cxx @@ -16,6 +16,7 @@ #include #include +#include #include "FairMQBenchmarkSampler.h" #include "FairMQLogger.h" @@ -23,9 +24,8 @@ using namespace std; FairMQBenchmarkSampler::FairMQBenchmarkSampler() - : fEventSize(10000) - , fEventRate(1) - , fEventCounter(0) + : fMsgSize(10000) + , fNumMsgs(0) { } @@ -35,52 +35,37 @@ FairMQBenchmarkSampler::~FairMQBenchmarkSampler() void FairMQBenchmarkSampler::Run() { - boost::thread resetEventCounter(boost::bind(&FairMQBenchmarkSampler::ResetEventCounter, this)); + void* buffer = malloc(fMsgSize); + int numSentMsgs = 0; - void* buffer = operator new[](fEventSize); - - unique_ptr baseMsg(fTransportFactory->CreateMessage(buffer, fEventSize)); + unique_ptr baseMsg(fTransportFactory->CreateMessage(buffer, fMsgSize)); // store the channel reference to avoid traversing the map on every loop iteration - const FairMQChannel& dataChannel = fChannels.at("data-out").at(0); + const FairMQChannel& dataOutChannel = fChannels.at("data-out").at(0); + + LOG(INFO) << "Starting the benchmark with message size of " << fMsgSize << " and number of messages " << fNumMsgs << "."; + boost::timer::auto_cpu_timer timer; while (CheckCurrentState(RUNNING)) { unique_ptr msg(fTransportFactory->CreateMessage()); msg->Copy(baseMsg); - dataChannel.Send(msg); - - --fEventCounter; - - while (fEventCounter == 0) + if (dataOutChannel.Send(msg) >= 0) { - boost::this_thread::sleep(boost::posix_time::milliseconds(1)); + if (fNumMsgs > 0) + { + numSentMsgs++; + if (numSentMsgs >= fNumMsgs) + { + break; + } + } } } - try { - resetEventCounter.interrupt(); - resetEventCounter.join(); - } catch(boost::thread_resource_error& e) { - LOG(ERROR) << e.what(); - } -} - -void FairMQBenchmarkSampler::ResetEventCounter() -{ - while (true) - { - try - { - fEventCounter = fEventRate / 100; - boost::this_thread::sleep(boost::posix_time::milliseconds(10)); - } - catch (boost::thread_interrupted&) - { - break; - } - } + LOG(INFO) << "Sent " << numSentMsgs << " messages, leaving RUNNING state."; + LOG(INFO) << "Sending time: "; } void FairMQBenchmarkSampler::SetProperty(const int key, const string& value) @@ -106,11 +91,11 @@ void FairMQBenchmarkSampler::SetProperty(const int key, const int value) { switch (key) { - case EventSize: - fEventSize = value; + case MsgSize: + fMsgSize = value; break; - case EventRate: - fEventRate = value; + case NumMsgs: + fNumMsgs = value; break; default: FairMQDevice::SetProperty(key, value); @@ -122,10 +107,10 @@ int FairMQBenchmarkSampler::GetProperty(const int key, const int default_ /*= 0* { switch (key) { - case EventSize: - return fEventSize; - case EventRate: - return fEventRate; + case MsgSize: + return fMsgSize; + case NumMsgs: + return fNumMsgs; default: return FairMQDevice::GetProperty(key, default_); } @@ -135,10 +120,10 @@ string FairMQBenchmarkSampler::GetPropertyDescription(const int key) { switch (key) { - case EventSize: - return "EventSize: Size of the transfered message buffer."; - case EventRate: - return "EventRate: Upper limit for the message rate."; + case MsgSize: + return "MsgSize: Size of the transfered message buffer."; + case NumMsgs: + return "NumMsgs: Number of messages to send."; default: return FairMQDevice::GetPropertyDescription(key); } diff --git a/fairmq/devices/FairMQBenchmarkSampler.h b/fairmq/devices/FairMQBenchmarkSampler.h index 72f50bfb..77d437b3 100644 --- a/fairmq/devices/FairMQBenchmarkSampler.h +++ b/fairmq/devices/FairMQBenchmarkSampler.h @@ -28,8 +28,9 @@ class FairMQBenchmarkSampler : public FairMQDevice public: enum { - EventSize = FairMQDevice::Last, - EventRate, + MsgSize = FairMQDevice::Last, + NumMsgs, + MsgRate, Last }; @@ -37,7 +38,6 @@ class FairMQBenchmarkSampler : public FairMQDevice virtual ~FairMQBenchmarkSampler(); void Log(int intervalInMs); - void ResetEventCounter(); virtual void SetProperty(const int key, const std::string& value); virtual std::string GetProperty(const int key, const std::string& default_ = ""); @@ -48,9 +48,8 @@ class FairMQBenchmarkSampler : public FairMQDevice virtual void ListProperties(); protected: - int fEventSize; - int fEventRate; - int fEventCounter; + int fMsgSize; + int fNumMsgs; virtual void Run(); }; diff --git a/fairmq/devices/FairMQSink.cxx b/fairmq/devices/FairMQSink.cxx index 5a254255..8a8a11c9 100644 --- a/fairmq/devices/FairMQSink.cxx +++ b/fairmq/devices/FairMQSink.cxx @@ -14,27 +14,112 @@ #include #include +#include #include "FairMQSink.h" #include "FairMQLogger.h" +using namespace std; + FairMQSink::FairMQSink() + : fNumMsgs(0) { } void FairMQSink::Run() { + int numReceivedMsgs = 0; // store the channel reference to avoid traversing the map on every loop iteration - const FairMQChannel& dataChannel = fChannels.at("data-in").at(0); + const FairMQChannel& dataInChannel = fChannels.at("data-in").at(0); + + LOG(INFO) << "Starting the benchmark and expecting to receive " << fNumMsgs << " messages."; + boost::timer::auto_cpu_timer timer; while (CheckCurrentState(RUNNING)) { std::unique_ptr msg(fTransportFactory->CreateMessage()); - dataChannel.Receive(msg); + if (dataInChannel.Receive(msg) >= 0) + { + if (fNumMsgs > 0) + { + numReceivedMsgs++; + if (numReceivedMsgs >= fNumMsgs) + { + break; + } + } + } } + + LOG(INFO) << "Received " << numReceivedMsgs << " messages, leaving RUNNING state."; + LOG(INFO) << "Receiving time: "; } FairMQSink::~FairMQSink() { } + +void FairMQSink::SetProperty(const int key, const string& value) +{ + switch (key) + { + default: + FairMQDevice::SetProperty(key, value); + break; + } +} + +string FairMQSink::GetProperty(const int key, const string& default_ /*= ""*/) +{ + switch (key) + { + default: + return FairMQDevice::GetProperty(key, default_); + } +} + +void FairMQSink::SetProperty(const int key, const int value) +{ + switch (key) + { + case NumMsgs: + fNumMsgs = value; + break; + default: + FairMQDevice::SetProperty(key, value); + break; + } +} + +int FairMQSink::GetProperty(const int key, const int default_ /*= 0*/) +{ + switch (key) + { + case NumMsgs: + return fNumMsgs; + default: + return FairMQDevice::GetProperty(key, default_); + } +} + +string FairMQSink::GetPropertyDescription(const int key) +{ + switch (key) + { + case NumMsgs: + return "NumMsgs: Number of messages to send."; + default: + return FairMQDevice::GetPropertyDescription(key); + } +} + +void FairMQSink::ListProperties() +{ + LOG(INFO) << "Properties of FairMQSink:"; + for (int p = FairMQConfigurable::Last; p < FairMQSink::Last; ++p) + { + LOG(INFO) << " " << GetPropertyDescription(p); + } + LOG(INFO) << "---------------------------"; +} diff --git a/fairmq/devices/FairMQSink.h b/fairmq/devices/FairMQSink.h index 81ef4e1a..c1bbccd2 100644 --- a/fairmq/devices/FairMQSink.h +++ b/fairmq/devices/FairMQSink.h @@ -15,15 +15,33 @@ #ifndef FAIRMQSINK_H_ #define FAIRMQSINK_H_ +#include + #include "FairMQDevice.h" class FairMQSink : public FairMQDevice { public: + enum + { + NumMsgs = FairMQDevice::Last, + Last + }; + FairMQSink(); virtual ~FairMQSink(); + virtual void SetProperty(const int key, const std::string& value); + virtual std::string GetProperty(const int key, const std::string& default_ = ""); + virtual void SetProperty(const int key, const int value); + virtual int GetProperty(const int key, const int default_ = 0); + + virtual std::string GetPropertyDescription(const int key); + virtual void ListProperties(); + protected: + int fNumMsgs; + virtual void Run(); }; diff --git a/fairmq/run/runBenchmarkSampler.cxx b/fairmq/run/runBenchmarkSampler.cxx index 016899bf..61156cb7 100644 --- a/fairmq/run/runBenchmarkSampler.cxx +++ b/fairmq/run/runBenchmarkSampler.cxx @@ -41,13 +41,13 @@ int main(int argc, char** argv) try { - int eventSize; - int eventRate; + int msgSize; + int numMsgs; options_description sampler_options("Sampler options"); sampler_options.add_options() - ("event-size", value(&eventSize)->default_value(1000), "Event size in bytes") - ("event-rate", value(&eventRate)->default_value(0), "Event rate limit in maximum number of events per second"); + ("msg-size", value(&msgSize)->default_value(1000), "Message size in bytes") + ("num-msgs", value(&numMsgs)->default_value(0), "Number of messages to send"); config.AddToCmdLineOptions(sampler_options); @@ -72,8 +72,8 @@ int main(int argc, char** argv) #endif sampler.SetProperty(FairMQBenchmarkSampler::Id, id); - sampler.SetProperty(FairMQBenchmarkSampler::EventSize, eventSize); - sampler.SetProperty(FairMQBenchmarkSampler::EventRate, eventRate); + sampler.SetProperty(FairMQBenchmarkSampler::MsgSize, msgSize); + sampler.SetProperty(FairMQBenchmarkSampler::NumMsgs, numMsgs); sampler.SetProperty(FairMQBenchmarkSampler::NumIoThreads, config.GetValue("io-threads")); sampler.ChangeState("INIT_DEVICE"); diff --git a/fairmq/run/runSink.cxx b/fairmq/run/runSink.cxx index 018fb8d9..612babf9 100644 --- a/fairmq/run/runSink.cxx +++ b/fairmq/run/runSink.cxx @@ -40,6 +40,14 @@ int main(int argc, char** argv) try { + int numMsgs; + + options_description sink_options("Sink options"); + sink_options.add_options() + ("num-msgs", value(&numMsgs)->default_value(0), "Number of messages to receive"); + + config.AddToCmdLineOptions(sink_options); + if (config.ParseAll(argc, argv)) { return 0; @@ -64,6 +72,7 @@ int main(int argc, char** argv) sink.SetTransport(transportFactory); sink.SetProperty(FairMQSink::Id, id); + sink.SetProperty(FairMQSink::NumMsgs, numMsgs); sink.SetProperty(FairMQSink::NumIoThreads, config.GetValue("io-threads")); sink.ChangeState("INIT_DEVICE"); diff --git a/fairmq/run/startBenchmark.sh.in b/fairmq/run/startBenchmark.sh.in index 8adb8e30..4920680f 100755 --- a/fairmq/run/startBenchmark.sh.in +++ b/fairmq/run/startBenchmark.sh.in @@ -1,18 +1,14 @@ #!/bin/bash -if(@NANOMSG_FOUND@); then - buffSize="500000000" # nanomsg buffer size is in bytes -else - buffSize="10000" # zeromq high-water mark is in messages -fi - SAMPLER="bsampler" SAMPLER+=" --id bsampler1" -SAMPLER+=" --event-size 10000" +SAMPLER+=" --msg-size 1000000" +#SAMPLER+=" --num-msgs 1000" SAMPLER+=" --config-json-file @CMAKE_BINARY_DIR@/bin/config/benchmark.json" xterm -geometry 80x23+0+0 -hold -e @CMAKE_BINARY_DIR@/bin/$SAMPLER & SINK="sink" SINK+=" --id sink1" +#SINK+=" --num-msgs 1000" SINK+=" --config-json-file @CMAKE_BINARY_DIR@/bin/config/benchmark.json" xterm -geometry 80x23+500+0 -hold -e @CMAKE_BINARY_DIR@/bin/$SINK &