FairMQ  1.4.33
C++ Message Queuing Library and Framework
FairMQMerger.h
1 /********************************************************************************
2  * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
15 #ifndef FAIRMQMERGER_H_
16 #define FAIRMQMERGER_H_
17 
18 #include "FairMQDevice.h"
19 #include "../FairMQPoller.h"
20 #include "../FairMQLogger.h"
21 
22 #include <string>
23 #include <vector>
24 
25 class FairMQMerger : public FairMQDevice
26 {
27  public:
28  FairMQMerger()
29  : fMultipart(true)
30  , fInChannelName("data-in")
31  , fOutChannelName("data-out")
32  {}
33  ~FairMQMerger() {}
34 
35  protected:
36  bool fMultipart;
37  std::string fInChannelName;
38  std::string fOutChannelName;
39 
40  void InitTask() override
41  {
42  fMultipart = fConfig->GetProperty<bool>("multipart");
43  fInChannelName = fConfig->GetProperty<std::string>("in-channel");
44  fOutChannelName = fConfig->GetProperty<std::string>("out-channel");
45  }
46 
47  void RegisterChannelEndpoints() override
48  {
49  RegisterChannelEndpoint(fInChannelName, 1, 10000);
50  RegisterChannelEndpoint(fOutChannelName, 1, 1);
51 
52  PrintRegisteredChannels();
53  }
54 
55  void Run() override
56  {
57  int numInputs = fChannels.at(fInChannelName).size();
58 
59  std::vector<FairMQChannel*> chans;
60 
61  for (auto& chan : fChannels.at(fInChannelName)) {
62  chans.push_back(&chan);
63  }
64 
65  FairMQPollerPtr poller(NewPoller(chans));
66 
67  if (fMultipart) {
68  while (!NewStatePending()) {
69  poller->Poll(100);
70 
71  // Loop over the data input channels.
72  for (int i = 0; i < numInputs; ++i) {
73  // Check if the channel has data ready to be received.
74  if (poller->CheckInput(i)) {
75  FairMQParts payload;
76 
77  if (Receive(payload, fInChannelName, i) >= 0) {
78  if (Send(payload, fOutChannelName) < 0) {
79  LOG(debug) << "Transfer interrupted";
80  break;
81  }
82  } else {
83  LOG(debug) << "Transfer interrupted";
84  break;
85  }
86  }
87  }
88  }
89  } else {
90  while (!NewStatePending()) {
91  poller->Poll(100);
92 
93  // Loop over the data input channels.
94  for (int i = 0; i < numInputs; ++i) {
95  // Check if the channel has data ready to be received.
96  if (poller->CheckInput(i)) {
97  FairMQMessagePtr payload(fTransportFactory->CreateMessage());
98 
99  if (Receive(payload, fInChannelName, i) >= 0) {
100  if (Send(payload, fOutChannelName) < 0) {
101  LOG(debug) << "Transfer interrupted";
102  break;
103  }
104  } else {
105  LOG(debug) << "Transfer interrupted";
106  break;
107  }
108  }
109  }
110  }
111  }
112  }
113 };
114 
115 #endif /* FAIRMQMERGER_H_ */
FairMQMerger
Definition: FairMQMerger.h:26
FairMQDevice::fChannels
std::unordered_map< std::string, std::vector< FairMQChannel > > fChannels
Device channels.
Definition: FairMQDevice.h:383
FairMQParts
FairMQParts is a lightweight convenience wrapper around a vector of unique pointers to FairMQMessage,...
Definition: FairMQParts.h:21
fair::mq::ProgOptions::GetProperty
T GetProperty(const std::string &key) const
Read config property, throw if no property with this key exists.
Definition: ProgOptions.h:69
FairMQMerger::Run
void Run() override
Runs the device (to be overloaded in child classes)
Definition: FairMQMerger.h:61
FairMQDevice::Receive
int64_t Receive(FairMQMessagePtr &msg, const std::string &channel, const int index=0, int rcvTimeoutInMs=-1)
Definition: FairMQDevice.h:108
FairMQDevice::Send
int64_t Send(FairMQMessagePtr &msg, const std::string &channel, const int index=0, int sndTimeoutInMs=-1)
Definition: FairMQDevice.h:97
FairMQDevice::fConfig
fair::mq::ProgOptions * fConfig
Pointer to config (internal or external)
Definition: FairMQDevice.h:385
FairMQMerger::InitTask
void InitTask() override
Task initialization (can be overloaded in child classes)
Definition: FairMQMerger.h:46
FairMQDevice::NewStatePending
bool NewStatePending() const
Returns true if a new state has been requested, signaling the current handler to stop.
Definition: FairMQDevice.h:470
FairMQDevice::fTransportFactory
std::shared_ptr< FairMQTransportFactory > fTransportFactory
Default transport factory.
Definition: FairMQDevice.h:379
FairMQDevice
Definition: FairMQDevice.h:50

privacy