FairMQ  1.4.14
C++ Message Queuing Library and Framework
FairMQDevice.h
1 /********************************************************************************
2  * Copyright (C) 2012-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
8 
9 #ifndef FAIRMQDEVICE_H_
10 #define FAIRMQDEVICE_H_
11 
12 #include <StateMachine.h>
13 #include <FairMQTransportFactory.h>
14 #include <fairmq/Transports.h>
15 #include <fairmq/StateQueue.h>
16 
17 #include <FairMQChannel.h>
18 #include <FairMQMessage.h>
19 #include <FairMQParts.h>
20 #include <FairMQUnmanagedRegion.h>
21 #include <FairMQLogger.h>
22 #include <fairmq/ProgOptions.h>
23 
24 #include <vector>
25 #include <memory> // unique_ptr
26 #include <algorithm> // find
27 #include <string>
28 #include <chrono>
29 #include <iostream>
30 #include <unordered_map>
31 #include <functional>
32 #include <stdexcept>
33 #include <mutex>
34 #include <atomic>
35 #include <cstddef>
36 #include <utility> // pair
37 
38 #include <fairmq/tools/Version.h>
39 
40 using FairMQChannelMap = std::unordered_map<std::string, std::vector<FairMQChannel>>;
41 
42 using InputMsgCallback = std::function<bool(FairMQMessagePtr&, int)>;
43 using InputMultipartCallback = std::function<bool(FairMQParts&, int)>;
44 
45 namespace fair
46 {
47 namespace mq
48 {
49 struct OngoingTransition : std::runtime_error { using std::runtime_error::runtime_error; };
50 }
51 }
52 
54 {
55  friend class FairMQChannel;
56 
57  public:
58  // backwards-compatibility enum for old state machine interface, todo: delete this
59  enum Event
60  {
61  INIT_DEVICE,
62  internal_DEVICE_READY,
63  INIT_TASK,
64  internal_READY,
65  RUN,
66  STOP,
67  RESET_TASK,
68  RESET_DEVICE,
69  internal_IDLE,
70  END,
71  ERROR_FOUND
72  };
73 
74  // backwards-compatibility enum for old state machine interface, todo: delete this
75  enum State
76  {
77  OK,
78  Error,
79  IDLE,
80  INITIALIZING_DEVICE,
81  DEVICE_READY,
82  INITIALIZING_TASK,
83  READY,
84  RUNNING,
85  RESETTING_TASK,
86  RESETTING_DEVICE,
87  EXITING
88  };
89 
91  FairMQDevice();
94 
97 
100 
101  private:
103 
104  public:
106  FairMQDevice(const FairMQDevice&) = delete;
108  FairMQDevice operator=(const FairMQDevice&) = delete;
110  virtual ~FairMQDevice();
111 
113  virtual void LogSocketRates();
114 
115  template<typename Serializer, typename DataType, typename... Args>
116  void Serialize(FairMQMessage& msg, DataType&& data, Args&&... args) const
117  {
118  Serializer().Serialize(msg, std::forward<DataType>(data), std::forward<Args>(args)...);
119  }
120 
121  template<typename Deserializer, typename DataType, typename... Args>
122  void Deserialize(FairMQMessage& msg, DataType&& data, Args&&... args) const
123  {
124  Deserializer().Deserialize(msg, std::forward<DataType>(data), std::forward<Args>(args)...);
125  }
126 
133  int Send(FairMQMessagePtr& msg, const std::string& channel, const int index = 0, int sndTimeoutInMs = -1)
134  {
135  return GetChannel(channel, index).Send(msg, sndTimeoutInMs);
136  }
137 
144  int Receive(FairMQMessagePtr& msg, const std::string& channel, const int index = 0, int rcvTimeoutInMs = -1)
145  {
146  return GetChannel(channel, index).Receive(msg, rcvTimeoutInMs);
147  }
148 
155  int64_t Send(FairMQParts& parts, const std::string& channel, const int index = 0, int sndTimeoutInMs = -1)
156  {
157  return GetChannel(channel, index).Send(parts.fParts, sndTimeoutInMs);
158  }
159 
166  int64_t Receive(FairMQParts& parts, const std::string& channel, const int index = 0, int rcvTimeoutInMs = -1)
167  {
168  return GetChannel(channel, index).Receive(parts.fParts, rcvTimeoutInMs);
169  }
170 
173  {
174  return fTransportFactory.get();
175  }
176 
177  // creates message with the default device transport
178  template<typename... Args>
179  FairMQMessagePtr NewMessage(Args&&... args)
180  {
181  return Transport()->CreateMessage(std::forward<Args>(args)...);
182  }
183 
184  // creates message with the transport of the specified channel
185  template<typename... Args>
186  FairMQMessagePtr NewMessageFor(const std::string& channel, int index, Args&&... args)
187  {
188  return GetChannel(channel, index).NewMessage(std::forward<Args>(args)...);
189  }
190 
191  // creates a message that will not be cleaned up after transfer, with the default device transport
192  template<typename T>
193  FairMQMessagePtr NewStaticMessage(const T& data)
194  {
195  return Transport()->NewStaticMessage(data);
196  }
197 
198  // creates a message that will not be cleaned up after transfer, with the transport of the specified channel
199  template<typename T>
200  FairMQMessagePtr NewStaticMessageFor(const std::string& channel, int index, const T& data)
201  {
202  return GetChannel(channel, index).NewStaticMessage(data);
203  }
204 
205  // creates a message with a copy of the provided data, with the default device transport
206  template<typename T>
207  FairMQMessagePtr NewSimpleMessage(const T& data)
208  {
209  return Transport()->NewSimpleMessage(data);
210  }
211 
212  // creates a message with a copy of the provided data, with the transport of the specified channel
213  template<typename T>
214  FairMQMessagePtr NewSimpleMessageFor(const std::string& channel, int index, const T& data)
215  {
216  return GetChannel(channel, index).NewSimpleMessage(data);
217  }
218 
219  // creates unamanaged region with the default device transport
220  FairMQUnmanagedRegionPtr NewUnmanagedRegion(const size_t size,
221  FairMQRegionCallback callback = nullptr,
222  const std::string& path = "",
223  int flags = 0)
224  {
225  return Transport()->CreateUnmanagedRegion(size, callback, path, flags);
226  }
227 
228  // creates unamanaged region with the default device transport
229  FairMQUnmanagedRegionPtr NewUnmanagedRegion(const size_t size,
230  const int64_t userFlags,
231  FairMQRegionCallback callback = nullptr,
232  const std::string& path = "",
233  int flags = 0)
234  {
235  return Transport()->CreateUnmanagedRegion(size, userFlags, callback, path, flags);
236  }
237 
238  // creates unmanaged region with the transport of the specified channel
239  FairMQUnmanagedRegionPtr NewUnmanagedRegionFor(const std::string& channel,
240  int index,
241  const size_t size,
242  FairMQRegionCallback callback = nullptr,
243  const std::string& path = "",
244  int flags = 0)
245  {
246  return GetChannel(channel, index).NewUnmanagedRegion(size, callback, path, flags);
247  }
248 
249  // creates unmanaged region with the transport of the specified channel
250  FairMQUnmanagedRegionPtr NewUnmanagedRegionFor(const std::string& channel,
251  int index,
252  const size_t size,
253  const int64_t userFlags,
254  FairMQRegionCallback callback = nullptr,
255  const std::string& path = "",
256  int flags = 0)
257  {
258  return GetChannel(channel, index).NewUnmanagedRegion(size, userFlags, callback, path, flags);
259  }
260 
261  template<typename ...Ts>
262  FairMQPollerPtr NewPoller(const Ts&... inputs)
263  {
264  std::vector<std::string> chans{inputs...};
265 
266  // if more than one channel provided, check compatibility
267  if (chans.size() > 1)
268  {
269  fair::mq::Transport type = GetChannel(chans.at(0), 0).Transport()->GetType();
270 
271  for (unsigned int i = 1; i < chans.size(); ++i)
272  {
273  if (type != GetChannel(chans.at(i), 0).Transport()->GetType())
274  {
275  LOG(error) << "poller failed: different transports within same poller are not yet supported. Going to ERROR state.";
276  throw std::runtime_error("poller failed: different transports within same poller are not yet supported.");
277  }
278  }
279  }
280 
281  return GetChannel(chans.at(0), 0).Transport()->CreatePoller(fChannels, chans);
282  }
283 
284  FairMQPollerPtr NewPoller(const std::vector<FairMQChannel*>& channels)
285  {
286  // if more than one channel provided, check compatibility
287  if (channels.size() > 1)
288  {
289  fair::mq::Transport type = channels.at(0)->Transport()->GetType();
290 
291  for (unsigned int i = 1; i < channels.size(); ++i)
292  {
293  if (type != channels.at(i)->Transport()->GetType())
294  {
295  LOG(error) << "poller failed: different transports within same poller are not yet supported. Going to ERROR state.";
296  throw std::runtime_error("poller failed: different transports within same poller are not yet supported.");
297  }
298  }
299  }
300 
301  return channels.at(0)->Transport()->CreatePoller(channels);
302  }
303 
306  std::shared_ptr<FairMQTransportFactory> AddTransport(const fair::mq::Transport transport);
307 
309  void SetConfig(fair::mq::ProgOptions& config);
312  {
313  return fConfig;
314  }
315 
316  // overload to easily bind member functions
317  template<typename T>
318  void OnData(const std::string& channelName, bool (T::* memberFunction)(FairMQMessagePtr& msg, int index))
319  {
320  fDataCallbacks = true;
321  fMsgInputs.insert(std::make_pair(channelName, [this, memberFunction](FairMQMessagePtr& msg, int index)
322  {
323  return (static_cast<T*>(this)->*memberFunction)(msg, index);
324  }));
325 
326  if (find(fInputChannelKeys.begin(), fInputChannelKeys.end(), channelName) == fInputChannelKeys.end())
327  {
328  fInputChannelKeys.push_back(channelName);
329  }
330  }
331 
332  void OnData(const std::string& channelName, InputMsgCallback callback)
333  {
334  fDataCallbacks = true;
335  fMsgInputs.insert(make_pair(channelName, callback));
336 
337  if (find(fInputChannelKeys.begin(), fInputChannelKeys.end(), channelName) == fInputChannelKeys.end())
338  {
339  fInputChannelKeys.push_back(channelName);
340  }
341  }
342 
343  // overload to easily bind member functions
344  template<typename T>
345  void OnData(const std::string& channelName, bool (T::* memberFunction)(FairMQParts& parts, int index))
346  {
347  fDataCallbacks = true;
348  fMultipartInputs.insert(std::make_pair(channelName, [this, memberFunction](FairMQParts& parts, int index)
349  {
350  return (static_cast<T*>(this)->*memberFunction)(parts, index);
351  }));
352 
353  if (find(fInputChannelKeys.begin(), fInputChannelKeys.end(), channelName) == fInputChannelKeys.end())
354  {
355  fInputChannelKeys.push_back(channelName);
356  }
357  }
358 
359  void OnData(const std::string& channelName, InputMultipartCallback callback)
360  {
361  fDataCallbacks = true;
362  fMultipartInputs.insert(make_pair(channelName, callback));
363 
364  if (find(fInputChannelKeys.begin(), fInputChannelKeys.end(), channelName) == fInputChannelKeys.end())
365  {
366  fInputChannelKeys.push_back(channelName);
367  }
368  }
369 
370  FairMQChannel& GetChannel(const std::string& channelName, const int index = 0)
371  try {
372  return fChannels.at(channelName).at(index);
373  } catch (const std::out_of_range& oor) {
374  LOG(error) << "requested channel has not been configured? check channel names/configuration.";
375  LOG(error) << "channel: " << channelName << ", index: " << index;
376  LOG(error) << "out of range: " << oor.what();
377  throw;
378  }
379 
380  virtual void RegisterChannelEndpoints() {}
381 
382  bool RegisterChannelEndpoint(const std::string& channelName, uint16_t minNumSubChannels = 1, uint16_t maxNumSubChannels = 1)
383  {
384  bool ok = fChannelRegistry.insert(std::make_pair(channelName, std::make_pair(minNumSubChannels, maxNumSubChannels))).second;
385  if (!ok) {
386  LOG(warn) << "Registering channel: name already registered: \"" << channelName << "\"";
387  }
388  return ok;
389  }
390 
391  void PrintRegisteredChannels()
392  {
393  if (fChannelRegistry.size() < 1) {
394  std::cout << "no channels registered." << std::endl;
395  } else {
396  for (const auto& c : fChannelRegistry) {
397  std::cout << c.first << ":" << c.second.first << ":" << c.second.second << std::endl;
398  }
399  }
400  }
401 
402  void SetId(const std::string& id) { fId = id; }
403  std::string GetId() { return fId; }
404 
405  const fair::mq::tools::Version GetVersion() const { return fVersion; }
406 
407  void SetNumIoThreads(int numIoThreads) { fConfig->SetProperty("io-threads", numIoThreads);}
408  int GetNumIoThreads() const { return fConfig->GetProperty<int>("io-threads", DefaultIOThreads); }
409 
410  void SetNetworkInterface(const std::string& networkInterface) { fConfig->SetProperty("network-interface", networkInterface); }
411  std::string GetNetworkInterface() const { return fConfig->GetProperty<std::string>("network-interface", DefaultNetworkInterface); }
412 
413  void SetDefaultTransport(const std::string& name) { fConfig->SetProperty("transport", name); }
414  std::string GetDefaultTransport() const { return fConfig->GetProperty<std::string>("transport", DefaultTransportName); }
415 
416  void SetInitTimeoutInS(int initTimeoutInS) { fConfig->SetProperty("init-timeout", initTimeoutInS); }
417  int GetInitTimeoutInS() const { return fConfig->GetProperty<int>("init-timeout", DefaultInitTimeout); }
418 
421  void SetTransport(const std::string& transport) { fConfig->SetProperty("transport", transport); }
423  std::string GetTransportName() const { return fConfig->GetProperty<std::string>("transport", DefaultTransportName); }
424 
425  void SetRawCmdLineArgs(const std::vector<std::string>& args) { fRawCmdLineArgs = args; }
426  std::vector<std::string> GetRawCmdLineArgs() const { return fRawCmdLineArgs; }
427 
428  void RunStateMachine()
429  {
430  fStateMachine.ProcessWork();
431  };
432 
436  template<typename Rep, typename Period>
437  bool WaitFor(std::chrono::duration<Rep, Period> const& duration)
438  {
439  return !fStateMachine.WaitForPendingStateFor(std::chrono::duration_cast<std::chrono::milliseconds>(duration).count());
440  }
441 
442  protected:
443  std::shared_ptr<FairMQTransportFactory> fTransportFactory;
444  std::unordered_map<fair::mq::Transport, std::shared_ptr<FairMQTransportFactory>> fTransports;
445 
446  public:
447  std::unordered_map<std::string, std::vector<FairMQChannel>> fChannels;
448  std::unique_ptr<fair::mq::ProgOptions> fInternalConfig;
450 
451  void AddChannel(const std::string& name, FairMQChannel&& channel)
452  {
453  fConfig->AddChannel(name, channel);
454  }
455 
456  protected:
457  std::string fId;
458 
460  virtual void Init() {}
461 
462  virtual void Bind() {}
463 
464  virtual void Connect() {}
465 
467  virtual void InitTask() {}
468 
470  virtual void Run() {}
471 
473  virtual void PreRun() {}
474 
476  virtual bool ConditionalRun() { return false; }
477 
479  virtual void PostRun() {}
480 
481  virtual void Pause() __attribute__((deprecated("PAUSE state is removed. This method is never called. To pause Run, go to READY with STOP transition and back to RUNNING with RUN to resume."))) {}
482 
484  virtual void ResetTask() {}
485 
487  virtual void Reset() {}
488 
489  public:
490  bool ChangeState(const fair::mq::Transition transition) { return fStateMachine.ChangeState(transition); }
491  bool ChangeState(const std::string& transition) { return fStateMachine.ChangeState(fair::mq::GetTransition(transition)); }
492 
493  bool ChangeState(const int transition) __attribute__((deprecated("Use ChangeState(const fair::mq::Transition transition).")));
494 
495  void WaitForEndOfState(const fair::mq::Transition transition) __attribute__((deprecated("Use WaitForState(fair::mq::State expectedState).")));
496  void WaitForEndOfState(const std::string& transition) __attribute__((deprecated("Use WaitForState(fair::mq::State expectedState)."))) { WaitForState(transition); }
497 
498  fair::mq::State WaitForNextState() { return fStateQueue.WaitForNext(); }
499  void WaitForState(fair::mq::State state) { fStateQueue.WaitForState(state); }
500  void WaitForState(const std::string& state) { WaitForState(fair::mq::GetState(state)); }
501 
502  void TransitionTo(const fair::mq::State state);
503 
504  void SubscribeToStateChange(const std::string& key, std::function<void(const fair::mq::State)> callback) { fStateMachine.SubscribeToStateChange(key, callback); }
505  void UnsubscribeFromStateChange(const std::string& key) { fStateMachine.UnsubscribeFromStateChange(key); }
506 
507  void SubscribeToNewTransition(const std::string& key, std::function<void(const fair::mq::Transition)> callback) { fStateMachine.SubscribeToNewTransition(key, callback); }
508  void UnsubscribeFromNewTransition(const std::string& key) { fStateMachine.UnsubscribeFromNewTransition(key); }
509 
510  bool CheckCurrentState(const int /* state */) const __attribute__((deprecated("Use NewStatePending()."))) { return !fStateMachine.NewStatePending(); }
511  bool CheckCurrentState(const std::string& /* state */) const __attribute__((deprecated("Use NewStatePending()."))) { return !fStateMachine.NewStatePending(); }
512 
514  bool NewStatePending() const { return fStateMachine.NewStatePending(); }
515 
516  fair::mq::State GetCurrentState() const { return fStateMachine.GetCurrentState(); }
517  std::string GetCurrentStateName() const { return fStateMachine.GetCurrentStateName(); }
518 
519  static std::string GetStateName(const fair::mq::State state) { return fair::mq::GetStateName(state); }
520  static std::string GetTransitionName(const fair::mq::Transition transition) { return fair::mq::GetTransitionName(transition); }
521 
522  static constexpr const char* DefaultId = "";
523  static constexpr int DefaultIOThreads = 1;
524  static constexpr const char* DefaultTransportName = "zeromq";
525  static constexpr fair::mq::Transport DefaultTransportType = fair::mq::Transport::ZMQ;
526  static constexpr const char* DefaultNetworkInterface = "default";
527  static constexpr int DefaultInitTimeout = 120;
528  static constexpr uint64_t DefaultMaxRunTime = 0;
529  static constexpr float DefaultRate = 0.;
530  static constexpr const char* DefaultSession = "default";
531 
532  private:
533  fair::mq::Transport fDefaultTransportType;
534  fair::mq::StateMachine fStateMachine;
535 
537  void InitWrapper();
539  void BindWrapper();
541  void ConnectWrapper();
543  void InitTaskWrapper();
545  void RunWrapper();
547  void ResetTaskWrapper();
549  void ResetWrapper();
550 
552  void UnblockTransports();
553 
555  void Exit() {}
556 
558  void AttachChannels(std::vector<FairMQChannel*>& chans);
559  bool AttachChannel(FairMQChannel& ch);
560 
561  void HandleSingleChannelInput();
562  void HandleMultipleChannelInput();
563  void HandleMultipleTransportInput();
564  void PollForTransport(const FairMQTransportFactory* factory, const std::vector<std::string>& channelKeys);
565 
566  bool HandleMsgInput(const std::string& chName, const InputMsgCallback& callback, int i);
567  bool HandleMultipartInput(const std::string& chName, const InputMultipartCallback& callback, int i);
568 
569  std::vector<FairMQChannel*> fUninitializedBindingChannels;
570  std::vector<FairMQChannel*> fUninitializedConnectingChannels;
571 
572  bool fDataCallbacks;
573  std::unordered_map<std::string, InputMsgCallback> fMsgInputs;
574  std::unordered_map<std::string, InputMultipartCallback> fMultipartInputs;
575  std::unordered_map<fair::mq::Transport, std::vector<std::string>> fMultitransportInputs;
576  std::unordered_map<std::string, std::pair<uint16_t, uint16_t>> fChannelRegistry;
577  std::vector<std::string> fInputChannelKeys;
578  std::mutex fMultitransportMutex;
579  std::atomic<bool> fMultitransportProceed;
580 
581  const fair::mq::tools::Version fVersion;
582  float fRate;
583  uint64_t fMaxRunRuntimeInS;
584  int fInitializationTimeoutInS;
585  std::vector<std::string> fRawCmdLineArgs;
586 
587  fair::mq::StateQueue fStateQueue;
588 
589  std::mutex fTransitionMtx;
590  bool fTransitioning;
591 };
592 
593 #endif /* FAIRMQDEVICE_H_ */
std::string GetTransportName() const
Gets the default transport name.
Definition: FairMQDevice.h:423
virtual bool ConditionalRun()
Called during RUNNING state repeatedly until it returns false or device state changes.
Definition: FairMQDevice.h:476
std::unordered_map< fair::mq::Transport, std::shared_ptr< FairMQTransportFactory > > fTransports
Container for transports.
Definition: FairMQDevice.h:444
Definition: StateQueue.h:25
virtual void InitTask()
Task initialization (can be overloaded in child classes)
Definition: FairMQDevice.h:467
Definition: FairMQTransportFactory.h:30
void SetTransport(const std::string &transport)
Definition: FairMQDevice.h:421
int Send(FairMQMessagePtr &msg, const std::string &channel, const int index=0, int sndTimeoutInMs=-1)
Definition: FairMQDevice.h:133
bool WaitFor(std::chrono::duration< Rep, Period > const &duration)
Definition: FairMQDevice.h:437
FairMQChannel & operator=(const FairMQChannel &)
Assignment operator.
Definition: FairMQChannel.cxx:134
fair::mq::ProgOptions * fConfig
Pointer to config (internal or external)
Definition: FairMQDevice.h:449
virtual void Run()
Runs the device (to be overloaded in child classes)
Definition: FairMQDevice.h:470
Definition: FairMQChannel.h:30
int64_t Send(FairMQParts &parts, const std::string &channel, const int index=0, int sndTimeoutInMs=-1)
Definition: FairMQDevice.h:155
Definition: ProgOptions.h:36
virtual void Init()
Additional user initialization (can be overloaded in child classes). Prefer to use InitTask()...
Definition: FairMQDevice.h:460
std::string fId
Device ID.
Definition: FairMQDevice.h:457
int Receive(FairMQMessagePtr &msg, const std::string &channel, const int index=0, int rcvTimeoutInMs=-1)
Definition: FairMQDevice.h:144
Definition: FairMQDevice.h:49
std::unordered_map< std::string, std::vector< FairMQChannel > > fChannels
Device channels.
Definition: FairMQDevice.h:447
Definition: StateMachine.h:26
std::unique_ptr< fair::mq::ProgOptions > fInternalConfig
Internal program options configuration.
Definition: FairMQDevice.h:448
void AddChannel(const std::string &name, const FairMQChannel &channel)
Takes the provided channel and creates properties based on it.
Definition: ProgOptions.cxx:351
int64_t Receive(FairMQParts &parts, const std::string &channel, const int index=0, int rcvTimeoutInMs=-1)
Definition: FairMQDevice.h:166
fair::mq::ProgOptions * GetConfig() const
Get pointer to the config.
Definition: FairMQDevice.h:311
virtual void PreRun()
Called in the RUNNING state once before executing the Run()/ConditionalRun() method.
Definition: FairMQDevice.h:473
FairMQParts is a lightweight convenience wrapper around a vector of unique pointers to FairMQMessage...
Definition: FairMQParts.h:20
std::shared_ptr< FairMQTransportFactory > fTransportFactory
Default transport factory.
Definition: FairMQDevice.h:443
Definition: FairMQDevice.h:53
Tools for interfacing containers to the transport via polymorphic allocators.
Definition: DeviceRunner.h:23
virtual void PostRun()
Called in the RUNNING state once after executing the Run()/ConditionalRun() method.
Definition: FairMQDevice.h:479
Definition: FairMQMessage.h:20
auto Transport() const -> FairMQTransportFactory *
Getter for default transport factory.
Definition: FairMQDevice.h:172
Definition: Version.h:22

privacy