FairMQ  1.3.8
C++ Message Passing Framework
FairMQChannel.h
1 /********************************************************************************
2  * Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
8 
9 #ifndef FAIRMQCHANNEL_H_
10 #define FAIRMQCHANNEL_H_
11 
12 #include <string>
13 #include <memory> // unique_ptr, shared_ptr
14 #include <vector>
15 #include <atomic>
16 #include <mutex>
17 #include <stdexcept>
18 #include <utility> // std::move
19 
20 #include <FairMQTransportFactory.h>
21 #include <FairMQSocket.h>
22 #include <fairmq/Transports.h>
23 #include <FairMQLogger.h>
24 #include <FairMQParts.h>
25 #include <FairMQMessage.h>
26 
28 {
29  friend class FairMQDevice;
30 
31  public:
33  FairMQChannel();
34 
39  FairMQChannel(const std::string& type, const std::string& method, const std::string& address);
40 
45  FairMQChannel(const std::string& name, const std::string& type, std::shared_ptr<FairMQTransportFactory> factory);
46 
53  FairMQChannel(const std::string& name, const std::string& type, const std::string& method, const std::string& address, std::shared_ptr<FairMQTransportFactory> factory);
54 
57 
60 
62  virtual ~FairMQChannel() {}
63 
64  struct ChannelConfigurationError : std::runtime_error { using std::runtime_error::runtime_error; };
65 
66  FairMQSocket& GetSocket() const;
67 
68  bool Bind(const std::string& address)
69  {
70  fMethod = "bind";
71  fAddress = address;
72  return fSocket->Bind(address);
73  }
74 
75  bool Connect(const std::string& address)
76  {
77  fMethod = "connect";
78  fAddress = address;
79  return fSocket->Connect(address);
80  }
81 
84  std::string GetChannelName() const { return GetName(); } // TODO: deprecate this in favor of following
85  std::string GetName() const;
86 
89  std::string GetChannelPrefix() const { return GetPrefix(); } // TODO: deprecate this in favor of following
90  std::string GetPrefix() const;
91 
94  std::string GetChannelIndex() const { return GetIndex(); } // TODO: deprecate this in favor of following
95  std::string GetIndex() const;
96 
99  std::string GetType() const;
100 
103  std::string GetMethod() const;
104 
107  std::string GetAddress() const;
108 
111  std::string GetTransportName() const;
112 
115  int GetSndBufSize() const;
116 
119  int GetRcvBufSize() const;
120 
123  int GetSndKernelSize() const;
124 
127  int GetRcvKernelSize() const;
128 
131  int GetLinger() const;
132 
135  int GetRateLogging() const;
136 
139  int GetPortRangeMin() const;
140 
143  int GetPortRangeMax() const;
144 
147  bool GetAutoBind() const;
148 
151  void UpdateType(const std::string& type);
152 
155  void UpdateMethod(const std::string& method);
156 
159  void UpdateAddress(const std::string& address);
160 
163  void UpdateTransport(const std::string& transport);
164 
167  void UpdateSndBufSize(const int sndBufSize);
168 
171  void UpdateRcvBufSize(const int rcvBufSize);
172 
175  void UpdateSndKernelSize(const int sndKernelSize);
176 
179  void UpdateRcvKernelSize(const int rcvKernelSize);
180 
183  void UpdateLinger(const int duration);
184 
187  void UpdateRateLogging(const int rateLogging);
188 
191  void UpdatePortRangeMin(const int minPort);
192 
195  void UpdatePortRangeMax(const int maxPort);
196 
199  void UpdateAutoBind(const bool autobind);
200 
203  void UpdateChannelName(const std::string& name) { UpdateName(name); } // TODO: deprecate this in favor of following
204  void UpdateName(const std::string& name);
205 
208  bool IsValid() const;
209 
212  bool ValidateChannel() // TODO: deprecate this
213  {
214  return Validate();
215  }
216 
219  bool Validate();
220 
221  void Init();
222 
223  bool ConnectEndpoint(const std::string& endpoint);
224 
225  bool BindEndpoint(std::string& endpoint);
226 
228  void ResetChannel();
229 
234  int Send(FairMQMessagePtr& msg, int sndTimeoutInMs = -1)
235  {
236  CheckSendCompatibility(msg);
237  return fSocket->Send(msg, sndTimeoutInMs);
238  }
239 
244  int Receive(FairMQMessagePtr& msg, int rcvTimeoutInMs = -1)
245  {
246  CheckReceiveCompatibility(msg);
247  return fSocket->Receive(msg, rcvTimeoutInMs);
248  }
249 
250  int SendAsync(FairMQMessagePtr& msg) __attribute__((deprecated("For non-blocking Send, use timeout version with timeout of 0: Send(msg, timeout);")))
251  {
252  CheckSendCompatibility(msg);
253  return fSocket->Send(msg, 0);
254  }
255  int ReceiveAsync(FairMQMessagePtr& msg) __attribute__((deprecated("For non-blocking Receive, use timeout version with timeout of 0: Receive(msg, timeout);")))
256  {
257  CheckReceiveCompatibility(msg);
258  return fSocket->Receive(msg, 0);
259  }
260 
265  int64_t Send(std::vector<FairMQMessagePtr>& msgVec, int sndTimeoutInMs = -1)
266  {
267  CheckSendCompatibility(msgVec);
268  return fSocket->Send(msgVec, sndTimeoutInMs);
269  }
270 
275  int64_t Receive(std::vector<FairMQMessagePtr>& msgVec, int rcvTimeoutInMs = -1)
276  {
277  CheckReceiveCompatibility(msgVec);
278  return fSocket->Receive(msgVec, rcvTimeoutInMs);
279  }
280 
281  int64_t SendAsync(std::vector<FairMQMessagePtr>& msgVec) __attribute__((deprecated("For non-blocking Send, use timeout version with timeout of 0: Send(msgVec, timeout);")))
282  {
283  CheckSendCompatibility(msgVec);
284  return fSocket->Send(msgVec, 0);
285  }
286  int64_t ReceiveAsync(std::vector<FairMQMessagePtr>& msgVec) __attribute__((deprecated("For non-blocking Receive, use timeout version with timeout of 0: Receive(msgVec, timeout);")))
287  {
288  CheckReceiveCompatibility(msgVec);
289  return fSocket->Receive(msgVec, 0);
290  }
291 
296  int64_t Send(FairMQParts& parts, int sndTimeoutInMs = -1)
297  {
298  return Send(parts.fParts, sndTimeoutInMs);
299  }
300 
305  int64_t Receive(FairMQParts& parts, int rcvTimeoutInMs = -1)
306  {
307  return Receive(parts.fParts, rcvTimeoutInMs);
308  }
309 
310  int64_t SendAsync(FairMQParts& parts) __attribute__((deprecated("For non-blocking Send, use timeout version with timeout of 0: Send(parts, timeout);")))
311  {
312  return Send(parts.fParts, 0);
313  }
314 
315  int64_t ReceiveAsync(FairMQParts& parts) __attribute__((deprecated("For non-blocking Receive, use timeout version with timeout of 0: Receive(parts, timeout);")))
316  {
317  return Receive(parts.fParts, 0);
318  }
319 
320  unsigned long GetBytesTx() const { return fSocket->GetBytesTx(); }
321  unsigned long GetBytesRx() const { return fSocket->GetBytesRx(); }
322  unsigned long GetMessagesTx() const { return fSocket->GetMessagesTx(); }
323  unsigned long GetMessagesRx() const { return fSocket->GetMessagesRx(); }
324 
325  auto Transport() -> FairMQTransportFactory*
326  {
327  return fTransportFactory.get();
328  };
329 
330  template<typename... Args>
331  FairMQMessagePtr NewMessage(Args&&... args)
332  {
333  return Transport()->CreateMessage(std::forward<Args>(args)...);
334  }
335 
336  template<typename T>
337  FairMQMessagePtr NewSimpleMessage(const T& data)
338  {
339  return Transport()->NewSimpleMessage(data);
340  }
341 
342  template<typename T>
343  FairMQMessagePtr NewStaticMessage(const T& data)
344  {
345  return Transport()->NewStaticMessage(data);
346  }
347 
348  private:
349  std::shared_ptr<FairMQTransportFactory> fTransportFactory;
350  fair::mq::Transport fTransportType;
351  std::unique_ptr<FairMQSocket> fSocket;
352 
353  std::string fType;
354  std::string fMethod;
355  std::string fAddress;
356  int fSndBufSize;
357  int fRcvBufSize;
358  int fSndKernelSize;
359  int fRcvKernelSize;
360  int fLinger;
361  int fRateLogging;
362  int fPortRangeMin;
363  int fPortRangeMax;
364  bool fAutoBind;
365 
366  std::string fName;
367  std::atomic<bool> fIsValid;
368 
369  // use static mutex to make the class easily copyable
370  // implication: same mutex is used for all instances of the class
371  // this does not hurt much, because mutex is used only during initialization with very low contention
372  // possible TODO: improve this
373  static std::mutex fChannelMutex;
374 
375  bool fMultipart;
376  bool fModified;
377  bool fReset;
378 
379  void CheckSendCompatibility(FairMQMessagePtr& msg)
380  {
381  if (fTransportType != msg->GetType()) {
382  // LOG(debug) << "Channel type does not match message type. Creating wrapper";
383  FairMQMessagePtr msgWrapper(NewMessage(
384  msg->GetData(),
385  msg->GetSize(),
386  [](void* /*data*/, void* _msg) { delete static_cast<FairMQMessage*>(_msg); },
387  msg.get()
388  ));
389  msg.release();
390  msg = move(msgWrapper);
391  }
392  }
393 
394  void CheckSendCompatibility(std::vector<FairMQMessagePtr>& msgVec)
395  {
396  for (auto& msg : msgVec) {
397  if (fTransportType != msg->GetType()) {
398  // LOG(debug) << "Channel type does not match message type. Creating wrapper";
399  FairMQMessagePtr msgWrapper(NewMessage(
400  msg->GetData(),
401  msg->GetSize(),
402  [](void* /*data*/, void* _msg) { delete static_cast<FairMQMessage*>(_msg); },
403  msg.get()
404  ));
405  msg.release();
406  msg = move(msgWrapper);
407  }
408  }
409  }
410 
411  void CheckReceiveCompatibility(FairMQMessagePtr& msg)
412  {
413  if (fTransportType != msg->GetType()) {
414  // LOG(debug) << "Channel type does not match message type. Creating wrapper";
415  FairMQMessagePtr newMsg(NewMessage());
416  msg = move(newMsg);
417  }
418  }
419 
420  void CheckReceiveCompatibility(std::vector<FairMQMessagePtr>& msgVec)
421  {
422  for (auto& msg : msgVec) {
423  if (fTransportType != msg->GetType()) {
424  // LOG(debug) << "Channel type does not match message type. Creating wrapper";
425  FairMQMessagePtr newMsg(NewMessage());
426  msg = move(newMsg);
427  }
428  }
429  }
430 
431  void InitTransport(std::shared_ptr<FairMQTransportFactory> factory)
432  {
433  fTransportFactory = factory;
434  fTransportType = factory->GetType();
435  }
436  auto SetModified(const bool modified) -> void;
437 };
438 
439 #endif /* FAIRMQCHANNEL_H_ */
std::string GetChannelIndex() const
Definition: FairMQChannel.h:94
int GetPortRangeMax() const
Definition: FairMQChannel.cxx:239
int GetSndBufSize() const
Definition: FairMQChannel.cxx:176
std::string GetTransportName() const
Definition: FairMQChannel.cxx:167
int Send(FairMQMessagePtr &msg, int sndTimeoutInMs=-1)
Definition: FairMQChannel.h:234
void UpdateAutoBind(const bool autobind)
Definition: FairMQChannel.cxx:389
bool GetAutoBind() const
Definition: FairMQChannel.cxx:248
std::string GetType() const
Definition: FairMQChannel.cxx:140
int Receive(FairMQMessagePtr &msg, int rcvTimeoutInMs=-1)
Definition: FairMQChannel.h:244
void UpdatePortRangeMin(const int minPort)
Definition: FairMQChannel.cxx:367
std::string GetChannelPrefix() const
Definition: FairMQChannel.h:89
int64_t Send(std::vector< FairMQMessagePtr > &msgVec, int sndTimeoutInMs=-1)
Definition: FairMQChannel.h:265
int GetRateLogging() const
Definition: FairMQChannel.cxx:221
std::string GetAddress() const
Definition: FairMQChannel.cxx:158
int GetRcvKernelSize() const
Definition: FairMQChannel.cxx:203
Definition: FairMQTransportFactory.h:28
int GetPortRangeMin() const
Definition: FairMQChannel.cxx:230
std::string GetChannelName() const
Definition: FairMQChannel.h:84
void UpdateRcvBufSize(const int rcvBufSize)
Definition: FairMQChannel.cxx:312
FairMQChannel & operator=(const FairMQChannel &)
Assignment operator.
Definition: FairMQChannel.cxx:85
Definition: FairMQChannel.h:27
int GetSndKernelSize() const
Definition: FairMQChannel.cxx:194
bool IsValid() const
Definition: FairMQChannel.cxx:420
void UpdateRcvKernelSize(const int rcvKernelSize)
Definition: FairMQChannel.cxx:334
void UpdateAddress(const std::string &address)
Definition: FairMQChannel.cxx:279
void UpdateTransport(const std::string &transport)
Definition: FairMQChannel.cxx:290
int64_t Receive(FairMQParts &parts, int rcvTimeoutInMs=-1)
Definition: FairMQChannel.h:305
int64_t Receive(std::vector< FairMQMessagePtr > &msgVec, int rcvTimeoutInMs=-1)
Definition: FairMQChannel.h:275
Definition: FairMQSocket.h:19
void UpdateRateLogging(const int rateLogging)
Definition: FairMQChannel.cxx:356
int GetLinger() const
Definition: FairMQChannel.cxx:212
void UpdateSndBufSize(const int sndBufSize)
Definition: FairMQChannel.cxx:301
bool Validate()
Definition: FairMQChannel.cxx:429
void UpdateMethod(const std::string &method)
Definition: FairMQChannel.cxx:268
void UpdateChannelName(const std::string &name)
Definition: FairMQChannel.h:203
std::string GetMethod() const
Definition: FairMQChannel.cxx:149
FairMQParts is a lightweight convenience wrapper around a vector of unique pointers to FairMQMessage...
Definition: FairMQParts.h:20
int GetRcvBufSize() const
Definition: FairMQChannel.cxx:185
void UpdateLinger(const int duration)
Definition: FairMQChannel.cxx:345
void ResetChannel()
Resets the channel (requires validation to be used again).
Definition: FairMQChannel.cxx:637
bool ValidateChannel()
Definition: FairMQChannel.h:212
Definition: FairMQDevice.h:46
void UpdatePortRangeMax(const int maxPort)
Definition: FairMQChannel.cxx:378
void UpdateType(const std::string &type)
Definition: FairMQChannel.cxx:257
Definition: FairMQMessage.h:20
void UpdateSndKernelSize(const int sndKernelSize)
Definition: FairMQChannel.cxx:323
FairMQChannel()
Default constructor.
Definition: FairMQChannel.cxx:27
virtual ~FairMQChannel()
Default destructor.
Definition: FairMQChannel.h:62
int64_t Send(FairMQParts &parts, int sndTimeoutInMs=-1)
Definition: FairMQChannel.h:296
Definition: FairMQChannel.h:64

privacy