From b9883d3b13d28aae992cc3dfa59d560b2a338392 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Thu, 31 Mar 2016 14:41:05 +0200 Subject: [PATCH] Configuration and DDS example/tools updates - Update DDS example command UI and extract it from example. - Unify address handling via DDS properties for dynamic deployment. - Update DDS docs with the new approach. - Allow `--config-key` to be used to access common config in JSON. - Allow common channel properties to be specified for all sockets. - Update MQ examples and Tuto3 with new config options. - Add start scripts to MQ examples for easier use. --- fairmq/CMakeLists.txt | 6 +- fairmq/FairMQChannel.cxx | 44 +++ fairmq/FairMQChannel.h | 15 + fairmq/FairMQDevice.cxx | 311 +++++++++++++----- fairmq/FairMQDevice.h | 30 +- fairmq/deployment/CMakeLists.txt | 62 ++++ fairmq/deployment/FairMQDDSTools.h | 197 +++++++++++ fairmq/deployment/runDDSCommandUI.cxx | 113 +++++++ fairmq/logger/logger.cxx | 134 ++++---- fairmq/logger/logger.h | 81 ++--- fairmq/nanomsg/FairMQPollerNN.h | 1 + fairmq/nanomsg/FairMQSocketNN.cxx | 6 + fairmq/nanomsg/FairMQSocketNN.h | 6 - fairmq/options/FairMQParser.cxx | 275 +++++++++------- fairmq/options/FairMQParser.h | 7 +- fairmq/options/FairMQProgOptions.cxx | 94 ++++-- fairmq/run/runMerger.cxx | 131 ++------ fairmq/run/runProxy.cxx | 123 ++----- fairmq/run/runSplitter.cxx | 132 ++------ ...hmark.sh.in => startFairMQBenchmark.sh.in} | 0 fairmq/tools/FairMQTools.h | 15 + 21 files changed, 1082 insertions(+), 701 deletions(-) create mode 100644 fairmq/deployment/CMakeLists.txt create mode 100644 fairmq/deployment/FairMQDDSTools.h create mode 100644 fairmq/deployment/runDDSCommandUI.cxx rename fairmq/run/{startBenchmark.sh.in => startFairMQBenchmark.sh.in} (100%) diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index 6dc42883..4b882c75 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -6,12 +6,16 @@ # copied verbatim in the file "LICENSE" # ################################################################################ -configure_file(${CMAKE_SOURCE_DIR}/fairmq/run/startBenchmark.sh.in ${CMAKE_BINARY_DIR}/bin/startBenchmark.sh) +configure_file(${CMAKE_SOURCE_DIR}/fairmq/run/startFairMQBenchmark.sh.in ${CMAKE_BINARY_DIR}/bin/startFairMQBenchmark.sh) configure_file(${CMAKE_SOURCE_DIR}/fairmq/run/benchmark.json ${CMAKE_BINARY_DIR}/bin/config/benchmark.json) add_subdirectory(logger) add_subdirectory(test) +If(DDS_FOUND) + add_subdirectory(deployment) +EndIf(DDS_FOUND) + Set(INCLUDE_DIRECTORIES ${CMAKE_SOURCE_DIR}/fairmq ${CMAKE_SOURCE_DIR}/fairmq/devices diff --git a/fairmq/FairMQChannel.cxx b/fairmq/FairMQChannel.cxx index 7c1a29ab..85e878c0 100644 --- a/fairmq/FairMQChannel.cxx +++ b/fairmq/FairMQChannel.cxx @@ -28,6 +28,7 @@ FairMQChannel::FairMQChannel() , fType("unspecified") , fMethod("unspecified") , fAddress("unspecified") + , fProperty("") , fSndBufSize(1000) , fRcvBufSize(1000) , fSndKernelSize(0) @@ -50,6 +51,7 @@ FairMQChannel::FairMQChannel(const string& type, const string& method, const str , fType(type) , fMethod(method) , fAddress(address) + , fProperty("") , fSndBufSize(1000) , fRcvBufSize(1000) , fSndKernelSize(0) @@ -72,6 +74,7 @@ FairMQChannel::FairMQChannel(const FairMQChannel& chan) , fType(chan.fType) , fMethod(chan.fMethod) , fAddress(chan.fAddress) + , fProperty(chan.fProperty) , fSndBufSize(chan.fSndBufSize) , fRcvBufSize(chan.fRcvBufSize) , fSndKernelSize(chan.fSndKernelSize) @@ -93,6 +96,7 @@ FairMQChannel& FairMQChannel::operator=(const FairMQChannel& chan) fType = chan.fType; fMethod = chan.fMethod; fAddress = chan.fAddress; + fProperty = chan.fProperty; fSndBufSize = chan.fSndBufSize; fRcvBufSize = chan.fRcvBufSize; fSndKernelSize = chan.fSndKernelSize; @@ -112,6 +116,17 @@ FairMQChannel& FairMQChannel::operator=(const FairMQChannel& chan) return *this; } +string FairMQChannel::GetChannelName() const +{ + return fChannelName; +} + +string FairMQChannel::GetChannelPrefix() const +{ + string prefix = fChannelName; + return prefix.erase(fChannelName.rfind("[")); +} + string FairMQChannel::GetType() const { try @@ -154,6 +169,20 @@ string FairMQChannel::GetAddress() const } } +string FairMQChannel::GetProperty() const +{ + try + { + boost::unique_lock scoped_lock(fChannelMutex); + return fProperty; + } + catch (boost::exception& e) + { + LOG(ERROR) << "Exception caught in FairMQChannel::GetProperty: " << boost::diagnostic_information(e); + exit(EXIT_FAILURE); + } +} + int FairMQChannel::GetSndBufSize() const { try @@ -269,6 +298,21 @@ void FairMQChannel::UpdateAddress(const string& address) } } +void FairMQChannel::UpdateProperty(const string& property) +{ + try + { + boost::unique_lock scoped_lock(fChannelMutex); + fIsValid = false; + fProperty = property; + } + catch (boost::exception& e) + { + LOG(ERROR) << "Exception caught in FairMQChannel::UpdateProperty: " << boost::diagnostic_information(e); + exit(EXIT_FAILURE); + } +} + void FairMQChannel::UpdateSndBufSize(const int sndBufSize) { try diff --git a/fairmq/FairMQChannel.h b/fairmq/FairMQChannel.h index 265024db..cb496336 100644 --- a/fairmq/FairMQChannel.h +++ b/fairmq/FairMQChannel.h @@ -50,6 +50,14 @@ class FairMQChannel /// Default destructor virtual ~FairMQChannel(); + /// Get channel name + /// @return Returns full channel name (e.g. "data[0]") + std::string GetChannelName() const; + + /// Get channel prefix + /// @return Returns channel prefix (e.g. "data") + std::string GetChannelPrefix() const; + /// Get socket type /// @return Returns socket type (push/pull/pub/sub/spub/xsub/pair/req/rep/dealer/router/) std::string GetType() const; @@ -59,6 +67,9 @@ class FairMQChannel /// Get socket address (e.g. "tcp://127.0.0.1:5555" or "ipc://abc") /// @return Returns socket type (e.g. "tcp://127.0.0.1:5555" or "ipc://abc") std::string GetAddress() const; + /// Get channel property (custom property) + /// @return Returns property value + std::string GetProperty() const; /// Get socket send buffer size (in number of messages) /// @return Returns socket send buffer size (in number of messages) int GetSndBufSize() const; @@ -84,6 +95,9 @@ class FairMQChannel /// Set socket address /// @param address Socket address (e.g. "tcp://127.0.0.1:5555" or "ipc://abc") void UpdateAddress(const std::string& address); + /// Set custom channel property + /// @param property Channel property + void UpdateProperty(const std::string& property); /// Set socket send buffer size /// @param sndBufSize Socket send buffer size (in number of messages) void UpdateSndBufSize(const int sndBufSize); @@ -257,6 +271,7 @@ class FairMQChannel std::string fType; std::string fMethod; std::string fAddress; + std::string fProperty; int fSndBufSize; int fRcvBufSize; int fSndKernelSize; diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index d8af1dd8..e9e88bea 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -26,7 +26,9 @@ #include "FairMQSocket.h" #include "FairMQDevice.h" #include "FairMQLogger.h" +#include "FairMQTools.h" +#include "FairMQProgOptions.h" #include "FairMQTransportFactoryZMQ.h" #ifdef NANOMSG_FOUND #include "FairMQTransportFactoryNN.h" @@ -44,13 +46,15 @@ static void CallSignalHandler(int signal) FairMQDevice::FairMQDevice() : fChannels() , fId() - , fMaxInitializationTime(120) + , fMaxInitializationAttempts(120) , fNumIoThreads(1) , fPortRangeMin(22000) , fPortRangeMax(32000) , fLogIntervalInMs(1000) , fCmdSocket(nullptr) , fTransportFactory(nullptr) + , fConfig(nullptr) + , fNetworkInterface() , fInitialValidationFinished(false) , fInitialValidationCondition() , fInitialValidationMutex() @@ -88,6 +92,58 @@ void FairMQDevice::SignalHandler(int signal) exit(EXIT_FAILURE); } +void FairMQDevice::ConnectChannels(list& chans) +{ + auto itr = chans.begin(); + + while (itr != chans.end()) + { + if ((*itr)->ValidateChannel()) + { + if (ConnectChannel(**itr)) + { + (*itr)->InitCommandInterface(fTransportFactory, fNumIoThreads); + chans.erase(itr++); + } + else + { + LOG(ERROR) << "failed to connect channel " << (*itr)->fChannelName; + ++itr; + } + } + else + { + ++itr; + } + } +} + +void FairMQDevice::BindChannels(list& chans) +{ + auto itr = chans.begin(); + + while (itr != chans.end()) + { + if ((*itr)->ValidateChannel()) + { + if (BindChannel(**itr)) + { + (*itr)->InitCommandInterface(fTransportFactory, fNumIoThreads); + chans.erase(itr++); + } + else + { + LOG(ERROR) << "failed to bind channel " << (*itr)->fChannelName; + ++itr; + } + } + else + { + ++itr; + } + } +} + void FairMQDevice::InitWrapper() { if (!fTransportFactory) @@ -102,58 +158,65 @@ void FairMQDevice::InitWrapper() fCmdSocket->Bind("inproc://commands"); } - // List to store the uninitialized channels. - list uninitializedChannels; + // Containers to store the uninitialized channels. + list uninitializedBindingChannels; + list uninitializedConnectingChannels; + + // Fill the uninitialized channel containers for (auto mi = fChannels.begin(); mi != fChannels.end(); ++mi) { for (auto vi = (mi->second).begin(); vi != (mi->second).end(); ++vi) { - // set channel name: name + vector index - stringstream ss; - ss << mi->first << "[" << vi - (mi->second).begin() << "]"; - vi->fChannelName = ss.str(); - // fill the uninitialized list - uninitializedChannels.push_back(&(*vi)); + if (vi->fMethod == "bind") + { + // set channel name: name + vector index + stringstream ss; + ss << mi->first << "[" << vi - (mi->second).begin() << "]"; + vi->fChannelName = ss.str(); + // if binding address is not specified, set it up to try getting it from the configured network interface + if (vi->fAddress == "unspecified" || vi->fAddress == "") + { + vi->fAddress = "tcp://" + FairMQ::tools::getInterfaceIP(fNetworkInterface) + ":1"; + } + // fill the uninitialized list + uninitializedBindingChannels.push_back(&(*vi)); + } + else if (vi->fMethod == "connect") + { + // set channel name: name + vector index + stringstream ss; + ss << mi->first << "[" << vi - (mi->second).begin() << "]"; + vi->fChannelName = ss.str(); + // fill the uninitialized list + uninitializedConnectingChannels.push_back(&(*vi)); + } + else + { + LOG(ERROR) << "Cannot update configuration. Socket method (bind/connect) not specified."; + exit(EXIT_FAILURE); + } } } + // Bind channels. Here one run is enough, because bind settings should be available locally + // If necessary this could be handled in the same way as the connecting channels + BindChannels(uninitializedBindingChannels); + // notify parent thread about completion of first validation. + { + lock_guard lock(fInitialValidationMutex); + fInitialValidationFinished = true; + fInitialValidationCondition.notify_one(); + } + // go over the list of channels until all are initialized (and removed from the uninitialized list) int numAttempts = 0; - int maxAttempts = fMaxInitializationTime; - do + int maxAttempts = fMaxInitializationAttempts; + while (!uninitializedConnectingChannels.empty()) { - auto itr = uninitializedChannels.begin(); - - while (itr != uninitializedChannels.end()) + ConnectChannels(uninitializedConnectingChannels); + if (++numAttempts > maxAttempts) { - if ((*itr)->ValidateChannel()) - { - if (InitChannel(*(*itr))) - { - (*itr)->InitCommandInterface(fTransportFactory, fNumIoThreads); - uninitializedChannels.erase(itr++); - } - else - { - LOG(ERROR) << "failed to initialize channel " << (*itr)->fChannelName; - ++itr; - } - } - else - { - ++itr; - } - } - - // notify parent thread about completion of first validation. - boost::lock_guard lock(fInitialValidationMutex); - fInitialValidationFinished = true; - fInitialValidationCondition.notify_one(); - - ++numAttempts; - if (numAttempts > maxAttempts) - { - LOG(ERROR) << "could not initialize all channels after " << maxAttempts << " attempts"; + LOG(ERROR) << "could not connect all channels after " << maxAttempts << " attempts"; // TODO: goto ERROR state; exit(EXIT_FAILURE); } @@ -162,7 +225,7 @@ void FairMQDevice::InitWrapper() { boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); } - } while (!uninitializedChannels.empty()); + } Init(); @@ -176,18 +239,15 @@ void FairMQDevice::InitWrapper() void FairMQDevice::WaitForInitialValidation() { - boost::unique_lock lock(fInitialValidationMutex); - while (!fInitialValidationFinished) - { - fInitialValidationCondition.wait(lock); - } + unique_lock lock(fInitialValidationMutex); + fInitialValidationCondition.wait(lock, [&] () { return fInitialValidationFinished; }); } void FairMQDevice::Init() { } -bool FairMQDevice::InitChannel(FairMQChannel& ch) +bool FairMQDevice::BindChannel(FairMQChannel& ch) { LOG(DEBUG) << "Initializing channel " << ch.fChannelName << " (" << ch.fType << ")"; // initialize the socket @@ -196,51 +256,105 @@ bool FairMQDevice::InitChannel(FairMQChannel& ch) ch.fSocket->SetOption("snd-hwm", &(ch.fSndBufSize), sizeof(ch.fSndBufSize)); ch.fSocket->SetOption("rcv-hwm", &(ch.fRcvBufSize), sizeof(ch.fRcvBufSize)); - // TODO: make it work with ipc + // number of attempts when choosing a random port + int maxAttempts = 1000; + int numAttempts = 0; - if (ch.fMethod == "bind") + // initialize random generator + boost::random::mt19937 gen(getpid()); + boost::random::uniform_int_distribution<> randomPort(fPortRangeMin, fPortRangeMax); + + LOG(DEBUG) << "Binding channel " << ch.fChannelName << " on " << ch.fAddress; + + // try to bind to the saved port. In case of failure, try random one. + while (!ch.fSocket->Bind(ch.fAddress)) { - // number of attempts when choosing a random port - int maxAttempts = 1000; - int numAttempts = 0; + LOG(DEBUG) << "Could not bind to configured (TCP) port, trying random port in range " << fPortRangeMin << "-" << fPortRangeMax; + ++numAttempts; - // initialize random generator - boost::random::mt19937 gen(getpid()); - boost::random::uniform_int_distribution<> randomPort(fPortRangeMin, fPortRangeMax); + if (numAttempts > maxAttempts) + { + LOG(ERROR) << "could not bind to any (TCP) port in the given range after " << maxAttempts << " attempts"; + return false; + } + + size_t pos = ch.fAddress.rfind(":"); + stringstream newPort; + newPort << static_cast(randomPort(gen)); + ch.fAddress = ch.fAddress.substr(0, pos + 1) + newPort.str(); LOG(DEBUG) << "Binding channel " << ch.fChannelName << " on " << ch.fAddress; - - // try to bind to the saved port. In case of failure, try random one. - if (!ch.fSocket->Bind(ch.fAddress)) - { - LOG(DEBUG) << "Could not bind to configured port, trying random port in range " << fPortRangeMin << "-" << fPortRangeMax; - do { - ++numAttempts; - - if (numAttempts > maxAttempts) - { - LOG(ERROR) << "could not bind to any port in the given range after " << maxAttempts << " attempts"; - return false; - } - - size_t pos = ch.fAddress.rfind(":"); - stringstream newPort; - newPort << static_cast(randomPort(gen)); - ch.fAddress = ch.fAddress.substr(0, pos + 1) + newPort.str(); - - LOG(DEBUG) << "Binding channel " << ch.fChannelName << " on " << ch.fAddress; - } while (!ch.fSocket->Bind(ch.fAddress)); - } - } - else - { - LOG(DEBUG) << "Connecting channel " << ch.fChannelName << " to " << ch.fAddress; - ch.fSocket->Connect(ch.fAddress); } return true; } +bool FairMQDevice::ConnectChannel(FairMQChannel& ch) +{ + LOG(DEBUG) << "Initializing channel " << ch.fChannelName << " (" << ch.fType << ")"; + // initialize the socket + ch.fSocket = fTransportFactory->CreateSocket(ch.fType, ch.fChannelName, fNumIoThreads, fId); + // set high water marks + ch.fSocket->SetOption("snd-hwm", &(ch.fSndBufSize), sizeof(ch.fSndBufSize)); + ch.fSocket->SetOption("rcv-hwm", &(ch.fRcvBufSize), sizeof(ch.fRcvBufSize)); + // connect + LOG(DEBUG) << "Connecting channel " << ch.fChannelName << " to " << ch.fAddress; + ch.fSocket->Connect(ch.fAddress); + return true; +} + +// bool FairMQDevice::InitChannel(FairMQChannel& ch) +// { +// LOG(DEBUG) << "Initializing channel " << ch.fChannelName << " (" << ch.fType << ")"; +// // initialize the socket +// ch.fSocket = fTransportFactory->CreateSocket(ch.fType, ch.fChannelName, fNumIoThreads, fId); +// // set high water marks +// ch.fSocket->SetOption("snd-hwm", &(ch.fSndBufSize), sizeof(ch.fSndBufSize)); +// ch.fSocket->SetOption("rcv-hwm", &(ch.fRcvBufSize), sizeof(ch.fRcvBufSize)); + +// if (ch.fMethod == "bind") +// { +// // number of attempts when choosing a random port +// int maxAttempts = 1000; +// int numAttempts = 0; + +// // initialize random generator +// boost::random::mt19937 gen(getpid()); +// boost::random::uniform_int_distribution<> randomPort(fPortRangeMin, fPortRangeMax); + +// LOG(DEBUG) << "Binding channel " << ch.fChannelName << " on " << ch.fAddress; + +// // try to bind to the saved port. In case of failure, try random one. +// if (!ch.fSocket->Bind(ch.fAddress)) +// { +// LOG(DEBUG) << "Could not bind to configured port, trying random port in range " << fPortRangeMin << "-" << fPortRangeMax; +// do { +// ++numAttempts; + +// if (numAttempts > maxAttempts) +// { +// LOG(ERROR) << "could not bind to any port in the given range after " << maxAttempts << " attempts"; +// return false; +// } + +// size_t pos = ch.fAddress.rfind(":"); +// stringstream newPort; +// newPort << static_cast(randomPort(gen)); +// ch.fAddress = ch.fAddress.substr(0, pos + 1) + newPort.str(); + +// LOG(DEBUG) << "Binding channel " << ch.fChannelName << " on " << ch.fAddress; +// } while (!ch.fSocket->Bind(ch.fAddress)); +// } +// } +// else +// { +// LOG(DEBUG) << "Connecting channel " << ch.fChannelName << " to " << ch.fAddress; +// ch.fSocket->Connect(ch.fAddress); +// } + +// return true; +// } + void FairMQDevice::InitTaskWrapper() { InitTask(); @@ -365,6 +479,9 @@ void FairMQDevice::SetProperty(const int key, const string& value) case Id: fId = value; break; + case NetworkInterface: + fNetworkInterface = value; + break; default: FairMQConfigurable::SetProperty(key, value); break; @@ -379,8 +496,8 @@ void FairMQDevice::SetProperty(const int key, const int value) case NumIoThreads: fNumIoThreads = value; break; - case MaxInitializationTime: - fMaxInitializationTime = value; + case MaxInitializationAttempts: + fMaxInitializationAttempts = value; break; case PortRangeMin: fPortRangeMin = value; @@ -404,6 +521,8 @@ string FairMQDevice::GetProperty(const int key, const string& default_ /*= ""*/) { case Id: return fId; + case NetworkInterface: + return fNetworkInterface; default: return FairMQConfigurable::GetProperty(key, default_); } @@ -417,14 +536,16 @@ string FairMQDevice::GetPropertyDescription(const int key) return "Id: Device ID"; case NumIoThreads: return "NumIoThreads: Number of I/O Threads (size of the 0MQ thread pool to handle I/O operations. If your application is using only the inproc transport for messaging you may set this to zero, otherwise set it to at least one.)"; - case MaxInitializationTime: - return "MaxInitializationTime: Timeout for retrying validation and initialization of the channels."; + case MaxInitializationAttempts: + return "MaxInitializationAttempts: Maximum number of validation and initialization attempts of the channels."; case PortRangeMin: return "PortRangeMin: Minumum value for the port range (when binding to dynamic port)."; case PortRangeMax: return "PortRangeMax: Maximum value for the port range (when binding to dynamic port)."; case LogIntervalInMs: return "LogIntervalInMs: Time between socket rates logging outputs."; + case NetworkInterface: + return "NetworkInterface: Network interface to use for dynamic binding."; default: return FairMQConfigurable::GetPropertyDescription(key); } @@ -447,8 +568,8 @@ int FairMQDevice::GetProperty(const int key, const int default_ /*= 0*/) { case NumIoThreads: return fNumIoThreads; - case MaxInitializationTime: - return fMaxInitializationTime; + case MaxInitializationAttempts: + return fMaxInitializationAttempts; case PortRangeMin: return fPortRangeMin; case PortRangeMax: @@ -491,6 +612,16 @@ void FairMQDevice::SetTransport(const string& transport) } } +void FairMQDevice::SetConfig(FairMQProgOptions& config) +{ + fConfig = &config; + fChannels = config.GetFairMQMap(); + SetTransport(config.GetValue("transport")); + fId = config.GetValue("id"); + fNetworkInterface = config.GetValue("network-interface"); + fNumIoThreads = config.GetValue("io-threads"); +} + void FairMQDevice::LogSocketRates() { timestamp_t t0; diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index 6b429771..838797f3 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -21,14 +21,20 @@ #include #include +#include +#include + #include "FairMQConfigurable.h" #include "FairMQStateMachine.h" #include "FairMQTransportFactory.h" + #include "FairMQSocket.h" #include "FairMQChannel.h" #include "FairMQMessage.h" #include "FairMQParts.h" +class FairMQProgOptions; + class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable { friend class FairMQChannel; @@ -37,11 +43,12 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable enum { Id = FairMQConfigurable::Last, ///< Device ID - MaxInitializationTime, ///< Timeout for the initialization + MaxInitializationAttempts, ///< Timeout for the initialization NumIoThreads, ///< Number of ZeroMQ I/O threads PortRangeMin, ///< Minimum value for the port range (if dynamic) PortRangeMax, ///< Maximum value for the port range (if dynamic) LogIntervalInMs, ///< Interval for logging the socket transfer rates + NetworkInterface, ///< Network interface to use for dynamic binding Last }; @@ -241,17 +248,21 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable /// @param transport Transport string ("zeromq"/"nanomsg") void SetTransport(const std::string& transport = "zeromq"); + void SetConfig(FairMQProgOptions& config); + /// Implements the sort algorithm used in SortChannel() /// @param lhs Right hand side value for comparison /// @param rhs Left hand side value for comparison static bool SortSocketsByAddress(const FairMQChannel &lhs, const FairMQChannel &rhs); + // TODO: make this const? std::unordered_map> fChannels; ///< Device channels protected: std::string fId; ///< Device ID + std::string fNetworkInterface; ///< Network interface to use for dynamic binding - int fMaxInitializationTime; ///< Timeout for the initialization + int fMaxInitializationAttempts; ///< Timeout for the initialization int fNumIoThreads; ///< Number of ZeroMQ I/O threads @@ -263,6 +274,7 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable FairMQSocket* fCmdSocket; ///< Socket used for the internal unblocking mechanism FairMQTransportFactory* fTransportFactory; ///< Transport factory + FairMQProgOptions* fConfig; ///< Program options configuration /// Additional user initialization (can be overloaded in child classes). Prefer to use InitTask(). virtual void Init(); @@ -285,8 +297,8 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable private: // condition variable to notify parent thread about end of initial validation. bool fInitialValidationFinished; - boost::condition_variable fInitialValidationCondition; - boost::mutex fInitialValidationMutex; + std::condition_variable fInitialValidationCondition; + std::mutex fInitialValidationMutex; /// Handles the initialization and the Init() method void InitWrapper(); @@ -306,8 +318,14 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable /// Unblocks blocking channel send/receive calls void Unblock(); - /// Initializes a single channel (used in InitWrapper) - bool InitChannel(FairMQChannel&); + /// Binds channel in the list + void BindChannels(std::list& chans); + /// Connects channel in the list + void ConnectChannels(std::list& chans); + /// Binds a single channel (used in InitWrapper) + bool BindChannel(FairMQChannel& ch); + /// Connects a single channel (used in InitWrapper) + bool ConnectChannel(FairMQChannel& ch); /// Signal handler void SignalHandler(int signal); diff --git a/fairmq/deployment/CMakeLists.txt b/fairmq/deployment/CMakeLists.txt new file mode 100644 index 00000000..6a149602 --- /dev/null +++ b/fairmq/deployment/CMakeLists.txt @@ -0,0 +1,62 @@ + ################################################################################ + # Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH # + # # + # This software is distributed under the terms of the # + # GNU Lesser General Public Licence version 3 (LGPL) version 3, # + # copied verbatim in the file "LICENSE" # + ################################################################################ + +Set(INCLUDE_DIRECTORIES + ${CMAKE_SOURCE_DIR}/fairmq + ${CMAKE_SOURCE_DIR}/fairmq/zeromq + ${CMAKE_SOURCE_DIR}/fairmq/nanomsg + ${CMAKE_SOURCE_DIR}/fairmq/devices + ${CMAKE_SOURCE_DIR}/fairmq/tools + ${CMAKE_SOURCE_DIR}/fairmq/options + ${CMAKE_SOURCE_DIR}/fairmq/deployment + ${CMAKE_CURRENT_BINARY_DIR} +) + +Set(SYSTEM_INCLUDE_DIRECTORIES + ${SYSTEM_INCLUDE_DIRECTORIES} + ${Boost_INCLUDE_DIR} + ${DDS_INCLUDE_DIR} + ${ZMQ_INCLUDE_DIR} +) + +Include_Directories(${INCLUDE_DIRECTORIES}) +Include_Directories(SYSTEM ${SYSTEM_INCLUDE_DIRECTORIES}) + +Set(LINK_DIRECTORIES + ${LINK_DIRECTORIES} + ${Boost_LIBRARY_DIRS} + ${DDS_LIBRARY_DIR} +) + +Link_Directories(${LINK_DIRECTORIES}) + +Install(FILES FairMQDDSTools.h DESTINATION include) + +Set(Exe_Names + ${Exe_Names} + fairmq-dds-command-ui +) + +Set(Exe_Source + ${Exe_Source} + runDDSCommandUI.cxx +) + +list(LENGTH Exe_Names _length) +math(EXPR _length ${_length}-1) + +ForEach(_file RANGE 0 ${_length}) + list(GET Exe_Names ${_file} _name) + list(GET Exe_Source ${_file} _src) + set(EXE_NAME ${_name}) + set(SRCS ${_src}) + set(DEPENDENCIES FairMQ dds_intercom_lib) + GENERATE_EXECUTABLE() +EndForEach(_file RANGE 0 ${_length}) + + diff --git a/fairmq/deployment/FairMQDDSTools.h b/fairmq/deployment/FairMQDDSTools.h new file mode 100644 index 00000000..2246358b --- /dev/null +++ b/fairmq/deployment/FairMQDDSTools.h @@ -0,0 +1,197 @@ +#ifndef FAIRMQDDSTOOLS_H_ +#define FAIRMQDDSTOOLS_H_ + +#include "FairMQLogger.h" +#include "FairMQDevice.h" +#include "FairMQChannel.h" + +#include +#include +#include +#include +#include +#include +#include + +#include "dds_intercom.h" // DDS + +using namespace std; +using namespace dds::intercom_api; + +// container to hold channel config and corresponding dds key values +struct DDSConfig +{ + // container of sub channels, e.g. 'i' in data[i] + vector subChannels; + // dds values for the channel + CKeyValue::valuesMap_t ddsValues; +}; + +/// Handles channels addresses of the device with configuration from DDS +/// Addresses of binding channels are published via DDS using channels names as keys +/// Addresses of connecting channels are collected from DDS using channels names as keys +/// \param device Reference to FairMQDevice whose channels to handle +void HandleConfigViaDDS(FairMQDevice& device) +{ + // container for binding channels + vector bindingChans; + // container for connecting channels + map connectingChans; + + // fill the containers + for (auto& mi : device.fChannels) + { + if ((mi.second).at(0).GetMethod() == "bind") + { + for (auto& vi : mi.second) + { + bindingChans.push_back(&vi); + } + } + else if ((mi.second).at(0).GetMethod() == "connect") + { + // try some trickery with forwarding emplacing values into map + connectingChans.emplace(piecewise_construct, forward_as_tuple(mi.first), forward_as_tuple()); + for (auto& vi : mi.second) + { + connectingChans.at(mi.first).subChannels.push_back(&vi); + } + } + else + { + LOG(ERROR) << "Cannot update address configuration. Socket method (bind/connect) not specified."; + return; + } + } + + // Wait for the binding channels to bind + device.WaitForInitialValidation(); + + // DDS key value store + CKeyValue ddsKV; + + // publish bound addresses via DDS at keys corresponding to the channel prefixes, e.g. 'data' in data[i] + for (const auto& i : bindingChans) + { + // LOG(INFO) << "Publishing " << i->GetChannelPrefix() << " address to DDS under '" << i->GetProperty() << "' property name."; + ddsKV.putValue(i->GetProperty(), i->GetAddress()); + } + + // receive connect addresses from DDS via keys corresponding to channel prefixes, e.g. 'data' in data[i] + if (connectingChans.size() > 0) + { + mutex keyMutex; + condition_variable keyCV; + + LOG(DEBUG) << "Subscribing for DDS properties."; + ddsKV.subscribe([&] (const string& /*key*/, const string& /*value*/) + { + keyCV.notify_all(); + }); + + // scope based locking + { + unique_lock lock(keyMutex); + keyCV.wait_for(lock, chrono::milliseconds(1000), [&] () + { + // receive new properties + for (auto& mi : connectingChans) + { + for (auto& vi : mi.second.subChannels) + { + // LOG(INFO) << "Waiting for " << vi->GetChannelPrefix() << " address from DDS."; + ddsKV.getValues(vi->GetProperty(), &(mi.second.ddsValues)); + } + } + + // update channels and remove them from unfinished container + for (auto mi = connectingChans.begin(); mi != connectingChans.end(); /* no increment */) + { + if (mi->second.subChannels.size() == mi->second.ddsValues.size()) + { + auto it = mi->second.ddsValues.begin(); + for (int i = 0; i < mi->second.subChannels.size(); ++i) + { + mi->second.subChannels.at(i)->UpdateAddress(it->second); + ++it; + } + // when multiple subChannels are used, their order on every device should be the same, irregardless of arrival order from DDS. + device.SortChannel(mi->first); + connectingChans.erase(mi++); + } + else + { + ++mi; + } + } + + if (connectingChans.empty()) + { + LOG(DEBUG) << "Successfully received all required DDS properties!"; + } + return connectingChans.empty(); + }); + } + } +} + +/// Controls device state via DDS custom commands interface +/// \param device Reference to FairMQDevice whose state to control +void runDDSStateHandler(FairMQDevice& device) +{ + mutex mtx; + condition_variable stopCondition; + + string id = device.GetProperty(FairMQDevice::Id, ""); + string pid(to_string(getpid())); + + try + { + const set events = { "INIT_DEVICE", "INIT_TASK", "PAUSE", "RUN", "STOP", "RESET_TASK", "RESET_DEVICE" }; + + CCustomCmd ddsCustomCmd; + + ddsCustomCmd.subscribe([&](const string& cmd, const string& cond, uint64_t senderId) + { + LOG(INFO) << "Received command: " << cmd; + + if (cmd == "check-state") + { + ddsCustomCmd.send(id + ": " + device.GetCurrentStateName() + " (pid: " + pid + ")", to_string(senderId)); + } + else if (events.find(cmd) != events.end()) + { + ddsCustomCmd.send(id + ": attempting to " + cmd, to_string(senderId)); + device.ChangeState(cmd); + } + else if (cmd == "END") + { + ddsCustomCmd.send(id + ": attempting to " + cmd, to_string(senderId)); + device.ChangeState(cmd); + ddsCustomCmd.send(id + ": " + device.GetCurrentStateName(), to_string(senderId)); + if (device.GetCurrentStateName() == "EXITING") + { + unique_lock lock(mtx); + stopCondition.notify_one(); + } + } + else + { + LOG(WARN) << "Unknown command: " << cmd; + LOG(WARN) << "Origin: " << senderId; + LOG(WARN) << "Destination: " << cond; + } + }); + + LOG(INFO) << "Listening for commands from DDS..."; + unique_lock lock(mtx); + stopCondition.wait(lock); + } + catch (exception& e) + { + cerr << "Error: " << e.what() << endl; + return; + } +} + +#endif /* FAIRMQDDSTOOLS_H_ */ diff --git a/fairmq/deployment/runDDSCommandUI.cxx b/fairmq/deployment/runDDSCommandUI.cxx new file mode 100644 index 00000000..c3f56bdc --- /dev/null +++ b/fairmq/deployment/runDDSCommandUI.cxx @@ -0,0 +1,113 @@ +#include "dds_intercom.h" + +#include // raw mode console input + +#include +#include +#include +#include +#include + +using namespace std; +using namespace dds::intercom_api; + +void PrintControlsHelp() +{ + cout << "Use keys to control the devices:" << endl; + cout << "[c] check states, [h] help, [p] pause, [r] run, [s] stop, [t] reset task, [d] reset device, [q] end, [j] init task, [i] init device" << endl; + cout << "To quit press Ctrl+C" << endl; +} + +int main(int argc, char* argv[]) +{ + try + { + CCustomCmd ddsCustomCmd; + + // subscribe to receive messages from DDS + ddsCustomCmd.subscribe([](const string& msg, const string& condition, uint64_t senderId) + { + cout << "Received: \"" << msg << "\"" << endl; + }); + + char c; + + // setup reading from cin (enable raw mode) + struct termios t; + tcgetattr(STDIN_FILENO, &t); // get the current terminal I/O structure + t.c_lflag &= ~ICANON; // disable canonical input + tcsetattr(STDIN_FILENO, TCSANOW, &t); // apply the new settings + + PrintControlsHelp(); + + while (cin >> c) + { + int result = 0; // result of the dds send + + switch (c) + { + case 'c': + cout << " > checking state of the devices" << endl; + result = ddsCustomCmd.send("check-state", ""); + break; + case 'i': + cout << " > init devices" << endl; + result = ddsCustomCmd.send("INIT_DEVICE", ""); + break; + case 'j': + cout << " > init tasks" << endl; + result = ddsCustomCmd.send("INIT_TASK", ""); + break; + case 'p': + cout << " > pause devices" << endl; + result = ddsCustomCmd.send("PAUSE", ""); + break; + case 'r': + cout << " > run tasks" << endl; + result = ddsCustomCmd.send("RUN", ""); + break; + case 's': + cout << " > stop devices" << endl; + result = ddsCustomCmd.send("STOP", ""); + break; + case 't': + cout << " > reset tasks" << endl; + result = ddsCustomCmd.send("RESET_TASK", ""); + break; + case 'd': + cout << " > reset devices" << endl; + result = ddsCustomCmd.send("RESET_DEVICE", ""); + break; + case 'h': + cout << " > help" << endl; + PrintControlsHelp(); + break; + case 'q': + cout << " > end" << endl; + result = ddsCustomCmd.send("END", ""); + break; + default: + cout << "Invalid input: [" << c << "]" << endl; + PrintControlsHelp(); + break; + } + + if (result == 1) + { + cerr << "Error sending custom command" << endl; + } + } + + // disable raw mode + tcgetattr(STDIN_FILENO, &t); // get the current terminal I/O structure + t.c_lflag |= ICANON; // re-enable canonical input + tcsetattr(STDIN_FILENO, TCSANOW, &t); // apply the new settings + } + catch (exception& e) + { + cerr << "Error: " << e.what() << endl; + return EXIT_FAILURE; + } + + return EXIT_SUCCESS; +} diff --git a/fairmq/logger/logger.cxx b/fairmq/logger/logger.cxx index 7a696878..631d8cb6 100644 --- a/fairmq/logger/logger.cxx +++ b/fairmq/logger/logger.cxx @@ -27,17 +27,13 @@ typedef boost::null_deleter empty_deleter_t; #include #include - - namespace logging = boost::log; namespace src = boost::log::sources; namespace expr = boost::log::expressions; namespace sinks = boost::log::sinks; namespace attrs = boost::log::attributes; - - -BOOST_LOG_GLOBAL_LOGGER_INIT(global_logger, src::severity_logger_mt) +BOOST_LOG_GLOBAL_LOGGER_INIT(global_logger, src::severity_logger_mt) { src::severity_logger_mt global_logger; global_logger.add_attribute("TimeStamp", attrs::local_clock()); @@ -50,68 +46,72 @@ void init_log_console(bool color_format) // add a text sink typedef sinks::synchronous_sink text_sink; logging::core::get()->remove_all_sinks(); - + // CONSOLE - all severity except error boost::shared_ptr sink = boost::make_shared(); // add "console" output stream to our sink - sink->locked_backend()->add_stream(boost::shared_ptr(&std::cout, empty_deleter_t())); - + sink->locked_backend()->add_stream(boost::shared_ptr(&std::clog, empty_deleter_t())); + // specify the format of the log message - if(color_format) + if (color_format) + { sink->set_formatter(&init_log_formatter); + } else + { sink->set_formatter(&init_log_formatter); - + } + sink->set_filter(severity != SEVERITY_ERROR && severity < SEVERITY_NOLOG); // add sink to the core logging::core::get()->add_sink(sink); - - + // CONSOLE - only severity error boost::shared_ptr sink_error = boost::make_shared(); sink_error->locked_backend()->add_stream(boost::shared_ptr(&std::cerr, empty_deleter_t())); - - if(color_format) + + if (color_format) + { sink_error->set_formatter(&init_log_formatter); + } else + { sink_error->set_formatter(&init_log_formatter); - + } + sink_error->set_filter(severity == SEVERITY_ERROR); logging::core::get()->add_sink(sink_error); } void reinit_logger(bool color_format) { - LOG(NOLOG)<<""; + LOG(NOLOG) << ""; logging::core::get()->remove_all_sinks(); init_log_console(color_format); } - void init_log_file(const std::string& filename, custom_severity_level threshold, log_op::operation op, const std::string& id) { // add a text sink std::string formatted_filename(filename); - formatted_filename+=id; - formatted_filename+="_%Y-%m-%d_%H-%M-%S.%N.log"; - boost::shared_ptr< sinks::text_file_backend > backend = - boost::make_shared< sinks::text_file_backend > - ( - boost::log::keywords::file_name = formatted_filename, - boost::log::keywords::rotation_size = 10 * 1024 * 1024, - // rotate at midnight every day - boost::log::keywords::time_based_rotation = sinks::file::rotation_at_time_point(0, 0, 0), - // log collector, - // -- maximum total size of the stored log files is 1GB. - // -- minimum free space on the drive is 2GB - boost::log::keywords::max_size = 1000 * 1024 * 1024, - boost::log::keywords::min_free_space = 2000 * 1024 * 1024, - boost::log::keywords::auto_flush = true - //keywords::time_based_rotation = &is_it_time_to_rotate + formatted_filename += id; + formatted_filename += "_%Y-%m-%d_%H-%M-%S.%N.log"; + boost::shared_ptr backend = boost::make_shared( + boost::log::keywords::file_name = formatted_filename, + boost::log::keywords::rotation_size = 10 * 1024 * 1024, + // rotate at midnight every day + boost::log::keywords::time_based_rotation = sinks::file::rotation_at_time_point(0, 0, 0), + // log collector, + // -- maximum total size of the stored log files is 1GB. + // -- minimum free space on the drive is 2GB + boost::log::keywords::max_size = 1000 * 1024 * 1024, + boost::log::keywords::min_free_space = 2000 * 1024 * 1024, + boost::log::keywords::auto_flush = true + //keywords::time_based_rotation = &is_it_time_to_rotate ); - typedef sinks::synchronous_sink< sinks::text_file_backend > sink_t; - boost::shared_ptr< sink_t > sink(new sink_t(backend)); - + typedef sinks::synchronous_sink sink_t; + boost::shared_ptr sink(new sink_t(backend)); + // specify the format of the log message sink->set_formatter(&init_log_formatter); @@ -120,95 +120,78 @@ void init_log_file(const std::string& filename, custom_severity_level threshold, case log_op::operation::EQUAL : sink->set_filter(severity == threshold); break; - case log_op::operation::GREATER_THAN : sink->set_filter(severity > threshold); break; - case log_op::operation::GREATER_EQ_THAN : sink->set_filter(severity >= threshold); break; - case log_op::operation::LESS_THAN : sink->set_filter(severity < threshold); break; - case log_op::operation::LESS_EQ_THAN : sink->set_filter(severity <= threshold); break; - default: break; } + logging::core::get()->add_sink(sink); } // temporary : to be replaced with c++11 lambda -void set_global_log_level(log_op::operation op, custom_severity_level threshold ) +void set_global_log_level(log_op::operation op, custom_severity_level threshold) { switch (threshold) { case custom_severity_level::TRACE : set_global_log_level_operation(op,custom_severity_level::TRACE); break; - case custom_severity_level::DEBUG : set_global_log_level_operation(op,custom_severity_level::DEBUG); break; - case custom_severity_level::RESULTS : set_global_log_level_operation(op,custom_severity_level::RESULTS); break; - case custom_severity_level::INFO : set_global_log_level_operation(op,custom_severity_level::INFO); break; - case custom_severity_level::WARN : set_global_log_level_operation(op,custom_severity_level::WARN); break; - case custom_severity_level::STATE : set_global_log_level_operation(op,custom_severity_level::STATE); break; - case custom_severity_level::ERROR : set_global_log_level_operation(op,custom_severity_level::ERROR); break; - case custom_severity_level::NOLOG : set_global_log_level_operation(op,custom_severity_level::NOLOG); break; - default: break; } } -void set_global_log_level_operation(log_op::operation op, custom_severity_level threshold ) +void set_global_log_level_operation(log_op::operation op, custom_severity_level threshold) { switch (op) { case log_op::operation::EQUAL : boost::log::core::get()->set_filter(severity == threshold); break; - case log_op::operation::GREATER_THAN : boost::log::core::get()->set_filter(severity > threshold); break; - case log_op::operation::GREATER_EQ_THAN : boost::log::core::get()->set_filter(severity >= threshold); break; - case log_op::operation::LESS_THAN : boost::log::core::get()->set_filter(severity < threshold); break; - case log_op::operation::LESS_EQ_THAN : boost::log::core::get()->set_filter(severity <= threshold); break; - default: break; } @@ -218,52 +201,45 @@ void init_new_file(const std::string& filename, custom_severity_level threshold, { // add a file text sink with filters but without any formatting std::string formatted_filename(filename); - formatted_filename+=".%N.txt"; - boost::shared_ptr< sinks::text_file_backend > backend = - boost::make_shared< sinks::text_file_backend > - ( - boost::log::keywords::file_name = formatted_filename, - boost::log::keywords::rotation_size = 10 * 1024 * 1024, - // rotate at midnight every day - boost::log::keywords::time_based_rotation = sinks::file::rotation_at_time_point(0, 0, 0), - // log collector, - // -- maximum total size of the stored log files is 1GB. - // -- minimum free space on the drive is 2GB - boost::log::keywords::max_size = 1000 * 1024 * 1024, - boost::log::keywords::min_free_space = 2000 * 1024 * 1024, - boost::log::keywords::auto_flush = true - //keywords::time_based_rotation = &is_it_time_to_rotate + formatted_filename += ".%N.txt"; + boost::shared_ptr backend = boost::make_shared( + boost::log::keywords::file_name = formatted_filename, + boost::log::keywords::rotation_size = 10 * 1024 * 1024, + // rotate at midnight every day + boost::log::keywords::time_based_rotation = sinks::file::rotation_at_time_point(0, 0, 0), + // log collector, + // -- maximum total size of the stored log files is 1GB. + // -- minimum free space on the drive is 2GB + boost::log::keywords::max_size = 1000 * 1024 * 1024, + boost::log::keywords::min_free_space = 2000 * 1024 * 1024, + boost::log::keywords::auto_flush = true + //keywords::time_based_rotation = &is_it_time_to_rotate ); typedef sinks::synchronous_sink< sinks::text_file_backend > sink_t; boost::shared_ptr< sink_t > sink(new sink_t(backend)); - //sink->set_formatter(&init_file_formatter); - + // sink->set_formatter(&init_file_formatter); + switch (op) { case log_op::operation::EQUAL : sink->set_filter(severity == threshold); break; - case log_op::operation::GREATER_THAN : sink->set_filter(severity > threshold); break; - case log_op::operation::GREATER_EQ_THAN : sink->set_filter(severity >= threshold); break; - case log_op::operation::LESS_THAN : sink->set_filter(severity < threshold); break; - case log_op::operation::LESS_EQ_THAN : sink->set_filter(severity <= threshold); break; - default: break; } + logging::core::get()->add_sink(sink); } - diff --git a/fairmq/logger/logger.h b/fairmq/logger/logger.h index ad090ff2..d9cb437f 100644 --- a/fairmq/logger/logger.h +++ b/fairmq/logger/logger.h @@ -12,8 +12,7 @@ * Created on August 21, 2015, 6:12 PM */ #ifndef LOGGER_H -#define LOGGER_H - +#define LOGGER_H #define BOOST_LOG_DYN_LINK 1 // necessary when linking the boost_log library dynamically #define FUSION_MAX_VECTOR_SIZE 20 @@ -39,10 +38,7 @@ // fairmq #include "logger_def.h" - - - - + // Note : the following types and values must be defined in the included logger_def.h : // 1- custom_severity_level // 2- SEVERITY_THRESHOLD @@ -62,26 +58,22 @@ namespace log_op }; } - // declaration of the init function for the global logger - -void init_log_console(bool color_format=true); +void init_log_console(bool color_format = true); void reinit_logger(bool color_format); -void init_log_file( const std::string& filename, - custom_severity_level threshold=SEVERITY_THRESHOLD, - log_op::operation=log_op::GREATER_EQ_THAN, - const std::string& id="" - ); +void init_log_file(const std::string& filename, + custom_severity_level threshold = SEVERITY_THRESHOLD, + log_op::operation = log_op::GREATER_EQ_THAN, + const std::string& id = "" + ); -void init_new_file( const std::string& filename, - custom_severity_level threshold, - log_op::operation op - ); +void init_new_file(const std::string& filename, + custom_severity_level threshold, + log_op::operation op + ); -void set_global_log_level( log_op::operation op=log_op::GREATER_EQ_THAN, - custom_severity_level threshold=SEVERITY_THRESHOLD ); -void set_global_log_level_operation( log_op::operation op=log_op::GREATER_EQ_THAN, - custom_severity_level threshold=SEVERITY_THRESHOLD ); +void set_global_log_level(log_op::operation op = log_op::GREATER_EQ_THAN, custom_severity_level threshold = SEVERITY_THRESHOLD); +void set_global_log_level_operation(log_op::operation op = log_op::GREATER_EQ_THAN, custom_severity_level threshold=SEVERITY_THRESHOLD); #if defined(__GNUC__) || defined(__GNUG__) #pragma GCC diagnostic push @@ -92,7 +84,7 @@ void set_global_log_level_operation( log_op::operation op=log_op::GREATER_EQ_TH BOOST_LOG_GLOBAL_LOGGER(global_logger, boost::log::sources::severity_logger_mt) BOOST_LOG_ATTRIBUTE_KEYWORD(fairmq_logger_timestamp, "TimeStamp", boost::posix_time::ptime) -BOOST_LOG_ATTRIBUTE_KEYWORD(severity, "Severity", custom_severity_level) +BOOST_LOG_ATTRIBUTE_KEYWORD(severity, "Severity", custom_severity_level) #if defined(__GNUC__) || defined(__GNUG__) #pragma GCC diagnostic pop @@ -101,29 +93,29 @@ BOOST_LOG_ATTRIBUTE_KEYWORD(severity, "Severity", custom_severity_level) template void init_log_formatter(const boost::log::record_view &view, boost::log::formatting_ostream &os) { - os << "[" ; - - if(std::is_same::value) - os<<"\033[01;36m"; - - auto date_time_formatter = - boost::log::expressions::stream - << boost::log::expressions::format_date_time< boost::posix_time::ptime >("TimeStamp", "%H:%M:%S"); + os << "["; + + if (std::is_same::value) + { + os << "\033[01;36m"; + } + + auto date_time_formatter = boost::log::expressions::stream << boost::log::expressions::format_date_time("TimeStamp", "%H:%M:%S"); date_time_formatter(view, os); - - if(std::is_same::value) - os<<"\033[0m"; - - os << "]" - << "[" - << view.attribute_values()["Severity"].extract() - << "] " - //<< " - " - << view.attribute_values()["Message"].extract(); + + if (std::is_same::value) + { + os << "\033[0m"; + } + + os << "]" + << "[" + << view.attribute_values()["Severity"].extract() + << "] " + //<< " - " + << view.attribute_values()["Message"].extract(); } - - // helper macros // global macros (core). Level filters are set globally here, that is to all register sinks @@ -134,7 +126,7 @@ void init_log_formatter(const boost::log::record_view &view, boost::log::formatt #else #define LOG(severity) BOOST_LOG_SEV(global_logger::get(),custom_severity_level::severity) #define MQLOG(severity) BOOST_LOG_SEV(global_logger::get(),custom_severity_level::severity) -#endif +#endif #define SET_LOG_LEVEL(loglevel) boost::log::core::get()->set_filter(severity >= custom_severity_level::loglevel); #define SET_LOG_FILTER(op,loglevel) set_global_log_level(log_op::op,custom_severity_level::loglevel) @@ -152,5 +144,4 @@ void init_log_formatter(const boost::log::record_view &view, boost::log::formatt // create new file without formatting #define INIT_NEW_FILE(filename,op,loglevel) init_new_file(filename,custom_severity_level::loglevel,log_op::op); - #endif diff --git a/fairmq/nanomsg/FairMQPollerNN.h b/fairmq/nanomsg/FairMQPollerNN.h index d4f52140..76c45a01 100644 --- a/fairmq/nanomsg/FairMQPollerNN.h +++ b/fairmq/nanomsg/FairMQPollerNN.h @@ -24,6 +24,7 @@ #include "FairMQTransportFactoryNN.h" class FairMQChannel; +struct nn_pollfd; class FairMQPollerNN : public FairMQPoller { diff --git a/fairmq/nanomsg/FairMQSocketNN.cxx b/fairmq/nanomsg/FairMQSocketNN.cxx index 1eaf4d89..1fcb6c23 100644 --- a/fairmq/nanomsg/FairMQSocketNN.cxx +++ b/fairmq/nanomsg/FairMQSocketNN.cxx @@ -16,6 +16,12 @@ #include "FairMQMessageNN.h" #include "FairMQLogger.h" +#include +#include +#include +#include +#include + #include #ifdef MSGPACK_FOUND #include diff --git a/fairmq/nanomsg/FairMQSocketNN.h b/fairmq/nanomsg/FairMQSocketNN.h index 8cc13575..b020d380 100644 --- a/fairmq/nanomsg/FairMQSocketNN.h +++ b/fairmq/nanomsg/FairMQSocketNN.h @@ -17,12 +17,6 @@ #include -#include -#include -#include -#include -#include - #include "FairMQSocket.h" class FairMQSocketNN : public FairMQSocket diff --git a/fairmq/options/FairMQParser.cxx b/fairmq/options/FairMQParser.cxx index 01819493..e588140c 100644 --- a/fairmq/options/FairMQParser.cxx +++ b/fairmq/options/FairMQParser.cxx @@ -24,31 +24,31 @@ namespace FairMQParser // TODO : add key-value map parameter for replacing/updating values from keys // function that convert property tree (given the xml or json structure) to FairMQMap -FairMQMap ptreeToMQMap(const boost::property_tree::ptree& pt, const string& deviceId, const string& rootNode, const string& formatFlag) +FairMQMap ptreeToMQMap(const boost::property_tree::ptree& pt, const string& id, const string& rootNode, const string& formatFlag) { // Create fair mq map FairMQMap channelMap; - helper::PrintDeviceList(pt.get_child(rootNode)); + // helper::PrintDeviceList(pt.get_child(rootNode)); // Extract value from boost::property_tree - helper::DeviceParser(pt.get_child(rootNode),channelMap,deviceId,formatFlag); + helper::DeviceParser(pt.get_child(rootNode), channelMap, id, formatFlag); if (channelMap.size() > 0) { - LOG(DEBUG) << "---- Channel-keys found are :"; + stringstream channelKeys; for (const auto& p : channelMap) { - LOG(DEBUG) << p.first; + channelKeys << "'" << p.first << "' "; } + LOG(DEBUG) << "---- Found following channel keys: " << channelKeys.str(); } else { - LOG(WARN) << "---- No channel-keys found for device-id " << deviceId; + LOG(WARN) << "---- No channel keys found for " << id; LOG(WARN) << "---- Check the JSON inputs and/or command line inputs"; } return channelMap; } - FairMQMap JSON::UserParser(const string& filename, const string& deviceId, const string& rootNode) { boost::property_tree::ptree pt; @@ -77,93 +77,119 @@ FairMQMap XML::UserParser(stringstream& input, const string& deviceId, const str return ptreeToMQMap(pt, deviceId, rootNode, "xml"); } - - - -// ///////////////////////////////////////////////////////////////////////////////////////// -// ----------------------------------------------------------------------------------------- namespace helper { - // ----------------------------------------------------------------------------------------- - void PrintDeviceList(const boost::property_tree::ptree& tree, const std::string& formatFlag) { string deviceIdKey; // do a first loop just to print the device-id in json input - for(const auto& p : tree) + for (const auto& p : tree) { if (p.first == "devices") { - for(const auto& q : p.second.get_child("")) + for (const auto& q : p.second.get_child("")) { - deviceIdKey = q.second.get("id"); - LOG(DEBUG) << "Found device id '" << deviceIdKey << "' in JSON input"; + string key = q.second.get("key", ""); + if (key != "") + { + deviceIdKey = key; + LOG(TRACE) << "Found config for device key '" << deviceIdKey << "' in JSON input"; + } + else + { + deviceIdKey = q.second.get("id"); + LOG(TRACE) << "Found config for device id '" << deviceIdKey << "' in JSON input"; + } } } - + if (p.first == "device") { //get id attribute to choose the device if (formatFlag == "xml") { deviceIdKey = p.second.get(".id"); - LOG(DEBUG) << "Found device id '" << deviceIdKey << "' in XML input"; + LOG(TRACE) << "Found config for '" << deviceIdKey << "' in XML input"; } if (formatFlag == "json") { - deviceIdKey = p.second.get("id"); - LOG(DEBUG) << "Found device id '"<< deviceIdKey << "' in JSON input"; + string key = p.second.get("key", ""); + if (key != "") + { + deviceIdKey = key; + LOG(TRACE) << "Found config for device key '" << deviceIdKey << "' in JSON input"; + } + else + { + deviceIdKey = p.second.get("id"); + LOG(TRACE) << "Found config for device id '" << deviceIdKey << "' in JSON input"; + } } } - } } - // ----------------------------------------------------------------------------------------- - void DeviceParser(const boost::property_tree::ptree& tree, FairMQMap& channelMap, const string& deviceId, const string& formatFlag) { string deviceIdKey; + + LOG(DEBUG) << "Looking for '" << deviceId << "' id/key in the provided config file..."; + // For each node in fairMQOptions - for(const auto& p0 : tree) + for (const auto& p0 : tree) { if (p0.first == "devices") { - for(const auto& p : p0.second) + for (const auto& p : p0.second) { - deviceIdKey = p.second.get("id"); - LOG(TRACE) << "Found device id '"<< deviceIdKey << "' in JSON input"; - + // check if key is provided, otherwise use id + string key = p.second.get("key", ""); + if (key != "") + { + deviceIdKey = key; + // LOG(DEBUG) << "Found config for device key '" << deviceIdKey << "' in JSON input"; + } + else + { + deviceIdKey = p.second.get("id"); + // LOG(DEBUG) << "Found config for device id '" << deviceIdKey << "' in JSON input"; + } + // if not correct device id, do not fill MQMap if (deviceId != deviceIdKey) { continue; } - // print if DEBUG log level set - stringstream deviceStream; - deviceStream << "[node = " << p.first << "] id = " << deviceIdKey; - LOG(DEBUG) << deviceStream.str(); - helper::ChannelParser(p.second,channelMap,formatFlag); - + LOG(DEBUG) << "[" << p0.first << "] " << deviceIdKey; + ChannelParser(p.second, channelMap, formatFlag); } } if (p0.first == "device") { - if (formatFlag == "xml") { deviceIdKey = p0.second.get(".id"); - LOG(DEBUG) << "Found device id '" << deviceIdKey << "' in XML input"; + LOG(DEBUG) << "Found config for '" << deviceIdKey << "' in XML input"; } if (formatFlag == "json") { - deviceIdKey = p0.second.get("id"); - LOG(DEBUG) << "Found device id '"<< deviceIdKey << "' in JSON input"; + // check if key is provided, otherwise use id + string key = p0.second.get("key", ""); + if (key != "") + { + deviceIdKey = key; + // LOG(DEBUG) << "Found config for device key '" << deviceIdKey << "' in JSON input"; + } + else + { + deviceIdKey = p0.second.get("id"); + // LOG(DEBUG) << "Found config for device id '" << deviceIdKey << "' in JSON input"; + } } // if not correct device id, do not fill MQMap @@ -172,47 +198,49 @@ namespace helper continue; } - + LOG(DEBUG) << "[" << p0.first << "] " << deviceIdKey; - // print if DEBUG log level set - stringstream deviceStream; - deviceStream << "[node = " << p0.first << "] id = " << deviceIdKey; - LOG(DEBUG) << deviceStream.str(); - helper::ChannelParser(p0.second,channelMap,formatFlag); + ChannelParser(p0.second, channelMap, formatFlag); } } } - // ----------------------------------------------------------------------------------------- - void ChannelParser(const boost::property_tree::ptree& tree, FairMQMap& channelMap, const string& formatFlag) { string channelKey; - for(const auto& p : tree) + + for (const auto& p : tree) { - if(p.first=="channels") + if (p.first == "channels") { - for(const auto& q : p.second) + for (const auto& q : p.second) { channelKey = q.second.get("name"); - - // print if DEBUG log level set - stringstream channelStream; - channelStream << "\t [node = " << p.first << "] name = " << channelKey; - LOG(DEBUG) << channelStream.str(); + + // try to get common properties to use for all subChannels + FairMQChannel commonChannel; + commonChannel.UpdateType(q.second.get("type", commonChannel.GetType())); + commonChannel.UpdateMethod(q.second.get("method", commonChannel.GetMethod())); + commonChannel.UpdateProperty(q.second.get("property", commonChannel.GetProperty())); + commonChannel.UpdateSndBufSize(q.second.get("sndBufSize", commonChannel.GetSndBufSize())); + commonChannel.UpdateRcvBufSize(q.second.get("rcvBufSize", commonChannel.GetRcvBufSize())); + commonChannel.UpdateRateLogging(q.second.get("rateLogging", commonChannel.GetRateLogging())); + + LOG(DEBUG) << "\t[" << p.first << "] " << channelKey; // temporary FairMQChannel container vector channelList; - helper::SocketParser(q.second.get_child(""),channelList); - - //fill mq map option + SocketParser(q.second.get_child(""), channelList, commonChannel); + channelMap.insert(make_pair(channelKey, move(channelList))); } } - if(p.first=="channel") + if (p.first == "channel") { - + // try to get common properties to use for all subChannels + FairMQChannel commonChannel; + // get name attribute to form key if (formatFlag == "xml") { @@ -222,87 +250,112 @@ namespace helper if (formatFlag == "json") { channelKey = p.second.get("name"); + + // try to get common properties to use for all subChannels + commonChannel.UpdateType(p.second.get("type", commonChannel.GetType())); + commonChannel.UpdateMethod(p.second.get("method", commonChannel.GetMethod())); + commonChannel.UpdateProperty(p.second.get("property", commonChannel.GetProperty())); + commonChannel.UpdateSndBufSize(p.second.get("sndBufSize", commonChannel.GetSndBufSize())); + commonChannel.UpdateRcvBufSize(p.second.get("rcvBufSize", commonChannel.GetRcvBufSize())); + commonChannel.UpdateRateLogging(p.second.get("rateLogging", commonChannel.GetRateLogging())); } - stringstream channelStream; - channelStream << "\t [node = " << p.first << "] name = " << channelKey; - LOG(DEBUG) << channelStream.str(); + LOG(DEBUG) << "\t[" << p.first << "] " << channelKey; // temporary FairMQChannel container vector channelList; - helper::SocketParser(p.second.get_child(""),channelList); - - //fill mq map option + SocketParser(p.second.get_child(""), channelList, commonChannel); + channelMap.insert(make_pair(channelKey, move(channelList))); } - } } - // ----------------------------------------------------------------------------------------- - - void SocketParser(const boost::property_tree::ptree& tree, vector& channelList) + void SocketParser(const boost::property_tree::ptree& tree, vector& channelList, const FairMQChannel& commonChannel) { // for each socket in channel int socketCounter = 0; - for (const auto& s : tree) + for (const auto& p : tree) { - if (s.first == "sockets") + if (p.first == "sockets") { - for (const auto& r : s.second) + for (const auto& q : p.second) { ++socketCounter; - FairMQChannel channel; + // create new channel and apply setting from the common channel + FairMQChannel channel(commonChannel); - // print if DEBUG log level set - stringstream socket; - socket << "\t \t [node = " << s.first << "] socket index = " << socketCounter; - LOG(DEBUG) << socket.str(); - LOG(DEBUG) << "\t \t \t type = " << r.second.get("type", channel.GetType()); - LOG(DEBUG) << "\t \t \t method = " << r.second.get("method", channel.GetMethod()); - LOG(DEBUG) << "\t \t \t address = " << r.second.get("address", channel.GetAddress()); - LOG(DEBUG) << "\t \t \t sndBufSize = " << r.second.get("sndBufSize", channel.GetSndBufSize()); - LOG(DEBUG) << "\t \t \t rcvBufSize = " << r.second.get("rcvBufSize", channel.GetRcvBufSize()); - LOG(DEBUG) << "\t \t \t rateLogging = " << r.second.get("rateLogging", channel.GetRateLogging()); + // if the socket field specifies or overrides something from the common channel, apply those settings + channel.UpdateType(q.second.get("type", channel.GetType())); + channel.UpdateMethod(q.second.get("method", channel.GetMethod())); + channel.UpdateAddress(q.second.get("address", channel.GetAddress())); + channel.UpdateProperty(q.second.get("property", channel.GetProperty())); + channel.UpdateSndBufSize(q.second.get("sndBufSize", channel.GetSndBufSize())); + channel.UpdateRcvBufSize(q.second.get("rcvBufSize", channel.GetRcvBufSize())); + channel.UpdateRateLogging(q.second.get("rateLogging", channel.GetRateLogging())); - channel.UpdateType(r.second.get("type", channel.GetType())); - channel.UpdateMethod(r.second.get("method", channel.GetMethod())); - channel.UpdateAddress(r.second.get("address", channel.GetAddress())); - channel.UpdateSndBufSize(r.second.get("sndBufSize", channel.GetSndBufSize())); // int - channel.UpdateRcvBufSize(r.second.get("rcvBufSize", channel.GetRcvBufSize())); // int - channel.UpdateRateLogging(r.second.get("rateLogging", channel.GetRateLogging())); // int + LOG(DEBUG) << "\t\t[" << p.first << "] " << socketCounter; + LOG(DEBUG) << "\t\t\ttype = " << channel.GetType(); + LOG(DEBUG) << "\t\t\tmethod = " << channel.GetMethod(); + LOG(DEBUG) << "\t\t\taddress = " << channel.GetAddress(); + LOG(DEBUG) << "\t\t\tproperty = " << channel.GetProperty(); + LOG(DEBUG) << "\t\t\tsndBufSize = " << channel.GetSndBufSize(); + LOG(DEBUG) << "\t\t\trcvBufSize = " << channel.GetRcvBufSize(); + LOG(DEBUG) << "\t\t\trateLogging = " << channel.GetRateLogging(); channelList.push_back(channel); } } - if(s.first == "socket") + if (p.first == "socket") { ++socketCounter; - FairMQChannel channel; + // create new channel and apply setting from the common channel + FairMQChannel channel(commonChannel); - // print if DEBUG log level set - stringstream socket; - socket << "\t \t [node = " << s.first << "] socket index = " << socketCounter; - LOG(DEBUG) << socket.str(); - LOG(DEBUG) << "\t \t \t type = " << s.second.get("type", channel.GetType()); - LOG(DEBUG) << "\t \t \t method = " << s.second.get("method", channel.GetMethod()); - LOG(DEBUG) << "\t \t \t address = " << s.second.get("address", channel.GetAddress()); - LOG(DEBUG) << "\t \t \t sndBufSize = " << s.second.get("sndBufSize", channel.GetSndBufSize()); - LOG(DEBUG) << "\t \t \t rcvBufSize = " << s.second.get("rcvBufSize", channel.GetRcvBufSize()); - LOG(DEBUG) << "\t \t \t rateLogging = " << s.second.get("rateLogging", channel.GetRateLogging()); + // if the socket field specifies or overrides something from the common channel, apply those settings + channel.UpdateType(p.second.get("type", channel.GetType())); + channel.UpdateMethod(p.second.get("method", channel.GetMethod())); + channel.UpdateAddress(p.second.get("address", channel.GetAddress())); + channel.UpdateProperty(p.second.get("property", channel.GetProperty())); + channel.UpdateSndBufSize(p.second.get("sndBufSize", channel.GetSndBufSize())); + channel.UpdateRcvBufSize(p.second.get("rcvBufSize", channel.GetRcvBufSize())); + channel.UpdateRateLogging(p.second.get("rateLogging", channel.GetRateLogging())); - channel.UpdateType(s.second.get("type", channel.GetType())); - channel.UpdateMethod(s.second.get("method", channel.GetMethod())); - channel.UpdateAddress(s.second.get("address", channel.GetAddress())); - channel.UpdateSndBufSize(s.second.get("sndBufSize", channel.GetSndBufSize())); // int - channel.UpdateRcvBufSize(s.second.get("rcvBufSize", channel.GetRcvBufSize())); // int - channel.UpdateRateLogging(s.second.get("rateLogging", channel.GetRateLogging())); // int + LOG(DEBUG) << "\t\t[" << p.first << "] " << socketCounter; + LOG(DEBUG) << "\t\t\ttype = " << channel.GetType(); + LOG(DEBUG) << "\t\t\tmethod = " << channel.GetMethod(); + LOG(DEBUG) << "\t\t\taddress = " << channel.GetAddress(); + LOG(DEBUG) << "\t\t\tproperty = " << channel.GetProperty(); + LOG(DEBUG) << "\t\t\tsndBufSize = " << channel.GetSndBufSize(); + LOG(DEBUG) << "\t\t\trcvBufSize = " << channel.GetRcvBufSize(); + LOG(DEBUG) << "\t\t\trateLogging = " << channel.GetRateLogging(); channelList.push_back(channel); } - - }// end socket loop + } // end socket loop + + if (socketCounter) + { + LOG(DEBUG) << "Found " << socketCounter << " socket(s) in channel."; + } + else + { + LOG(DEBUG) << "\t\t\tNo subChannels specified,"; + LOG(DEBUG) << "\t\t\tapplying common settings to the channel:"; + FairMQChannel channel(commonChannel); + + LOG(DEBUG) << "\t\t\ttype = " << channel.GetType(); + LOG(DEBUG) << "\t\t\tmethod = " << channel.GetMethod(); + LOG(DEBUG) << "\t\t\taddress = " << channel.GetAddress(); + LOG(DEBUG) << "\t\t\tproperty = " << channel.GetProperty(); + LOG(DEBUG) << "\t\t\tsndBufSize = " << channel.GetSndBufSize(); + LOG(DEBUG) << "\t\t\trcvBufSize = " << channel.GetRcvBufSize(); + LOG(DEBUG) << "\t\t\trateLogging = " << channel.GetRateLogging(); + + channelList.push_back(channel); + } + } } // end helper namespace diff --git a/fairmq/options/FairMQParser.h b/fairmq/options/FairMQParser.h index 2c62c9b8..19a9b39b 100644 --- a/fairmq/options/FairMQParser.h +++ b/fairmq/options/FairMQParser.h @@ -39,17 +39,14 @@ struct XML FairMQMap UserParser(std::stringstream& input, const std::string& deviceId, const std::string& rootNode = "fairMQOptions"); }; - namespace helper { void PrintDeviceList(const boost::property_tree::ptree& tree, const std::string& formatFlag = "json"); void DeviceParser(const boost::property_tree::ptree& tree, FairMQMap& channelMap, const std::string& deviceId, const std::string& formatFlag); void ChannelParser(const boost::property_tree::ptree& tree, FairMQMap& channelMap, const std::string& formatFlag); - void SocketParser(const boost::property_tree::ptree& tree, std::vector& channelList); + void SocketParser(const boost::property_tree::ptree& tree, std::vector& channelList, const FairMQChannel& commonChannel); } - - } // FairMQParser namespace -#endif /* FAIRMQPARSER_H */ +#endif /* FAIRMQPARSER_H */ diff --git a/fairmq/options/FairMQProgOptions.cxx b/fairmq/options/FairMQProgOptions.cxx index f45c4b6c..ec00c070 100644 --- a/fairmq/options/FairMQProgOptions.cxx +++ b/fairmq/options/FairMQProgOptions.cxx @@ -61,23 +61,23 @@ int FairMQProgOptions::ParseAll(const int argc, char** argv, bool allowUnregiste } } - // set log level before printing (default is 0 = DEBUG level) - std::string verbose = GetValue("verbose"); - bool color = GetValue("log-color"); - if (!color) - { - reinit_logger(false); - } - //SET_LOG_LEVEL(DEBUG); - if (fSeverityMap.count(verbose)) - { - set_global_log_level(log_op::operation::GREATER_EQ_THAN, fSeverityMap.at(verbose)); - } - else - { - LOG(ERROR) << " verbosity level '" << verbose << "' unknown, it will be set to DEBUG"; - set_global_log_level(log_op::operation::GREATER_EQ_THAN, fSeverityMap.at("DEBUG")); - } + // // set log level before printing (default is 0 = DEBUG level) + // std::string verbose = GetValue("verbose"); + // bool color = GetValue("log-color"); + // if (!color) + // { + // reinit_logger(false); + // } + // //SET_LOG_LEVEL(DEBUG); + // if (fSeverityMap.count(verbose)) + // { + // set_global_log_level(log_op::operation::GREATER_EQ_THAN, fSeverityMap.at(verbose)); + // } + // else + // { + // LOG(ERROR) << " verbosity level '" << verbose << "' unknown, it will be set to DEBUG"; + // set_global_log_level(log_op::operation::GREATER_EQ_THAN, fSeverityMap.at("DEBUG")); + // } PrintOptions(); @@ -108,28 +108,42 @@ int FairMQProgOptions::ParseAll(const int argc, char** argv, bool allowUnregiste } else { - if(fVarMap.count("mq-config")) + if (fVarMap.count("mq-config")) { - LOG(DEBUG)<<"mq-config command line called : default xml/json parser will be used"; + LOG(DEBUG) << "mq-config command line called : default xml/json parser will be used"; std::string file = fVarMap["mq-config"].as(); - std::string id = fVarMap["id"].as(); + std::string id; + if (fVarMap.count("config-key")) + { + id = fVarMap["config-key"].as(); + } + else + { + id = fVarMap["id"].as(); + } std::string file_extension = boost::filesystem::extension(file); std::transform(file_extension.begin(), file_extension.end(), file_extension.begin(), ::tolower); - if(file_extension==".json") + if (file_extension == ".json") + { UserParser(file, id); + } else - if(file_extension==".xml") + { + if (file_extension == ".xml") + { UserParser(file, id); + } else { - LOG(ERROR) <<"mq-config command line called but file extension '" - <(), "Device ID (required argument).") - ("io-threads", po::value()->default_value(1), "Number of I/O threads.") - ("transport", po::value()->default_value("zeromq"), "Transport ('zeromq'/'nanomsg')."); + ("id", po::value(), "Device ID (required argument).") + ("io-threads", po::value()->default_value(1), "Number of I/O threads.") + ("transport", po::value()->default_value("zeromq"), "Transport ('zeromq'/'nanomsg').") + ("deployment", po::value()->default_value("static"), "Deployment ('static'/'dds').") + ("network-interface", po::value()->default_value("eth0"), "Network interface to bind on (e.g. eth0, ib0, wlan0, en0, lo...).") + ; fMQOptionsInCfg.add_options() - ("id", po::value()->required(), "Device ID (required argument).") - ("io-threads", po::value()->default_value(1), "Number of I/O threads.") - ("transport", po::value()->default_value("zeromq"), "Transport ('zeromq'/'nanomsg')."); + ("id", po::value()->required(), "Device ID (required argument).") + ("io-threads", po::value()->default_value(1), "Number of I/O threads.") + ("transport", po::value()->default_value("zeromq"), "Transport ('zeromq'/'nanomsg').") + ("deployment", po::value()->default_value("static"), "Deployment ('static'/'dds').") + ("network-interface", po::value()->default_value("eth0"), "Network interface to bind on (e.g. eth0, ib0, wlan0, en0, lo...).") + ; } else { fMQOptionsInCmd.add_options() - ("id", po::value()->required(), "Device ID (required argument)") - ("io-threads", po::value()->default_value(1), "Number of I/O threads") - ("transport", po::value()->default_value("zeromq"), "Transport ('zeromq'/'nanomsg')."); + ("id", po::value()->required(), "Device ID (required argument)") + ("io-threads", po::value()->default_value(1), "Number of I/O threads") + ("transport", po::value()->default_value("zeromq"), "Transport ('zeromq'/'nanomsg').") + ("deployment", po::value()->default_value("static"), "Deployment ('static'/'dds').") + ("network-interface", po::value()->default_value("eth0"), "Network interface to bind on (e.g. eth0, ib0, wlan0, en0, lo...).") + ; } fMQParserOptions.add_options() @@ -182,6 +205,7 @@ void FairMQProgOptions::InitOptionDescription() ("config-json-string", po::value>()->multitoken(), "JSON input as command line string.") ("config-json-file", po::value(), "JSON input as file.") ("mq-config", po::value(), "JSON/XML input as file. The configuration object will check xml or json file extention and will call the json or xml parser accordingly") + ("config-key", po::value(), "Use provided value instead of device id for fetching the configuration from the config file") ; AddToCmdLineOptions(fGenericDesc); @@ -190,7 +214,7 @@ void FairMQProgOptions::InitOptionDescription() if (fUseConfigFile) { - AddToCfgFileOptions(fMQOptionsInCfg,false); - AddToCfgFileOptions(fMQParserOptions,false); + AddToCfgFileOptions(fMQOptionsInCfg, false); + AddToCfgFileOptions(fMQParserOptions, false); } } diff --git a/fairmq/run/runMerger.cxx b/fairmq/run/runMerger.cxx index 0e97c008..5a6b1604 100644 --- a/fairmq/run/runMerger.cxx +++ b/fairmq/run/runMerger.cxx @@ -14,132 +14,43 @@ #include -#include "boost/program_options.hpp" - #include "FairMQLogger.h" +#include "FairMQParser.h" +#include "FairMQProgOptions.h" #include "FairMQMerger.h" -using namespace std; - -typedef struct DeviceOptions -{ - DeviceOptions() : - id(), ioThreads(0), transport(), numInputs(0), - inputSocketType(), inputBufSize(), inputMethod(), inputAddress(), - outputSocketType(), outputBufSize(0), outputMethod(), outputAddress() {} - - string id; - int ioThreads; - string transport; - int numInputs; - vector inputSocketType; - vector inputBufSize; - vector inputMethod; - vector inputAddress; - string outputSocketType; - int outputBufSize; - string outputMethod; - string outputAddress; -} DeviceOptions_t; - -inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) -{ - if (_options == NULL) - throw runtime_error("Internal error: options' container is empty."); - - namespace bpo = boost::program_options; - bpo::options_description desc("Options"); - desc.add_options() - ("id", bpo::value()->required(), "Device ID") - ("io-threads", bpo::value()->default_value(1), "Number of I/O threads") - ("transport", bpo::value()->default_value("zeromq"), "Transport (zeromq/nanomsg)") - ("num-inputs", bpo::value()->required(), "Number of Merger input sockets") - ("input-socket-type", bpo::value>()->required(), "Input socket type: sub/pull") - ("input-buff-size", bpo::value>()->required(), "Input buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") - ("input-method", bpo::value>()->required(), "Input method: bind/connect") - ("input-address", bpo::value>()->required(), "Input address, e.g.: \"tcp://localhost:5555\"") - ("output-socket-type", bpo::value()->required(), "Output socket type: pub/push") - ("output-buff-size", bpo::value()->required(), "Output buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") - ("output-method", bpo::value()->required(), "Output method: bind/connect") - ("output-address", bpo::value()->required(), "Output address, e.g.: \"tcp://localhost:5555\"") - ("help", "Print help messages"); - - bpo::variables_map vm; - bpo::store(bpo::parse_command_line(_argc, _argv, desc), vm); - - if (vm.count("help")) - { - LOG(INFO) << "FairMQ Merger" << endl << desc; - return false; - } - - bpo::notify(vm); - - if (vm.count("id")) { _options->id = vm["id"].as(); } - if (vm.count("io-threads")) { _options->ioThreads = vm["io-threads"].as(); } - if (vm.count("transport")) { _options->transport = vm["transport"].as(); } - if (vm.count("num-inputs")) { _options->numInputs = vm["num-inputs"].as(); } - if (vm.count("input-socket-type")) { _options->inputSocketType = vm["input-socket-type"].as>(); } - if (vm.count("input-buff-size")) { _options->inputBufSize = vm["input-buff-size"].as>(); } - if (vm.count("input-method")) { _options->inputMethod = vm["input-method"].as>(); } - if (vm.count("input-address")) { _options->inputAddress = vm["input-address"].as>(); } - if (vm.count("output-socket-type")) { _options->outputSocketType = vm["output-socket-type"].as(); } - if (vm.count("output-buff-size")) { _options->outputBufSize = vm["output-buff-size"].as(); } - if (vm.count("output-method")) { _options->outputMethod = vm["output-method"].as(); } - if (vm.count("output-address")) { _options->outputAddress = vm["output-address"].as(); } - - return true; -} - int main(int argc, char** argv) { FairMQMerger merger; merger.CatchSignals(); - DeviceOptions_t options; + FairMQProgOptions config; + try { - if (!parse_cmd_line(argc, argv, &options)) + if (config.ParseAll(argc, argv)) + { return 0; + } + + merger.SetConfig(config); + + merger.ChangeState("INIT_DEVICE"); + merger.WaitForEndOfState("INIT_DEVICE"); + + merger.ChangeState("INIT_TASK"); + merger.WaitForEndOfState("INIT_TASK"); + + merger.ChangeState("RUN"); + merger.InteractiveStateLoop(); } - catch (exception& e) + catch (std::exception& e) { LOG(ERROR) << e.what(); + LOG(INFO) << "Command line options are the following: "; + config.PrintHelp(); return 1; } - LOG(INFO) << "PID: " << getpid(); - - merger.SetTransport(options.transport); - - for (unsigned int i = 0; i < options.inputAddress.size(); ++i) - { - FairMQChannel inputChannel(options.inputSocketType.at(i), options.inputMethod.at(i), options.inputAddress.at(i)); - inputChannel.UpdateSndBufSize(options.inputBufSize.at(i)); - inputChannel.UpdateRcvBufSize(options.inputBufSize.at(i)); - inputChannel.UpdateRateLogging(1); - - merger.fChannels["data-in"].push_back(inputChannel); - } - - FairMQChannel outputChannel(options.outputSocketType, options.outputMethod, options.outputAddress); - outputChannel.UpdateSndBufSize(options.outputBufSize); - outputChannel.UpdateRcvBufSize(options.outputBufSize); - outputChannel.UpdateRateLogging(1); - - merger.fChannels["data-out"].push_back(outputChannel); - - merger.SetProperty(FairMQMerger::Id, options.id); - merger.SetProperty(FairMQMerger::NumIoThreads, options.ioThreads); - - merger.ChangeState("INIT_DEVICE"); - merger.WaitForEndOfState("INIT_DEVICE"); - - merger.ChangeState("INIT_TASK"); - merger.WaitForEndOfState("INIT_TASK"); - - merger.ChangeState("RUN"); - merger.InteractiveStateLoop(); - return 0; } diff --git a/fairmq/run/runProxy.cxx b/fairmq/run/runProxy.cxx index 52086690..ae41d7bf 100644 --- a/fairmq/run/runProxy.cxx +++ b/fairmq/run/runProxy.cxx @@ -14,126 +14,45 @@ #include -#include "boost/program_options.hpp" - #include "FairMQLogger.h" +#include "FairMQParser.h" +#include "FairMQProgOptions.h" #include "FairMQProxy.h" using namespace std; -typedef struct DeviceOptions -{ - DeviceOptions() : - id(), ioThreads(0), transport(), - inputSocketType(), inputBufSize(0), inputMethod(), inputAddress(), - outputSocketType(), outputBufSize(0), outputMethod(), outputAddress() {} - - string id; - int ioThreads; - string transport; - string inputSocketType; - int inputBufSize; - string inputMethod; - string inputAddress; - string outputSocketType; - int outputBufSize; - string outputMethod; - string outputAddress; -} DeviceOptions_t; - -inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) -{ - if (_options == NULL) - throw runtime_error("Internal error: options' container is empty."); - - namespace bpo = boost::program_options; - bpo::options_description desc("Options"); - desc.add_options() - ("id", bpo::value()->required(), "Device ID") - ("io-threads", bpo::value()->default_value(1), "Number of I/O threads") - ("transport", bpo::value()->default_value("zeromq"), "Transport (zeromq/nanomsg)") - ("input-socket-type", bpo::value()->required(), "Input socket type: sub/pull") - ("input-buff-size", bpo::value()->default_value(1000), "Input buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") - ("input-method", bpo::value()->required(), "Input method: bind/connect") - ("input-address", bpo::value()->required(), "Input address, e.g.: \"tcp://localhost:5555\"") - ("output-socket-type", bpo::value()->required(), "Output socket type: pub/push") - ("output-buff-size", bpo::value()->default_value(1000), "Output buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") - ("output-method", bpo::value()->required(), "Output method: bind/connect") - ("output-address", bpo::value()->required(), "Output address, e.g.: \"tcp://localhost:5555\"") - ("help", "Print help messages"); - - bpo::variables_map vm; - bpo::store(bpo::parse_command_line(_argc, _argv, desc), vm); - - if (vm.count("help")) - { - LOG(INFO) << "FairMQ Proxy" << endl << desc; - return false; - } - - bpo::notify(vm); - - if (vm.count("id")) { _options->id = vm["id"].as(); } - if (vm.count("io-threads")) { _options->ioThreads = vm["io-threads"].as(); } - if (vm.count("transport")) { _options->transport = vm["transport"].as(); } - if (vm.count("input-socket-type")) { _options->inputSocketType = vm["input-socket-type"].as(); } - if (vm.count("input-buff-size")) { _options->inputBufSize = vm["input-buff-size"].as(); } - if (vm.count("input-method")) { _options->inputMethod = vm["input-method"].as(); } - if (vm.count("input-address")) { _options->inputAddress = vm["input-address"].as(); } - if (vm.count("output-socket-type")) { _options->outputSocketType = vm["output-socket-type"].as(); } - if (vm.count("output-buff-size")) { _options->outputBufSize = vm["output-buff-size"].as(); } - if (vm.count("output-method")) { _options->outputMethod = vm["output-method"].as(); } - if (vm.count("output-address")) { _options->outputAddress = vm["output-address"].as(); } - - return true; -} - int main(int argc, char** argv) { FairMQProxy proxy; proxy.CatchSignals(); - DeviceOptions_t options; + FairMQProgOptions config; + try { - if (!parse_cmd_line(argc, argv, &options)) + if (config.ParseAll(argc, argv)) + { return 0; + } + + proxy.SetConfig(config); + + proxy.ChangeState("INIT_DEVICE"); + proxy.WaitForEndOfState("INIT_DEVICE"); + + proxy.ChangeState("INIT_TASK"); + proxy.WaitForEndOfState("INIT_TASK"); + + proxy.ChangeState("RUN"); + proxy.InteractiveStateLoop(); } - catch (exception& e) + catch (std::exception& e) { LOG(ERROR) << e.what(); + LOG(INFO) << "Command line options are the following: "; + config.PrintHelp(); return 1; } - LOG(INFO) << "PID: " << getpid(); - - proxy.SetTransport(options.transport); - - FairMQChannel inputChannel(options.inputSocketType, options.inputMethod, options.inputAddress); - inputChannel.UpdateSndBufSize(options.inputBufSize); - inputChannel.UpdateRcvBufSize(options.inputBufSize); - inputChannel.UpdateRateLogging(1); - - proxy.fChannels["data-in"].push_back(inputChannel); - - FairMQChannel outputChannel(options.outputSocketType, options.outputMethod, options.outputAddress); - outputChannel.UpdateSndBufSize(options.outputBufSize); - outputChannel.UpdateRcvBufSize(options.outputBufSize); - outputChannel.UpdateRateLogging(1); - - proxy.fChannels["data-out"].push_back(outputChannel); - - proxy.SetProperty(FairMQProxy::Id, options.id); - proxy.SetProperty(FairMQProxy::NumIoThreads, options.ioThreads); - - proxy.ChangeState("INIT_DEVICE"); - proxy.WaitForEndOfState("INIT_DEVICE"); - - proxy.ChangeState("INIT_TASK"); - proxy.WaitForEndOfState("INIT_TASK"); - - proxy.ChangeState("RUN"); - proxy.InteractiveStateLoop(); - return 0; } diff --git a/fairmq/run/runSplitter.cxx b/fairmq/run/runSplitter.cxx index 4b46c3f7..47414917 100644 --- a/fairmq/run/runSplitter.cxx +++ b/fairmq/run/runSplitter.cxx @@ -14,133 +14,43 @@ #include -#include "boost/program_options.hpp" - #include "FairMQLogger.h" +#include "FairMQParser.h" +#include "FairMQProgOptions.h" #include "FairMQSplitter.h" -using namespace std; - -typedef struct DeviceOptions -{ - DeviceOptions() : - id(), ioThreads(0), transport(), numOutputs(0), - inputSocketType(), inputBufSize(0), inputMethod(), inputAddress(), - outputSocketType(), outputBufSize(), outputMethod(), outputAddress() - {} - - string id; - int ioThreads; - string transport; - int numOutputs; - string inputSocketType; - int inputBufSize; - string inputMethod; - string inputAddress; - vector outputSocketType; - vector outputBufSize; - vector outputMethod; - vector outputAddress; -} DeviceOptions_t; - -inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) -{ - if (_options == NULL) - throw runtime_error("Internal error: options' container is empty."); - - namespace bpo = boost::program_options; - bpo::options_description desc("Options"); - desc.add_options() - ("id", bpo::value()->required(), "Device ID") - ("io-threads", bpo::value()->default_value(1), "Number of I/O threads") - ("transport", bpo::value()->default_value("zeromq"), "Transport (zeromq/nanomsg)") - ("num-outputs", bpo::value()->required(), "Number of Splitter output sockets") - ("input-socket-type", bpo::value()->required(), "Input socket type: sub/pull") - ("input-buff-size", bpo::value(), "Input buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") - ("input-method", bpo::value()->required(), "Input method: bind/connect") - ("input-address", bpo::value()->required(), "Input address, e.g.: \"tcp://localhost:5555\"") - ("output-socket-type", bpo::value>()->required(), "Output socket type: pub/push") - ("output-buff-size", bpo::value>(), "Output buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") - ("output-method", bpo::value>()->required(), "Output method: bind/connect") - ("output-address", bpo::value>()->required(), "Output address, e.g.: \"tcp://localhost:5555\"") - ("help", "Print help messages"); - - bpo::variables_map vm; - bpo::store(bpo::parse_command_line(_argc, _argv, desc), vm); - - if (vm.count("help")) - { - LOG(INFO) << "FairMQ Splitter" << endl << desc; - return false; - } - - bpo::notify(vm); - - if (vm.count("id")) { _options->id = vm["id"].as(); } - if (vm.count("io-threads")) { _options->ioThreads = vm["io-threads"].as(); } - if (vm.count("transport")) { _options->transport = vm["transport"].as(); } - if (vm.count("num-outputs")) { _options->numOutputs = vm["num-outputs"].as(); } - if (vm.count("input-socket-type")) { _options->inputSocketType = vm["input-socket-type"].as(); } - if (vm.count("input-buff-size")) { _options->inputBufSize = vm["input-buff-size"].as(); } - if (vm.count("input-method")) { _options->inputMethod = vm["input-method"].as(); } - if (vm.count("input-address")) { _options->inputAddress = vm["input-address"].as(); } - if (vm.count("output-socket-type")) { _options->outputSocketType = vm["output-socket-type"].as>(); } - if (vm.count("output-buff-size")) { _options->outputBufSize = vm["output-buff-size"].as>(); } - if (vm.count("output-method")) { _options->outputMethod = vm["output-method"].as>(); } - if (vm.count("output-address")) { _options->outputAddress = vm["output-address"].as>(); } - - return true; -} - int main(int argc, char** argv) { FairMQSplitter splitter; splitter.CatchSignals(); - DeviceOptions_t options; + FairMQProgOptions config; + try { - if (!parse_cmd_line(argc, argv, &options)) + if (config.ParseAll(argc, argv)) + { return 0; + } + + splitter.SetConfig(config); + + splitter.ChangeState("INIT_DEVICE"); + splitter.WaitForEndOfState("INIT_DEVICE"); + + splitter.ChangeState("INIT_TASK"); + splitter.WaitForEndOfState("INIT_TASK"); + + splitter.ChangeState("RUN"); + splitter.InteractiveStateLoop(); } - catch (exception& e) + catch (std::exception& e) { LOG(ERROR) << e.what(); + LOG(INFO) << "Command line options are the following: "; + config.PrintHelp(); return 1; } - LOG(INFO) << "PID: " << getpid(); - - splitter.SetTransport(options.transport); - - FairMQChannel inputChannel(options.inputSocketType, options.inputMethod, options.inputAddress); - inputChannel.UpdateSndBufSize(options.inputBufSize); - inputChannel.UpdateRcvBufSize(options.inputBufSize); - inputChannel.UpdateRateLogging(1); - - splitter.fChannels["data-in"].push_back(inputChannel); - - for (unsigned int i = 0; i < options.outputAddress.size(); ++i) - { - FairMQChannel outputChannel(options.outputSocketType.at(i), options.outputMethod.at(i), options.outputAddress.at(i)); - outputChannel.UpdateSndBufSize(options.outputBufSize.at(i)); - outputChannel.UpdateRcvBufSize(options.outputBufSize.at(i)); - outputChannel.UpdateRateLogging(1); - - splitter.fChannels["data-out"].push_back(outputChannel); - } - - splitter.SetProperty(FairMQSplitter::Id, options.id); - splitter.SetProperty(FairMQSplitter::NumIoThreads, options.ioThreads); - - splitter.ChangeState("INIT_DEVICE"); - splitter.WaitForEndOfState("INIT_DEVICE"); - - splitter.ChangeState("INIT_TASK"); - splitter.WaitForEndOfState("INIT_TASK"); - - splitter.ChangeState("RUN"); - splitter.InteractiveStateLoop(); - return 0; } diff --git a/fairmq/run/startBenchmark.sh.in b/fairmq/run/startFairMQBenchmark.sh.in similarity index 100% rename from fairmq/run/startBenchmark.sh.in rename to fairmq/run/startFairMQBenchmark.sh.in diff --git a/fairmq/tools/FairMQTools.h b/fairmq/tools/FairMQTools.h index 158c98c0..446060dd 100644 --- a/fairmq/tools/FairMQTools.h +++ b/fairmq/tools/FairMQTools.h @@ -65,6 +65,21 @@ int getHostIPs(map& addressMap) return 0; } +string getInterfaceIP(string interface) +{ + map IPs; + FairMQ::tools::getHostIPs(IPs); + if (IPs.count(interface)) + { + return IPs[interface]; + } + else + { + LOG(ERROR) << "Could not find provided network interface: \"" << interface << "\"!, exiting."; + exit(EXIT_FAILURE); + } +} + #if defined(__GNUC__) || defined(__GNUG__) #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Weffc++"