FairMQSocket: add versions of Send/Receive methods with int flags instead of string, which is more flexible and performant.

Int flags are mapped to their ZeroMQ/nanomsg versions behind the transport interface.
Methods with string flags are kept for backwards compatibility.
This commit is contained in:
Alexey Rybalchenko
2015-01-25 21:40:00 +01:00
committed by Mohammad Al-Turany
parent a9b7e8866c
commit 6518b7cd41
5 changed files with 103 additions and 5 deletions

View File

@@ -20,7 +20,8 @@
boost::shared_ptr<FairMQContextZMQ> FairMQSocketZMQ::fContext = boost::shared_ptr<FairMQContextZMQ>(new FairMQContextZMQ(1));
FairMQSocketZMQ::FairMQSocketZMQ(const string& type, int num, int numIoThreads)
: fSocket(NULL)
: FairMQSocket(ZMQ_SNDMORE, ZMQ_RCVMORE, ZMQ_DONTWAIT)
, fSocket(NULL)
, fId()
, fBytesTx(0)
, fBytesRx(0)
@@ -115,6 +116,28 @@ int FairMQSocketZMQ::Send(FairMQMessage* msg, const string& flag)
return nbytes;
}
int FairMQSocketZMQ::Send(FairMQMessage* msg, const int flags)
{
int nbytes = zmq_msg_send(static_cast<zmq_msg_t*>(msg->GetMessage()), fSocket, flags);
if (nbytes >= 0)
{
fBytesTx += nbytes;
++fMessagesTx;
return nbytes;
}
if (zmq_errno() == EAGAIN)
{
return 0;
}
if (zmq_errno() == ETERM)
{
LOG(INFO) << "terminating socket #" << fId;
return -1;
}
LOG(ERROR) << "failed sending on socket #" << fId << ", reason: " << zmq_strerror(errno);
return nbytes;
}
int FairMQSocketZMQ::Receive(FairMQMessage* msg, const string& flag)
{
int nbytes = zmq_msg_recv(static_cast<zmq_msg_t*>(msg->GetMessage()), fSocket, GetConstant(flag));
@@ -137,6 +160,28 @@ int FairMQSocketZMQ::Receive(FairMQMessage* msg, const string& flag)
return nbytes;
}
int FairMQSocketZMQ::Receive(FairMQMessage* msg, const int flags)
{
int nbytes = zmq_msg_recv(static_cast<zmq_msg_t*>(msg->GetMessage()), fSocket, flags);
if (nbytes >= 0)
{
fBytesRx += nbytes;
++fMessagesRx;
return nbytes;
}
if (zmq_errno() == EAGAIN)
{
return 0;
}
if (zmq_errno() == ETERM)
{
LOG(INFO) << "terminating socket #" << fId;
return -1;
}
LOG(ERROR) << "failed receiving on socket #" << fId << ", reason: " << zmq_strerror(errno);
return nbytes;
}
void FairMQSocketZMQ::Close()
{
if (fSocket == NULL)
@@ -250,6 +295,8 @@ int FairMQSocketZMQ::GetConstant(const string& constant)
return ZMQ_LINGER;
if (constant == "no-block")
return ZMQ_DONTWAIT;
if (constant == "snd-more no-block")
return ZMQ_DONTWAIT|ZMQ_SNDMORE;
return -1;
}

View File

@@ -33,7 +33,9 @@ class FairMQSocketZMQ : public FairMQSocket
virtual void Connect(const string& address);
virtual int Send(FairMQMessage* msg, const string& flag="");
virtual int Send(FairMQMessage* msg, const int flags);
virtual int Receive(FairMQMessage* msg, const string& flag="");
virtual int Receive(FairMQMessage* msg, const int flags);
virtual void* GetSocket();
virtual int GetSocket(int nothing);