21 #include "../FairMQDevice.h" 22 #include "../FairMQLogger.h" 23 #include "../options/FairMQProgOptions.h" 41 uint64_t fMaxIterations;
42 uint64_t fNumIterations;
43 std::string fInChannelName;
47 fMultipart =
fConfig->GetValue<
bool>(
"multipart");
48 fMaxIterations =
fConfig->GetValue<uint64_t>(
"max-iterations");
49 fInChannelName =
fConfig->GetValue<std::string>(
"in-channel");
57 LOG(info) <<
"Starting the benchmark and expecting to receive " << fMaxIterations <<
" messages.";
58 auto tStart = std::chrono::high_resolution_clock::now();
60 while (CheckCurrentState(RUNNING))
66 if (dataInChannel.
Receive(parts) >= 0)
68 if (fMaxIterations > 0)
70 if (fNumIterations >= fMaxIterations)
72 LOG(info) <<
"Configured maximum number of iterations reached.";
81 FairMQMessagePtr msg(dataInChannel.NewMessage());
83 if (dataInChannel.
Receive(msg) >= 0)
85 if (fMaxIterations > 0)
87 if (fNumIterations >= fMaxIterations)
89 LOG(info) <<
"Configured maximum number of iterations reached.";
98 auto tEnd = std::chrono::high_resolution_clock::now();
100 LOG(info) <<
"Leaving RUNNING state. Received " << fNumIterations <<
" messages in " << std::chrono::duration<double, std::milli>(tEnd - tStart).count() <<
"ms.";
virtual void InitTask()
Task initialization (can be overloaded in child classes)
Definition: FairMQSink.h:45
int Receive(FairMQMessagePtr &msg, int rcvTimeoutInMs=-1)
Definition: FairMQChannel.h:244
FairMQProgOptions * fConfig
Pointer to config (internal or external)
Definition: FairMQDevice.h:413
Definition: FairMQChannel.h:27
Definition: FairMQSink.h:26
std::unordered_map< std::string, std::vector< FairMQChannel > > fChannels
Device channels.
Definition: FairMQDevice.h:411
FairMQParts is a lightweight convenience wrapper around a vector of unique pointers to FairMQMessage,...
Definition: FairMQParts.h:20
Definition: FairMQDevice.h:46
virtual void Run()
Runs the device (to be overloaded in child classes)
Definition: FairMQSink.h:52