FairMQ  1.2.1
C++ Message Passing Framework
FairMQChannel.h
1 /********************************************************************************
2  * Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
8 
9 #ifndef FAIRMQCHANNEL_H_
10 #define FAIRMQCHANNEL_H_
11 
12 #include <string>
13 #include <memory> // unique_ptr
14 #include <vector>
15 #include <atomic>
16 #include <mutex>
17 
18 #include <FairMQTransportFactory.h>
19 #include <FairMQSocket.h>
20 #include <fairmq/Transports.h>
21 #include <FairMQLogger.h>
22 #include <FairMQParts.h>
23 
25 {
26  friend class FairMQDevice;
27 
28  public:
30  FairMQChannel();
31 
36  FairMQChannel(const std::string& type, const std::string& method, const std::string& address);
37 
42  FairMQChannel(const std::string& name, const std::string& type, std::shared_ptr<FairMQTransportFactory> factory);
43 
46 
49 
51  virtual ~FairMQChannel();
52 
53  FairMQSocket const & GetSocket() const;
54 
55  auto Bind(const std::string& address) -> bool
56  {
57  fMethod = "bind";
58  fAddress = address;
59  return fSocket->Bind(address);
60  }
61 
62  auto Connect(const std::string& address) -> void
63  {
64  fMethod = "connect";
65  fAddress = address;
66  return fSocket->Connect(address);
67  }
68 
71  std::string GetChannelName() const;
72 
75  std::string GetChannelPrefix() const;
76 
79  std::string GetChannelIndex() const;
80 
83  std::string GetType() const;
84 
87  std::string GetMethod() const;
88 
91  std::string GetAddress() const;
92 
95  std::string GetTransport() const;
96 
99  int GetSndBufSize() const;
100 
103  int GetRcvBufSize() const;
104 
107  int GetSndKernelSize() const;
108 
111  int GetRcvKernelSize() const;
112 
115  int GetRateLogging() const;
116 
119  void UpdateType(const std::string& type);
120 
123  void UpdateMethod(const std::string& method);
124 
127  void UpdateAddress(const std::string& address);
128 
131  void UpdateTransport(const std::string& transport);
132 
135  void UpdateSndBufSize(const int sndBufSize);
136 
139  void UpdateRcvBufSize(const int rcvBufSize);
140 
143  void UpdateSndKernelSize(const int sndKernelSize);
144 
147  void UpdateRcvKernelSize(const int rcvKernelSize);
148 
151  void UpdateRateLogging(const int rateLogging);
152 
155  void UpdateChannelName(const std::string& name);
156 
159  bool IsValid() const;
160 
163  bool ValidateChannel();
164 
166  void ResetChannel();
167 
168  int Send(std::unique_ptr<FairMQMessage>& msg) const;
169  int Receive(std::unique_ptr<FairMQMessage>& msg) const;
170 
179  int Send(std::unique_ptr<FairMQMessage>& msg, int sndTimeoutInMs) const;
180 
188  int Receive(std::unique_ptr<FairMQMessage>& msg, int rcvTimeoutInMs) const;
189 
198  int SendAsync(std::unique_ptr<FairMQMessage>& msg) const;
199 
205  int ReceiveAsync(std::unique_ptr<FairMQMessage>& msg) const;
206 
207  int64_t Send(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) const;
208  int64_t Receive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) const;
209 
215  int64_t Send(std::vector<std::unique_ptr<FairMQMessage>>& msgVec, int sndTimeoutInMs) const;
216 
222  int64_t Receive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec, int rcvTimeoutInMs) const;
223 
231  int64_t SendAsync(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) const;
232 
238  int64_t ReceiveAsync(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) const;
239 
240  int64_t Send(FairMQParts& parts) const
241  {
242  return Send(parts.fParts);
243  }
244 
245  int64_t Receive(FairMQParts& parts) const
246  {
247  return Receive(parts.fParts);
248  }
249 
250  int64_t Send(FairMQParts& parts, int sndTimeoutInMs) const
251  {
252  return Send(parts.fParts, sndTimeoutInMs);
253  }
254 
255  int64_t Receive(FairMQParts& parts, int rcvTimeoutInMs) const
256  {
257  return Receive(parts.fParts, rcvTimeoutInMs);
258  }
259 
260  int64_t SendAsync(FairMQParts& parts) const
261  {
262  return SendAsync(parts.fParts);
263  }
264 
265  int64_t ReceiveAsync(FairMQParts& parts) const
266  {
267  return ReceiveAsync(parts.fParts);
268  }
269 
270  unsigned long GetBytesTx() const;
271  unsigned long GetBytesRx() const;
272  unsigned long GetMessagesTx() const;
273  unsigned long GetMessagesRx() const;
274 
275  auto Transport() const -> const FairMQTransportFactory*
276  {
277  return fTransportFactory.get();
278  };
279 
280  template<typename... Args>
281  FairMQMessagePtr NewMessage(Args&&... args) const
282  {
283  return Transport()->CreateMessage(std::forward<Args>(args)...);
284  }
285 
286  template<typename T>
287  FairMQMessagePtr NewSimpleMessage(const T& data) const
288  {
289  return Transport()->NewSimpleMessage(data);
290  }
291 
292  template<typename T>
293  FairMQMessagePtr NewStaticMessage(const T& data) const
294  {
295  return Transport()->NewStaticMessage(data);
296  }
297 
298  private:
299  std::unique_ptr<FairMQSocket> fSocket;
300 
301  std::string fType;
302  std::string fMethod;
303  std::string fAddress;
304  std::string fTransport;
305  int fSndBufSize;
306  int fRcvBufSize;
307  int fSndKernelSize;
308  int fRcvKernelSize;
309  int fRateLogging;
310 
311  std::string fName;
312  std::atomic<bool> fIsValid;
313 
314  FairMQ::Transport fTransportType;
315  std::shared_ptr<FairMQTransportFactory> fTransportFactory;
316 
317  bool CheckCompatibility(std::unique_ptr<FairMQMessage>& msg) const;
318  bool CheckCompatibility(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) const;
319 
320  void InitTransport(std::shared_ptr<FairMQTransportFactory> factory);
321 
322  // use static mutex to make the class easily copyable
323  // implication: same mutex is used for all instances of the class
324  // this does not hurt much, because mutex is used only during initialization with very low contention
325  // possible TODO: improve this
326  static std::mutex fChannelMutex;
327 
328  bool fMultipart;
329  bool fModified;
330  auto SetModified(const bool modified) -> void;
331  bool fReset;
332 };
333 
334 #endif /* FAIRMQCHANNEL_H_ */
int GetSndBufSize() const
Definition: FairMQChannel.cxx:211
std::string GetType() const
Definition: FairMQChannel.cxx:155
int ReceiveAsync(std::unique_ptr< FairMQMessage > &msg) const
int SendAsync(std::unique_ptr< FairMQMessage > &msg) const
virtual ~FairMQChannel()
Default destructor.
Definition: FairMQChannel.cxx:743
int GetRateLogging() const
Definition: FairMQChannel.cxx:267
std::string GetAddress() const
Definition: FairMQChannel.cxx:183
int GetRcvKernelSize() const
Definition: FairMQChannel.cxx:253
Definition: FairMQTransportFactory.h:27
auto Transport() const -> const FairMQTransportFactory *
Getter for default transport factory.
Definition: FairMQDevice.h:197
std::string GetChannelPrefix() const
Definition: FairMQChannel.cxx:140
void UpdateRcvBufSize(const int rcvBufSize)
Definition: FairMQChannel.cxx:361
FairMQChannel & operator=(const FairMQChannel &)
Assignment operator.
Definition: FairMQChannel.cxx:109
Definition: FairMQChannel.h:24
int GetSndKernelSize() const
Definition: FairMQChannel.cxx:239
bool IsValid() const
Definition: FairMQChannel.cxx:455
void UpdateRcvKernelSize(const int rcvKernelSize)
Definition: FairMQChannel.cxx:393
void UpdateAddress(const std::string &address)
Definition: FairMQChannel.cxx:313
void UpdateTransport(const std::string &transport)
Definition: FairMQChannel.cxx:329
std::string GetChannelName() const
Definition: FairMQChannel.cxx:135
Definition: FairMQSocket.h:18
void UpdateRateLogging(const int rateLogging)
Definition: FairMQChannel.cxx:409
std::string GetTransport() const
Definition: FairMQChannel.cxx:197
std::string GetChannelIndex() const
Definition: FairMQChannel.cxx:147
void UpdateSndBufSize(const int sndBufSize)
Definition: FairMQChannel.cxx:345
void UpdateMethod(const std::string &method)
Definition: FairMQChannel.cxx:297
void UpdateChannelName(const std::string &name)
Definition: FairMQChannel.cxx:439
std::string GetMethod() const
Definition: FairMQChannel.cxx:169
FairMQParts is a lightweight convenience wrapper around a vector of unique pointers to FairMQMessage...
Definition: FairMQParts.h:20
std::shared_ptr< FairMQTransportFactory > fTransportFactory
Transport factory.
Definition: FairMQDevice.h:417
int GetRcvBufSize() const
Definition: FairMQChannel.cxx:225
void ResetChannel()
Resets the channel (requires validation to be used again).
Definition: FairMQChannel.cxx:663
bool ValidateChannel()
Definition: FairMQChannel.cxx:469
Definition: FairMQDevice.h:44
void UpdateType(const std::string &type)
Definition: FairMQChannel.cxx:281
void UpdateSndKernelSize(const int sndKernelSize)
Definition: FairMQChannel.cxx:377
FairMQChannel()
Default constructor.
Definition: FairMQChannel.cxx:26