9 #ifndef FAIRMQCHANNEL_H_ 10 #define FAIRMQCHANNEL_H_ 20 #include <FairMQTransportFactory.h> 21 #include <FairMQSocket.h> 22 #include <fairmq/Transports.h> 23 #include <FairMQLogger.h> 24 #include <FairMQParts.h> 25 #include <FairMQMessage.h> 39 FairMQChannel(
const std::string& type,
const std::string& method,
const std::string& address);
45 FairMQChannel(
const std::string& name,
const std::string& type, std::shared_ptr<FairMQTransportFactory> factory);
53 FairMQChannel(
const std::string& name,
const std::string& type,
const std::string& method,
const std::string& address, std::shared_ptr<FairMQTransportFactory> factory);
68 bool Bind(
const std::string& address)
72 return fSocket->Bind(address);
75 bool Connect(
const std::string& address)
79 return fSocket->Connect(address);
85 std::string GetName()
const;
90 std::string GetPrefix()
const;
95 std::string GetIndex()
const;
204 void UpdateName(
const std::string& name);
223 bool ConnectEndpoint(
const std::string& endpoint);
225 bool BindEndpoint(std::string& endpoint);
234 int Send(FairMQMessagePtr& msg,
int sndTimeoutInMs = -1)
236 CheckSendCompatibility(msg);
237 return fSocket->Send(msg, sndTimeoutInMs);
244 int Receive(FairMQMessagePtr& msg,
int rcvTimeoutInMs = -1)
246 CheckReceiveCompatibility(msg);
247 return fSocket->Receive(msg, rcvTimeoutInMs);
250 int SendAsync(FairMQMessagePtr& msg) __attribute__((deprecated(
"For non-blocking Send, use timeout version with timeout of 0: Send(msg, timeout);")))
252 CheckSendCompatibility(msg);
253 return fSocket->Send(msg, 0);
255 int ReceiveAsync(FairMQMessagePtr& msg) __attribute__((deprecated(
"For non-blocking Receive, use timeout version with timeout of 0: Receive(msg, timeout);")))
257 CheckReceiveCompatibility(msg);
258 return fSocket->Receive(msg, 0);
265 int64_t
Send(std::vector<FairMQMessagePtr>& msgVec,
int sndTimeoutInMs = -1)
267 CheckSendCompatibility(msgVec);
268 return fSocket->Send(msgVec, sndTimeoutInMs);
275 int64_t
Receive(std::vector<FairMQMessagePtr>& msgVec,
int rcvTimeoutInMs = -1)
277 CheckReceiveCompatibility(msgVec);
278 return fSocket->Receive(msgVec, rcvTimeoutInMs);
281 int64_t SendAsync(std::vector<FairMQMessagePtr>& msgVec) __attribute__((deprecated(
"For non-blocking Send, use timeout version with timeout of 0: Send(msgVec, timeout);")))
283 CheckSendCompatibility(msgVec);
284 return fSocket->Send(msgVec, 0);
286 int64_t ReceiveAsync(std::vector<FairMQMessagePtr>& msgVec) __attribute__((deprecated(
"For non-blocking Receive, use timeout version with timeout of 0: Receive(msgVec, timeout);")))
288 CheckReceiveCompatibility(msgVec);
289 return fSocket->Receive(msgVec, 0);
298 return Send(parts.fParts, sndTimeoutInMs);
307 return Receive(parts.fParts, rcvTimeoutInMs);
310 int64_t SendAsync(
FairMQParts& parts) __attribute__((deprecated(
"For non-blocking Send, use timeout version with timeout of 0: Send(parts, timeout);")))
312 return Send(parts.fParts, 0);
315 int64_t ReceiveAsync(
FairMQParts& parts) __attribute__((deprecated(
"For non-blocking Receive, use timeout version with timeout of 0: Receive(parts, timeout);")))
317 return Receive(parts.fParts, 0);
320 unsigned long GetBytesTx()
const {
return fSocket->GetBytesTx(); }
321 unsigned long GetBytesRx()
const {
return fSocket->GetBytesRx(); }
322 unsigned long GetMessagesTx()
const {
return fSocket->GetMessagesTx(); }
323 unsigned long GetMessagesRx()
const {
return fSocket->GetMessagesRx(); }
327 return fTransportFactory.get();
330 template<
typename... Args>
331 FairMQMessagePtr NewMessage(Args&&... args)
333 return Transport()->CreateMessage(std::forward<Args>(args)...);
337 FairMQMessagePtr NewSimpleMessage(
const T& data)
339 return Transport()->NewSimpleMessage(data);
343 FairMQMessagePtr NewStaticMessage(
const T& data)
345 return Transport()->NewStaticMessage(data);
349 std::shared_ptr<FairMQTransportFactory> fTransportFactory;
350 fair::mq::Transport fTransportType;
351 std::unique_ptr<FairMQSocket> fSocket;
355 std::string fAddress;
367 std::atomic<bool> fIsValid;
373 static std::mutex fChannelMutex;
379 void CheckSendCompatibility(FairMQMessagePtr& msg)
381 if (fTransportType != msg->GetType()) {
383 FairMQMessagePtr msgWrapper(NewMessage(
386 [](
void* ,
void* _msg) {
delete static_cast<FairMQMessage*
>(_msg); },
390 msg = move(msgWrapper);
394 void CheckSendCompatibility(std::vector<FairMQMessagePtr>& msgVec)
396 for (
auto& msg : msgVec) {
397 if (fTransportType != msg->GetType()) {
399 FairMQMessagePtr msgWrapper(NewMessage(
402 [](
void* ,
void* _msg) {
delete static_cast<FairMQMessage*
>(_msg); },
406 msg = move(msgWrapper);
411 void CheckReceiveCompatibility(FairMQMessagePtr& msg)
413 if (fTransportType != msg->GetType()) {
415 FairMQMessagePtr newMsg(NewMessage());
420 void CheckReceiveCompatibility(std::vector<FairMQMessagePtr>& msgVec)
422 for (
auto& msg : msgVec) {
423 if (fTransportType != msg->GetType()) {
425 FairMQMessagePtr newMsg(NewMessage());
431 void InitTransport(std::shared_ptr<FairMQTransportFactory> factory)
433 fTransportFactory = factory;
434 fTransportType = factory->GetType();
436 auto SetModified(
const bool modified) -> void;
std::string GetChannelIndex() const
Definition: FairMQChannel.h:94
int GetPortRangeMax() const
Definition: FairMQChannel.cxx:239
int GetSndBufSize() const
Definition: FairMQChannel.cxx:176
std::string GetTransportName() const
Definition: FairMQChannel.cxx:167
int Send(FairMQMessagePtr &msg, int sndTimeoutInMs=-1)
Definition: FairMQChannel.h:234
void UpdateAutoBind(const bool autobind)
Definition: FairMQChannel.cxx:389
bool GetAutoBind() const
Definition: FairMQChannel.cxx:248
std::string GetType() const
Definition: FairMQChannel.cxx:140
int Receive(FairMQMessagePtr &msg, int rcvTimeoutInMs=-1)
Definition: FairMQChannel.h:244
void UpdatePortRangeMin(const int minPort)
Definition: FairMQChannel.cxx:367
std::string GetChannelPrefix() const
Definition: FairMQChannel.h:89
int64_t Send(std::vector< FairMQMessagePtr > &msgVec, int sndTimeoutInMs=-1)
Definition: FairMQChannel.h:265
int GetRateLogging() const
Definition: FairMQChannel.cxx:221
std::string GetAddress() const
Definition: FairMQChannel.cxx:158
int GetRcvKernelSize() const
Definition: FairMQChannel.cxx:203
Definition: FairMQTransportFactory.h:28
int GetPortRangeMin() const
Definition: FairMQChannel.cxx:230
std::string GetChannelName() const
Definition: FairMQChannel.h:84
void UpdateRcvBufSize(const int rcvBufSize)
Definition: FairMQChannel.cxx:312
FairMQChannel & operator=(const FairMQChannel &)
Assignment operator.
Definition: FairMQChannel.cxx:85
Definition: FairMQChannel.h:27
int GetSndKernelSize() const
Definition: FairMQChannel.cxx:194
bool IsValid() const
Definition: FairMQChannel.cxx:420
void UpdateRcvKernelSize(const int rcvKernelSize)
Definition: FairMQChannel.cxx:334
void UpdateAddress(const std::string &address)
Definition: FairMQChannel.cxx:279
void UpdateTransport(const std::string &transport)
Definition: FairMQChannel.cxx:290
int64_t Receive(FairMQParts &parts, int rcvTimeoutInMs=-1)
Definition: FairMQChannel.h:305
int64_t Receive(std::vector< FairMQMessagePtr > &msgVec, int rcvTimeoutInMs=-1)
Definition: FairMQChannel.h:275
Definition: FairMQSocket.h:19
void UpdateRateLogging(const int rateLogging)
Definition: FairMQChannel.cxx:356
int GetLinger() const
Definition: FairMQChannel.cxx:212
void UpdateSndBufSize(const int sndBufSize)
Definition: FairMQChannel.cxx:301
bool Validate()
Definition: FairMQChannel.cxx:429
void UpdateMethod(const std::string &method)
Definition: FairMQChannel.cxx:268
void UpdateChannelName(const std::string &name)
Definition: FairMQChannel.h:203
std::string GetMethod() const
Definition: FairMQChannel.cxx:149
FairMQParts is a lightweight convenience wrapper around a vector of unique pointers to FairMQMessage...
Definition: FairMQParts.h:20
int GetRcvBufSize() const
Definition: FairMQChannel.cxx:185
void UpdateLinger(const int duration)
Definition: FairMQChannel.cxx:345
void ResetChannel()
Resets the channel (requires validation to be used again).
Definition: FairMQChannel.cxx:637
bool ValidateChannel()
Definition: FairMQChannel.h:212
Definition: FairMQDevice.h:46
void UpdatePortRangeMax(const int maxPort)
Definition: FairMQChannel.cxx:378
void UpdateType(const std::string &type)
Definition: FairMQChannel.cxx:257
Definition: FairMQMessage.h:20
void UpdateSndKernelSize(const int sndKernelSize)
Definition: FairMQChannel.cxx:323
FairMQChannel()
Default constructor.
Definition: FairMQChannel.cxx:27
virtual ~FairMQChannel()
Default destructor.
Definition: FairMQChannel.h:62
int64_t Send(FairMQParts &parts, int sndTimeoutInMs=-1)
Definition: FairMQChannel.h:296
Definition: FairMQChannel.h:64