FairMQ  1.2.0
C++ Message Passing Framework
FairMQSink.h
1 /********************************************************************************
2  * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
15 #ifndef FAIRMQSINK_H_
16 #define FAIRMQSINK_H_
17 
18 #include <string>
19 #include <chrono>
20 
21 #include "../FairMQDevice.h"
22 #include "../FairMQLogger.h"
23 #include "../options/FairMQProgOptions.h"
24 
25 // template<typename OutputPolicy>
26 class FairMQSink : public FairMQDevice//, public OutputPolicy
27 {
28  public:
29  FairMQSink()
30  : fMaxIterations(0)
31  , fNumIterations(0)
32  , fInChannelName()
33  {}
34 
35  virtual ~FairMQSink()
36  {}
37 
38  protected:
39  uint64_t fMaxIterations;
40  uint64_t fNumIterations;
41  std::string fInChannelName;
42 
43  virtual void InitTask()
44  {
45  fMaxIterations = fConfig->GetValue<uint64_t>("max-iterations");
46  fInChannelName = fConfig->GetValue<std::string>("in-channel");
47  }
48 
49  virtual void Run()
50  {
51  // store the channel reference to avoid traversing the map on every loop iteration
52  FairMQChannel& dataInChannel = fChannels.at(fInChannelName).at(0);
53 
54  LOG(info) << "Starting the benchmark and expecting to receive " << fMaxIterations << " messages.";
55  auto tStart = std::chrono::high_resolution_clock::now();
56 
57  while (CheckCurrentState(RUNNING))
58  {
59  FairMQMessagePtr msg(dataInChannel.Transport()->CreateMessage());
60 
61  if (dataInChannel.Receive(msg) >= 0)
62  {
63  if (fMaxIterations > 0)
64  {
65  if (fNumIterations >= fMaxIterations)
66  {
67  break;
68  }
69  }
70  fNumIterations++;
71  }
72  }
73 
74  auto tEnd = std::chrono::high_resolution_clock::now();
75 
76  LOG(info) << "Leaving RUNNING state. Received " << fNumIterations << " messages in " << std::chrono::duration<double, std::milli>(tEnd - tStart).count() << "ms.";
77  }
78 };
79 
80 #endif /* FAIRMQSINK_H_ */
virtual void InitTask()
Definition: FairMQSink.h:43
virtual void Run()
Definition: FairMQSink.h:49
FairMQProgOptions * fConfig
Program options configuration.
Definition: FairMQDevice.h:422
Definition: FairMQChannel.h:24
Definition: FairMQSink.h:26
std::unordered_map< std::string, std::vector< FairMQChannel > > fChannels
Device channels.
Definition: FairMQDevice.h:421
Definition: FairMQDevice.h:44