diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index 183491d8..d9fd489c 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -117,14 +117,16 @@ void FairMQDevice::Bind() { if (!fPayloadOutputs->at(i)->Bind(fOutputAddress.at(i))) { + LOG(DEBUG) << "binding output #" << i << " on " << fOutputAddress.at(i) << "failed, trying to find port in range"; + LOG(INFO) << "port range: " << fPortRangeMin << " - " << fPortRangeMax; do { - LOG(WARN) << "could not bind at " << fOutputAddress.at(i) << ", trying another port in range"; ++numAttempts; - size_t pos = fOutputAddress.at(i).rfind(":"); stringstream ss; ss << (int)randomPort(gen); string portString = ss.str(); + + size_t pos = fOutputAddress.at(i).rfind(":"); fOutputAddress.at(i) = fOutputAddress.at(i).substr(0, pos + 1) + portString; if (numAttempts > maxAttempts) { @@ -144,18 +146,20 @@ void FairMQDevice::Bind() { if (!fPayloadInputs->at(i)->Bind(fInputAddress.at(i))) { + LOG(DEBUG) << "binding input #" << i << " on " << fInputAddress.at(i) << "failed, trying to find port in range"; + LOG(INFO) << "port range: " << fPortRangeMin << " - " << fPortRangeMax; do { - LOG(WARN) << "could not bind at " << fInputAddress.at(i) << ", trying another port in range"; ++numAttempts; - size_t pos = fInputAddress.at(i).rfind(":"); stringstream ss; ss << (int)randomPort(gen); string portString = ss.str(); + + size_t pos = fInputAddress.at(i).rfind(":"); fInputAddress.at(i) = fInputAddress.at(i).substr(0, pos + 1) + portString; if (numAttempts > maxAttempts) { - LOG(ERROR) << "could not bind output " << i << " to any port in the given range"; + LOG(ERROR) << "could not bind input " << i << " to any port in the given range"; break; } } while (!fPayloadInputs->at(i)->Bind(fInputAddress.at(i))); diff --git a/fairmq/nanomsg/FairMQSocketNN.cxx b/fairmq/nanomsg/FairMQSocketNN.cxx index 8a209e1c..acf5b491 100644 --- a/fairmq/nanomsg/FairMQSocketNN.cxx +++ b/fairmq/nanomsg/FairMQSocketNN.cxx @@ -42,17 +42,26 @@ FairMQSocketNN::FairMQSocketNN(const string& type, int num, int numIoThreads) // http://250bpm.com/blog:14 // http://www.freelists.org/post/nanomsg/a-stupid-load-balancing-question,1 fSocket = nn_socket(AF_SP_RAW, GetConstant(type)); + if (fSocket == -1) + { + LOG(ERROR) << "failed creating socket #" << fId << ", reason: " << nn_strerror(errno); + exit(EXIT_FAILURE); + } } else { fSocket = nn_socket(AF_SP, GetConstant(type)); + if (fSocket == -1) + { + LOG(ERROR) << "failed creating socket #" << fId << ", reason: " << nn_strerror(errno); + exit(EXIT_FAILURE); + } if (type == "sub") { nn_setsockopt(fSocket, NN_SUB, NN_SUB_SUBSCRIBE, NULL, 0); } } - LOG(INFO) << "created socket #" << fId; } diff --git a/fairmq/zeromq/FairMQContextZMQ.cxx b/fairmq/zeromq/FairMQContextZMQ.cxx index d18d851d..7095433b 100644 --- a/fairmq/zeromq/FairMQContextZMQ.cxx +++ b/fairmq/zeromq/FairMQContextZMQ.cxx @@ -12,9 +12,10 @@ * @author D. Klein, A. Rybalchenko */ +#include + #include "FairMQLogger.h" #include "FairMQContextZMQ.h" -#include FairMQContextZMQ::FairMQContextZMQ(int numIoThreads) : fContext() @@ -23,6 +24,7 @@ FairMQContextZMQ::FairMQContextZMQ(int numIoThreads) if (fContext == NULL) { LOG(ERROR) << "failed creating context, reason: " << zmq_strerror(errno); + exit(EXIT_FAILURE); } int rc = zmq_ctx_set(fContext, ZMQ_IO_THREADS, numIoThreads); @@ -30,6 +32,13 @@ FairMQContextZMQ::FairMQContextZMQ(int numIoThreads) { LOG(ERROR) << "failed configuring context, reason: " << zmq_strerror(errno); } + + // Set the maximum number of allowed sockets on the context. + rc = zmq_ctx_set(fContext, ZMQ_MAX_SOCKETS, 10000); + if (rc != 0) + { + LOG(ERROR) << "failed configuring context, reason: " << zmq_strerror(errno); + } } FairMQContextZMQ::~FairMQContextZMQ() diff --git a/fairmq/zeromq/FairMQSocketZMQ.cxx b/fairmq/zeromq/FairMQSocketZMQ.cxx index 43598c72..fe629962 100644 --- a/fairmq/zeromq/FairMQSocketZMQ.cxx +++ b/fairmq/zeromq/FairMQSocketZMQ.cxx @@ -40,6 +40,12 @@ FairMQSocketZMQ::FairMQSocketZMQ(const string& type, int num, int numIoThreads) fSocket = zmq_socket(fContext->GetContext(), GetConstant(type)); + if (fSocket == NULL) + { + LOG(ERROR) << "failed creating socket #" << fId << ", reason: " << zmq_strerror(errno); + exit(EXIT_FAILURE); + } + rc = zmq_setsockopt(fSocket, ZMQ_IDENTITY, &fId, fId.length()); if (rc != 0) { @@ -79,6 +85,10 @@ bool FairMQSocketZMQ::Bind(const string& address) int rc = zmq_bind(fSocket, address.c_str()); if (rc != 0) { + if (errno == EADDRINUSE) { + // do not print error in this case, this is handled by FairMQDevice in case no connection could be established after trying a number of random ports from a range. + return false; + } LOG(ERROR) << "failed binding socket #" << fId << ", reason: " << zmq_strerror(errno); return false; }