add ROUTER/DEALER/PAIR sockets.

This commit is contained in:
Alexey Rybalchenko 2014-10-15 11:24:41 +02:00
parent 9317f06c10
commit 6968f57abc
3 changed files with 35 additions and 4 deletions

View File

@ -33,11 +33,22 @@ FairMQSocketNN::FairMQSocketNN(const string& type, int num, int numIoThreads)
LOG(INFO) << "number of I/O threads is not used in nanomsg"; LOG(INFO) << "number of I/O threads is not used in nanomsg";
} }
if (type == "router" || type == "dealer")
{
// Additional info about using the sockets ROUTER and DEALER with nanomsg can be found in:
// http://250bpm.com/blog:14
// http://www.freelists.org/post/nanomsg/a-stupid-load-balancing-question,1
fSocket = nn_socket(AF_SP_RAW, GetConstant(type));
}
else
{
fSocket = nn_socket(AF_SP, GetConstant(type)); fSocket = nn_socket(AF_SP, GetConstant(type));
if (type == "sub") if (type == "sub")
{ {
nn_setsockopt(fSocket, NN_SUB, NN_SUB_SUBSCRIBE, NULL, 0); nn_setsockopt(fSocket, NN_SUB, NN_SUB_SUBSCRIBE, NULL, 0);
} }
}
LOG(INFO) << "created socket #" << fId; LOG(INFO) << "created socket #" << fId;
} }
@ -172,7 +183,7 @@ int FairMQSocketNN::GetConstant(const string& constant)
if (constant == "pub") if (constant == "pub")
return NN_PUB; return NN_PUB;
if (constant == "xsub") if (constant == "xsub")
return NN_SUB; // TODO: is there XPUB, XSUB for nanomsg? return NN_SUB;
if (constant == "xpub") if (constant == "xpub")
return NN_PUB; return NN_PUB;
if (constant == "push") if (constant == "push")
@ -183,10 +194,20 @@ int FairMQSocketNN::GetConstant(const string& constant)
return NN_REQ; return NN_REQ;
if (constant == "rep") if (constant == "rep")
return NN_REP; return NN_REP;
if (constant == "dealer")
return NN_REQ;
if (constant == "router")
return NN_REP;
if (constant == "pair")
return NN_PAIR;
if (constant == "snd-hwm") if (constant == "snd-hwm")
return NN_SNDBUF; return NN_SNDBUF;
if (constant == "rcv-hwm") if (constant == "rcv-hwm")
return NN_RCVBUF; return NN_RCVBUF;
if (constant == "snd-more") { if (constant == "snd-more") {
LOG(ERROR) << "Multipart messages functionality currently not supported by nanomsg!"; LOG(ERROR) << "Multipart messages functionality currently not supported by nanomsg!";
return -1; return -1;
@ -195,6 +216,7 @@ int FairMQSocketNN::GetConstant(const string& constant)
LOG(ERROR) << "Multipart messages functionality currently not supported by nanomsg!"; LOG(ERROR) << "Multipart messages functionality currently not supported by nanomsg!";
return -1; return -1;
} }
if (constant == "linger") if (constant == "linger")
return NN_LINGER; return NN_LINGER;

View File

@ -19,6 +19,7 @@
#include <nanomsg/pipeline.h> #include <nanomsg/pipeline.h>
#include <nanomsg/pubsub.h> #include <nanomsg/pubsub.h>
#include <nanomsg/reqrep.h> #include <nanomsg/reqrep.h>
#include <nanomsg/pair.h>
#include "FairMQSocket.h" #include "FairMQSocket.h"

View File

@ -228,6 +228,13 @@ int FairMQSocketZMQ::GetConstant(const string& constant)
return ZMQ_REQ; return ZMQ_REQ;
if (constant == "rep") if (constant == "rep")
return ZMQ_REP; return ZMQ_REP;
if (constant == "dealer")
return ZMQ_DEALER;
if (constant == "router")
return ZMQ_ROUTER;
if (constant == "pair")
return ZMQ_PAIR;
if (constant == "snd-hwm") if (constant == "snd-hwm")
return ZMQ_SNDHWM; return ZMQ_SNDHWM;
if (constant == "rcv-hwm") if (constant == "rcv-hwm")
@ -236,6 +243,7 @@ int FairMQSocketZMQ::GetConstant(const string& constant)
return ZMQ_SNDMORE; return ZMQ_SNDMORE;
if (constant == "rcv-more") if (constant == "rcv-more")
return ZMQ_RCVMORE; return ZMQ_RCVMORE;
if (constant == "linger") if (constant == "linger")
return ZMQ_LINGER; return ZMQ_LINGER;