diff --git a/CMakeLists.txt b/CMakeLists.txt index 9187ef70..7961cacf 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -106,6 +106,7 @@ if(BUILD_FAIRMQ OR BUILD_SDK) container program_options filesystem + regex date_time regex diff --git a/fairmq/FairMQChannel.cxx b/fairmq/FairMQChannel.cxx index 4a293906..f274368e 100644 --- a/fairmq/FairMQChannel.cxx +++ b/fairmq/FairMQChannel.cxx @@ -15,6 +15,16 @@ #include using namespace std; +using namespace fair::mq; + +template +T GetPropertyOrDefault(const fair::mq::Properties& m, const string& k, const T& ifNotFound) noexcept +{ + if (m.count(k)) { + return boost::any_cast(m.at(k)); + } + return ifNotFound; +} FairMQChannel::FairMQChannel() : FairMQChannel(DefaultName, DefaultType, DefaultMethod, DefaultAddress, nullptr) @@ -56,6 +66,26 @@ FairMQChannel::FairMQChannel(const string& name, const string& type, const strin , fMtx() {} +FairMQChannel::FairMQChannel(const string& name, int index, const fair::mq::Properties& properties) + : FairMQChannel(tools::ToString(name, "[", index, "]"), "unspecified", "unspecified", "unspecified", nullptr) +{ + string prefix(tools::ToString("chans.", name, ".", index, ".")); + + fType = GetPropertyOrDefault(properties, string(prefix + "type"), fType); + fMethod = GetPropertyOrDefault(properties, string(prefix + "method"), fMethod); + fAddress = GetPropertyOrDefault(properties, string(prefix + "address"), fAddress); + fTransportType = TransportTypes.at(GetPropertyOrDefault(properties, string(prefix + "transport"), TransportNames.at(fTransportType))); + fSndBufSize = GetPropertyOrDefault(properties, string(prefix + "sndBufSize"), fSndBufSize); + fRcvBufSize = GetPropertyOrDefault(properties, string(prefix + "rcvBufSize"), fRcvBufSize); + fSndKernelSize = GetPropertyOrDefault(properties, string(prefix + "sndKernelSize"), fSndKernelSize); + fRcvKernelSize = GetPropertyOrDefault(properties, string(prefix + "rcvKernelSize"), fRcvKernelSize); + fLinger = GetPropertyOrDefault(properties, string(prefix + "linger"), fLinger); + fRateLogging = GetPropertyOrDefault(properties, string(prefix + "rateLogging"), fRateLogging); + fPortRangeMin = GetPropertyOrDefault(properties, string(prefix + "portRangeMin"), fPortRangeMin); + fPortRangeMax = GetPropertyOrDefault(properties, string(prefix + "portRangeMax"), fPortRangeMax); + fAutoBind = GetPropertyOrDefault(properties, string(prefix + "autoBind"), fAutoBind); +} + FairMQChannel::FairMQChannel(const FairMQChannel& chan) : FairMQChannel(chan, chan.fName) {} @@ -144,7 +174,7 @@ try { return fType; } catch (exception& e) { LOG(error) << "Exception caught in FairMQChannel::GetType: " << e.what(); - throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what())); + throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } string FairMQChannel::GetMethod() const @@ -153,7 +183,7 @@ try { return fMethod; } catch (exception& e) { LOG(error) << "Exception caught in FairMQChannel::GetMethod: " << e.what(); - throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what())); + throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } string FairMQChannel::GetAddress() const @@ -162,25 +192,25 @@ try { return fAddress; } catch (exception& e) { LOG(error) << "Exception caught in FairMQChannel::GetAddress: " << e.what(); - throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what())); + throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } string FairMQChannel::GetTransportName() const try { lock_guard lock(fMtx); - return fair::mq::TransportNames.at(fTransportType); + return TransportNames.at(fTransportType); } catch (exception& e) { LOG(error) << "Exception caught in FairMQChannel::GetTransportName: " << e.what(); - throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what())); + throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } -fair::mq::Transport FairMQChannel::GetTransportType() const +Transport FairMQChannel::GetTransportType() const try { lock_guard lock(fMtx); return fTransportType; } catch (exception& e) { LOG(error) << "Exception caught in FairMQChannel::GetTransportType: " << e.what(); - throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what())); + throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } @@ -190,7 +220,7 @@ try { return fSndBufSize; } catch (exception& e) { LOG(error) << "Exception caught in FairMQChannel::GetSndBufSize: " << e.what(); - throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what())); + throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } int FairMQChannel::GetRcvBufSize() const @@ -199,7 +229,7 @@ try { return fRcvBufSize; } catch (exception& e) { LOG(error) << "Exception caught in FairMQChannel::GetRcvBufSize: " << e.what(); - throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what())); + throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } int FairMQChannel::GetSndKernelSize() const @@ -208,7 +238,7 @@ try { return fSndKernelSize; } catch (exception& e) { LOG(error) << "Exception caught in FairMQChannel::GetSndKernelSize: " << e.what(); - throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what())); + throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } int FairMQChannel::GetRcvKernelSize() const @@ -217,7 +247,7 @@ try { return fRcvKernelSize; } catch (exception& e) { LOG(error) << "Exception caught in FairMQChannel::GetRcvKernelSize: " << e.what(); - throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what())); + throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } int FairMQChannel::GetLinger() const @@ -226,7 +256,7 @@ try { return fLinger; } catch (exception& e) { LOG(error) << "Exception caught in FairMQChannel::GetLinger: " << e.what(); - throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what())); + throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } int FairMQChannel::GetRateLogging() const @@ -235,7 +265,7 @@ try { return fRateLogging; } catch (exception& e) { LOG(error) << "Exception caught in FairMQChannel::GetRateLogging: " << e.what(); - throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what())); + throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } int FairMQChannel::GetPortRangeMin() const @@ -244,7 +274,7 @@ try { return fPortRangeMin; } catch (exception& e) { LOG(error) << "Exception caught in FairMQChannel::GetPortRangeMin: " << e.what(); - throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what())); + throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } int FairMQChannel::GetPortRangeMax() const @@ -253,7 +283,7 @@ try { return fPortRangeMax; } catch (exception& e) { LOG(error) << "Exception caught in FairMQChannel::GetPortRangeMax: " << e.what(); - throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what())); + throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } bool FairMQChannel::GetAutoBind() const @@ -262,7 +292,7 @@ try { return fAutoBind; } catch (exception& e) { LOG(error) << "Exception caught in FairMQChannel::GetAutoBind: " << e.what(); - throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what())); + throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } void FairMQChannel::UpdateType(const string& type) @@ -273,7 +303,7 @@ try { fModified = true; } catch (exception& e) { LOG(error) << "Exception caught in FairMQChannel::UpdateType: " << e.what(); - throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what())); + throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } void FairMQChannel::UpdateMethod(const string& method) @@ -284,7 +314,7 @@ try { fModified = true; } catch (exception& e) { LOG(error) << "Exception caught in FairMQChannel::UpdateMethod: " << e.what(); - throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what())); + throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } void FairMQChannel::UpdateAddress(const string& address) @@ -295,18 +325,18 @@ try { fModified = true; } catch (exception& e) { LOG(error) << "Exception caught in FairMQChannel::UpdateAddress: " << e.what(); - throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what())); + throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } void FairMQChannel::UpdateTransport(const string& transport) try { lock_guard lock(fMtx); fIsValid = false; - fTransportType = fair::mq::TransportTypes.at(transport); + fTransportType = TransportTypes.at(transport); fModified = true; } catch (exception& e) { LOG(error) << "Exception caught in FairMQChannel::UpdateTransport: " << e.what(); - throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what())); + throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } void FairMQChannel::UpdateSndBufSize(const int sndBufSize) @@ -317,7 +347,7 @@ try { fModified = true; } catch (exception& e) { LOG(error) << "Exception caught in FairMQChannel::UpdateSndBufSize: " << e.what(); - throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what())); + throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } void FairMQChannel::UpdateRcvBufSize(const int rcvBufSize) @@ -328,7 +358,7 @@ try { fModified = true; } catch (exception& e) { LOG(error) << "Exception caught in FairMQChannel::UpdateRcvBufSize: " << e.what(); - throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what())); + throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } void FairMQChannel::UpdateSndKernelSize(const int sndKernelSize) @@ -339,7 +369,7 @@ try { fModified = true; } catch (exception& e) { LOG(error) << "Exception caught in FairMQChannel::UpdateSndKernelSize: " << e.what(); - throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what())); + throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } void FairMQChannel::UpdateRcvKernelSize(const int rcvKernelSize) @@ -350,7 +380,7 @@ try { fModified = true; } catch (exception& e) { LOG(error) << "Exception caught in FairMQChannel::UpdateRcvKernelSize: " << e.what(); - throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what())); + throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } void FairMQChannel::UpdateLinger(const int duration) @@ -361,7 +391,7 @@ try { fModified = true; } catch (exception& e) { LOG(error) << "Exception caught in FairMQChannel::UpdateLinger: " << e.what(); - throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what())); + throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } void FairMQChannel::UpdateRateLogging(const int rateLogging) @@ -372,7 +402,7 @@ try { fModified = true; } catch (exception& e) { LOG(error) << "Exception caught in FairMQChannel::UpdateRateLogging: " << e.what(); - throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what())); + throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } void FairMQChannel::UpdatePortRangeMin(const int minPort) @@ -383,7 +413,7 @@ try { fModified = true; } catch (exception& e) { LOG(error) << "Exception caught in FairMQChannel::UpdatePortRangeMin: " << e.what(); - throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what())); + throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } void FairMQChannel::UpdatePortRangeMax(const int maxPort) @@ -394,7 +424,7 @@ try { fModified = true; } catch (exception& e) { LOG(error) << "Exception caught in FairMQChannel::UpdatePortRangeMax: " << e.what(); - throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what())); + throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } void FairMQChannel::UpdateAutoBind(const bool autobind) @@ -405,7 +435,7 @@ try { fModified = true; } catch (exception& e) { LOG(error) << "Exception caught in FairMQChannel::UpdateAutoBind: " << e.what(); - throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what())); + throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } auto FairMQChannel::SetModified(const bool modified) -> void @@ -414,7 +444,7 @@ try { fModified = modified; } catch (exception& e) { LOG(error) << "Exception caught in FairMQChannel::SetModified: " << e.what(); - throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what())); + throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } void FairMQChannel::UpdateName(const string& name) @@ -425,7 +455,7 @@ try { fModified = true; } catch (exception& e) { LOG(error) << "Exception caught in FairMQChannel::UpdateName: " << e.what(); - throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what())); + throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } bool FairMQChannel::IsValid() const @@ -434,7 +464,7 @@ try { return fIsValid; } catch (exception& e) { LOG(error) << "Exception caught in FairMQChannel::IsValid: " << e.what(); - throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what())); + throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } bool FairMQChannel::Validate() @@ -462,7 +492,7 @@ try { ss << "INVALID"; LOG(debug) << ss.str(); LOG(error) << "Invalid channel type: '" << fType << "'"; - throw ChannelConfigurationError(fair::mq::tools::ToString("Invalid channel type: '", fType, "'")); + throw ChannelConfigurationError(tools::ToString("Invalid channel type: '", fType, "'")); } // validate socket address @@ -485,7 +515,7 @@ try { ss << "INVALID"; LOG(debug) << ss.str(); LOG(error) << "Invalid endpoint connection method: '" << fMethod << "' for " << endpoint; - throw ChannelConfigurationError(fair::mq::tools::ToString("Invalid endpoint connection method: '", fMethod, "' for ", endpoint)); + throw ChannelConfigurationError(tools::ToString("Invalid endpoint connection method: '", fMethod, "' for ", endpoint)); } address = endpoint; } @@ -541,7 +571,7 @@ try { ss << "INVALID"; LOG(debug) << ss.str(); LOG(error) << "invalid channel send buffer size (cannot be negative): '" << fSndBufSize << "'"; - throw ChannelConfigurationError(fair::mq::tools::ToString("invalid channel send buffer size (cannot be negative): '", fSndBufSize, "'")); + throw ChannelConfigurationError(tools::ToString("invalid channel send buffer size (cannot be negative): '", fSndBufSize, "'")); } // validate socket buffer size for receiving @@ -549,7 +579,7 @@ try { ss << "INVALID"; LOG(debug) << ss.str(); LOG(error) << "invalid channel receive buffer size (cannot be negative): '" << fRcvBufSize << "'"; - throw ChannelConfigurationError(fair::mq::tools::ToString("invalid channel receive buffer size (cannot be negative): '", fRcvBufSize, "'")); + throw ChannelConfigurationError(tools::ToString("invalid channel receive buffer size (cannot be negative): '", fRcvBufSize, "'")); } // validate socket kernel transmit size for sending @@ -557,7 +587,7 @@ try { ss << "INVALID"; LOG(debug) << ss.str(); LOG(error) << "invalid channel send kernel transmit size (cannot be negative): '" << fSndKernelSize << "'"; - throw ChannelConfigurationError(fair::mq::tools::ToString("invalid channel send kernel transmit size (cannot be negative): '", fSndKernelSize, "'")); + throw ChannelConfigurationError(tools::ToString("invalid channel send kernel transmit size (cannot be negative): '", fSndKernelSize, "'")); } // validate socket kernel transmit size for receiving @@ -565,7 +595,7 @@ try { ss << "INVALID"; LOG(debug) << ss.str(); LOG(error) << "invalid channel receive kernel transmit size (cannot be negative): '" << fRcvKernelSize << "'"; - throw ChannelConfigurationError(fair::mq::tools::ToString("invalid channel receive kernel transmit size (cannot be negative): '", fRcvKernelSize, "'")); + throw ChannelConfigurationError(tools::ToString("invalid channel receive kernel transmit size (cannot be negative): '", fRcvKernelSize, "'")); } // validate socket rate logging interval @@ -573,7 +603,7 @@ try { ss << "INVALID"; LOG(debug) << ss.str(); LOG(error) << "invalid socket rate logging interval (cannot be negative): '" << fRateLogging << "'"; - throw ChannelConfigurationError(fair::mq::tools::ToString("invalid socket rate logging interval (cannot be negative): '", fRateLogging, "'")); + throw ChannelConfigurationError(tools::ToString("invalid socket rate logging interval (cannot be negative): '", fRateLogging, "'")); } fIsValid = true; @@ -582,7 +612,7 @@ try { return true; } catch (exception& e) { LOG(error) << "Exception caught in FairMQChannel::ValidateChannel: " << e.what(); - throw ChannelConfigurationError(fair::mq::tools::ToString(e.what())); + throw ChannelConfigurationError(tools::ToString(e.what())); } void FairMQChannel::Init() @@ -641,7 +671,7 @@ bool FairMQChannel::BindEndpoint(string& endpoint) } size_t pos = endpoint.rfind(':'); - endpoint = endpoint.substr(0, pos + 1) + fair::mq::tools::ToString(static_cast(randomPort(generator))); + endpoint = endpoint.substr(0, pos + 1) + tools::ToString(static_cast(randomPort(generator))); } while (!fSocket->Bind(endpoint)); return true; diff --git a/fairmq/FairMQChannel.h b/fairmq/FairMQChannel.h index 84debb3e..3c66059f 100644 --- a/fairmq/FairMQChannel.h +++ b/fairmq/FairMQChannel.h @@ -9,6 +9,16 @@ #ifndef FAIRMQCHANNEL_H_ #define FAIRMQCHANNEL_H_ +#include +#include +#include +#include +#include +#include +#include + +#include + #include #include // unique_ptr, shared_ptr #include @@ -17,13 +27,6 @@ #include #include // std::move -#include -#include -#include -#include -#include -#include - class FairMQChannel { friend class FairMQDevice; @@ -56,6 +59,8 @@ class FairMQChannel /// @param factory TransportFactory FairMQChannel(const std::string& name, const std::string& type, const std::string& method, const std::string& address, std::shared_ptr factory); + FairMQChannel(const std::string& name, int index, const fair::mq::Properties& properties); + /// Copy Constructor FairMQChannel(const FairMQChannel&); @@ -332,6 +337,22 @@ class FairMQChannel return Transport()->CreateUnmanagedRegion(size, callback, path, flags); } + static constexpr fair::mq::Transport DefaultTransportType = fair::mq::Transport::DEFAULT; + static constexpr const char* DefaultTransportName = "default"; + static constexpr const char* DefaultName = ""; + static constexpr const char* DefaultType = "unspecified"; + static constexpr const char* DefaultMethod = "unspecified"; + static constexpr const char* DefaultAddress = "unspecified"; + static constexpr int DefaultSndBufSize = 1000; + static constexpr int DefaultRcvBufSize = 1000; + static constexpr int DefaultSndKernelSize = 0; + static constexpr int DefaultRcvKernelSize = 0; + static constexpr int DefaultLinger = 500; + static constexpr int DefaultRateLogging = 1; + static constexpr int DefaultPortRangeMin = 22000; + static constexpr int DefaultPortRangeMax = 23000; + static constexpr bool DefaultAutoBind = true; + private: std::shared_ptr fTransportFactory; fair::mq::Transport fTransportType; @@ -416,22 +437,6 @@ class FairMQChannel } auto SetModified(const bool modified) -> void; - - static constexpr fair::mq::Transport DefaultTransportType = fair::mq::Transport::DEFAULT; - static constexpr const char* DefaultTransportName = "default"; - static constexpr const char* DefaultName = ""; - static constexpr const char* DefaultType = "unspecified"; - static constexpr const char* DefaultMethod = "unspecified"; - static constexpr const char* DefaultAddress = "unspecified"; - static constexpr int DefaultSndBufSize = 1000; - static constexpr int DefaultRcvBufSize = 1000; - static constexpr int DefaultSndKernelSize = 0; - static constexpr int DefaultRcvKernelSize = 0; - static constexpr int DefaultLinger = 500; - static constexpr int DefaultRateLogging = 1; - static constexpr int DefaultPortRangeMin = 22000; - static constexpr int DefaultPortRangeMax = 23000; - static constexpr bool DefaultAutoBind = true; }; #endif /* FAIRMQCHANNEL_H_ */ diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index 804405dd..e561b0e2 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -207,13 +207,10 @@ void FairMQDevice::InitWrapper() throw; } - for (auto& c : fConfig->GetFairMQMap()) { - if (fChannels.find(c.first) == fChannels.end()) { - LOG(debug) << "Inserting new device channel from config: " << c.first; - fChannels.insert(c); - } else { - LOG(debug) << "Updating existing device channel from config: " << c.first; - fChannels[c.first] = c.second; + unordered_map infos = fConfig->GetChannelInfo(); + for (const auto& info : infos) { + for (int i = 0; i < info.second; ++i) { + fChannels[info.first].emplace_back(info.first, i, fConfig->GetPropertiesStartingWith(tools::ToString("chans.", info.first, ".", i, "."))); } } @@ -226,9 +223,6 @@ void FairMQDevice::InitWrapper() for (auto& mi : fChannels) { int subChannelIndex = 0; for (auto& vi : mi.second) { - // set channel name: name + vector index - vi.fName = tools::ToString(mi.first, "[", subChannelIndex, "]"); - // set channel transport LOG(debug) << "Initializing transport for channel " << vi.fName << ": " << fair::mq::TransportNames.at(vi.fTransportType); vi.InitTransport(AddTransport(vi.fTransportType)); diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index 1d04be32..451e747f 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -428,7 +428,7 @@ class FairMQDevice /// Wait for the supplied amount of time or for interruption. /// If interrupted, returns false, otherwise true. /// @param duration wait duration - template + template bool WaitFor(std::chrono::duration const& duration) { return !fStateMachine.WaitForPendingStateFor(std::chrono::duration_cast(duration).count()); @@ -443,9 +443,9 @@ class FairMQDevice std::unique_ptr fInternalConfig; ///< Internal program options configuration FairMQProgOptions* fConfig; ///< Pointer to config (internal or external) - void AddChannel(const std::string& channelName, const FairMQChannel& channel) + void AddChannel(const std::string& name, FairMQChannel&& channel) { - fConfig->AddChannel(channelName, channel); + fConfig->AddChannel(name, channel); } protected: diff --git a/fairmq/options/FairMQParser.cxx b/fairmq/options/FairMQParser.cxx index b9af5968..ac589634 100644 --- a/fairmq/options/FairMQParser.cxx +++ b/fairmq/options/FairMQParser.cxx @@ -8,7 +8,7 @@ /* * File: FairMQParser.cxx * Author: winckler - * + * * Created on May 14, 2015, 5:01 PM */ @@ -18,8 +18,11 @@ #include #include +#include using namespace std; +using namespace fair::mq::tools; +using namespace boost::property_tree; namespace fair { @@ -28,77 +31,42 @@ namespace mq namespace parser { -// TODO : add key-value map parameter for replacing/updating values from keys -// function that convert property tree (given the json structure) to FairMQChannelMap -FairMQChannelMap ptreeToMQMap(const boost::property_tree::ptree& pt, const string& id, const string& rootNode) +fair::mq::Properties ptreeToProperties(const ptree& pt, const string& id) { if (id == "") { throw ParserError("no device ID provided. Provide with `--id` cmd option"); } - // Create fair mq map - FairMQChannelMap channelMap; - // boost::property_tree::json_parser::write_json(std::cout, pt); - // Helper::PrintDeviceList(pt.get_child(rootNode)); - // Extract value from boost::property_tree - Helper::DeviceParser(pt.get_child(rootNode), channelMap, id); - - if (channelMap.empty()) { - LOG(warn) << "---- No channel keys found for " << id; - LOG(warn) << "---- Check the JSON inputs and/or command line inputs"; - } - - return channelMap; + return Helper::DeviceParser(pt.get_child("fairMQOptions"), id); } -FairMQChannelMap JSON::UserParser(const string& filename, const string& deviceId, const string& rootNode) +fair::mq::Properties JSON::UserParser(const string& filename, const string& deviceId) { - boost::property_tree::ptree pt; - boost::property_tree::read_json(filename, pt); - return ptreeToMQMap(pt, deviceId, rootNode); + ptree input; + LOG(debug) << "Parsing JSON from " << filename << " ..."; + read_json(filename, input); + return ptreeToProperties(input, deviceId); } namespace Helper { -void PrintDeviceList(const boost::property_tree::ptree& tree) +fair::mq::Properties DeviceParser(const ptree& fairMQOptions, const string& deviceId) { - string deviceIdKey; + fair::mq::Properties properties; - // do a first loop just to print the device-id in json input - for (const auto& p : tree) { - if (p.first == "devices") { - for (const auto& q : p.second.get_child("")) { - string key = q.second.get("key", ""); - if (key != "") { - deviceIdKey = key; - LOG(debug) << "Found config for device key '" << deviceIdKey << "' in JSON input"; - } else { - deviceIdKey = q.second.get("id"); - LOG(debug) << "Found config for device id '" << deviceIdKey << "' in JSON input"; - } - } - } - } -} + for (const auto& node : fairMQOptions) { + if (node.first == "devices") { + for (const auto& device : node.second) { + string deviceIdKey; -void DeviceParser(const boost::property_tree::ptree& tree, FairMQChannelMap& channelMap, const string& deviceId) -{ - string deviceIdKey; - - // For each node in fairMQOptions - for (const auto& p : tree) { - if (p.first == "devices") { - for (const auto& q : p.second) { // check if key is provided, otherwise use id - string key = q.second.get("key", ""); + string key = device.second.get("key", ""); if (key != "") { deviceIdKey = key; - // LOG(trace) << "Found config for device key '" << deviceIdKey << "' in JSON input"; } else { - deviceIdKey = q.second.get("id"); - // LOG(trace) << "Found config for device id '" << deviceIdKey << "' in JSON input"; + deviceIdKey = device.second.get("id"); } // if not correct device id, do not fill MQMap @@ -108,146 +76,146 @@ void DeviceParser(const boost::property_tree::ptree& tree, FairMQChannelMap& cha LOG(trace) << "Found following channels for device ID '" << deviceId << "' :"; - ChannelParser(q.second, channelMap); + ChannelParser(device.second, properties); } } } + + return properties; } -void ChannelParser(const boost::property_tree::ptree& tree, FairMQChannelMap& channelMap) +void ChannelParser(const ptree& tree, fair::mq::Properties& properties) { - string channelKey; + for (const auto& node : tree) { + if (node.first == "channels") { + for (const auto& cn : node.second) { + fair::mq::Properties commonProperties; + commonProperties.emplace("type", cn.second.get("type", FairMQChannel::DefaultType)); + commonProperties.emplace("method", cn.second.get("method", FairMQChannel::DefaultMethod)); + commonProperties.emplace("address", cn.second.get("address", FairMQChannel::DefaultAddress)); + commonProperties.emplace("transport", cn.second.get("transport", FairMQChannel::DefaultTransportName)); + commonProperties.emplace("sndBufSize", cn.second.get("sndBufSize", FairMQChannel::DefaultSndBufSize)); + commonProperties.emplace("rcvBufSize", cn.second.get("rcvBufSize", FairMQChannel::DefaultRcvBufSize)); + commonProperties.emplace("sndKernelSize", cn.second.get("sndKernelSize", FairMQChannel::DefaultSndKernelSize)); + commonProperties.emplace("rcvKernelSize", cn.second.get("rcvKernelSize", FairMQChannel::DefaultRcvKernelSize)); + commonProperties.emplace("linger", cn.second.get("linger", FairMQChannel::DefaultLinger)); + commonProperties.emplace("rateLogging", cn.second.get("rateLogging", FairMQChannel::DefaultRateLogging)); + commonProperties.emplace("portRangeMin", cn.second.get("portRangeMin", FairMQChannel::DefaultPortRangeMin)); + commonProperties.emplace("portRangeMax", cn.second.get("portRangeMax", FairMQChannel::DefaultPortRangeMax)); + commonProperties.emplace("autoBind", cn.second.get("autoBind", FairMQChannel::DefaultAutoBind)); - for (const auto& p : tree) { - if (p.first == "channels") { - for (const auto& q : p.second) { - channelKey = q.second.get("name"); - - int numSockets = q.second.get("numSockets", 0); - - // try to get common properties to use for all subChannels - FairMQChannel commonChannel; - commonChannel.UpdateType(q.second.get("type", commonChannel.GetType())); - commonChannel.UpdateMethod(q.second.get("method", commonChannel.GetMethod())); - commonChannel.UpdateAddress(q.second.get("address", commonChannel.GetAddress())); - commonChannel.UpdateTransport(q.second.get("transport", commonChannel.GetTransportName())); - commonChannel.UpdateSndBufSize(q.second.get("sndBufSize", commonChannel.GetSndBufSize())); - commonChannel.UpdateRcvBufSize(q.second.get("rcvBufSize", commonChannel.GetRcvBufSize())); - commonChannel.UpdateSndKernelSize(q.second.get("sndKernelSize", commonChannel.GetSndKernelSize())); - commonChannel.UpdateRcvKernelSize(q.second.get("rcvKernelSize", commonChannel.GetRcvKernelSize())); - commonChannel.UpdateLinger(q.second.get("linger", commonChannel.GetLinger())); - commonChannel.UpdateRateLogging(q.second.get("rateLogging", commonChannel.GetRateLogging())); - commonChannel.UpdatePortRangeMin(q.second.get("portRangeMin", commonChannel.GetPortRangeMin())); - commonChannel.UpdatePortRangeMax(q.second.get("portRangeMax", commonChannel.GetPortRangeMax())); - commonChannel.UpdateAutoBind(q.second.get("autoBind", commonChannel.GetAutoBind())); - - // temporary FairMQChannel container - vector channelList; + string name = cn.second.get("name"); + int numSockets = cn.second.get("numSockets", 0); if (numSockets > 0) { - LOG(trace) << "" << channelKey << ":"; - LOG(trace) << "\tnumSockets of " << numSockets << " specified,"; - LOG(trace) << "\tapplying common settings to each:"; + LOG(trace) << name << ":"; + LOG(trace) << "\tnumSockets of " << numSockets << " specified, applying common settings to each:"; - LOG(trace) << "\ttype = " << commonChannel.GetType(); - LOG(trace) << "\tmethod = " << commonChannel.GetMethod(); - LOG(trace) << "\taddress = " << commonChannel.GetAddress(); - LOG(trace) << "\ttransport = " << commonChannel.GetTransportName(); - LOG(trace) << "\tsndBufSize = " << commonChannel.GetSndBufSize(); - LOG(trace) << "\trcvBufSize = " << commonChannel.GetRcvBufSize(); - LOG(trace) << "\tsndKernelSize = " << commonChannel.GetSndKernelSize(); - LOG(trace) << "\trcvKernelSize = " << commonChannel.GetRcvKernelSize(); - LOG(trace) << "\tlinger = " << commonChannel.GetLinger(); - LOG(trace) << "\trateLogging = " << commonChannel.GetRateLogging(); - LOG(trace) << "\tportRangeMin = " << commonChannel.GetPortRangeMin(); - LOG(trace) << "\tportRangeMax = " << commonChannel.GetPortRangeMax(); - LOG(trace) << "\tautoBind = " << commonChannel.GetAutoBind(); + // TODO: make a loop out of this + LOG(trace) << "\ttype = " << boost::any_cast(commonProperties.at("type")); + LOG(trace) << "\tmethod = " << boost::any_cast(commonProperties.at("method")); + LOG(trace) << "\taddress = " << boost::any_cast(commonProperties.at("address")); + LOG(trace) << "\ttransport = " << boost::any_cast(commonProperties.at("transport")); + LOG(trace) << "\tsndBufSize = " << boost::any_cast(commonProperties.at("sndBufSize")); + LOG(trace) << "\trcvBufSize = " << boost::any_cast(commonProperties.at("rcvBufSize")); + LOG(trace) << "\tsndKernelSize = " << boost::any_cast(commonProperties.at("sndKernelSize")); + LOG(trace) << "\trcvKernelSize = " << boost::any_cast(commonProperties.at("rcvKernelSize")); + LOG(trace) << "\tlinger = " << boost::any_cast(commonProperties.at("linger")); + LOG(trace) << "\trateLogging = " << boost::any_cast(commonProperties.at("rateLogging")); + LOG(trace) << "\tportRangeMin = " << boost::any_cast(commonProperties.at("portRangeMin")); + LOG(trace) << "\tportRangeMax = " << boost::any_cast(commonProperties.at("portRangeMax")); + LOG(trace) << "\tautoBind = " << boost::any_cast(commonProperties.at("autoBind")); for (int i = 0; i < numSockets; ++i) { - FairMQChannel channel(commonChannel); - channelList.push_back(channel); + for (const auto& p : commonProperties) { + properties.emplace(ToString("chans.", name, ".", i, ".", p.first), p.second); + } } } else { - SocketParser(q.second.get_child(""), channelList, channelKey, commonChannel); + SubChannelParser(cn.second.get_child(""), properties, name, commonProperties); } - - channelMap.insert(make_pair(channelKey, move(channelList))); } } } } -void SocketParser(const boost::property_tree::ptree& tree, vector& channelList, const string& channelName, const FairMQChannel& commonChannel) +void SubChannelParser(const ptree& channelTree, fair::mq::Properties& properties, const string& channelName, const fair::mq::Properties& commonProperties) { // for each socket in channel - int socketCounter = 0; + int i = 0; - for (const auto& p : tree) { - if (p.first == "sockets") { - for (const auto& q : p.second) { - // create new channel and apply setting from the common channel - FairMQChannel channel(commonChannel); + for (const auto& node : channelTree) { + if (node.first == "sockets") { + for (const auto& sn : node.second) { + // a sub-channel inherits relevant properties from the common channel ... + fair::mq::Properties newProperties(commonProperties); - // if the socket field specifies or overrides something from the common channel, apply those settings - channel.UpdateType(q.second.get("type", channel.GetType())); - channel.UpdateMethod(q.second.get("method", channel.GetMethod())); - channel.UpdateAddress(q.second.get("address", channel.GetAddress())); - channel.UpdateTransport(q.second.get("transport", channel.GetTransportName())); - channel.UpdateSndBufSize(q.second.get("sndBufSize", channel.GetSndBufSize())); - channel.UpdateRcvBufSize(q.second.get("rcvBufSize", channel.GetRcvBufSize())); - channel.UpdateSndKernelSize(q.second.get("sndKernelSize", channel.GetSndKernelSize())); - channel.UpdateRcvKernelSize(q.second.get("rcvKernelSize", channel.GetRcvKernelSize())); - channel.UpdateLinger(q.second.get("linger", channel.GetLinger())); - channel.UpdateRateLogging(q.second.get("rateLogging", channel.GetRateLogging())); - channel.UpdatePortRangeMin(q.second.get("portRangeMin", channel.GetPortRangeMin())); - channel.UpdatePortRangeMax(q.second.get("portRangeMax", channel.GetPortRangeMax())); - channel.UpdateAutoBind(q.second.get("autoBind", channel.GetAutoBind())); + // ... and adds/overwrites its own properties + newProperties["type"] = sn.second.get("type", boost::any_cast(commonProperties.at("type"))); + newProperties["method"] = sn.second.get("method", boost::any_cast(commonProperties.at("method"))); + newProperties["address"] = sn.second.get("address", boost::any_cast(commonProperties.at("address"))); + newProperties["transport"] = sn.second.get("transport", boost::any_cast(commonProperties.at("transport"))); + newProperties["sndBufSize"] = sn.second.get("sndBufSize", boost::any_cast(commonProperties.at("sndBufSize"))); + newProperties["rcvBufSize"] = sn.second.get("rcvBufSize", boost::any_cast(commonProperties.at("rcvBufSize"))); + newProperties["sndKernelSize"] = sn.second.get("sndKernelSize", boost::any_cast(commonProperties.at("sndKernelSize"))); + newProperties["rcvKernelSize"] = sn.second.get("rcvKernelSize", boost::any_cast(commonProperties.at("rcvKernelSize"))); + newProperties["linger"] = sn.second.get("linger", boost::any_cast(commonProperties.at("linger"))); + newProperties["rateLogging"] = sn.second.get("rateLogging", boost::any_cast(commonProperties.at("rateLogging"))); + newProperties["portRangeMin"] = sn.second.get("portRangeMin", boost::any_cast(commonProperties.at("portRangeMin"))); + newProperties["portRangeMax"] = sn.second.get("portRangeMax", boost::any_cast(commonProperties.at("portRangeMax"))); + newProperties["autoBind"] = sn.second.get("autoBind", boost::any_cast(commonProperties.at("autoBind"))); - LOG(trace) << "" << channelName << "[" << socketCounter << "]:"; - LOG(trace) << "\ttype = " << channel.GetType(); - LOG(trace) << "\tmethod = " << channel.GetMethod(); - LOG(trace) << "\taddress = " << channel.GetAddress(); - LOG(trace) << "\ttransport = " << channel.GetTransportName(); - LOG(trace) << "\tsndBufSize = " << channel.GetSndBufSize(); - LOG(trace) << "\trcvBufSize = " << channel.GetRcvBufSize(); - LOG(trace) << "\tsndKernelSize = " << channel.GetSndKernelSize(); - LOG(trace) << "\trcvKernelSize = " << channel.GetRcvKernelSize(); - LOG(trace) << "\tlinger = " << channel.GetLinger(); - LOG(trace) << "\trateLogging = " << channel.GetRateLogging(); - LOG(trace) << "\tportRangeMin = " << channel.GetPortRangeMin(); - LOG(trace) << "\tportRangeMax = " << channel.GetPortRangeMax(); - LOG(trace) << "\tautoBind = " << channel.GetAutoBind(); + LOG(trace) << "" << channelName << "[" << i << "]:"; + // TODO: make a loop out of this + LOG(trace) << "\ttype = " << boost::any_cast(newProperties.at("type")); + LOG(trace) << "\tmethod = " << boost::any_cast(newProperties.at("method")); + LOG(trace) << "\taddress = " << boost::any_cast(newProperties.at("address")); + LOG(trace) << "\ttransport = " << boost::any_cast(newProperties.at("transport")); + LOG(trace) << "\tsndBufSize = " << boost::any_cast(newProperties.at("sndBufSize")); + LOG(trace) << "\trcvBufSize = " << boost::any_cast(newProperties.at("rcvBufSize")); + LOG(trace) << "\tsndKernelSize = " << boost::any_cast(newProperties.at("sndKernelSize")); + LOG(trace) << "\trcvKernelSize = " << boost::any_cast(newProperties.at("rcvKernelSize")); + LOG(trace) << "\tlinger = " << boost::any_cast(newProperties.at("linger")); + LOG(trace) << "\trateLogging = " << boost::any_cast(newProperties.at("rateLogging")); + LOG(trace) << "\tportRangeMin = " << boost::any_cast(newProperties.at("portRangeMin")); + LOG(trace) << "\tportRangeMax = " << boost::any_cast(newProperties.at("portRangeMax")); + LOG(trace) << "\tautoBind = " << boost::any_cast(newProperties.at("autoBind")); - channelList.push_back(channel); - ++socketCounter; + for (const auto& p : newProperties) { + properties.emplace(ToString("chans.", channelName, ".", i, ".", p.first), p.second); + } + ++i; } } } // end socket loop - if (socketCounter) { - LOG(trace) << "Found " << socketCounter << " socket(s) in channel."; + if (i > 0) { + LOG(trace) << "Found " << i << " socket(s) in channel."; } else { LOG(trace) << "" << channelName << ":"; LOG(trace) << "\tNo sockets specified,"; LOG(trace) << "\tapplying common settings to the channel:"; - FairMQChannel channel(commonChannel); + fair::mq::Properties newProperties(commonProperties); - LOG(trace) << "\ttype = " << channel.GetType(); - LOG(trace) << "\tmethod = " << channel.GetMethod(); - LOG(trace) << "\taddress = " << channel.GetAddress(); - LOG(trace) << "\ttransport = " << channel.GetTransportName(); - LOG(trace) << "\tsndBufSize = " << channel.GetSndBufSize(); - LOG(trace) << "\trcvBufSize = " << channel.GetRcvBufSize(); - LOG(trace) << "\tsndKernelSize = " << channel.GetSndKernelSize(); - LOG(trace) << "\trcvKernelSize = " << channel.GetRcvKernelSize(); - LOG(trace) << "\tlinger = " << channel.GetLinger(); - LOG(trace) << "\trateLogging = " << channel.GetRateLogging(); - LOG(trace) << "\tportRangeMin = " << channel.GetPortRangeMin(); - LOG(trace) << "\tportRangeMax = " << channel.GetPortRangeMax(); - LOG(trace) << "\tautoBind = " << channel.GetAutoBind(); + // TODO: make a loop out of this + LOG(trace) << "\ttype = " << boost::any_cast(newProperties.at("type")); + LOG(trace) << "\tmethod = " << boost::any_cast(newProperties.at("method")); + LOG(trace) << "\taddress = " << boost::any_cast(newProperties.at("address")); + LOG(trace) << "\ttransport = " << boost::any_cast(newProperties.at("transport")); + LOG(trace) << "\tsndBufSize = " << boost::any_cast(newProperties.at("sndBufSize")); + LOG(trace) << "\trcvBufSize = " << boost::any_cast(newProperties.at("rcvBufSize")); + LOG(trace) << "\tsndKernelSize = " << boost::any_cast(newProperties.at("sndKernelSize")); + LOG(trace) << "\trcvKernelSize = " << boost::any_cast(newProperties.at("rcvKernelSize")); + LOG(trace) << "\tlinger = " << boost::any_cast(newProperties.at("linger")); + LOG(trace) << "\trateLogging = " << boost::any_cast(newProperties.at("rateLogging")); + LOG(trace) << "\tportRangeMin = " << boost::any_cast(newProperties.at("portRangeMin")); + LOG(trace) << "\tportRangeMax = " << boost::any_cast(newProperties.at("portRangeMax")); + LOG(trace) << "\tautoBind = " << boost::any_cast(newProperties.at("autoBind")); - channelList.push_back(channel); + for (const auto& p : newProperties) { + properties.emplace(ToString("chans.", channelName, ".0.", p.first), p.second); + } } } diff --git a/fairmq/options/FairMQParser.h b/fairmq/options/FairMQParser.h index ce1c8c85..f451bb4b 100644 --- a/fairmq/options/FairMQParser.h +++ b/fairmq/options/FairMQParser.h @@ -1,4 +1,11 @@ -/* +/******************************************************************************** + * Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +/* * File: FairMQParser.h * Author: winckler * @@ -16,6 +23,7 @@ #include #include "FairMQChannel.h" +#include "Properties.h" namespace fair { @@ -24,24 +32,21 @@ namespace mq namespace parser { -using FairMQChannelMap = std::unordered_map>; - struct ParserError : std::runtime_error { using std::runtime_error::runtime_error; }; -FairMQChannelMap ptreeToMQMap(const boost::property_tree::ptree& pt, const std::string& deviceId, const std::string& rootNode); +fair::mq::Properties ptreeToProperties(const boost::property_tree::ptree& pt, const std::string& deviceId); struct JSON { - FairMQChannelMap UserParser(const std::string& filename, const std::string& deviceId, const std::string& rootNode = "fairMQOptions"); + fair::mq::Properties UserParser(const std::string& filename, const std::string& deviceId); }; namespace Helper { -void PrintDeviceList(const boost::property_tree::ptree& tree); -void DeviceParser(const boost::property_tree::ptree& tree, FairMQChannelMap& channelMap, const std::string& deviceId); -void ChannelParser(const boost::property_tree::ptree& tree, FairMQChannelMap& channelMap); -void SocketParser(const boost::property_tree::ptree& tree, std::vector& channelList, const std::string& channelName, const FairMQChannel& commonChannel); +fair::mq::Properties DeviceParser(const boost::property_tree::ptree& tree, const std::string& deviceId); +void ChannelParser(const boost::property_tree::ptree& tree, fair::mq::Properties& properties); +void SubChannelParser(const boost::property_tree::ptree& tree, fair::mq::Properties& properties, const std::string& channelName, const fair::mq::Properties& commonProperties); } // Helper namespace diff --git a/fairmq/options/FairMQProgOptions.cxx b/fairmq/options/FairMQProgOptions.cxx index 7e16046a..02ac5662 100644 --- a/fairmq/options/FairMQProgOptions.cxx +++ b/fairmq/options/FairMQProgOptions.cxx @@ -23,13 +23,12 @@ #include #include #include // join/split -#include +#include #include #include #include #include -#include using namespace std; using namespace fair::mq; @@ -37,6 +36,13 @@ using boost::any_cast; namespace po = boost::program_options; +struct ValInfo +{ + string value; + string type; + string origin; +}; + template ostream& operator<<(ostream& os, const vector& v) { @@ -64,37 +70,78 @@ pair getStringPair(const boost::any& v, const string& label) return { ss.str(), label }; } -unordered_map(*)(const boost::any&)> FairMQProgOptions::fValInfos = { - { type_index(typeid(string)), [](const boost::any& v) { return pair{ any_cast(v), "" }; } }, - { type_index(typeid(int)), [](const boost::any& v) { return getString(v, ""); } }, - { type_index(typeid(size_t)), [](const boost::any& v) { return getString(v, ""); } }, - { type_index(typeid(uint32_t)), [](const boost::any& v) { return getString(v, ""); } }, - { type_index(typeid(uint64_t)), [](const boost::any& v) { return getString(v, ""); } }, - { type_index(typeid(long)), [](const boost::any& v) { return getString(v, ""); } }, - { type_index(typeid(long long)), [](const boost::any& v) { return getString(v, ""); } }, - { type_index(typeid(unsigned)), [](const boost::any& v) { return getString(v, ""); } }, - { type_index(typeid(unsigned long)), [](const boost::any& v) { return getString(v, ""); } }, - { type_index(typeid(unsigned long long)), [](const boost::any& v) { return getString(v, ""); } }, - { type_index(typeid(float)), [](const boost::any& v) { return getString(v, ""); } }, - { type_index(typeid(double)), [](const boost::any& v) { return getString(v, ""); } }, - { type_index(typeid(long double)), [](const boost::any& v) { return getString(v, ""); } }, - { type_index(typeid(bool)), [](const boost::any& v) { stringstream ss; ss << boolalpha << any_cast(v); return pair{ ss.str(), "" }; } }, - { type_index(typeid(vector)), [](const boost::any& v) { stringstream ss; ss << boolalpha << any_cast>(v); return pair{ ss.str(), ">" }; } }, - { type_index(typeid(boost::filesystem::path)), [](const boost::any& v) { return getStringPair(v, ""); } }, - { type_index(typeid(vector)), [](const boost::any& v) { return getStringPair>(v, ">"); } }, - { type_index(typeid(vector)), [](const boost::any& v) { return getStringPair>(v, ">"); } }, - { type_index(typeid(vector)), [](const boost::any& v) { return getStringPair>(v, ">"); } }, - { type_index(typeid(vector)), [](const boost::any& v) { return getStringPair>(v, ">"); } }, - { type_index(typeid(vector)), [](const boost::any& v) { return getStringPair>(v, ">"); } }, - { type_index(typeid(vector)), [](const boost::any& v) { return getStringPair>(v, ">"); } }, - { type_index(typeid(vector)), [](const boost::any& v) { return getStringPair>(v, ">"); } }, - { type_index(typeid(vector)), [](const boost::any& v) { return getStringPair>(v, ">"); } }, - { type_index(typeid(vector)), [](const boost::any& v) { return getStringPair>(v, ">"); } }, - { type_index(typeid(vector)), [](const boost::any& v) { return getStringPair>(v, ">"); } }, - { type_index(typeid(vector)), [](const boost::any& v) { return getStringPair>(v, ">"); } }, - { type_index(typeid(vector)), [](const boost::any& v) { return getStringPair>(v, ">"); } }, - { type_index(typeid(vector)), [](const boost::any& v) { return getStringPair>(v, ">"); } }, - { type_index(typeid(vector)), [](const boost::any& v) { return getStringPair>(v, ">"); } }, +unordered_map(const Property&)>> FairMQProgOptions::fTypeInfos = { + { type_index(typeid(char)), [](const Property& p) { return pair{ string(1, any_cast(p)), "char" }; } }, + { type_index(typeid(unsigned char)), [](const Property& p) { return pair{ string(1, any_cast(p)), "unsigned char" }; } }, + { type_index(typeid(string)), [](const Property& p) { return pair{ any_cast(p), "string" }; } }, + { type_index(typeid(int)), [](const Property& p) { return getString(p, "int"); } }, + { type_index(typeid(size_t)), [](const Property& p) { return getString(p, "size_t"); } }, + { type_index(typeid(uint32_t)), [](const Property& p) { return getString(p, "uint32_t"); } }, + { type_index(typeid(uint64_t)), [](const Property& p) { return getString(p, "uint64_t"); } }, + { type_index(typeid(long)), [](const Property& p) { return getString(p, "long"); } }, + { type_index(typeid(long long)), [](const Property& p) { return getString(p, "long long"); } }, + { type_index(typeid(unsigned)), [](const Property& p) { return getString(p, "unsigned"); } }, + { type_index(typeid(unsigned long)), [](const Property& p) { return getString(p, "unsigned long"); } }, + { type_index(typeid(unsigned long long)), [](const Property& p) { return getString(p, "unsigned long long"); } }, + { type_index(typeid(float)), [](const Property& p) { return getString(p, "float"); } }, + { type_index(typeid(double)), [](const Property& p) { return getString(p, "double"); } }, + { type_index(typeid(long double)), [](const Property& p) { return getString(p, "long double"); } }, + { type_index(typeid(bool)), [](const Property& p) { stringstream ss; ss << boolalpha << any_cast(p); return pair{ ss.str(), "bool" }; } }, + { type_index(typeid(vector)), [](const Property& p) { stringstream ss; ss << boolalpha << any_cast>(p); return pair{ ss.str(), "vector>" }; } }, + { type_index(typeid(boost::filesystem::path)), [](const Property& p) { return getStringPair(p, "boost::filesystem::path"); } }, + { type_index(typeid(vector)), [](const Property& p) { return getStringPair>(p, "vector"); } }, + { type_index(typeid(vector)), [](const Property& p) { return getStringPair>(p, "vector"); } }, + { type_index(typeid(vector)), [](const Property& p) { return getStringPair>(p, "vector"); } }, + { type_index(typeid(vector)), [](const Property& p) { return getStringPair>(p, "vector"); } }, + { type_index(typeid(vector)), [](const Property& p) { return getStringPair>(p, "vector"); } }, + { type_index(typeid(vector)), [](const Property& p) { return getStringPair>(p, "vector"); } }, + { type_index(typeid(vector)), [](const Property& p) { return getStringPair>(p, "vector"); } }, + { type_index(typeid(vector)), [](const Property& p) { return getStringPair>(p, "vector"); } }, + { type_index(typeid(vector)), [](const Property& p) { return getStringPair>(p, "vector"); } }, + { type_index(typeid(vector)), [](const Property& p) { return getStringPair>(p, "vector"); } }, + { type_index(typeid(vector)), [](const Property& p) { return getStringPair>(p, "vector"); } }, + { type_index(typeid(vector)), [](const Property& p) { return getStringPair>(p, "vector"); } }, + { type_index(typeid(vector)), [](const Property& p) { return getStringPair>(p, "vector"); } }, + { type_index(typeid(vector)), [](const Property& p) { return getStringPair>(p, "vector"); } }, + { type_index(typeid(vector)), [](const Property& p) { return getStringPair>(p, "vector"); } }, + { type_index(typeid(vector)), [](const Property& p) { return getStringPair>(p, "vector"); } }, +}; + +unordered_map FairMQProgOptions::fEventEmitters = { + { type_index(typeid(char)), [](const EventManager& em, const string& k, const Property& p) { em.Emit(k, any_cast(p)); } }, + { type_index(typeid(unsigned char)), [](const EventManager& em, const string& k, const Property& p) { em.Emit(k, any_cast(p)); } }, + { type_index(typeid(string)), [](const EventManager& em, const string& k, const Property& p) { em.Emit(k, any_cast(p)); } }, + { type_index(typeid(int)), [](const EventManager& em, const string& k, const Property& p) { em.Emit(k, any_cast(p)); } }, + { type_index(typeid(size_t)), [](const EventManager& em, const string& k, const Property& p) { em.Emit(k, any_cast(p)); } }, + { type_index(typeid(uint32_t)), [](const EventManager& em, const string& k, const Property& p) { em.Emit(k, any_cast(p)); } }, + { type_index(typeid(uint64_t)), [](const EventManager& em, const string& k, const Property& p) { em.Emit(k, any_cast(p)); } }, + { type_index(typeid(long)), [](const EventManager& em, const string& k, const Property& p) { em.Emit(k, any_cast(p)); } }, + { type_index(typeid(long long)), [](const EventManager& em, const string& k, const Property& p) { em.Emit(k, any_cast(p)); } }, + { type_index(typeid(unsigned)), [](const EventManager& em, const string& k, const Property& p) { em.Emit(k, any_cast(p)); } }, + { type_index(typeid(unsigned long)), [](const EventManager& em, const string& k, const Property& p) { em.Emit(k, any_cast(p)); } }, + { type_index(typeid(unsigned long long)), [](const EventManager& em, const string& k, const Property& p) { em.Emit(k, any_cast(p)); } }, + { type_index(typeid(float)), [](const EventManager& em, const string& k, const Property& p) { em.Emit(k, any_cast(p)); } }, + { type_index(typeid(double)), [](const EventManager& em, const string& k, const Property& p) { em.Emit(k, any_cast(p)); } }, + { type_index(typeid(long double)), [](const EventManager& em, const string& k, const Property& p) { em.Emit(k, any_cast(p)); } }, + { type_index(typeid(bool)), [](const EventManager& em, const string& k, const Property& p) { em.Emit(k, any_cast(p)); } }, + { type_index(typeid(vector)), [](const EventManager& em, const string& k, const Property& p) { em.Emit>(k, any_cast>(p)); } }, + { type_index(typeid(boost::filesystem::path)), [](const EventManager& em, const string& k, const Property& p) { em.Emit(k, any_cast(p)); } }, + { type_index(typeid(vector)), [](const EventManager& em, const string& k, const Property& p) { em.Emit>(k, any_cast>(p)); } }, + { type_index(typeid(vector)), [](const EventManager& em, const string& k, const Property& p) { em.Emit>(k, any_cast>(p)); } }, + { type_index(typeid(vector)), [](const EventManager& em, const string& k, const Property& p) { em.Emit>(k, any_cast>(p)); } }, + { type_index(typeid(vector)), [](const EventManager& em, const string& k, const Property& p) { em.Emit>(k, any_cast>(p)); } }, + { type_index(typeid(vector)), [](const EventManager& em, const string& k, const Property& p) { em.Emit>(k, any_cast>(p)); } }, + { type_index(typeid(vector)), [](const EventManager& em, const string& k, const Property& p) { em.Emit>(k, any_cast>(p)); } }, + { type_index(typeid(vector)), [](const EventManager& em, const string& k, const Property& p) { em.Emit>(k, any_cast>(p)); } }, + { type_index(typeid(vector)), [](const EventManager& em, const string& k, const Property& p) { em.Emit>(k, any_cast>(p)); } }, + { type_index(typeid(vector)), [](const EventManager& em, const string& k, const Property& p) { em.Emit>(k, any_cast>(p)); } }, + { type_index(typeid(vector)), [](const EventManager& em, const string& k, const Property& p) { em.Emit>(k, any_cast>(p)); } }, + { type_index(typeid(vector)), [](const EventManager& em, const string& k, const Property& p) { em.Emit>(k, any_cast>(p)); } }, + { type_index(typeid(vector)), [](const EventManager& em, const string& k, const Property& p) { em.Emit>(k, any_cast>(p)); } }, + { type_index(typeid(vector)), [](const EventManager& em, const string& k, const Property& p) { em.Emit>(k, any_cast>(p)); } }, + { type_index(typeid(vector)), [](const EventManager& em, const string& k, const Property& p) { em.Emit>(k, any_cast>(p)); } }, + { type_index(typeid(vector)), [](const EventManager& em, const string& k, const Property& p) { em.Emit>(k, any_cast>(p)); } }, + { type_index(typeid(vector)), [](const EventManager& em, const string& k, const Property& p) { em.Emit>(k, any_cast>(p)); } }, }; namespace fair @@ -102,6 +149,12 @@ namespace fair namespace mq { +string ConvertPropertyToString(const Property& p) +{ + pair info = FairMQProgOptions::fTypeInfos.at(p.type())(p); + return info.first; +} + ValInfo ConvertVarValToValInfo(const po::variable_value& v) { string origin; @@ -115,10 +168,9 @@ ValInfo ConvertVarValToValInfo(const po::variable_value& v) } try { - pair info = FairMQProgOptions::fValInfos.at(v.value().type())(v.value()); + pair info = FairMQProgOptions::fTypeInfos.at(v.value().type())(v.value()); return {info.first, info.second, origin}; - } catch (out_of_range& oor) - { + } catch (out_of_range& oor) { return {string("[unidentified_type]"), string("[unidentified_type]"), origin}; } }; @@ -133,14 +185,11 @@ string ConvertVarValToString(const po::variable_value& v) FairMQProgOptions::FairMQProgOptions() : fVarMap() - , fFairMQChannelMap() , fAllOptions("FairMQ Command Line Options") , fGeneralOptions("General options") , fMQOptions("FairMQ device options") , fParserOptions("FairMQ channel config parser options") , fMtx() - , fChannelInfo() - , fChannelKeyMap() , fUnregisteredOptions() , fEvents() { @@ -179,6 +228,149 @@ FairMQProgOptions::FairMQProgOptions() ParseDefaults(); } +unordered_map FairMQProgOptions::GetChannelInfo() const +{ + lock_guard lock(fMtx); + return GetChannelInfoImpl(); +} + +unordered_map FairMQProgOptions::GetChannelInfoImpl() const +{ + unordered_map info; + + boost::regex re("chans\\..*\\.type"); + for (const auto& m : fVarMap) { + if (boost::regex_match(m.first, re)) { + string chan = m.first.substr(6); + string::size_type n = chan.find("."); + string chanName = chan.substr(0, n); + + if (info.find(chanName) == info.end()) { + info.emplace(chanName, 1); + } else { + info[chanName] = info[chanName] + 1; + } + } + } + + return info; +} + +Properties FairMQProgOptions::GetProperties(const string& q) const +{ + boost::regex re(q); + Properties result; + + lock_guard lock(fMtx); + + for (const auto& m : fVarMap) { + if (boost::regex_match(m.first, re)) { + result.emplace(m.first, m.second.value()); + } + } + + if (result.size() == 0) { + LOG(warn) << "could not find anything with \"" << q << "\""; + } + + return result; +} + +map FairMQProgOptions::GetPropertiesAsString(const string& q) const +{ + boost::regex re(q); + map result; + + lock_guard lock(fMtx); + + for (const auto& m : fVarMap) { + if (boost::regex_match(m.first, re)) { + result.emplace(m.first, ConvertPropertyToString(m.second.value())); + } + } + + if (result.size() == 0) { + LOG(warn) << "could not find anything with \"" << q << "\""; + } + + return result; +} + +Properties FairMQProgOptions::GetPropertiesStartingWith(const string& q) const +{ + Properties result; + + lock_guard lock(fMtx); + + for (const auto& m : fVarMap) { + if (m.first.compare(0, q.length(), q) == 0) { + result.emplace(m.first, m.second.value()); + } + } + + return result; +} + +void FairMQProgOptions::SetProperties(const Properties& input) +{ + unique_lock lock(fMtx); + + map& vm = fVarMap; + for (const auto& m : input) { + vm[m.first].value() = m.second; + } + + lock.unlock(); + + for (const auto& m : input) { + fEventEmitters.at(m.second.type())(fEvents, m.first, m.second); + fEvents.Emit(m.first, ConvertPropertyToString(m.second)); + } +} + +void FairMQProgOptions::AddChannel(const std::string& name, const FairMQChannel& channel) +{ + lock_guard lock(fMtx); + unordered_map existingChannels = GetChannelInfoImpl(); + int index = 0; + if (existingChannels.count(name) > 0) { + index = existingChannels.at(name); + } + + string prefix = fair::mq::tools::ToString("chans.", name, ".", index, "."); + + SetVarMapValue(string(prefix + "type"), channel.GetType()); + SetVarMapValue(string(prefix + "method"), channel.GetMethod()); + SetVarMapValue(string(prefix + "address"), channel.GetAddress()); + SetVarMapValue(string(prefix + "transport"), channel.GetTransportName()); + SetVarMapValue(string(prefix + "sndBufSize"), channel.GetSndBufSize()); + SetVarMapValue(string(prefix + "rcvBufSize"), channel.GetRcvBufSize()); + SetVarMapValue(string(prefix + "sndKernelSize"), channel.GetSndKernelSize()); + SetVarMapValue(string(prefix + "rcvKernelSize"), channel.GetRcvKernelSize()); + SetVarMapValue(string(prefix + "linger"), channel.GetLinger()); + SetVarMapValue(string(prefix + "rateLogging"), channel.GetRateLogging()); + SetVarMapValue(string(prefix + "portRangeMin"), channel.GetPortRangeMin()); + SetVarMapValue(string(prefix + "portRangeMax"), channel.GetPortRangeMax()); + SetVarMapValue(string(prefix + "autoBind"), channel.GetAutoBind()); +} + +void FairMQProgOptions::DeleteProperty(const string& key) +{ + lock_guard lock(fMtx); + + map& vm = fVarMap; + vm.erase(key); +} + +int FairMQProgOptions::ParseAll(const vector& cmdArgs, bool allowUnregistered) +{ + vector argv(cmdArgs.size()); + transform(cmdArgs.begin(), cmdArgs.end(), argv.begin(), [](const string& str) { + return str.c_str(); + }); + return ParseAll(argv.size(), const_cast(argv.data()), allowUnregistered); +} + int FairMQProgOptions::ParseAll(const int argc, char const* const* argv, bool allowUnregistered) { ParseCmdLine(argc, argv, allowUnregistered); @@ -227,7 +419,7 @@ int FairMQProgOptions::ParseAll(const int argc, char const* const* argv, bool al try { if (fVarMap.count("mq-config")) { LOG(debug) << "mq-config: Using default JSON parser"; - UpdateChannelMap(parser::JSON().UserParser(fVarMap.at("mq-config").as(), idForParser)); + SetProperties(parser::JSON().UserParser(fVarMap.at("mq-config").as(), idForParser)); } else if (fVarMap.count("channel-config")) { LOG(debug) << "channel-config: Parsing channel configuration"; ParseChannelsFromCmdLine(); @@ -258,7 +450,7 @@ void FairMQProgOptions::ParseChannelsFromCmdLine() idForParser = fVarMap["id"].as(); } - UpdateChannelMap(parser::SUBOPT().UserParser(fVarMap.at("channel-config").as>(), idForParser)); + SetProperties(parser::SUBOPT().UserParser(fVarMap.at("channel-config").as>(), idForParser)); } void FairMQProgOptions::ParseCmdLine(const int argc, char const* const* argv, bool allowUnregistered) @@ -293,143 +485,6 @@ void FairMQProgOptions::ParseDefaults() po::store(po::parse_command_line(argv.size(), const_cast(argv.data()), fAllOptions), fVarMap); } -unordered_map> FairMQProgOptions::GetFairMQMap() const -{ - return fFairMQChannelMap; -} - -unordered_map FairMQProgOptions::GetChannelInfo() const -{ - return fChannelInfo; -} - -// replace FairMQChannelMap, and update variable map accordingly -int FairMQProgOptions::UpdateChannelMap(const unordered_map>& channels) -{ - fFairMQChannelMap = channels; - UpdateChannelInfo(); - UpdateMQValues(); - return 0; -} - -void FairMQProgOptions::UpdateChannelInfo() -{ - fChannelInfo.clear(); - for (const auto& c : fFairMQChannelMap) { - fChannelInfo.insert(make_pair(c.first, c.second.size())); - } -} - -void FairMQProgOptions::UpdateMQValues() -{ - for (const auto& p : fFairMQChannelMap) { - int index = 0; - - for (const auto& channel : p.second) { - string typeKey = "chans." + p.first + "." + to_string(index) + ".type"; - string methodKey = "chans." + p.first + "." + to_string(index) + ".method"; - string addressKey = "chans." + p.first + "." + to_string(index) + ".address"; - string transportKey = "chans." + p.first + "." + to_string(index) + ".transport"; - string sndBufSizeKey = "chans." + p.first + "." + to_string(index) + ".sndBufSize"; - string rcvBufSizeKey = "chans." + p.first + "." + to_string(index) + ".rcvBufSize"; - string sndKernelSizeKey = "chans." + p.first + "." + to_string(index) + ".sndKernelSize"; - string rcvKernelSizeKey = "chans." + p.first + "." + to_string(index) + ".rcvKernelSize"; - string lingerKey = "chans." + p.first + "." + to_string(index) + ".linger"; - string rateLoggingKey = "chans." + p.first + "." + to_string(index) + ".rateLogging"; - string portRangeMinKey = "chans." + p.first + "." + to_string(index) + ".portRangeMin"; - string portRangeMaxKey = "chans." + p.first + "." + to_string(index) + ".portRangeMax"; - string autoBindKey = "chans." + p.first + "." + to_string(index) + ".autoBind"; - - fChannelKeyMap[typeKey] = ChannelKey{p.first, index, "type"}; - fChannelKeyMap[methodKey] = ChannelKey{p.first, index, "method"}; - fChannelKeyMap[addressKey] = ChannelKey{p.first, index, "address"}; - fChannelKeyMap[transportKey] = ChannelKey{p.first, index, "transport"}; - fChannelKeyMap[sndBufSizeKey] = ChannelKey{p.first, index, "sndBufSize"}; - fChannelKeyMap[rcvBufSizeKey] = ChannelKey{p.first, index, "rcvBufSize"}; - fChannelKeyMap[sndKernelSizeKey] = ChannelKey{p.first, index, "sndKernelSize"}; - fChannelKeyMap[rcvKernelSizeKey] = ChannelKey{p.first, index, "rcvkernelSize"}; - fChannelKeyMap[lingerKey] = ChannelKey{p.first, index, "linger"}; - fChannelKeyMap[rateLoggingKey] = ChannelKey{p.first, index, "rateLogging"}; - fChannelKeyMap[portRangeMinKey] = ChannelKey{p.first, index, "portRangeMin"}; - fChannelKeyMap[portRangeMaxKey] = ChannelKey{p.first, index, "portRangeMax"}; - fChannelKeyMap[autoBindKey] = ChannelKey{p.first, index, "autoBind"}; - - SetVarMapValue(typeKey, channel.GetType()); - SetVarMapValue(methodKey, channel.GetMethod()); - SetVarMapValue(addressKey, channel.GetAddress()); - SetVarMapValue(transportKey, channel.GetTransportName()); - SetVarMapValue(sndBufSizeKey, channel.GetSndBufSize()); - SetVarMapValue(rcvBufSizeKey, channel.GetRcvBufSize()); - SetVarMapValue(sndKernelSizeKey, channel.GetSndKernelSize()); - SetVarMapValue(rcvKernelSizeKey, channel.GetRcvKernelSize()); - SetVarMapValue(lingerKey, channel.GetLinger()); - SetVarMapValue(rateLoggingKey, channel.GetRateLogging()); - SetVarMapValue(portRangeMinKey, channel.GetPortRangeMin()); - SetVarMapValue(portRangeMaxKey, channel.GetPortRangeMax()); - SetVarMapValue(autoBindKey, channel.GetAutoBind()); - - index++; - } - - SetVarMapValue("chans." + p.first + ".numSockets", index); - } -} - -int FairMQProgOptions::UpdateChannelValue(const string& channelName, int index, const string& member, const string& val) -{ - if (member == "type") { - fFairMQChannelMap.at(channelName).at(index).UpdateType(val); - } else if (member == "method") { - fFairMQChannelMap.at(channelName).at(index).UpdateMethod(val); - } else if (member == "address") { - fFairMQChannelMap.at(channelName).at(index).UpdateAddress(val); - } else if (member == "transport") { - fFairMQChannelMap.at(channelName).at(index).UpdateTransport(val); - } else { - LOG(error) << "update of FairMQChannel map failed for the following key: " << channelName << "." << index << "." << member; - return 1; - } - - return 0; -} - -int FairMQProgOptions::UpdateChannelValue(const string& channelName, int index, const string& member, int val) -{ - if (member == "sndBufSize") { - fFairMQChannelMap.at(channelName).at(index).UpdateSndBufSize(val); - } else if (member == "rcvBufSize") { - fFairMQChannelMap.at(channelName).at(index).UpdateRcvBufSize(val); - } else if (member == "sndKernelSize") { - fFairMQChannelMap.at(channelName).at(index).UpdateSndKernelSize(val); - } else if (member == "rcvKernelSize") { - fFairMQChannelMap.at(channelName).at(index).UpdateRcvKernelSize(val); - } else if (member == "linger") { - fFairMQChannelMap.at(channelName).at(index).UpdateLinger(val); - } else if (member == "rateLogging") { - fFairMQChannelMap.at(channelName).at(index).UpdateRateLogging(val); - } else if (member == "portRangeMin") { - fFairMQChannelMap.at(channelName).at(index).UpdatePortRangeMin(val); - } else if (member == "portRangeMax") { - fFairMQChannelMap.at(channelName).at(index).UpdatePortRangeMax(val); - } else { - LOG(error) << "update of FairMQChannel map failed for the following key: " << channelName << "." << index << "." << member; - return 1; - } - - return 0; -} - -int FairMQProgOptions::UpdateChannelValue(const string& channelName, int index, const string& member, bool val) -{ - if (member == "autoBind") { - fFairMQChannelMap.at(channelName).at(index).UpdateAutoBind(val); - return 0; - } else { - LOG(error) << "update of FairMQChannel map failed for the following key: " << channelName << "." << index << "." << member; - return 1; - } -} - vector FairMQProgOptions::GetPropertyKeys() const { lock_guard lock(fMtx); @@ -457,11 +512,6 @@ po::options_description& FairMQProgOptions::GetCmdLineOptions() int FairMQProgOptions::PrintOptions() { - // -> loop over variable map and print its content - // -> In this example the following types are supported: - // string, int, float, double, short, boost::filesystem::path - // vector, vector, vector, vector, vector - map mapinfo; // get string length for formatting and convert varmap values into string @@ -493,10 +543,11 @@ int FairMQProgOptions::PrintOptions() ss << "Configuration: \n"; for (const auto& p : mapinfo) { + string type("<" + p.second.type + ">"); ss << setfill(' ') << left << setw(maxLenKey) << p.first << " = " << setw(maxLenValue) << p.second.value << " " - << setw(maxLenType) << p.second.type + << setw(maxLenType + 2) << type << " " << setw(maxLenDefault) << p.second.origin << "\n"; } @@ -526,21 +577,15 @@ int FairMQProgOptions::PrintOptionsRaw() return 0; } -string FairMQProgOptions::GetStringValue(const string& key) +string FairMQProgOptions::GetPropertyAsString(const string& key) const { lock_guard lock(fMtx); - string valueStr; - try { - if (fVarMap.count(key)) { - valueStr = ConvertVarValToString(fVarMap.at(key)); - } - } catch (exception& e) { - LOG(error) << "Exception thrown for the key '" << key << "'"; - LOG(error) << e.what(); + if (fVarMap.count(key)) { + return ConvertVarValToString(fVarMap.at(key)); } - return valueStr; + throw PropertyNotFoundException(fair::mq::tools::ToString("Config has no key: ", key)); } int FairMQProgOptions::Count(const string& key) const diff --git a/fairmq/options/FairMQProgOptions.h b/fairmq/options/FairMQProgOptions.h index 26d83215..6771330d 100644 --- a/fairmq/options/FairMQProgOptions.h +++ b/fairmq/options/FairMQProgOptions.h @@ -12,9 +12,11 @@ #include #include "FairMQLogger.h" #include "FairMQChannel.h" +#include "Properties.h" #include #include +#include #include #include @@ -22,9 +24,10 @@ #include #include #include -#include #include #include +#include +#include // pair #include namespace fair @@ -35,39 +38,20 @@ namespace mq struct PropertyChange : Event {}; struct PropertyChangeAsString : Event {}; -struct ValInfo -{ - std::string value; - std::string type; - std::string origin; -}; - } /* namespace mq */ } /* namespace fair */ class FairMQProgOptions { - private: - using FairMQChannelMap = std::unordered_map>; - public: FairMQProgOptions(); virtual ~FairMQProgOptions() {} struct PropertyNotFoundException : std::runtime_error { using std::runtime_error::runtime_error; }; - int ParseAll(const std::vector& cmdArgs, bool allowUnregistered) - { - std::vector argv(cmdArgs.size()); - transform(cmdArgs.begin(), cmdArgs.end(), argv.begin(), [](const std::string& str) { - return str.c_str(); - }); - return ParseAll(argv.size(), const_cast(argv.data()), allowUnregistered); - } - + int ParseAll(const std::vector& cmdArgs, bool allowUnregistered); int ParseAll(const int argc, char const* const* argv, bool allowUnregistered = true); - FairMQChannelMap GetFairMQMap() const; std::unordered_map GetChannelInfo() const; std::vector GetPropertyKeys() const; @@ -80,7 +64,7 @@ class FairMQProgOptions return fVarMap[key].as(); } - throw PropertyNotFoundException(fair::mq::tools::ToString("Config has no key: ", key)); + throw PropertyNotFoundException(fair::mq::tools::ToString("Config has no key: ", key)); } template @@ -95,30 +79,23 @@ class FairMQProgOptions return ifNotFound; } + fair::mq::Properties GetProperties(const std::string& q) const; + std::map GetPropertiesAsString(const std::string& q) const; + fair::mq::Properties GetPropertiesStartingWith(const std::string& q) const; + template T GetValue(const std::string& key) const // TODO: deprecate this { return GetProperty(key); } - std::map GetProperties(const std::string& q) + std::string GetPropertyAsString(const std::string& key) const; + + std::string GetStringValue(const std::string& key) const // TODO: deprecate this { - std::regex re(q); - std::map result; - - std::lock_guard lock(fMtx); - - for (const auto& m : fVarMap) { - if (std::regex_search(m.first, re)) { - result.emplace(m.first, m.second.value()); - } - } - - return result; + return GetPropertyAsString(key); } - std::string GetStringValue(const std::string& key); - template void SetProperty(const std::string& key, T val) { @@ -128,8 +105,6 @@ class FairMQProgOptions if (key == "channel-config") { ParseChannelsFromCmdLine(); - } else if (fChannelKeyMap.count(key)) { - UpdateChannelValue(fChannelKeyMap.at(key).channel, fChannelKeyMap.at(key).index, fChannelKeyMap.at(key).member, val); } lock.unlock(); @@ -145,28 +120,11 @@ class FairMQProgOptions return 0; } - void SetProperties(const std::map& input) - { - std::lock_guard lock(fMtx); + void SetProperties(const fair::mq::Properties& input); + void DeleteProperty(const std::string& key); - std::map& vm = fVarMap; - for (const auto& m : input) { - vm[m.first].value() = m.second; - } - - // TODO: call subscriptions here (after unlock) - } - - void DeleteProperty(const std::string& key) - { - std::lock_guard lock(fMtx); - - std::map& vm = fVarMap; - vm.erase(key); - } - - template - void Subscribe(const std::string& subscriber, std::function func) + template + void Subscribe(const std::string& subscriber, std::function func) const { std::lock_guard lock(fMtx); static_assert(!std::is_same::value || !std::is_same::value, @@ -174,20 +132,20 @@ class FairMQProgOptions fEvents.Subscribe(subscriber, func); } - template - void Unsubscribe(const std::string& subscriber) + template + void Unsubscribe(const std::string& subscriber) const { std::lock_guard lock(fMtx); fEvents.Unsubscribe(subscriber); } - void SubscribeAsString(const std::string& subscriber, std::function func) + void SubscribeAsString(const std::string& subscriber, std::function func) const { std::lock_guard lock(fMtx); fEvents.Subscribe(subscriber, func); } - void UnsubscribeAsString(const std::string& subscriber) + void UnsubscribeAsString(const std::string& subscriber) const { std::lock_guard lock(fMtx); fEvents.Unsubscribe(subscriber); @@ -202,23 +160,26 @@ class FairMQProgOptions int PrintOptions(); int PrintOptionsRaw(); - void AddChannel(const std::string& channelName, const FairMQChannel& channel) + void AddChannel(const std::string& name, const FairMQChannel& channel); + + template + static void AddType(std::string label = "") { - fFairMQChannelMap[channelName].push_back(channel); + if (label == "") { + label = boost::core::demangle(typeid(T).name()); + } + fTypeInfos[std::type_index(typeid(T))] = [label](const fair::mq::Property& p) { + std::stringstream ss; + ss << boost::any_cast(p); + return std::pair{ss.str(), label}; + }; } - static std::unordered_map(*)(const boost::any&)> fValInfos; + static std::unordered_map(const fair::mq::Property&)>> fTypeInfos; + static std::unordered_map fEventEmitters; private: - struct ChannelKey - { - std::string channel; - int index; - std::string member; - }; - boost::program_options::variables_map fVarMap; ///< options container - FairMQChannelMap fFairMQChannelMap; boost::program_options::options_description fAllOptions; ///< all options descriptions boost::program_options::options_description fGeneralOptions; ///< general options descriptions @@ -227,32 +188,14 @@ class FairMQProgOptions mutable std::mutex fMtx; - std::unordered_map fChannelInfo; ///< channel name - number of subchannels - std::unordered_map fChannelKeyMap;// key=full path - val=key info std::vector fUnregisteredOptions; ///< container with unregistered options - fair::mq::EventManager fEvents; + mutable fair::mq::EventManager fEvents; void ParseCmdLine(const int argc, char const* const* argv, bool allowUnregistered = true); void ParseDefaults(); - // read FairMQChannelMap and insert/update corresponding values in variable map - // create key for variable map as follow : channelName.index.memberName - void UpdateMQValues(); - int Store(const FairMQChannelMap& channels); - - int UpdateChannelMap(const FairMQChannelMap& map); - template - int UpdateChannelValue(const std::string&, int, const std::string&, T) - { - LOG(error) << "update of FairMQChannel map failed, because value type not supported"; - return 1; - } - int UpdateChannelValue(const std::string& channelName, int index, const std::string& member, const std::string& val); - int UpdateChannelValue(const std::string& channelName, int index, const std::string& member, int val); - int UpdateChannelValue(const std::string& channelName, int index, const std::string& member, bool val); - - void UpdateChannelInfo(); + std::unordered_map GetChannelInfoImpl() const; // modify the value of variable map after calling boost::program_options::store template diff --git a/fairmq/options/FairMQSuboptParser.cxx b/fairmq/options/FairMQSuboptParser.cxx index b354fdfd..aecf222f 100644 --- a/fairmq/options/FairMQSuboptParser.cxx +++ b/fairmq/options/FairMQSuboptParser.cxx @@ -29,7 +29,7 @@ namespace parser constexpr const char* SUBOPT::channelOptionKeys[]; -FairMQChannelMap SUBOPT::UserParser(const vector& channelConfig, const string& deviceId, const string& rootNode) +fair::mq::Properties SUBOPT::UserParser(const vector& channelConfig, const string& deviceId) { ptree pt; @@ -38,8 +38,7 @@ FairMQChannelMap SUBOPT::UserParser(const vector& channelConfig, const s ptree channelsArray; - for (auto token : channelConfig) - { + for (auto token : channelConfig) { string channelName; ptree channelProperties; @@ -48,32 +47,23 @@ FairMQChannelMap SUBOPT::UserParser(const vector& channelConfig, const s string argString(token); char* subopts = &argString[0]; char* value = nullptr; - while (subopts && *subopts != 0 && *subopts != ' ') - { + while (subopts && *subopts != 0 && *subopts != ' ') { int subopt = getsubopt(&subopts, (char**)channelOptionKeys, &value); - if (subopt == NAME) - { + if (subopt == NAME) { channelName = value; channelProperties.put("name", channelName); - } - else if (subopt == ADDRESS) - { + } else if (subopt == ADDRESS) { ptree socketProperties; socketProperties.put(channelOptionKeys[subopt], value); socketsArray.push_back(make_pair("", socketProperties)); - } - else if (subopt >= 0 && value != nullptr) - { + } else if (subopt >= 0 && value != nullptr) { channelProperties.put(channelOptionKeys[subopt], value); } } - if (channelName != "") - { + if (channelName != "") { channelProperties.add_child("sockets", socketsArray); - } - else - { + } else { // TODO: what is the error policy here, should we abort? LOG(error) << "missing channel name in argument of option --channel-config"; } @@ -88,7 +78,7 @@ FairMQChannelMap SUBOPT::UserParser(const vector& channelConfig, const s pt.add_child("fairMQOptions.devices", devicesArray); - return ptreeToMQMap(pt, deviceId, rootNode); + return ptreeToProperties(pt, deviceId); } } diff --git a/fairmq/options/FairMQSuboptParser.h b/fairmq/options/FairMQSuboptParser.h index 3a27a6cf..79a60a3c 100644 --- a/fairmq/options/FairMQSuboptParser.h +++ b/fairmq/options/FairMQSuboptParser.h @@ -14,7 +14,7 @@ #ifndef FAIRMQPARSER_SUBOPT_H #define FAIRMQPARSER_SUBOPT_H -#include "FairMQParser.h" // for FairMQChannelMap +#include "FairMQParser.h" // for FairMQProperties #include #include #include @@ -65,7 +65,7 @@ struct SUBOPT lastsocketkey }; - constexpr static const char *channelOptionKeys[] = { + constexpr static const char* channelOptionKeys[] = { /*[NAME] = */ "name", /*[TYPE] = */ "type", /*[METHOD] = */ "method", @@ -84,7 +84,7 @@ struct SUBOPT nullptr }; - FairMQChannelMap UserParser(const std::vector& channelConfig, const std::string& deviceId, const std::string& rootNode = "fairMQOptions"); + fair::mq::Properties UserParser(const std::vector& channelConfig, const std::string& deviceId); }; } diff --git a/fairmq/options/Properties.h b/fairmq/options/Properties.h new file mode 100644 index 00000000..5b7c4c54 --- /dev/null +++ b/fairmq/options/Properties.h @@ -0,0 +1,28 @@ +/******************************************************************************** + * Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +#ifndef FAIR_MQ_PROPERTIES_H +#define FAIR_MQ_PROPERTIES_H + +#include + +#include +#include + +namespace fair +{ +namespace mq +{ + +using Property = boost::any; + +using Properties = std::map; + +} +} + +#endif /* FAIR_MQ_PROPERTIES_H */ diff --git a/fairmq/plugins/Control.cxx b/fairmq/plugins/Control.cxx index 29680beb..2c86024e 100644 --- a/fairmq/plugins/Control.cxx +++ b/fairmq/plugins/Control.cxx @@ -298,7 +298,7 @@ auto Control::PrintInteractiveHelpColor() -> void << " [\033[01;32mi\033[0m] init device, [\033[01;32mb\033[0m] bind, [\033[01;32mx\033[0m] connect, [\033[01;32mj\033[0m] init task," << " [\033[01;32mr\033[0m] run, [\033[01;32ms\033[0m] stop,\n" << " [\033[01;32mt\033[0m] reset task, [\033[01;32md\033[0m] reset device, [\033[01;32mq\033[0m] end,\n" - << " [\033[01;32mk\033[0m] increase log severity [\033[01;32ml\033[0m] decrease log severity [\033[01;32mn\033[0m] increase log verbosity [\033[01;32mm\033[0m] decrease log verbosity\n\n"; + << " [\033[01;32mk\033[0m] increase log severity, [\033[01;32ml\033[0m] decrease log severity, [\033[01;32mn\033[0m] increase log verbosity, [\033[01;32mm\033[0m] decrease log verbosity\n\n"; cout << ss.str() << flush; } diff --git a/fairmq/run/runBenchmarkSampler.cxx b/fairmq/run/runBenchmarkSampler.cxx index 7e822af9..1235e3c5 100644 --- a/fairmq/run/runBenchmarkSampler.cxx +++ b/fairmq/run/runBenchmarkSampler.cxx @@ -23,7 +23,7 @@ void addCustomOptions(bpo::options_description& options) ("msg-rate", bpo::value()->default_value(0), "Msg rate limit in maximum number of messages per second"); } -FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) +FairMQDevicePtr getDevice(const FairMQProgOptions& /* config */) { return new FairMQBenchmarkSampler(); } diff --git a/test/device/_config.cxx b/test/device/_config.cxx index a6067869..efb4730d 100644 --- a/test/device/_config.cxx +++ b/test/device/_config.cxx @@ -114,7 +114,7 @@ class Config : public ::testing::Test channel.UpdateType("pub"); channel.UpdateMethod("connect"); channel.UpdateAddress("tcp://localhost:5558"); - device.AddChannel("data", channel); + device.AddChannel("data", std::move(channel)); thread t(control, ref(device)); @@ -143,7 +143,7 @@ class Config : public ::testing::Test channel.UpdateType("pub"); channel.UpdateMethod("connect"); channel.UpdateAddress("tcp://localhost:5558"); - device.AddChannel("data", channel); + device.AddChannel("data", std::move(channel)); thread t(&FairMQDevice::RunStateMachine, &device); diff --git a/test/device/_multiple_devices.cxx b/test/device/_multiple_devices.cxx index 2e842544..64d0edc2 100644 --- a/test/device/_multiple_devices.cxx +++ b/test/device/_multiple_devices.cxx @@ -57,7 +57,7 @@ class MultipleDevices : public ::testing::Test { FairMQChannel channel("push", "connect", "ipc://multiple-devices-test"); channel.UpdateRateLogging(0); - sender.AddChannel("data", channel); + sender.AddChannel("data", std::move(channel)); thread t(control, std::ref(sender)); @@ -78,7 +78,7 @@ class MultipleDevices : public ::testing::Test { FairMQChannel channel("pull", "bind", "ipc://multiple-devices-test"); channel.UpdateRateLogging(0); - receiver.AddChannel("data", channel); + receiver.AddChannel("data", std::move(channel)); thread t(control, std::ref(receiver)); diff --git a/test/plugin_services/_config.cxx b/test/plugin_services/_config.cxx index 9e4a1adc..c58a91e8 100644 --- a/test/plugin_services/_config.cxx +++ b/test/plugin_services/_config.cxx @@ -64,7 +64,7 @@ TEST_F(PluginServices, ConfigCallbacks) }); mServices.SubscribeToPropertyChange("test", [](const string& key, int /*value*/) { - if(key == "chans.data.0.rcvBufSize") { + if (key == "chans.data.0.rcvBufSize") { FAIL(); // should not be called because we unsubscribed } });