FairMQ  1.2.0
C++ Message Passing Framework
FairMQDevice.h
1 /********************************************************************************
2  * Copyright (C) 2012-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
8 
9 #ifndef FAIRMQDEVICE_H_
10 #define FAIRMQDEVICE_H_
11 
12 #include <FairMQStateMachine.h>
13 #include <FairMQTransportFactory.h>
14 #include <fairmq/Transports.h>
15 
16 #include <FairMQSocket.h>
17 #include <FairMQChannel.h>
18 #include <FairMQMessage.h>
19 #include <FairMQParts.h>
20 #include <FairMQUnmanagedRegion.h>
21 #include <FairMQLogger.h>
22 #include <options/FairMQProgOptions.h>
23 
24 #include <vector>
25 #include <memory> // unique_ptr
26 #include <algorithm> // std::sort()
27 #include <string>
28 #include <iostream>
29 #include <unordered_map>
30 #include <functional>
31 #include <assert.h> // static_assert
32 #include <type_traits> // is_trivially_copyable
33 
34 #include <mutex>
35 #include <condition_variable>
36 
37 #include <fairmq/Tools.h>
38 
39 using FairMQChannelMap = std::unordered_map<std::string, std::vector<FairMQChannel>>;
40 
41 using InputMsgCallback = std::function<bool(FairMQMessagePtr&, int)>;
42 using InputMultipartCallback = std::function<bool(FairMQParts&, int)>;
43 
45 {
46  friend class FairMQChannel;
47 
48  public:
50  FairMQDevice();
51 
55  FairMQDevice(const FairMQDevice&) = delete;
57  FairMQDevice operator=(const FairMQDevice&) = delete;
59  virtual ~FairMQDevice();
60 
62  void CatchSignals();
63 
65  virtual void LogSocketRates();
66 
70  void SortChannel(const std::string& name, const bool reindex = true);
71 
74  void PrintChannel(const std::string& name);
75 
76  template<typename Serializer, typename DataType, typename... Args>
77  void Serialize(FairMQMessage& msg, DataType&& data, Args&&... args) const
78  {
79  Serializer().Serialize(msg, std::forward<DataType>(data), std::forward<Args>(args)...);
80  }
81 
82  template<typename Deserializer, typename DataType, typename... Args>
83  void Deserialize(FairMQMessage& msg, DataType&& data, Args&&... args) const
84  {
85  Deserializer().Deserialize(msg, std::forward<DataType>(data), std::forward<Args>(args)...);
86  }
87 
88  int Send(FairMQMessagePtr& msg, const std::string& chan, const int i = 0) const
89  {
90  return fChannels.at(chan).at(i).Send(msg);
91  }
92 
93  int Receive(FairMQMessagePtr& msg, const std::string& chan, const int i = 0) const
94  {
95  return fChannels.at(chan).at(i).Receive(msg);
96  }
97 
104  int Send(FairMQMessagePtr& msg, const std::string& chan, const int i, int sndTimeoutInMs) const
105  {
106  return fChannels.at(chan).at(i).Send(msg, sndTimeoutInMs);
107  }
108 
115  int Receive(FairMQMessagePtr& msg, const std::string& chan, const int i, int rcvTimeoutInMs) const
116  {
117  return fChannels.at(chan).at(i).Receive(msg, rcvTimeoutInMs);
118  }
119 
126  int SendAsync(FairMQMessagePtr& msg, const std::string& chan, const int i = 0) const
127  {
128  return fChannels.at(chan).at(i).SendAsync(msg);
129  }
130 
137  int ReceiveAsync(FairMQMessagePtr& msg, const std::string& chan, const int i = 0) const
138  {
139  return fChannels.at(chan).at(i).ReceiveAsync(msg);
140  }
141 
142  int64_t Send(FairMQParts& parts, const std::string& chan, const int i = 0) const
143  {
144  return fChannels.at(chan).at(i).Send(parts.fParts);
145  }
146 
147  int64_t Receive(FairMQParts& parts, const std::string& chan, const int i = 0) const
148  {
149  return fChannels.at(chan).at(i).Receive(parts.fParts);
150  }
151 
158  int64_t Send(FairMQParts& parts, const std::string& chan, const int i, int sndTimeoutInMs) const
159  {
160  return fChannels.at(chan).at(i).Send(parts.fParts, sndTimeoutInMs);
161  }
162 
169  int64_t Receive(FairMQParts& parts, const std::string& chan, const int i, int rcvTimeoutInMs) const
170  {
171  return fChannels.at(chan).at(i).Receive(parts.fParts, rcvTimeoutInMs);
172  }
173 
180  int64_t SendAsync(FairMQParts& parts, const std::string& chan, const int i = 0) const
181  {
182  return fChannels.at(chan).at(i).SendAsync(parts.fParts);
183  }
184 
191  int64_t ReceiveAsync(FairMQParts& parts, const std::string& chan, const int i = 0) const
192  {
193  return fChannels.at(chan).at(i).ReceiveAsync(parts.fParts);
194  }
195 
197  auto Transport() const -> const FairMQTransportFactory*
198  {
199  return fTransports.at(fair::mq::TransportTypes[GetDefaultTransport()]).get();
200  }
201 
202  template<typename... Args>
203  FairMQMessagePtr NewMessage(Args&&... args) const
204  {
205  return Transport()->CreateMessage(std::forward<Args>(args)...);
206  }
207 
208  template<typename... Args>
209  FairMQMessagePtr NewMessageFor(const std::string& channel, int index, Args&&... args) const
210  {
211  return fChannels.at(channel).at(index).Transport()->CreateMessage(std::forward<Args>(args)...);
212  }
213 
214  template<typename T>
215  FairMQMessagePtr NewStaticMessage(const T& data) const
216  {
217  return Transport()->NewStaticMessage(data);
218  }
219 
220  template<typename T>
221  FairMQMessagePtr NewStaticMessageFor(const std::string& channel, int index, const T& data) const
222  {
223  return fChannels.at(channel).at(index).NewStaticMessage(data);
224  }
225 
226  template<typename T>
227  FairMQMessagePtr NewSimpleMessage(const T& data) const
228  {
229  return Transport()->NewSimpleMessage(data);
230  }
231 
232  template<typename T>
233  FairMQMessagePtr NewSimpleMessageFor(const std::string& channel, int index, const T& data) const
234  {
235  return fChannels.at(channel).at(index).NewSimpleMessage(data);
236  }
237 
238  FairMQUnmanagedRegionPtr NewUnmanagedRegion(const size_t size)
239  {
240  return Transport()->CreateUnmanagedRegion(size);
241  }
242 
243  FairMQUnmanagedRegionPtr NewUnmanagedRegionFor(const std::string& channel, int index, const size_t size, FairMQRegionCallback callback = nullptr)
244  {
245  return fChannels.at(channel).at(index).Transport()->CreateUnmanagedRegion(size, callback);
246  }
247 
248  template<typename ...Ts>
249  FairMQPollerPtr NewPoller(const Ts&... inputs)
250  {
251  std::vector<std::string> chans{inputs...};
252 
253  // if more than one channel provided, check compatibility
254  if (chans.size() > 1)
255  {
256  FairMQ::Transport type = fChannels.at(chans.at(0)).at(0).Transport()->GetType();
257 
258  for (unsigned int i = 1; i < chans.size(); ++i)
259  {
260  if (type != fChannels.at(chans.at(i)).at(0).Transport()->GetType())
261  {
262  LOG(error) << "poller failed: different transports within same poller are not yet supported. Going to ERROR state.";
263  ChangeState(ERROR_FOUND);
264  }
265  }
266  }
267 
268  return fChannels.at(chans.at(0)).at(0).Transport()->CreatePoller(fChannels, chans);
269  }
270 
271  FairMQPollerPtr NewPoller(const std::vector<const FairMQChannel*>& channels)
272  {
273  // if more than one channel provided, check compatibility
274  if (channels.size() > 1)
275  {
276  FairMQ::Transport type = channels.at(0)->Transport()->GetType();
277 
278  for (unsigned int i = 1; i < channels.size(); ++i)
279  {
280  if (type != channels.at(i)->Transport()->GetType())
281  {
282  LOG(error) << "poller failed: different transports within same poller are not yet supported. Going to ERROR state.";
283  ChangeState(ERROR_FOUND);
284  }
285  }
286  }
287 
288  return channels.at(0)->Transport()->CreatePoller(channels);
289  }
290 
293 
296  std::shared_ptr<FairMQTransportFactory> AddTransport(const std::string& transport);
299  void SetTransport(const std::string& transport = "zeromq");
300 
301  void SetConfig(FairMQProgOptions& config);
302  const FairMQProgOptions* GetConfig() const
303  {
304  return fConfig;
305  }
306 
310  static bool SortSocketsByAddress(const FairMQChannel &lhs, const FairMQChannel &rhs);
311 
312  template<typename T>
313  void OnData(const std::string& channelName, bool (T::* memberFunction)(FairMQMessagePtr& msg, int index))
314  {
315  fDataCallbacks = true;
316  fMsgInputs.insert(std::make_pair(channelName, [this, memberFunction](FairMQMessagePtr& msg, int index)
317  {
318  return (static_cast<T*>(this)->*memberFunction)(msg, index);
319  }));
320 
321  if (find(fInputChannelKeys.begin(), fInputChannelKeys.end(), channelName) == fInputChannelKeys.end())
322  {
323  fInputChannelKeys.push_back(channelName);
324  }
325  }
326 
327  void OnData(const std::string& channelName, InputMsgCallback callback)
328  {
329  fDataCallbacks = true;
330  fMsgInputs.insert(make_pair(channelName, callback));
331 
332  if (find(fInputChannelKeys.begin(), fInputChannelKeys.end(), channelName) == fInputChannelKeys.end())
333  {
334  fInputChannelKeys.push_back(channelName);
335  }
336  }
337 
338  template<typename T>
339  void OnData(const std::string& channelName, bool (T::* memberFunction)(FairMQParts& parts, int index))
340  {
341  fDataCallbacks = true;
342  fMultipartInputs.insert(std::make_pair(channelName, [this, memberFunction](FairMQParts& parts, int index)
343  {
344  return (static_cast<T*>(this)->*memberFunction)(parts, index);
345  }));
346 
347  if (find(fInputChannelKeys.begin(), fInputChannelKeys.end(), channelName) == fInputChannelKeys.end())
348  {
349  fInputChannelKeys.push_back(channelName);
350  }
351  }
352 
353  void OnData(const std::string& channelName, InputMultipartCallback callback)
354  {
355  fDataCallbacks = true;
356  fMultipartInputs.insert(make_pair(channelName, callback));
357 
358  if (find(fInputChannelKeys.begin(), fInputChannelKeys.end(), channelName) == fInputChannelKeys.end())
359  {
360  fInputChannelKeys.push_back(channelName);
361  }
362  }
363 
364  const FairMQChannel& GetChannel(const std::string& channelName, const int index = 0) const;
365 
366  virtual void RegisterChannelEndpoints() {}
367 
368  bool RegisterChannelEndpoint(const std::string& channelName, uint16_t minNumSubChannels = 1, uint16_t maxNumSubChannels = 1)
369  {
370  bool ok = fChannelRegistry.insert(std::make_pair(channelName, std::make_pair(minNumSubChannels, maxNumSubChannels))).second;
371  if (!ok)
372  {
373  LOG(warn) << "Registering channel: name already registered: \"" << channelName << "\"";
374  }
375  return ok;
376  }
377 
378  void PrintRegisteredChannels()
379  {
380  if (fChannelRegistry.size() < 1)
381  {
382  std::cout << "no channels registered." << std::endl;
383  }
384  else
385  {
386  for (const auto& c : fChannelRegistry)
387  {
388  std::cout << c.first << ":" << c.second.first << ":" << c.second.second << std::endl;
389  }
390  }
391  }
392 
393  void SetId(const std::string& id) { fId = id; }
394  std::string GetId() { return fId; }
395 
396  const fair::mq::tools::Version GetVersion() const { return fVersion; }
397 
398  void SetNumIoThreads(int numIoThreads) { fNumIoThreads = numIoThreads; }
399  int GetNumIoThreads() const { return fNumIoThreads; }
400 
401  void SetPortRangeMin(int portRangeMin) { fPortRangeMin = portRangeMin; }
402  int GetPortRangeMin() const { return fPortRangeMin; }
403 
404  void SetPortRangeMax(int portRangeMax) { fPortRangeMax = portRangeMax; }
405  int GetPortRangeMax() const { return fPortRangeMax; }
406 
407  void SetNetworkInterface(const std::string& networkInterface) { fNetworkInterface = networkInterface; }
408  std::string GetNetworkInterface() const { return fNetworkInterface; }
409 
410  void SetDefaultTransport(const std::string& defaultTransport) { fDefaultTransport = defaultTransport; }
411  std::string GetDefaultTransport() const { return fDefaultTransport; }
412 
413  void SetInitializationTimeoutInS(int initializationTimeoutInS) { fInitializationTimeoutInS = initializationTimeoutInS; }
414  int GetInitializationTimeoutInS() const { return fInitializationTimeoutInS; }
415 
416  protected:
417  std::shared_ptr<FairMQTransportFactory> fTransportFactory;
418  std::unordered_map<FairMQ::Transport, std::shared_ptr<FairMQTransportFactory>> fTransports;
419 
420  public:
421  std::unordered_map<std::string, std::vector<FairMQChannel>> fChannels;
423 
424  protected:
425  std::string fId;
426 
428 
431  virtual void Init();
432 
435  virtual void InitTask();
436 
439  virtual void Run();
440 
443  virtual void PreRun();
444 
447  virtual bool ConditionalRun();
448 
451  virtual void PostRun();
452 
455  virtual void Pause();
456 
459  virtual void ResetTask();
460 
463  virtual void Reset();
464 
465  private:
466  // condition variable to notify parent thread about end of initial validation.
467  bool fInitialValidationFinished;
468  std::condition_variable fInitialValidationCondition;
469  std::mutex fInitialValidationMutex;
470 
471  int fPortRangeMin;
472  int fPortRangeMax;
473 
474  std::string fNetworkInterface;
475  std::string fDefaultTransport;
476 
477  int fInitializationTimeoutInS;
478 
480  void InitWrapper();
482  void InitTaskWrapper();
484  void RunWrapper();
486  void PauseWrapper();
488  void ResetTaskWrapper();
490  void ResetWrapper();
491 
493  void Unblock();
494 
496  void Exit();
497 
499  void AttachChannels(std::vector<FairMQChannel*>& chans);
500 
504  bool ConnectEndpoint(FairMQSocket& socket, std::string& endpoint);
505  bool BindEndpoint(FairMQSocket& socket, std::string& endpoint);
509  bool AttachChannel(FairMQChannel& ch);
510 
511  void HandleSingleChannelInput();
512  void HandleMultipleChannelInput();
513  void HandleMultipleTransportInput();
514  void PollForTransport(const FairMQTransportFactory* factory, const std::vector<std::string>& channelKeys);
515 
516  bool HandleMsgInput(const std::string& chName, const InputMsgCallback& callback, int i) const;
517  bool HandleMultipartInput(const std::string& chName, const InputMultipartCallback& callback, int i) const;
518 
519  void CreateOwnConfig();
520 
521  bool fDataCallbacks;
522  std::unordered_map<std::string, InputMsgCallback> fMsgInputs;
523  std::unordered_map<std::string, InputMultipartCallback> fMultipartInputs;
524  std::unordered_map<FairMQ::Transport, std::vector<std::string>> fMultitransportInputs;
525  std::unordered_map<std::string, std::pair<uint16_t, uint16_t>> fChannelRegistry;
526  std::vector<std::string> fInputChannelKeys;
527  std::mutex fMultitransportMutex;
528  std::atomic<bool> fMultitransportProceed;
529 
530  bool fExternalConfig;
531 
532  const fair::mq::tools::Version fVersion;
533  float fRate;
534  size_t fLastTime;
535 };
536 
537 #endif /* FAIRMQDEVICE_H_ */
virtual void Pause()
Definition: FairMQDevice.cxx:753
std::shared_ptr< FairMQTransportFactory > AddTransport(const std::string &transport)
Definition: FairMQDevice.cxx:763
virtual void Run()
Definition: FairMQDevice.cxx:729
int64_t ReceiveAsync(FairMQParts &parts, const std::string &chan, const int i=0) const
Definition: FairMQDevice.h:191
int ReceiveAsync(FairMQMessagePtr &msg, const std::string &chan, const int i=0) const
Definition: FairMQDevice.h:137
virtual bool ConditionalRun()
Definition: FairMQDevice.cxx:737
FairMQProgOptions * fConfig
Program options configuration.
Definition: FairMQDevice.h:422
Definition: FairMQTransportFactory.h:27
auto Transport() const -> const FairMQTransportFactory *
Getter for default transport factory.
Definition: FairMQDevice.h:197
std::unordered_map< FairMQ::Transport, std::shared_ptr< FairMQTransportFactory > > fTransports
Container for transports.
Definition: FairMQDevice.h:418
Definition: FairMQChannel.h:24
Definition: FairMQProgOptions.h:41
virtual void PreRun()
Definition: FairMQDevice.cxx:733
virtual void ResetTask()
Definition: FairMQDevice.cxx:974
std::string fId
Device ID.
Definition: FairMQDevice.h:425
void CatchSignals()
Catches interrupt signals (SIGINT, SIGTERM)
virtual ~FairMQDevice()
Default destructor.
Definition: FairMQDevice.cxx:1014
void SortChannel(const std::string &name, const bool reindex=true)
Definition: FairMQDevice.cxx:420
void PrintChannel(const std::string &name)
Definition: FairMQDevice.cxx:441
int64_t Receive(FairMQParts &parts, const std::string &chan, const int i, int rcvTimeoutInMs) const
Definition: FairMQDevice.h:169
virtual void PostRun()
Definition: FairMQDevice.cxx:742
Definition: FairMQSocket.h:18
std::unordered_map< std::string, std::vector< FairMQChannel > > fChannels
Device channels.
Definition: FairMQDevice.h:421
Definition: FairMQStateMachine.h:568
int Receive(FairMQMessagePtr &msg, const std::string &chan, const int i, int rcvTimeoutInMs) const
Definition: FairMQDevice.h:115
void SetTransport(const std::string &transport="zeromq")
Definition: FairMQDevice.cxx:810
int fNumIoThreads
Number of ZeroMQ I/O threads.
Definition: FairMQDevice.h:427
int Send(FairMQMessagePtr &msg, const std::string &chan, const int i, int sndTimeoutInMs) const
Definition: FairMQDevice.h:104
FairMQParts is a lightweight convenience wrapper around a vector of unique pointers to FairMQMessage...
Definition: FairMQParts.h:20
std::shared_ptr< FairMQTransportFactory > fTransportFactory
Transport factory.
Definition: FairMQDevice.h:417
virtual void Reset()
Definition: FairMQDevice.cxx:987
static bool SortSocketsByAddress(const FairMQChannel &lhs, const FairMQChannel &rhs)
Definition: FairMQDevice.cxx:415
int64_t SendAsync(FairMQParts &parts, const std::string &chan, const int i=0) const
Definition: FairMQDevice.h:180
Definition: FairMQDevice.h:44
int SendAsync(FairMQMessagePtr &msg, const std::string &chan, const int i=0) const
Definition: FairMQDevice.h:126
virtual void Init()
Definition: FairMQDevice.cxx:215
Definition: FairMQMessage.h:19
void WaitForInitialValidation()
Waits for the first initialization run to finish.
Definition: FairMQDevice.cxx:209
virtual void InitTask()
Definition: FairMQDevice.cxx:411
FairMQDevice operator=(const FairMQDevice &)=delete
Assignment operator (disabled)
virtual void LogSocketRates()
Outputs the socket transfer rates.
Definition: FairMQDevice.cxx:851
FairMQDevice()
Default constructor.
Definition: FairMQDevice.cxx:32
Definition: Version.h:22
int64_t Send(FairMQParts &parts, const std::string &chan, const int i, int sndTimeoutInMs) const
Definition: FairMQDevice.h:158