FairMQ  1.4.14
C++ Message Queuing Library and Framework
FairMQSink.h
1 /********************************************************************************
2  * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
15 #ifndef FAIRMQSINK_H_
16 #define FAIRMQSINK_H_
17 
18 #include <string>
19 #include <chrono>
20 
21 #include "../FairMQDevice.h"
22 #include "../FairMQLogger.h"
23 
24 // template<typename OutputPolicy>
25 class FairMQSink : public FairMQDevice//, public OutputPolicy
26 {
27  public:
28  FairMQSink()
29  : fMultipart(false)
30  , fMaxIterations(0)
31  , fNumIterations(0)
32  , fInChannelName()
33  {}
34 
35  virtual ~FairMQSink()
36  {}
37 
38  protected:
39  bool fMultipart;
40  uint64_t fMaxIterations;
41  uint64_t fNumIterations;
42  std::string fInChannelName;
43 
44  virtual void InitTask()
45  {
46  fMultipart = fConfig->GetProperty<bool>("multipart");
47  fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
48  fInChannelName = fConfig->GetProperty<std::string>("in-channel");
49  }
50 
51  virtual void Run()
52  {
53  // store the channel reference to avoid traversing the map on every loop iteration
54  FairMQChannel& dataInChannel = fChannels.at(fInChannelName).at(0);
55 
56  LOG(info) << "Starting the benchmark and expecting to receive " << fMaxIterations << " messages.";
57  auto tStart = std::chrono::high_resolution_clock::now();
58 
59  while (!NewStatePending())
60  {
61  if (fMultipart)
62  {
63  FairMQParts parts;
64 
65  if (dataInChannel.Receive(parts) >= 0)
66  {
67  if (fMaxIterations > 0)
68  {
69  if (fNumIterations >= fMaxIterations)
70  {
71  LOG(info) << "Configured maximum number of iterations reached.";
72  break;
73  }
74  }
75  fNumIterations++;
76  }
77  }
78  else
79  {
80  FairMQMessagePtr msg(dataInChannel.NewMessage());
81 
82  if (dataInChannel.Receive(msg) >= 0)
83  {
84  if (fMaxIterations > 0)
85  {
86  if (fNumIterations >= fMaxIterations)
87  {
88  LOG(info) << "Configured maximum number of iterations reached.";
89  break;
90  }
91  }
92  fNumIterations++;
93  }
94  }
95  }
96 
97  auto tEnd = std::chrono::high_resolution_clock::now();
98 
99  LOG(info) << "Leaving RUNNING state. Received " << fNumIterations << " messages in " << std::chrono::duration<double, std::milli>(tEnd - tStart).count() << "ms.";
100  }
101 };
102 
103 #endif /* FAIRMQSINK_H_ */
virtual void InitTask()
Task initialization (can be overloaded in child classes)
Definition: FairMQSink.h:44
T GetProperty(const std::string &key) const
Read config property, throw if no property with this key exists.
Definition: ProgOptions.h:65
int Receive(FairMQMessagePtr &msg, int rcvTimeoutInMs=-1)
Definition: FairMQChannel.h:266
fair::mq::ProgOptions * fConfig
Pointer to config (internal or external)
Definition: FairMQDevice.h:449
Definition: FairMQChannel.h:30
Definition: FairMQSink.h:25
std::unordered_map< std::string, std::vector< FairMQChannel > > fChannels
Device channels.
Definition: FairMQDevice.h:447
FairMQParts is a lightweight convenience wrapper around a vector of unique pointers to FairMQMessage...
Definition: FairMQParts.h:20
Definition: FairMQDevice.h:53
virtual void Run()
Runs the device (to be overloaded in child classes)
Definition: FairMQSink.h:51

privacy