From c2d7c49cf5f241332d3bf8b8f1bc34754400aa10 Mon Sep 17 00:00:00 2001 From: mkrzewic Date: Fri, 11 Nov 2016 16:35:23 +0100 Subject: [PATCH] Support multiple endpoints per socket Sent messages will be scheduled among the endpoints according to socket type: PUB will send the same data to all endpoints simultaneously, PUSH will do round robin transfer. Incoming data is fair queued between endpoints. This is a feature of at least zeromq and nanomsg. _____________ To use: in the device configuration, instead of specifying just one address, specify a comma separated list e.g. tcp://localhost:123,ipc:///tmp/socket the connection method (bind/connect) applies to all endpoints in this case. ______________ Mixing binding and connecting endpoints is supported: prefix "@" means "bind", "+" (or ">") means connect, e.g. +tcp://localhost:123,@ipc:///tmp/socket,ipc:///tmp/asd (in case of missing prefix, the default channel method is used for that endpoint). --- fairmq/FairMQChannel.cxx | 89 +++++++++++++++++++-------- fairmq/FairMQChannel.h | 5 ++ fairmq/FairMQDevice.cxx | 129 +++++++++++++++++++++++++++++---------- fairmq/FairMQDevice.h | 10 +++ fairmq/FairMQSocket.cxx | 50 +++++++++++++++ fairmq/FairMQSocket.h | 1 + 6 files changed, 226 insertions(+), 58 deletions(-) diff --git a/fairmq/FairMQChannel.cxx b/fairmq/FairMQChannel.cxx index ee313d7c..505f243d 100644 --- a/fairmq/FairMQChannel.cxx +++ b/fairmq/FairMQChannel.cxx @@ -411,39 +411,52 @@ bool FairMQChannel::ValidateChannel() } else { - // check if address is a tcp or ipc address - if (fAddress.compare(0, 6, "tcp://") == 0) + //TODO: maybe cache fEndpoints as a class member? not really needed as tokenizing is + //fast, and only happens during (re-)configure + std::vector fEndpoints; + Tokenize(fEndpoints, fAddress); + for (const auto endpoint : fEndpoints) { - // check if TCP address contains port delimiter - string addressString = fAddress.substr(6); - if (addressString.find(":") == string::npos) + std::string address; + if (endpoint[0]=='@'||endpoint[0]=='+'||endpoint[0]=='>') { + address = endpoint.substr(1); + } else { + address = endpoint; + } + // check if address is a tcp or ipc address + if (address.compare(0, 6, "tcp://") == 0) { + // check if TCP address contains port delimiter + string addressString = address.substr(6); + if (addressString.find(":") == string::npos) + { + ss << "INVALID"; + LOG(DEBUG) << ss.str(); + LOG(ERROR) << "invalid channel address: \"" << address << "\" (missing port?)"; + return false; + } + } + else if (address.compare(0, 6, "ipc://") == 0) + { + // check if IPC address is not empty + string addressString = address.substr(6); + if (addressString == "") + { + ss << "INVALID"; + LOG(DEBUG) << ss.str(); + LOG(ERROR) << "invalid channel address: \"" << address << "\" (empty IPC address?)"; + return false; + } + } + else + { + // if neither TCP or IPC is specified, return invalid ss << "INVALID"; LOG(DEBUG) << ss.str(); - LOG(ERROR) << "invalid channel address: \"" << fAddress << "\" (missing port?)"; + LOG(ERROR) << "invalid channel address: \"" << address << "\" (missing protocol specifier?)"; return false; } } - else if (fAddress.compare(0, 6, "ipc://") == 0) - { - // check if IPC address is not empty - string addressString = fAddress.substr(6); - if (addressString == "") - { - ss << "INVALID"; - LOG(DEBUG) << ss.str(); - LOG(ERROR) << "invalid channel address: \"" << fAddress << "\" (empty IPC address?)"; - return false; - } - } - else - { - // if neither TCP or IPC is specified, return invalid - ss << "INVALID"; - LOG(DEBUG) << ss.str(); - LOG(ERROR) << "invalid channel address: \"" << fAddress << "\" (missing protocol specifier?)"; - return false; - } } // validate socket buffer size for sending @@ -770,3 +783,27 @@ FairMQChannel::~FairMQChannel() delete fCmdSocket; delete fPoller; } + +void FairMQChannel::Tokenize(std::vector& output, + const std::string& input, + const std::string delimiters) +{ + using namespace std; + size_t start = 0; + size_t end = input.find_first_of(delimiters); + if (end == string::npos) + { + output.push_back(input.substr(start, input.length())); + } + else do + { + output.push_back(input.substr(start, end-start)); + start = ++end; + end = input.find_first_of(delimiters, start); + if (end == string::npos) + { + output.push_back(input.substr(start, input.length())); + } + } while (end != string::npos); +} + diff --git a/fairmq/FairMQChannel.h b/fairmq/FairMQChannel.h index 2bca463a..433a17ff 100644 --- a/fairmq/FairMQChannel.h +++ b/fairmq/FairMQChannel.h @@ -248,6 +248,11 @@ class FairMQChannel int Receive(FairMQMessage* msg, const std::string& flag = "", int rcvTimeoutInMs = -1) const; int Receive(FairMQMessage* msg, const int flags, int rcvTimeoutInMs = -1) const; + // TODO: this might go to some base utility library + static void Tokenize(std::vector& output, + const std::string& input, + const std::string delimiters = ","); + private: std::string fType; std::string fMethod; diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index 019cc489..5776e1b4 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -118,7 +118,7 @@ void FairMQDevice::ConnectChannels(list& chans) { if ((*itr)->ValidateChannel()) { - if (ConnectChannel(**itr)) + if (AttachChannel(**itr)) { (*itr)->InitCommandInterface(fTransportFactory, fNumIoThreads); chans.erase(itr++); @@ -144,7 +144,7 @@ void FairMQDevice::BindChannels(list& chans) { if ((*itr)->ValidateChannel()) { - if (BindChannel(**itr)) + if (AttachChannel(**itr)) { (*itr)->InitCommandInterface(fTransportFactory, fNumIoThreads); chans.erase(itr++); @@ -273,37 +273,9 @@ bool FairMQDevice::BindChannel(FairMQChannel& ch) ch.fSocket->SetOption("snd-hwm", &(ch.fSndBufSize), sizeof(ch.fSndBufSize)); ch.fSocket->SetOption("rcv-hwm", &(ch.fRcvBufSize), sizeof(ch.fRcvBufSize)); - // number of attempts when choosing a random port - int maxAttempts = 1000; - int numAttempts = 0; - - // initialize random generator - std::default_random_engine generator(std::chrono::system_clock::now().time_since_epoch().count()); - std::uniform_int_distribution randomPort(fPortRangeMin, fPortRangeMax); - LOG(DEBUG) << "Binding channel " << ch.fChannelName << " on " << ch.fAddress; - // try to bind to the saved port. In case of failure, try random one. - while (!ch.fSocket->Bind(ch.fAddress)) - { - LOG(DEBUG) << "Could not bind to configured (TCP) port, trying random port in range " << fPortRangeMin << "-" << fPortRangeMax; - ++numAttempts; - - if (numAttempts > maxAttempts) - { - LOG(ERROR) << "could not bind to any (TCP) port in the given range after " << maxAttempts << " attempts"; - return false; - } - - size_t pos = ch.fAddress.rfind(":"); - stringstream newPort; - newPort << static_cast(randomPort(generator)); - ch.fAddress = ch.fAddress.substr(0, pos + 1) + newPort.str(); - - LOG(DEBUG) << "Binding channel " << ch.fChannelName << " on " << ch.fAddress; - } - - return true; + return BindEndpoint(*ch.fSocket, ch.fAddress); } bool FairMQDevice::ConnectChannel(FairMQChannel& ch) @@ -316,7 +288,100 @@ bool FairMQDevice::ConnectChannel(FairMQChannel& ch) ch.fSocket->SetOption("rcv-hwm", &(ch.fRcvBufSize), sizeof(ch.fRcvBufSize)); // connect LOG(DEBUG) << "Connecting channel " << ch.fChannelName << " to " << ch.fAddress; - ch.fSocket->Connect(ch.fAddress); + ConnectEndpoint(*ch.fSocket, ch.fAddress); + return true; +} + +bool FairMQDevice::AttachChannel(FairMQChannel& ch) +{ + std::vector endpoints; + FairMQChannel::Tokenize(endpoints, ch.fAddress); + for (auto& endpoint : endpoints) + { + //(re-)init socket + if (!ch.fSocket) { + ch.fSocket = fTransportFactory->CreateSocket(ch.fType, ch.fChannelName, fNumIoThreads, fId); + } + + // set high water marks + ch.fSocket->SetOption("snd-hwm", &(ch.fSndBufSize), sizeof(ch.fSndBufSize)); + ch.fSocket->SetOption("rcv-hwm", &(ch.fRcvBufSize), sizeof(ch.fRcvBufSize)); + + // attach + bool bind = (ch.fMethod=="bind"); + bool connectionModifier = false; + std::string address = endpoint; + + // check if the default fMethod is overridden by a modifier + if (endpoint[0]=='+' || endpoint[0]=='>') { + connectionModifier = true; + bind = false; + address = endpoint.substr(1); + } else if (endpoint[0]=='@') { + connectionModifier = true; + bind = true; + address = endpoint.substr(1); + } + + bool rc = true; + // make the connection + if (bind) { + rc = BindEndpoint(*ch.fSocket, address); + } else { + rc = ConnectEndpoint(*ch.fSocket, address); + } + + // bind might bind to an address different than requested, + // put the actual address back in the config + endpoint.clear(); + if (connectionModifier) endpoint.push_back(bind?'@':'+'); + endpoint += address; + + LOG(DEBUG) << "Attached channel " << ch.fChannelName << " to " << endpoint + << (bind?" (bind) ":" (connect) "); + + // after the book keeping is done, exit in case of errors + if (!rc) return rc; + } + return true; +} + +bool FairMQDevice::ConnectEndpoint(FairMQSocket& socket, std::string& endpoint) +{ + socket.Connect(endpoint); + return true; +} + +bool FairMQDevice::BindEndpoint(FairMQSocket& socket, std::string& endpoint) +{ + // number of attempts when choosing a random port + int maxAttempts = 1000; + int numAttempts = 0; + + // initialize random generator + std::default_random_engine generator(std::chrono::system_clock::now().time_since_epoch().count()); + std::uniform_int_distribution randomPort(fPortRangeMin, fPortRangeMax); + + // try to bind to the saved port. In case of failure, try random one. + while (!socket.Bind(endpoint)) + { + LOG(DEBUG) << "Could not bind to configured (TCP) port, trying random port in range " + << fPortRangeMin << "-" << fPortRangeMax; + ++numAttempts; + + if (numAttempts > maxAttempts) + { + LOG(ERROR) << "could not bind to any (TCP) port in the given range after " + << maxAttempts << " attempts"; + return false; + } + + size_t pos = endpoint.rfind(":"); + stringstream newPort; + newPort << static_cast(randomPort(generator)); + // TODO: thread safety? (this comes in as a reference and DOES get changed in this case). + endpoint = endpoint.substr(0, pos + 1) + newPort.str(); + } return true; } diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index 22fef7bc..00c31cde 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -400,6 +400,16 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable /// Connects a single channel (used in InitWrapper) bool ConnectChannel(FairMQChannel& ch); + /// Sets up and connects/binds a socket to an endpoint + /// return a string with the actual endpoint if it happens + //to stray from default. + bool ConnectEndpoint(FairMQSocket& socket, std::string& endpoint); + bool BindEndpoint(FairMQSocket& socket, std::string& endpoint); + /// Attaches the channel to all listed endpoints + /// the list is comma separated; the default method (bind/connect) is used. + /// to override default: prepend "@" to bind, "+" or ">" to connect endpoint. + bool AttachChannel(FairMQChannel& ch); + /// Signal handler void SignalHandler(int signal); bool fCatchingSignals; diff --git a/fairmq/FairMQSocket.cxx b/fairmq/FairMQSocket.cxx index 370e6134..a32fdfd5 100644 --- a/fairmq/FairMQSocket.cxx +++ b/fairmq/FairMQSocket.cxx @@ -11,3 +11,53 @@ * @since 2012-12-05 * @author D. Klein, A. Rybalchenko */ + +#include "FairMQSocket.h" + +bool FairMQSocket::Attach(const std::string& config, bool serverish) +{ + if (config.empty()) + return false; + if (config.size()<2) + return false; + + const char* endpoints = config.c_str(); + + // We hold each individual endpoint here + char endpoint [256]; + while (*endpoints) { + const char *delimiter = strchr (endpoints, ','); + if (!delimiter) + delimiter = endpoints + strlen (endpoints); + if (delimiter - endpoints > 255) + return false; + memcpy (endpoint, endpoints, delimiter - endpoints); + endpoint [delimiter - endpoints] = 0; + + bool rc; + if (endpoint [0] == '@') { + rc = Bind(endpoint + 1); + } + else if (endpoint [0] == '>' || endpoint [0] == '-' || endpoint [0] == '+' ) { + Connect(endpoint + 1); + } + else if (serverish) { + rc = Bind(endpoint); + } + else { + Connect(endpoint); + } + + if (!rc) { + return false; + } + + if (*delimiter == 0) { + break; + } + + endpoints = delimiter + 1; + } + + return true; +} diff --git a/fairmq/FairMQSocket.h b/fairmq/FairMQSocket.h index 6d89a4a6..d0eb9301 100644 --- a/fairmq/FairMQSocket.h +++ b/fairmq/FairMQSocket.h @@ -37,6 +37,7 @@ class FairMQSocket virtual bool Bind(const std::string& address) = 0; virtual void Connect(const std::string& address) = 0; + virtual bool Attach(const std::string& address, bool serverish = false); virtual int Send(FairMQMessage* msg, const std::string& flag = "") = 0; virtual int Send(FairMQMessage* msg, const int flags = 0) = 0;