9 #ifndef FAIRMQCHANNEL_H_ 10 #define FAIRMQCHANNEL_H_ 12 #include <FairMQTransportFactory.h> 13 #include <FairMQUnmanagedRegion.h> 14 #include <FairMQSocket.h> 15 #include <fairmq/Transports.h> 16 #include <FairMQLogger.h> 17 #include <FairMQParts.h> 18 #include <fairmq/Properties.h> 19 #include <FairMQMessage.h> 46 FairMQChannel(
const std::string& type,
const std::string& method,
const std::string& address);
52 FairMQChannel(
const std::string& name,
const std::string& type, std::shared_ptr<FairMQTransportFactory> factory);
60 FairMQChannel(
const std::string& name,
const std::string& type,
const std::string& method,
const std::string& address, std::shared_ptr<FairMQTransportFactory> factory);
62 FairMQChannel(
const std::string& name,
int index,
const fair::mq::Properties& properties);
89 bool Bind(
const std::string& address)
93 return fSocket->Bind(address);
96 bool Connect(
const std::string& address)
100 return fSocket->Connect(address);
105 std::string
GetChannelName() const __attribute__((deprecated("Use GetName()"))) {
return GetName(); }
106 std::string GetName()
const ;
110 std::string
GetChannelPrefix() const __attribute__((deprecated("Use GetPrefix()"))) {
return GetPrefix(); }
111 std::string GetPrefix()
const;
115 std::string
GetChannelIndex() const __attribute__((deprecated("Use GetIndex()"))) {
return GetIndex(); }
116 std::string GetIndex()
const;
228 void UpdateChannelName(
const std::string& name) __attribute__((deprecated(
"Use UpdateName()"))) { UpdateName(name); }
229 void UpdateName(
const std::string& name);
245 bool ConnectEndpoint(
const std::string& endpoint);
247 bool BindEndpoint(std::string& endpoint);
256 int Send(FairMQMessagePtr& msg,
int sndTimeoutInMs = -1)
258 CheckSendCompatibility(msg);
259 return fSocket->Send(msg, sndTimeoutInMs);
266 int Receive(FairMQMessagePtr& msg,
int rcvTimeoutInMs = -1)
268 CheckReceiveCompatibility(msg);
269 return fSocket->Receive(msg, rcvTimeoutInMs);
276 int64_t
Send(std::vector<FairMQMessagePtr>& msgVec,
int sndTimeoutInMs = -1)
278 CheckSendCompatibility(msgVec);
279 return fSocket->Send(msgVec, sndTimeoutInMs);
286 int64_t
Receive(std::vector<FairMQMessagePtr>& msgVec,
int rcvTimeoutInMs = -1)
288 CheckReceiveCompatibility(msgVec);
289 return fSocket->Receive(msgVec, rcvTimeoutInMs);
298 return Send(parts.fParts, sndTimeoutInMs);
307 return Receive(parts.fParts, rcvTimeoutInMs);
310 unsigned long GetBytesTx()
const {
return fSocket->GetBytesTx(); }
311 unsigned long GetBytesRx()
const {
return fSocket->GetBytesRx(); }
312 unsigned long GetMessagesTx()
const {
return fSocket->GetMessagesTx(); }
313 unsigned long GetMessagesRx()
const {
return fSocket->GetMessagesRx(); }
317 return fTransportFactory.get();
320 template<
typename... Args>
321 FairMQMessagePtr NewMessage(Args&&... args)
323 return Transport()->CreateMessage(std::forward<Args>(args)...);
327 FairMQMessagePtr NewSimpleMessage(
const T& data)
329 return Transport()->NewSimpleMessage(data);
333 FairMQMessagePtr NewStaticMessage(
const T& data)
335 return Transport()->NewStaticMessage(data);
338 FairMQUnmanagedRegionPtr NewUnmanagedRegion(
const size_t size, FairMQRegionCallback callback =
nullptr,
const std::string& path =
"",
int flags = 0)
340 return Transport()->CreateUnmanagedRegion(size, callback, path, flags);
343 FairMQUnmanagedRegionPtr NewUnmanagedRegion(
const size_t size,
const int64_t userFlags, FairMQRegionCallback callback =
nullptr,
const std::string& path =
"",
int flags = 0)
345 return Transport()->CreateUnmanagedRegion(size, userFlags, callback, path, flags);
348 static constexpr fair::mq::Transport DefaultTransportType = fair::mq::Transport::DEFAULT;
349 static constexpr
const char* DefaultTransportName =
"default";
350 static constexpr
const char* DefaultName =
"";
351 static constexpr
const char* DefaultType =
"unspecified";
352 static constexpr
const char* DefaultMethod =
"unspecified";
353 static constexpr
const char* DefaultAddress =
"unspecified";
354 static constexpr
int DefaultSndBufSize = 1000;
355 static constexpr
int DefaultRcvBufSize = 1000;
356 static constexpr
int DefaultSndKernelSize = 0;
357 static constexpr
int DefaultRcvKernelSize = 0;
358 static constexpr
int DefaultLinger = 500;
359 static constexpr
int DefaultRateLogging = 1;
360 static constexpr
int DefaultPortRangeMin = 22000;
361 static constexpr
int DefaultPortRangeMax = 23000;
362 static constexpr
bool DefaultAutoBind =
true;
365 std::shared_ptr<FairMQTransportFactory> fTransportFactory;
366 fair::mq::Transport fTransportType;
367 std::unique_ptr<FairMQSocket> fSocket;
372 std::string fAddress;
389 mutable std::mutex fMtx;
391 void CheckSendCompatibility(FairMQMessagePtr& msg)
393 if (fTransportType != msg->GetType()) {
394 FairMQMessagePtr msgWrapper(NewMessage(
397 [](
void* ,
void* _msg) {
delete static_cast<FairMQMessage*
>(_msg); },
401 msg = move(msgWrapper);
405 void CheckSendCompatibility(std::vector<FairMQMessagePtr>& msgVec)
407 for (
auto& msg : msgVec) {
408 if (fTransportType != msg->GetType()) {
410 FairMQMessagePtr msgWrapper(NewMessage(
413 [](
void* ,
void* _msg) {
delete static_cast<FairMQMessage*
>(_msg); },
417 msg = move(msgWrapper);
422 void CheckReceiveCompatibility(FairMQMessagePtr& msg)
424 if (fTransportType != msg->GetType()) {
425 FairMQMessagePtr newMsg(NewMessage());
430 void CheckReceiveCompatibility(std::vector<FairMQMessagePtr>& msgVec)
432 for (
auto& msg : msgVec) {
433 if (fTransportType != msg->GetType()) {
435 FairMQMessagePtr newMsg(NewMessage());
441 void InitTransport(std::shared_ptr<FairMQTransportFactory> factory)
443 fTransportFactory = factory;
444 fTransportType = factory->GetType();
447 auto SetModified(
const bool modified) -> void;
int GetPortRangeMax() const
Definition: FairMQChannel.cxx:309
int GetSndBufSize() const
Definition: FairMQChannel.cxx:246
std::string GetTransportName() const
Definition: FairMQChannel.cxx:227
int Send(FairMQMessagePtr &msg, int sndTimeoutInMs=-1)
Definition: FairMQChannel.h:256
void UpdateAutoBind(const bool autobind)
Definition: FairMQChannel.cxx:459
bool GetAutoBind() const
Definition: FairMQChannel.cxx:318
std::string GetType() const
Definition: FairMQChannel.cxx:200
int Receive(FairMQMessagePtr &msg, int rcvTimeoutInMs=-1)
Definition: FairMQChannel.h:266
fair::mq::Transport GetTransportType() const
Definition: FairMQChannel.cxx:236
void UpdateChannelName(const std::string &name) __attribute__((deprecated("Use UpdateName()")))
Definition: FairMQChannel.h:228
void UpdatePortRangeMin(const int minPort)
Definition: FairMQChannel.cxx:437
int64_t Send(std::vector< FairMQMessagePtr > &msgVec, int sndTimeoutInMs=-1)
Definition: FairMQChannel.h:276
int GetRateLogging() const
Definition: FairMQChannel.cxx:291
std::string GetAddress() const
Definition: FairMQChannel.cxx:218
int GetRcvKernelSize() const
Definition: FairMQChannel.cxx:273
Definition: FairMQTransportFactory.h:30
int GetPortRangeMin() const
Definition: FairMQChannel.cxx:300
void UpdateRcvBufSize(const int rcvBufSize)
Definition: FairMQChannel.cxx:382
FairMQChannel & operator=(const FairMQChannel &)
Assignment operator.
Definition: FairMQChannel.cxx:134
Definition: FairMQChannel.h:30
int GetSndKernelSize() const
Definition: FairMQChannel.cxx:264
bool IsValid() const
Definition: FairMQChannel.cxx:490
bool ValidateChannel() __attribute__((deprecated("Use Validate()")))
Definition: FairMQChannel.h:237
void UpdateRcvKernelSize(const int rcvKernelSize)
Definition: FairMQChannel.cxx:404
void UpdateAddress(const std::string &address)
Definition: FairMQChannel.cxx:349
void UpdateTransport(const std::string &transport)
Definition: FairMQChannel.cxx:360
int64_t Receive(FairMQParts &parts, int rcvTimeoutInMs=-1)
Definition: FairMQChannel.h:305
int64_t Receive(std::vector< FairMQMessagePtr > &msgVec, int rcvTimeoutInMs=-1)
Definition: FairMQChannel.h:286
Definition: FairMQSocket.h:19
std::string GetChannelIndex() const __attribute__((deprecated("Use GetIndex()")))
Definition: FairMQChannel.h:115
void UpdateRateLogging(const int rateLogging)
Definition: FairMQChannel.cxx:426
int GetLinger() const
Definition: FairMQChannel.cxx:282
void UpdateSndBufSize(const int sndBufSize)
Definition: FairMQChannel.cxx:371
bool Validate()
Definition: FairMQChannel.cxx:499
void UpdateMethod(const std::string &method)
Definition: FairMQChannel.cxx:338
std::string GetMethod() const
Definition: FairMQChannel.cxx:209
FairMQParts is a lightweight convenience wrapper around a vector of unique pointers to FairMQMessage...
Definition: FairMQParts.h:20
int GetRcvBufSize() const
Definition: FairMQChannel.cxx:255
void UpdateLinger(const int duration)
Definition: FairMQChannel.cxx:415
void ResetChannel()
Resets the channel (requires validation to be used again).
Definition: FairMQChannel.cxx:716
Definition: FairMQDevice.h:53
void UpdatePortRangeMax(const int maxPort)
Definition: FairMQChannel.cxx:448
void UpdateType(const std::string &type)
Definition: FairMQChannel.cxx:327
Definition: FairMQMessage.h:20
void UpdateSndKernelSize(const int sndKernelSize)
Definition: FairMQChannel.cxx:393
FairMQChannel()
Default constructor.
Definition: FairMQChannel.cxx:47
std::string GetChannelName() const __attribute__((deprecated("Use GetName()")))
Definition: FairMQChannel.h:105
std::string GetChannelPrefix() const __attribute__((deprecated("Use GetPrefix()")))
Definition: FairMQChannel.h:110
virtual ~FairMQChannel()
Destructor.
Definition: FairMQChannel.h:80
int64_t Send(FairMQParts &parts, int sndTimeoutInMs=-1)
Definition: FairMQChannel.h:296
Definition: FairMQChannel.h:85