Configuration and DDS example/tools updates

- Update DDS example command UI and extract it from example.
 - Unify address handling via DDS properties for dynamic deployment.
 - Update DDS docs with the new approach.
 - Allow `--config-key` to be used to access common config in JSON.
 - Allow common channel properties to be specified for all sockets.
 - Update MQ examples and Tuto3 with new config options.
 - Add start scripts to MQ examples for easier use.
This commit is contained in:
Alexey Rybalchenko
2016-03-31 14:41:05 +02:00
parent 151d3b5de8
commit b9883d3b13
21 changed files with 1082 additions and 701 deletions

View File

@@ -26,7 +26,9 @@
#include "FairMQSocket.h"
#include "FairMQDevice.h"
#include "FairMQLogger.h"
#include "FairMQTools.h"
#include "FairMQProgOptions.h"
#include "FairMQTransportFactoryZMQ.h"
#ifdef NANOMSG_FOUND
#include "FairMQTransportFactoryNN.h"
@@ -44,13 +46,15 @@ static void CallSignalHandler(int signal)
FairMQDevice::FairMQDevice()
: fChannels()
, fId()
, fMaxInitializationTime(120)
, fMaxInitializationAttempts(120)
, fNumIoThreads(1)
, fPortRangeMin(22000)
, fPortRangeMax(32000)
, fLogIntervalInMs(1000)
, fCmdSocket(nullptr)
, fTransportFactory(nullptr)
, fConfig(nullptr)
, fNetworkInterface()
, fInitialValidationFinished(false)
, fInitialValidationCondition()
, fInitialValidationMutex()
@@ -88,6 +92,58 @@ void FairMQDevice::SignalHandler(int signal)
exit(EXIT_FAILURE);
}
void FairMQDevice::ConnectChannels(list<FairMQChannel*>& chans)
{
auto itr = chans.begin();
while (itr != chans.end())
{
if ((*itr)->ValidateChannel())
{
if (ConnectChannel(**itr))
{
(*itr)->InitCommandInterface(fTransportFactory, fNumIoThreads);
chans.erase(itr++);
}
else
{
LOG(ERROR) << "failed to connect channel " << (*itr)->fChannelName;
++itr;
}
}
else
{
++itr;
}
}
}
void FairMQDevice::BindChannels(list<FairMQChannel*>& chans)
{
auto itr = chans.begin();
while (itr != chans.end())
{
if ((*itr)->ValidateChannel())
{
if (BindChannel(**itr))
{
(*itr)->InitCommandInterface(fTransportFactory, fNumIoThreads);
chans.erase(itr++);
}
else
{
LOG(ERROR) << "failed to bind channel " << (*itr)->fChannelName;
++itr;
}
}
else
{
++itr;
}
}
}
void FairMQDevice::InitWrapper()
{
if (!fTransportFactory)
@@ -102,58 +158,65 @@ void FairMQDevice::InitWrapper()
fCmdSocket->Bind("inproc://commands");
}
// List to store the uninitialized channels.
list<FairMQChannel*> uninitializedChannels;
// Containers to store the uninitialized channels.
list<FairMQChannel*> uninitializedBindingChannels;
list<FairMQChannel*> uninitializedConnectingChannels;
// Fill the uninitialized channel containers
for (auto mi = fChannels.begin(); mi != fChannels.end(); ++mi)
{
for (auto vi = (mi->second).begin(); vi != (mi->second).end(); ++vi)
{
// set channel name: name + vector index
stringstream ss;
ss << mi->first << "[" << vi - (mi->second).begin() << "]";
vi->fChannelName = ss.str();
// fill the uninitialized list
uninitializedChannels.push_back(&(*vi));
if (vi->fMethod == "bind")
{
// set channel name: name + vector index
stringstream ss;
ss << mi->first << "[" << vi - (mi->second).begin() << "]";
vi->fChannelName = ss.str();
// if binding address is not specified, set it up to try getting it from the configured network interface
if (vi->fAddress == "unspecified" || vi->fAddress == "")
{
vi->fAddress = "tcp://" + FairMQ::tools::getInterfaceIP(fNetworkInterface) + ":1";
}
// fill the uninitialized list
uninitializedBindingChannels.push_back(&(*vi));
}
else if (vi->fMethod == "connect")
{
// set channel name: name + vector index
stringstream ss;
ss << mi->first << "[" << vi - (mi->second).begin() << "]";
vi->fChannelName = ss.str();
// fill the uninitialized list
uninitializedConnectingChannels.push_back(&(*vi));
}
else
{
LOG(ERROR) << "Cannot update configuration. Socket method (bind/connect) not specified.";
exit(EXIT_FAILURE);
}
}
}
// Bind channels. Here one run is enough, because bind settings should be available locally
// If necessary this could be handled in the same way as the connecting channels
BindChannels(uninitializedBindingChannels);
// notify parent thread about completion of first validation.
{
lock_guard<mutex> lock(fInitialValidationMutex);
fInitialValidationFinished = true;
fInitialValidationCondition.notify_one();
}
// go over the list of channels until all are initialized (and removed from the uninitialized list)
int numAttempts = 0;
int maxAttempts = fMaxInitializationTime;
do
int maxAttempts = fMaxInitializationAttempts;
while (!uninitializedConnectingChannels.empty())
{
auto itr = uninitializedChannels.begin();
while (itr != uninitializedChannels.end())
ConnectChannels(uninitializedConnectingChannels);
if (++numAttempts > maxAttempts)
{
if ((*itr)->ValidateChannel())
{
if (InitChannel(*(*itr)))
{
(*itr)->InitCommandInterface(fTransportFactory, fNumIoThreads);
uninitializedChannels.erase(itr++);
}
else
{
LOG(ERROR) << "failed to initialize channel " << (*itr)->fChannelName;
++itr;
}
}
else
{
++itr;
}
}
// notify parent thread about completion of first validation.
boost::lock_guard<boost::mutex> lock(fInitialValidationMutex);
fInitialValidationFinished = true;
fInitialValidationCondition.notify_one();
++numAttempts;
if (numAttempts > maxAttempts)
{
LOG(ERROR) << "could not initialize all channels after " << maxAttempts << " attempts";
LOG(ERROR) << "could not connect all channels after " << maxAttempts << " attempts";
// TODO: goto ERROR state;
exit(EXIT_FAILURE);
}
@@ -162,7 +225,7 @@ void FairMQDevice::InitWrapper()
{
boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
}
} while (!uninitializedChannels.empty());
}
Init();
@@ -176,18 +239,15 @@ void FairMQDevice::InitWrapper()
void FairMQDevice::WaitForInitialValidation()
{
boost::unique_lock<boost::mutex> lock(fInitialValidationMutex);
while (!fInitialValidationFinished)
{
fInitialValidationCondition.wait(lock);
}
unique_lock<mutex> lock(fInitialValidationMutex);
fInitialValidationCondition.wait(lock, [&] () { return fInitialValidationFinished; });
}
void FairMQDevice::Init()
{
}
bool FairMQDevice::InitChannel(FairMQChannel& ch)
bool FairMQDevice::BindChannel(FairMQChannel& ch)
{
LOG(DEBUG) << "Initializing channel " << ch.fChannelName << " (" << ch.fType << ")";
// initialize the socket
@@ -196,51 +256,105 @@ bool FairMQDevice::InitChannel(FairMQChannel& ch)
ch.fSocket->SetOption("snd-hwm", &(ch.fSndBufSize), sizeof(ch.fSndBufSize));
ch.fSocket->SetOption("rcv-hwm", &(ch.fRcvBufSize), sizeof(ch.fRcvBufSize));
// TODO: make it work with ipc
// number of attempts when choosing a random port
int maxAttempts = 1000;
int numAttempts = 0;
if (ch.fMethod == "bind")
// initialize random generator
boost::random::mt19937 gen(getpid());
boost::random::uniform_int_distribution<> randomPort(fPortRangeMin, fPortRangeMax);
LOG(DEBUG) << "Binding channel " << ch.fChannelName << " on " << ch.fAddress;
// try to bind to the saved port. In case of failure, try random one.
while (!ch.fSocket->Bind(ch.fAddress))
{
// number of attempts when choosing a random port
int maxAttempts = 1000;
int numAttempts = 0;
LOG(DEBUG) << "Could not bind to configured (TCP) port, trying random port in range " << fPortRangeMin << "-" << fPortRangeMax;
++numAttempts;
// initialize random generator
boost::random::mt19937 gen(getpid());
boost::random::uniform_int_distribution<> randomPort(fPortRangeMin, fPortRangeMax);
if (numAttempts > maxAttempts)
{
LOG(ERROR) << "could not bind to any (TCP) port in the given range after " << maxAttempts << " attempts";
return false;
}
size_t pos = ch.fAddress.rfind(":");
stringstream newPort;
newPort << static_cast<int>(randomPort(gen));
ch.fAddress = ch.fAddress.substr(0, pos + 1) + newPort.str();
LOG(DEBUG) << "Binding channel " << ch.fChannelName << " on " << ch.fAddress;
// try to bind to the saved port. In case of failure, try random one.
if (!ch.fSocket->Bind(ch.fAddress))
{
LOG(DEBUG) << "Could not bind to configured port, trying random port in range " << fPortRangeMin << "-" << fPortRangeMax;
do {
++numAttempts;
if (numAttempts > maxAttempts)
{
LOG(ERROR) << "could not bind to any port in the given range after " << maxAttempts << " attempts";
return false;
}
size_t pos = ch.fAddress.rfind(":");
stringstream newPort;
newPort << static_cast<int>(randomPort(gen));
ch.fAddress = ch.fAddress.substr(0, pos + 1) + newPort.str();
LOG(DEBUG) << "Binding channel " << ch.fChannelName << " on " << ch.fAddress;
} while (!ch.fSocket->Bind(ch.fAddress));
}
}
else
{
LOG(DEBUG) << "Connecting channel " << ch.fChannelName << " to " << ch.fAddress;
ch.fSocket->Connect(ch.fAddress);
}
return true;
}
bool FairMQDevice::ConnectChannel(FairMQChannel& ch)
{
LOG(DEBUG) << "Initializing channel " << ch.fChannelName << " (" << ch.fType << ")";
// initialize the socket
ch.fSocket = fTransportFactory->CreateSocket(ch.fType, ch.fChannelName, fNumIoThreads, fId);
// set high water marks
ch.fSocket->SetOption("snd-hwm", &(ch.fSndBufSize), sizeof(ch.fSndBufSize));
ch.fSocket->SetOption("rcv-hwm", &(ch.fRcvBufSize), sizeof(ch.fRcvBufSize));
// connect
LOG(DEBUG) << "Connecting channel " << ch.fChannelName << " to " << ch.fAddress;
ch.fSocket->Connect(ch.fAddress);
return true;
}
// bool FairMQDevice::InitChannel(FairMQChannel& ch)
// {
// LOG(DEBUG) << "Initializing channel " << ch.fChannelName << " (" << ch.fType << ")";
// // initialize the socket
// ch.fSocket = fTransportFactory->CreateSocket(ch.fType, ch.fChannelName, fNumIoThreads, fId);
// // set high water marks
// ch.fSocket->SetOption("snd-hwm", &(ch.fSndBufSize), sizeof(ch.fSndBufSize));
// ch.fSocket->SetOption("rcv-hwm", &(ch.fRcvBufSize), sizeof(ch.fRcvBufSize));
// if (ch.fMethod == "bind")
// {
// // number of attempts when choosing a random port
// int maxAttempts = 1000;
// int numAttempts = 0;
// // initialize random generator
// boost::random::mt19937 gen(getpid());
// boost::random::uniform_int_distribution<> randomPort(fPortRangeMin, fPortRangeMax);
// LOG(DEBUG) << "Binding channel " << ch.fChannelName << " on " << ch.fAddress;
// // try to bind to the saved port. In case of failure, try random one.
// if (!ch.fSocket->Bind(ch.fAddress))
// {
// LOG(DEBUG) << "Could not bind to configured port, trying random port in range " << fPortRangeMin << "-" << fPortRangeMax;
// do {
// ++numAttempts;
// if (numAttempts > maxAttempts)
// {
// LOG(ERROR) << "could not bind to any port in the given range after " << maxAttempts << " attempts";
// return false;
// }
// size_t pos = ch.fAddress.rfind(":");
// stringstream newPort;
// newPort << static_cast<int>(randomPort(gen));
// ch.fAddress = ch.fAddress.substr(0, pos + 1) + newPort.str();
// LOG(DEBUG) << "Binding channel " << ch.fChannelName << " on " << ch.fAddress;
// } while (!ch.fSocket->Bind(ch.fAddress));
// }
// }
// else
// {
// LOG(DEBUG) << "Connecting channel " << ch.fChannelName << " to " << ch.fAddress;
// ch.fSocket->Connect(ch.fAddress);
// }
// return true;
// }
void FairMQDevice::InitTaskWrapper()
{
InitTask();
@@ -365,6 +479,9 @@ void FairMQDevice::SetProperty(const int key, const string& value)
case Id:
fId = value;
break;
case NetworkInterface:
fNetworkInterface = value;
break;
default:
FairMQConfigurable::SetProperty(key, value);
break;
@@ -379,8 +496,8 @@ void FairMQDevice::SetProperty(const int key, const int value)
case NumIoThreads:
fNumIoThreads = value;
break;
case MaxInitializationTime:
fMaxInitializationTime = value;
case MaxInitializationAttempts:
fMaxInitializationAttempts = value;
break;
case PortRangeMin:
fPortRangeMin = value;
@@ -404,6 +521,8 @@ string FairMQDevice::GetProperty(const int key, const string& default_ /*= ""*/)
{
case Id:
return fId;
case NetworkInterface:
return fNetworkInterface;
default:
return FairMQConfigurable::GetProperty(key, default_);
}
@@ -417,14 +536,16 @@ string FairMQDevice::GetPropertyDescription(const int key)
return "Id: Device ID";
case NumIoThreads:
return "NumIoThreads: Number of I/O Threads (size of the 0MQ thread pool to handle I/O operations. If your application is using only the inproc transport for messaging you may set this to zero, otherwise set it to at least one.)";
case MaxInitializationTime:
return "MaxInitializationTime: Timeout for retrying validation and initialization of the channels.";
case MaxInitializationAttempts:
return "MaxInitializationAttempts: Maximum number of validation and initialization attempts of the channels.";
case PortRangeMin:
return "PortRangeMin: Minumum value for the port range (when binding to dynamic port).";
case PortRangeMax:
return "PortRangeMax: Maximum value for the port range (when binding to dynamic port).";
case LogIntervalInMs:
return "LogIntervalInMs: Time between socket rates logging outputs.";
case NetworkInterface:
return "NetworkInterface: Network interface to use for dynamic binding.";
default:
return FairMQConfigurable::GetPropertyDescription(key);
}
@@ -447,8 +568,8 @@ int FairMQDevice::GetProperty(const int key, const int default_ /*= 0*/)
{
case NumIoThreads:
return fNumIoThreads;
case MaxInitializationTime:
return fMaxInitializationTime;
case MaxInitializationAttempts:
return fMaxInitializationAttempts;
case PortRangeMin:
return fPortRangeMin;
case PortRangeMax:
@@ -491,6 +612,16 @@ void FairMQDevice::SetTransport(const string& transport)
}
}
void FairMQDevice::SetConfig(FairMQProgOptions& config)
{
fConfig = &config;
fChannels = config.GetFairMQMap();
SetTransport(config.GetValue<string>("transport"));
fId = config.GetValue<string>("id");
fNetworkInterface = config.GetValue<string>("network-interface");
fNumIoThreads = config.GetValue<int>("io-threads");
}
void FairMQDevice::LogSocketRates()
{
timestamp_t t0;