From 1bb558a4578184049a5ab61418441ae0785e3168 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Thu, 16 Aug 2018 16:40:24 +0200 Subject: [PATCH] Refactor initialization - add device constructor that accepts FairMQProgOptions object. - Initialize config values in INIT state (to allow their update). - Simplify FairMQProgOptions handling in FairMQDevice. - Simplify SetTransport/SetConfig - refactor duplicated code. - Add FairMQDevice methods to add channels. --- fairmq/FairMQDevice.cxx | 197 +++++++++------------------ fairmq/FairMQDevice.h | 62 +++++---- fairmq/options/FairMQParser.cxx | 2 + fairmq/options/FairMQProgOptions.cxx | 20 +-- fairmq/options/FairMQProgOptions.h | 5 + 5 files changed, 123 insertions(+), 163 deletions(-) diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index 1cf08a8c..b4ff16d8 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -8,9 +8,6 @@ #include -#include -#include - #include // join/split #include @@ -31,53 +28,39 @@ using namespace std; - FairMQDevice::FairMQDevice() - : fTransportFactory(nullptr) - , fTransports() - , fChannels() - , fConfig(nullptr) - , fId() - , fNumIoThreads(1) - , fInitialValidationFinished(false) - , fInitialValidationCondition() - , fInitialValidationMutex() - , fPortRangeMin(22000) - , fPortRangeMax(32000) - , fNetworkInterface() - , fDefaultTransportType(fair::mq::Transport::DEFAULT) - , fInitializationTimeoutInS(120) - , fDataCallbacks(false) - , fMsgInputs() - , fMultipartInputs() - , fMultitransportInputs() - , fChannelRegistry() - , fInputChannelKeys() - , fMultitransportMutex() - , fMultitransportProceed(false) - , fExternalConfig(false) - , fVersion({0, 0, 0}) - , fRate(0.) - , fLastTime(0) - , fRawCmdLineArgs() + : FairMQDevice(nullptr, {0, 0, 0}) +{ +} + +FairMQDevice::FairMQDevice(FairMQProgOptions& config) + : FairMQDevice(&config, {0, 0, 0}) { } FairMQDevice::FairMQDevice(const fair::mq::tools::Version version) + : FairMQDevice(nullptr, version) +{ +} + +FairMQDevice::FairMQDevice(FairMQProgOptions& config, const fair::mq::tools::Version version) + : FairMQDevice(&config, version) +{ +} + +FairMQDevice::FairMQDevice(FairMQProgOptions* config, const fair::mq::tools::Version version) : fTransportFactory(nullptr) , fTransports() , fChannels() - , fConfig(nullptr) + , fInternalConfig(config ? nullptr : fair::mq::tools::make_unique()) + , fConfig(config ? config : fInternalConfig.get()) , fId() - , fNumIoThreads(1) , fInitialValidationFinished(false) , fInitialValidationCondition() , fInitialValidationMutex() , fPortRangeMin(22000) , fPortRangeMax(32000) - , fNetworkInterface() , fDefaultTransportType(fair::mq::Transport::DEFAULT) - , fInitializationTimeoutInS(120) , fDataCallbacks(false) , fMsgInputs() , fMultipartInputs() @@ -86,7 +69,6 @@ FairMQDevice::FairMQDevice(const fair::mq::tools::Version version) , fInputChannelKeys() , fMultitransportMutex() , fMultitransportProceed(false) - , fExternalConfig(false) , fVersion(version) , fRate(0.) , fLastTime(0) @@ -96,16 +78,43 @@ FairMQDevice::FairMQDevice(const fair::mq::tools::Version version) void FairMQDevice::InitWrapper() { - if (!fTransportFactory) + fId = fConfig->GetValue("id"); + fRate = fConfig->GetValue("rate"); + fPortRangeMin = fConfig->GetValue("port-range-min"); + fPortRangeMax = fConfig->GetValue("port-range-max"); + + try { - LOG(error) << "Transport not initialized. Did you call SetTransport()?"; - exit(EXIT_FAILURE); + fDefaultTransportType = fair::mq::TransportTypes.at(fConfig->GetValue("transport")); } + catch (const exception& e) + { + LOG(error) << "invalid transport type provided: " << fConfig->GetValue("transport"); + } + + for (auto& c : fConfig->GetFairMQMap()) + { + if (fChannels.find(c.first) == fChannels.end()) + { + LOG(debug) << "Inserting new device channel from config: " << c.first; + fChannels.insert(c); + } + else + { + LOG(debug) << "Updating existing device channel from config: " << c.first; + fChannels[c.first] = c.second; + } + } + + LOG(debug) << "Requesting '" << fair::mq::TransportNames.at(fDefaultTransportType) << "' as default transport for the device"; + fTransportFactory = AddTransport(fDefaultTransportType); // Containers to store the uninitialized channels. vector uninitializedBindingChannels; vector uninitializedConnectingChannels; + string networkInterface = fConfig->GetValue("network-interface"); + // Fill the uninitialized channel containers for (auto& mi : fChannels) { @@ -126,11 +135,11 @@ void FairMQDevice::InitWrapper() if (vi->fAddress == "unspecified" || vi->fAddress == "") { // if the configured network interface is default, get its name from the default route - if (fNetworkInterface == "default") + if (networkInterface == "default") { - fNetworkInterface = fair::mq::tools::getDefaultRouteNetworkInterface(); + networkInterface = fair::mq::tools::getDefaultRouteNetworkInterface(); } - vi->fAddress = "tcp://" + fair::mq::tools::getInterfaceIP(fNetworkInterface) + ":1"; + vi->fAddress = "tcp://" + fair::mq::tools::getInterfaceIP(networkInterface) + ":1"; } // fill the uninitialized list uninitializedBindingChannels.push_back(&(*vi)); @@ -175,10 +184,12 @@ void FairMQDevice::InitWrapper() fInitialValidationCondition.notify_one(); } + int initializationTimeoutInS = fConfig->GetValue("initialization-timeout"); + // go over the list of channels until all are initialized (and removed from the uninitialized list) int numAttempts = 1; auto sleepTimeInMS = 50; - auto maxAttempts = fInitializationTimeoutInS * 1000 / sleepTimeInMS; + auto maxAttempts = initializationTimeoutInS * 1000 / sleepTimeInMS; // first attempt AttachChannels(uninitializedConnectingChannels); // if not all channels could be connected, update their address values from config and retry @@ -201,9 +212,9 @@ void FairMQDevice::InitWrapper() if (numAttempts++ > maxAttempts) { - LOG(error) << "could not connect all channels after " << fInitializationTimeoutInS << " attempts"; + LOG(error) << "could not connect all channels after " << initializationTimeoutInS << " attempts"; ChangeState(ERROR_FOUND); - // throw runtime_error(fair::mq::tools::ToString("could not connect all channels after ", fInitializationTimeoutInS, " attempts")); + // throw runtime_error(fair::mq::tools::ToString("could not connect all channels after ", initializationTimeoutInS, " attempts")); } AttachChannels(uninitializedConnectingChannels); @@ -252,19 +263,15 @@ void FairMQDevice::AttachChannels(vector& chans) bool FairMQDevice::AttachChannel(FairMQChannel& ch) { - if (!ch.fTransportFactory) + if (ch.fTransportType == fair::mq::Transport::DEFAULT || ch.fTransportType == fTransportFactory->GetType()) { - if (ch.fTransportType == fair::mq::Transport::DEFAULT || ch.fTransportType == fTransportFactory->GetType()) - { - LOG(debug) << ch.fName << ": using default transport"; - ch.InitTransport(fTransportFactory); - } - else - { - LOG(debug) << ch.fName << ": channel transport (" << fair::mq::TransportNames.at(fDefaultTransportType) << ") overriden to " << fair::mq::TransportNames.at(ch.fTransportType); - ch.InitTransport(AddTransport(ch.fTransportType)); - } - ch.fTransportType = ch.fTransportFactory->GetType(); + LOG(debug) << ch.fName << ": using default transport"; + ch.InitTransport(fTransportFactory); + } + else + { + LOG(debug) << ch.fName << ": channel transport (" << fair::mq::TransportNames.at(fDefaultTransportType) << ") overriden to " << fair::mq::TransportNames.at(ch.fTransportType); + ch.InitTransport(AddTransport(ch.fTransportType)); } vector endpoints; @@ -798,78 +805,10 @@ shared_ptr FairMQDevice::AddTransport(const fair::mq::Tr } } -void FairMQDevice::CreateOwnConfig() -{ - // TODO: make fConfig a shared_ptr when no old user code has FairMQProgOptions ptr* - fConfig = new FairMQProgOptions(); - - string id{boost::uuids::to_string(boost::uuids::random_generator()())}; - LOG(warn) << "No FairMQProgOptions provided, creating one internally and setting device ID to " << id; - - // dummy argc+argv - char arg0[] = "undefined"; // executable name - char arg1[] = "--id"; - char* arg2 = const_cast(id.c_str()); // device ID - const char* argv[] = { &arg0[0], &arg1[0], arg2, nullptr }; - int argc = static_cast((sizeof(argv) / sizeof(argv[0])) - 1); - - fConfig->ParseAll(argc, &argv[0]); - - fId = fConfig->GetValue("id"); - fNetworkInterface = fConfig->GetValue("network-interface"); - fNumIoThreads = fConfig->GetValue("io-threads"); - fInitializationTimeoutInS = fConfig->GetValue("initialization-timeout"); - fRate = fConfig->GetValue("rate"); - try { - fDefaultTransportType = fair::mq::TransportTypes.at(fConfig->GetValue("transport")); - } catch(const exception& e) { - LOG(error) << "invalid transport type provided: " << fConfig->GetValue("transport"); - } -} - -void FairMQDevice::SetTransport(const string& transport) -{ - // This method is the first to be called, if FairMQProgOptions are not used (either SetTransport() or SetConfig() make sense, not both). - // Make sure here that at least internal config is available. - if (!fExternalConfig && !fConfig) - { - CreateOwnConfig(); - } - - if (fTransports.empty()) - { - LOG(debug) << "Requesting '" << transport << "' as default transport for the device"; - fTransportFactory = AddTransport(fair::mq::TransportTypes.at(transport)); - } - else - { - LOG(error) << "Transports container is not empty when setting transport. Setting default twice?"; - ChangeState(ERROR_FOUND); - } -} - void FairMQDevice::SetConfig(FairMQProgOptions& config) { - fExternalConfig = true; + fInternalConfig.reset(); fConfig = &config; - for (auto& c : fConfig->GetFairMQMap()) - { - if (!fChannels.insert(c).second) - { - LOG(warn) << "did not insert channel '" << c.first << "', it is already in the device."; - } - } - fId = fConfig->GetValue("id"); - fNetworkInterface = fConfig->GetValue("network-interface"); - fNumIoThreads = fConfig->GetValue("io-threads"); - fInitializationTimeoutInS = fConfig->GetValue("initialization-timeout"); - fRate = fConfig->GetValue("rate"); - try { - fDefaultTransportType = fair::mq::TransportTypes.at(fConfig->GetValue("transport")); - } catch(const exception& e) { - LOG(error) << "invalid transport type provided: " << fConfig->GetValue("transport"); - } - SetTransport(fConfig->GetValue("transport")); } void FairMQDevice::LogSocketRates() @@ -1020,7 +959,7 @@ void FairMQDevice::Reset() for (auto& vi : mi.second) { // vi.fReset = true; - vi.fSocket.reset(); + vi.fSocket.reset(); // destroy FairMQSocket } } } @@ -1032,10 +971,6 @@ const FairMQChannel& FairMQDevice::GetChannel(const string& channelName, const i void FairMQDevice::Exit() { - if (!fExternalConfig && fConfig) - { - delete fConfig; - } } FairMQDevice::~FairMQDevice() diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index 1ae70ba7..e5ca0b2e 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -48,9 +48,19 @@ class FairMQDevice : public FairMQStateMachine public: /// Default constructor FairMQDevice(); + /// Constructor with external FairMQProgOptions + FairMQDevice(FairMQProgOptions& config); /// Constructor that sets the version FairMQDevice(const fair::mq::tools::Version version); + + /// Constructor that sets the version and external FairMQProgOptions + FairMQDevice(FairMQProgOptions& config, const fair::mq::tools::Version version); + + private: + FairMQDevice(FairMQProgOptions* config, const fair::mq::tools::Version version); + + public: /// Copy constructor (disabled) FairMQDevice(const FairMQDevice&) = delete; /// Assignment operator (disabled) @@ -294,12 +304,11 @@ class FairMQDevice : public FairMQStateMachine /// Adds a transport to the device if it doesn't exist /// @param transport Transport string ("zeromq"/"nanomsg"/"shmem") std::shared_ptr AddTransport(const fair::mq::Transport transport); - /// Sets the default transport for the device - /// @param transport Transport string ("zeromq"/"nanomsg"/"shmem") - void SetTransport(const std::string& transport = "zeromq"); + /// Assigns config to the device void SetConfig(FairMQProgOptions& config); - const FairMQProgOptions* GetConfig() const + /// Get pointer to the config + FairMQProgOptions* GetConfig() const { return fConfig; } @@ -395,23 +404,29 @@ class FairMQDevice : public FairMQStateMachine const fair::mq::tools::Version GetVersion() const { return fVersion; } - void SetNumIoThreads(int numIoThreads) { fNumIoThreads = numIoThreads; } - int GetNumIoThreads() const { return fNumIoThreads; } + void SetNumIoThreads(int numIoThreads) { fConfig->SetValue("io-threads", numIoThreads);} + int GetNumIoThreads() const { return fConfig->GetValue("io-threads"); } - void SetPortRangeMin(int portRangeMin) { fPortRangeMin = portRangeMin; } - int GetPortRangeMin() const { return fPortRangeMin; } + void SetPortRangeMin(int portRangeMin) { fConfig->SetValue("port-range-min", portRangeMin); } + int GetPortRangeMin() const { return fConfig->GetValue("port-range-min"); } - void SetPortRangeMax(int portRangeMax) { fPortRangeMax = portRangeMax; } - int GetPortRangeMax() const { return fPortRangeMax; } + void SetPortRangeMax(int portRangeMax) { fConfig->SetValue("port-range-max", portRangeMax); } + int GetPortRangeMax() const { return fConfig->GetValue("port-range-max"); } - void SetNetworkInterface(const std::string& networkInterface) { fNetworkInterface = networkInterface; } - std::string GetNetworkInterface() const { return fNetworkInterface; } + void SetNetworkInterface(const std::string& networkInterface) { fConfig->SetValue("network-interface", networkInterface); } + std::string GetNetworkInterface() const { return fConfig->GetValue("network-interface"); } - void SetDefaultTransport(const std::string& name) { fDefaultTransportType = fair::mq::TransportTypes.at(name); } - std::string GetDefaultTransport() const { return fair::mq::TransportNames.at(fDefaultTransportType); } + void SetDefaultTransport(const std::string& name) { fConfig->SetValue("transport", name); } + std::string GetDefaultTransport() const { return fConfig->GetValue("transport"); } - void SetInitializationTimeoutInS(int initializationTimeoutInS) { fInitializationTimeoutInS = initializationTimeoutInS; } - int GetInitializationTimeoutInS() const { return fInitializationTimeoutInS; } + void SetInitializationTimeoutInS(int initializationTimeoutInS) { fConfig->SetValue("initialization-timeout", initializationTimeoutInS); } + int GetInitializationTimeoutInS() const { return fConfig->GetValue("initialization-timeout"); } + + /// Sets the default transport for the device + /// @param transport Transport string ("zeromq"/"nanomsg"/"shmem") + void SetTransport(const std::string& transport) { fConfig->SetValue("transport", transport); } + /// Gets the default transport name + std::string GetTransportName() const { return fConfig->GetValue("transport"); } void SetRawCmdLineArgs(const std::vector& args) { fRawCmdLineArgs = args; } std::vector GetRawCmdLineArgs() const { return fRawCmdLineArgs; } @@ -424,13 +439,17 @@ class FairMQDevice : public FairMQStateMachine public: std::unordered_map> fChannels; ///< Device channels - FairMQProgOptions* fConfig; ///< Program options configuration + std::unique_ptr fInternalConfig; ///< Internal program options configuration + FairMQProgOptions* fConfig; ///< Pointer to config (internal or external) + + void AddChannel(const std::string& channelName, const FairMQChannel& channel) + { + fConfig->AddChannel(channelName, channel); + } protected: std::string fId; ///< Device ID - int fNumIoThreads; ///< Number of ZeroMQ I/O threads - /// Additional user initialization (can be overloaded in child classes). Prefer to use InitTask(). /// Executed in a worker thread virtual void Init(); @@ -476,11 +495,8 @@ class FairMQDevice : public FairMQStateMachine int fPortRangeMin; ///< Minimum value for the port range (if dynamic) int fPortRangeMax; ///< Maximum value for the port range (if dynamic) - std::string fNetworkInterface; ///< Network interface to use for dynamic binding fair::mq::Transport fDefaultTransportType; ///< Default transport for the device - int fInitializationTimeoutInS; ///< Timeout for the initialization (in seconds) - /// Handles the initialization and the Init() method void InitWrapper(); /// Handles the InitTask() method @@ -532,8 +548,6 @@ class FairMQDevice : public FairMQStateMachine std::mutex fMultitransportMutex; std::atomic fMultitransportProceed; - bool fExternalConfig; - const fair::mq::tools::Version fVersion; float fRate; ///< Rate limiting for ConditionalRun size_t fLastTime; ///< Rate limiting for ConditionalRun diff --git a/fairmq/options/FairMQParser.cxx b/fairmq/options/FairMQParser.cxx index 4c6e932c..f4b2a86c 100644 --- a/fairmq/options/FairMQParser.cxx +++ b/fairmq/options/FairMQParser.cxx @@ -14,6 +14,8 @@ #include "FairMQParser.h" #include "FairMQLogger.h" +#include + #include #include diff --git a/fairmq/options/FairMQProgOptions.cxx b/fairmq/options/FairMQProgOptions.cxx index 932defb3..3b8472f1 100644 --- a/fairmq/options/FairMQProgOptions.cxx +++ b/fairmq/options/FairMQProgOptions.cxx @@ -20,6 +20,9 @@ #include "FairMQSuboptParser.h" #include // join/split +#include +#include +#include #include #include @@ -136,16 +139,16 @@ int FairMQProgOptions::ParseAll(const int argc, char const* const* argv, bool al fair::Logger::SetConsoleSeverity(severity); } - string id; + string idForParser; // check if config-key for config parser is provided if (fVarMap.count("config-key")) { - id = fVarMap["config-key"].as(); + idForParser = fVarMap["config-key"].as(); } else if (fVarMap.count("id")) { - id = fVarMap["id"].as(); + idForParser = fVarMap["id"].as(); } // check if any config parser is selected @@ -154,12 +157,12 @@ int FairMQProgOptions::ParseAll(const int argc, char const* const* argv, bool al if (fVarMap.count("mq-config")) { LOG(debug) << "mq-config: Using default JSON parser"; - UpdateChannelMap(parser::JSON().UserParser(fVarMap.at("mq-config").as(), id)); + UpdateChannelMap(parser::JSON().UserParser(fVarMap.at("mq-config").as(), idForParser)); } else if (fVarMap.count("channel-config")) { LOG(debug) << "channel-config: Parsing channel configuration"; - UpdateChannelMap(parser::SUBOPT().UserParser(fVarMap.at("channel-config").as>(), id)); + UpdateChannelMap(parser::SUBOPT().UserParser(fVarMap.at("channel-config").as>(), idForParser)); } else { @@ -184,6 +187,8 @@ int FairMQProgOptions::ParseAll(const int argc, char const* const* argv, bool al void FairMQProgOptions::ParseCmdLine(const int argc, char const* const* argv, bool allowUnregistered) { + fVarMap.clear(); + // get options from cmd line and store in variable map // here we use command_line_parser instead of parse_command_line, to allow unregistered and positional options if (allowUnregistered) @@ -205,8 +210,7 @@ void FairMQProgOptions::ParseCmdLine(const int argc, char const* const* argv, bo void FairMQProgOptions::ParseDefaults() { - vector emptyArgs; - emptyArgs.push_back("dummy"); + vector emptyArgs = {"dummy", "--id", boost::uuids::to_string(boost::uuids::random_generator()())}; vector argv(emptyArgs.size()); @@ -423,7 +427,7 @@ int FairMQProgOptions::PrintOptions() { ss << setfill(' ') << left << setw(maxLenKey) << p.first << " = " - << setw(maxLenValue) << p.second.value + << setw(maxLenValue) << p.second.value << " " << setw(maxLenType) << p.second.type << setw(maxLenDefault) << p.second.defaulted << "\n"; diff --git a/fairmq/options/FairMQProgOptions.h b/fairmq/options/FairMQProgOptions.h index 75245abc..85de740a 100644 --- a/fairmq/options/FairMQProgOptions.h +++ b/fairmq/options/FairMQProgOptions.h @@ -162,6 +162,11 @@ class FairMQProgOptions int PrintOptions(); int PrintOptionsRaw(); + void AddChannel(const std::string& channelName, const FairMQChannel& channel) + { + fFairMQChannelMap[channelName].push_back(channel); + } + private: struct ChannelKey {