mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 00:31:14 +00:00
Increase maximum number of zeromq sockets and improve output on errors.
FairMQSampler: Modified: comment out an unused loop. FairMQ: Modified: increase maximum number of ZeroMQ sockets (from default 1024). FairMQ: Modified: reduce amount of output when finding port from a range. FairMQ: Modified: stop the Device when socket creation fails and output an error.
This commit is contained in:
parent
8a82afe184
commit
61f24eb73a
|
@ -117,14 +117,16 @@ void FairMQDevice::Bind()
|
||||||
{
|
{
|
||||||
if (!fPayloadOutputs->at(i)->Bind(fOutputAddress.at(i)))
|
if (!fPayloadOutputs->at(i)->Bind(fOutputAddress.at(i)))
|
||||||
{
|
{
|
||||||
|
LOG(DEBUG) << "binding output #" << i << " on " << fOutputAddress.at(i) << "failed, trying to find port in range";
|
||||||
|
LOG(INFO) << "port range: " << fPortRangeMin << " - " << fPortRangeMax;
|
||||||
do {
|
do {
|
||||||
LOG(WARN) << "could not bind at " << fOutputAddress.at(i) << ", trying another port in range";
|
|
||||||
++numAttempts;
|
++numAttempts;
|
||||||
|
|
||||||
size_t pos = fOutputAddress.at(i).rfind(":");
|
|
||||||
stringstream ss;
|
stringstream ss;
|
||||||
ss << (int)randomPort(gen);
|
ss << (int)randomPort(gen);
|
||||||
string portString = ss.str();
|
string portString = ss.str();
|
||||||
|
|
||||||
|
size_t pos = fOutputAddress.at(i).rfind(":");
|
||||||
fOutputAddress.at(i) = fOutputAddress.at(i).substr(0, pos + 1) + portString;
|
fOutputAddress.at(i) = fOutputAddress.at(i).substr(0, pos + 1) + portString;
|
||||||
if (numAttempts > maxAttempts)
|
if (numAttempts > maxAttempts)
|
||||||
{
|
{
|
||||||
|
@ -144,18 +146,20 @@ void FairMQDevice::Bind()
|
||||||
{
|
{
|
||||||
if (!fPayloadInputs->at(i)->Bind(fInputAddress.at(i)))
|
if (!fPayloadInputs->at(i)->Bind(fInputAddress.at(i)))
|
||||||
{
|
{
|
||||||
|
LOG(DEBUG) << "binding input #" << i << " on " << fInputAddress.at(i) << "failed, trying to find port in range";
|
||||||
|
LOG(INFO) << "port range: " << fPortRangeMin << " - " << fPortRangeMax;
|
||||||
do {
|
do {
|
||||||
LOG(WARN) << "could not bind at " << fInputAddress.at(i) << ", trying another port in range";
|
|
||||||
++numAttempts;
|
++numAttempts;
|
||||||
|
|
||||||
size_t pos = fInputAddress.at(i).rfind(":");
|
|
||||||
stringstream ss;
|
stringstream ss;
|
||||||
ss << (int)randomPort(gen);
|
ss << (int)randomPort(gen);
|
||||||
string portString = ss.str();
|
string portString = ss.str();
|
||||||
|
|
||||||
|
size_t pos = fInputAddress.at(i).rfind(":");
|
||||||
fInputAddress.at(i) = fInputAddress.at(i).substr(0, pos + 1) + portString;
|
fInputAddress.at(i) = fInputAddress.at(i).substr(0, pos + 1) + portString;
|
||||||
if (numAttempts > maxAttempts)
|
if (numAttempts > maxAttempts)
|
||||||
{
|
{
|
||||||
LOG(ERROR) << "could not bind output " << i << " to any port in the given range";
|
LOG(ERROR) << "could not bind input " << i << " to any port in the given range";
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
} while (!fPayloadInputs->at(i)->Bind(fInputAddress.at(i)));
|
} while (!fPayloadInputs->at(i)->Bind(fInputAddress.at(i)));
|
||||||
|
|
|
@ -42,17 +42,26 @@ FairMQSocketNN::FairMQSocketNN(const string& type, int num, int numIoThreads)
|
||||||
// http://250bpm.com/blog:14
|
// http://250bpm.com/blog:14
|
||||||
// http://www.freelists.org/post/nanomsg/a-stupid-load-balancing-question,1
|
// http://www.freelists.org/post/nanomsg/a-stupid-load-balancing-question,1
|
||||||
fSocket = nn_socket(AF_SP_RAW, GetConstant(type));
|
fSocket = nn_socket(AF_SP_RAW, GetConstant(type));
|
||||||
|
if (fSocket == -1)
|
||||||
|
{
|
||||||
|
LOG(ERROR) << "failed creating socket #" << fId << ", reason: " << nn_strerror(errno);
|
||||||
|
exit(EXIT_FAILURE);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
fSocket = nn_socket(AF_SP, GetConstant(type));
|
fSocket = nn_socket(AF_SP, GetConstant(type));
|
||||||
|
if (fSocket == -1)
|
||||||
|
{
|
||||||
|
LOG(ERROR) << "failed creating socket #" << fId << ", reason: " << nn_strerror(errno);
|
||||||
|
exit(EXIT_FAILURE);
|
||||||
|
}
|
||||||
if (type == "sub")
|
if (type == "sub")
|
||||||
{
|
{
|
||||||
nn_setsockopt(fSocket, NN_SUB, NN_SUB_SUBSCRIBE, NULL, 0);
|
nn_setsockopt(fSocket, NN_SUB, NN_SUB_SUBSCRIBE, NULL, 0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
LOG(INFO) << "created socket #" << fId;
|
LOG(INFO) << "created socket #" << fId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -12,9 +12,10 @@
|
||||||
* @author D. Klein, A. Rybalchenko
|
* @author D. Klein, A. Rybalchenko
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#include <sstream>
|
||||||
|
|
||||||
#include "FairMQLogger.h"
|
#include "FairMQLogger.h"
|
||||||
#include "FairMQContextZMQ.h"
|
#include "FairMQContextZMQ.h"
|
||||||
#include <sstream>
|
|
||||||
|
|
||||||
FairMQContextZMQ::FairMQContextZMQ(int numIoThreads)
|
FairMQContextZMQ::FairMQContextZMQ(int numIoThreads)
|
||||||
: fContext()
|
: fContext()
|
||||||
|
@ -23,6 +24,7 @@ FairMQContextZMQ::FairMQContextZMQ(int numIoThreads)
|
||||||
if (fContext == NULL)
|
if (fContext == NULL)
|
||||||
{
|
{
|
||||||
LOG(ERROR) << "failed creating context, reason: " << zmq_strerror(errno);
|
LOG(ERROR) << "failed creating context, reason: " << zmq_strerror(errno);
|
||||||
|
exit(EXIT_FAILURE);
|
||||||
}
|
}
|
||||||
|
|
||||||
int rc = zmq_ctx_set(fContext, ZMQ_IO_THREADS, numIoThreads);
|
int rc = zmq_ctx_set(fContext, ZMQ_IO_THREADS, numIoThreads);
|
||||||
|
@ -30,6 +32,13 @@ FairMQContextZMQ::FairMQContextZMQ(int numIoThreads)
|
||||||
{
|
{
|
||||||
LOG(ERROR) << "failed configuring context, reason: " << zmq_strerror(errno);
|
LOG(ERROR) << "failed configuring context, reason: " << zmq_strerror(errno);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Set the maximum number of allowed sockets on the context.
|
||||||
|
rc = zmq_ctx_set(fContext, ZMQ_MAX_SOCKETS, 10000);
|
||||||
|
if (rc != 0)
|
||||||
|
{
|
||||||
|
LOG(ERROR) << "failed configuring context, reason: " << zmq_strerror(errno);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
FairMQContextZMQ::~FairMQContextZMQ()
|
FairMQContextZMQ::~FairMQContextZMQ()
|
||||||
|
|
|
@ -40,6 +40,12 @@ FairMQSocketZMQ::FairMQSocketZMQ(const string& type, int num, int numIoThreads)
|
||||||
|
|
||||||
fSocket = zmq_socket(fContext->GetContext(), GetConstant(type));
|
fSocket = zmq_socket(fContext->GetContext(), GetConstant(type));
|
||||||
|
|
||||||
|
if (fSocket == NULL)
|
||||||
|
{
|
||||||
|
LOG(ERROR) << "failed creating socket #" << fId << ", reason: " << zmq_strerror(errno);
|
||||||
|
exit(EXIT_FAILURE);
|
||||||
|
}
|
||||||
|
|
||||||
rc = zmq_setsockopt(fSocket, ZMQ_IDENTITY, &fId, fId.length());
|
rc = zmq_setsockopt(fSocket, ZMQ_IDENTITY, &fId, fId.length());
|
||||||
if (rc != 0)
|
if (rc != 0)
|
||||||
{
|
{
|
||||||
|
@ -79,6 +85,10 @@ bool FairMQSocketZMQ::Bind(const string& address)
|
||||||
int rc = zmq_bind(fSocket, address.c_str());
|
int rc = zmq_bind(fSocket, address.c_str());
|
||||||
if (rc != 0)
|
if (rc != 0)
|
||||||
{
|
{
|
||||||
|
if (errno == EADDRINUSE) {
|
||||||
|
// do not print error in this case, this is handled by FairMQDevice in case no connection could be established after trying a number of random ports from a range.
|
||||||
|
return false;
|
||||||
|
}
|
||||||
LOG(ERROR) << "failed binding socket #" << fId << ", reason: " << zmq_strerror(errno);
|
LOG(ERROR) << "failed binding socket #" << fId << ", reason: " << zmq_strerror(errno);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue
Block a user