From 79fba8ec4c74562c001c72b74ab6c37f4c774a5a Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Fri, 28 Oct 2016 15:42:15 +0200 Subject: [PATCH] Update JSON files & readme, use FairMQDevicePtr, cleanup. --- fairmq/options/FairMQEventManager.h | 8 +- fairmq/options/FairMQProgOptions.cxx | 98 ++++++++++--------- fairmq/options/FairMQProgOptions.h | 75 +++++++------- fairmq/options/FairProgOptions.h | 8 +- fairmq/options/FairProgOptionsHelper.h | 40 ++++---- fairmq/options/README.md | 92 +++++++++++++++++ fairmq/plugins/config/CMakeLists.txt | 1 + .../plugins/config/FairMQDDSConfigPlugin.cxx | 1 + fairmq/run/benchmark.json | 11 +-- fairmq/run/runBenchmarkSampler.cxx | 4 +- fairmq/run/runMerger.cxx | 2 +- fairmq/run/runMultiplier.cxx | 2 +- fairmq/run/runProxy.cxx | 2 +- fairmq/run/runSink.cxx | 4 +- fairmq/run/runSplitter.cxx | 2 +- fairmq/runFairMQDevice.h | 4 +- fairmq/tools/runSimpleMQStateMachine.h | 12 +-- 17 files changed, 234 insertions(+), 132 deletions(-) create mode 100644 fairmq/options/README.md diff --git a/fairmq/options/FairMQEventManager.h b/fairmq/options/FairMQEventManager.h index 2d85b557..c1f9e695 100644 --- a/fairmq/options/FairMQEventManager.h +++ b/fairmq/options/FairMQEventManager.h @@ -91,7 +91,7 @@ class FairMQEventManager {} template - void Connect(const std::string& key, F&& func) + void Connect(const std::string& key, F&& func) const { GetSlot(key).connect(std::forward(func)); } @@ -111,7 +111,7 @@ class FairMQEventManager template bool EventKeyFound(const std::string& key) { - if (fEventMap.find(std::pair(event, key) ) != fEventMap.end()) + if (fEventMap.find(std::pair(event, key)) != fEventMap.end()) { return true; } @@ -122,11 +122,11 @@ class FairMQEventManager } private: - std::map fEventMap; + mutable std::map fEventMap; template ::signal_type, typename SlotPtr = boost::shared_ptr> - Slot& GetSlot(const std::string& key) + Slot& GetSlot(const std::string& key) const { try { diff --git a/fairmq/options/FairMQProgOptions.cxx b/fairmq/options/FairMQProgOptions.cxx index 1b0ebd63..2e981892 100644 --- a/fairmq/options/FairMQProgOptions.cxx +++ b/fairmq/options/FairMQProgOptions.cxx @@ -67,8 +67,8 @@ void FairMQProgOptions::ParseAll(const int argc, char** argv, bool allowUnregist } } - std::string verbosity = GetValue("verbosity"); - std::string logFile = GetValue("log-to-file"); + string verbosity = GetValue("verbosity"); + string logFile = GetValue("log-to-file"); bool color = GetValue("log-color"); // check if the provided verbosity level is valid, otherwise set to DEBUG @@ -125,21 +125,21 @@ void FairMQProgOptions::ParseAll(const int argc, char** argv, bool allowUnregist { LOG(DEBUG) << "mq-config: Using default XML/JSON parser"; - std::string file = fVarMap["mq-config"].as(); - std::string id; + string file = fVarMap["mq-config"].as(); + string id; if (fVarMap.count("config-key")) { - id = fVarMap["config-key"].as(); + id = fVarMap["config-key"].as(); } else { - id = fVarMap["id"].as(); + id = fVarMap["id"].as(); } - std::string fileExtension = boost::filesystem::extension(file); + string fileExtension = boost::filesystem::extension(file); - std::transform(fileExtension.begin(), fileExtension.end(), fileExtension.begin(), ::tolower); + transform(fileExtension.begin(), fileExtension.end(), fileExtension.begin(), ::tolower); if (fileExtension == ".json") { @@ -164,19 +164,19 @@ void FairMQProgOptions::ParseAll(const int argc, char** argv, bool allowUnregist { LOG(DEBUG) << "config-json-string: Parsing JSON string"; - std::string id; + string id; if (fVarMap.count("config-key")) { - id = fVarMap["config-key"].as(); + id = fVarMap["config-key"].as(); } else { - id = fVarMap["id"].as(); + id = fVarMap["id"].as(); } - std::string value = FairMQ::ConvertVariableValue().Run(fVarMap.at("config-json-string")); - std::stringstream ss; + string value = FairMQ::ConvertVariableValue().Run(fVarMap.at("config-json-string")); + stringstream ss; ss << value; UserParser(ss, id); } @@ -184,19 +184,19 @@ void FairMQProgOptions::ParseAll(const int argc, char** argv, bool allowUnregist { LOG(DEBUG) << "config-json-string: Parsing XML string"; - std::string id; + string id; if (fVarMap.count("config-key")) { - id = fVarMap["config-key"].as(); + id = fVarMap["config-key"].as(); } else { - id = fVarMap["id"].as(); + id = fVarMap["id"].as(); } - std::string value = FairMQ::ConvertVariableValue().Run(fVarMap.at("config-xml-string")); - std::stringstream ss; + string value = FairMQ::ConvertVariableValue().Run(fVarMap.at("config-xml-string")); + stringstream ss; ss << value; UserParser(ss, id); } @@ -215,7 +215,7 @@ int FairMQProgOptions::Store(const FairMQMap& channels) // replace FairMQChannelMap, and update variable map accordingly int FairMQProgOptions::UpdateChannelMap(const FairMQMap& channels) { - fFairMQMap=channels; + fFairMQMap = channels; UpdateMQValues(); return 0; } @@ -224,50 +224,52 @@ int FairMQProgOptions::UpdateChannelMap(const FairMQMap& channels) // create key for variable map as follow : channelName.index.memberName void FairMQProgOptions::UpdateMQValues() { - for(const auto& p : fFairMQMap) + for (const auto& p : fFairMQMap) { int index = 0; - for(const auto& channel : p.second) + + for (const auto& channel : p.second) { - std::string typeKey = p.first + "." + std::to_string(index) + ".type"; - std::string methodKey = p.first + "." + std::to_string(index) + ".method"; - std::string addressKey = p.first + "." + std::to_string(index) + ".address"; - std::string sndBufSizeKey = p.first + "." + std::to_string(index) + ".sndBufSize"; - std::string rcvBufSizeKey = p.first + "." + std::to_string(index) + ".rcvBufSize"; - std::string rateLoggingKey = p.first + "." + std::to_string(index) + ".rateLogging"; + string typeKey = p.first + "." + to_string(index) + ".type"; + string methodKey = p.first + "." + to_string(index) + ".method"; + string addressKey = p.first + "." + to_string(index) + ".address"; + string sndBufSizeKey = p.first + "." + to_string(index) + ".sndBufSize"; + string rcvBufSizeKey = p.first + "." + to_string(index) + ".rcvBufSize"; + string rateLoggingKey = p.first + "." + to_string(index) + ".rateLogging"; - fMQKeyMap[typeKey] = std::make_tuple(p.first,index,"type"); - fMQKeyMap[methodKey] = std::make_tuple(p.first,index,"method"); - fMQKeyMap[addressKey] = std::make_tuple(p.first,index,"address"); - fMQKeyMap[sndBufSizeKey] = std::make_tuple(p.first,index,"sndBufSize"); - fMQKeyMap[rcvBufSizeKey] = std::make_tuple(p.first,index,"rcvBufSize"); - fMQKeyMap[rateLoggingKey] = std::make_tuple(p.first,index,"rateLogging"); + fMQKeyMap[typeKey] = make_tuple(p.first, index, "type"); + fMQKeyMap[methodKey] = make_tuple(p.first, index, "method"); + fMQKeyMap[addressKey] = make_tuple(p.first, index, "address"); + fMQKeyMap[sndBufSizeKey] = make_tuple(p.first, index, "sndBufSize"); + fMQKeyMap[rcvBufSizeKey] = make_tuple(p.first, index, "rcvBufSize"); + fMQKeyMap[rateLoggingKey] = make_tuple(p.first, index, "rateLogging"); - UpdateVarMap(typeKey,channel.GetType()); - UpdateVarMap(methodKey,channel.GetMethod()); - UpdateVarMap(addressKey,channel.GetAddress()); + UpdateVarMap(typeKey, channel.GetType()); + UpdateVarMap(methodKey, channel.GetMethod()); + UpdateVarMap(addressKey, channel.GetAddress()); - //UpdateVarMap(sndBufSizeKey, std::to_string(channel.GetSndBufSize()));// string API - UpdateVarMap(sndBufSizeKey,channel.GetSndBufSize()); + //UpdateVarMap(sndBufSizeKey, to_string(channel.GetSndBufSize()));// string API + UpdateVarMap(sndBufSizeKey, channel.GetSndBufSize()); - //UpdateVarMap(rcvBufSizeKey, std::to_string(channel.GetRcvBufSize()));// string API - UpdateVarMap(rcvBufSizeKey,channel.GetRcvBufSize()); + //UpdateVarMap(rcvBufSizeKey, to_string(channel.GetRcvBufSize()));// string API + UpdateVarMap(rcvBufSizeKey, channel.GetRcvBufSize()); - //UpdateVarMap(rateLoggingKey,std::to_string(channel.GetRateLogging()));// string API - UpdateVarMap(rateLoggingKey,channel.GetRateLogging()); + //UpdateVarMap(rateLoggingKey,to_string(channel.GetRateLogging()));// string API + UpdateVarMap(rateLoggingKey, channel.GetRateLogging()); /* LOG(DEBUG) << "Update MQ parameters of variable map"; - LOG(DEBUG) << "key = " << typeKey <<"\t value = " << GetValue(typeKey); - LOG(DEBUG) << "key = " << methodKey <<"\t value = " << GetValue(methodKey); - LOG(DEBUG) << "key = " << addressKey <<"\t value = " << GetValue(addressKey); + LOG(DEBUG) << "key = " << typeKey <<"\t value = " << GetValue(typeKey); + LOG(DEBUG) << "key = " << methodKey <<"\t value = " << GetValue(methodKey); + LOG(DEBUG) << "key = " << addressKey <<"\t value = " << GetValue(addressKey); LOG(DEBUG) << "key = " << sndBufSizeKey << "\t value = " << GetValue(sndBufSizeKey); LOG(DEBUG) << "key = " << rcvBufSizeKey <<"\t value = " << GetValue(rcvBufSizeKey); LOG(DEBUG) << "key = " << rateLoggingKey <<"\t value = " << GetValue(rateLoggingKey); */ index++; } + UpdateVarMap(p.first + ".numSockets", index); } } @@ -351,7 +353,7 @@ void FairMQProgOptions::InitOptionDescription() } } -int FairMQProgOptions::UpdateChannelMap(const std::string& channelName, int index, const std::string& member, const std::string& val) +int FairMQProgOptions::UpdateChannelMap(const string& channelName, int index, const string& member, const string& val) { if (member == "type") { @@ -381,7 +383,7 @@ int FairMQProgOptions::UpdateChannelMap(const std::string& channelName, int inde /* // string API -int FairMQProgOptions::UpdateChannelMap(const std::string& channelName, int index, const std::string& member, const std::string& val) +int FairMQProgOptions::UpdateChannelMap(const string& channelName, int index, const string& member, const string& val) { if (member == "type") { @@ -418,7 +420,7 @@ int FairMQProgOptions::UpdateChannelMap(const std::string& channelName, int inde // ---------------------------------------------------------------------------------- -int FairMQProgOptions::UpdateChannelMap(const std::string& channelName, int index, const std::string& member, int val) +int FairMQProgOptions::UpdateChannelMap(const string& channelName, int index, const string& member, int val) { if (member == "sndBufSize") { diff --git a/fairmq/options/FairMQProgOptions.h b/fairmq/options/FairMQProgOptions.h index 2ac1ee38..85077538 100644 --- a/fairmq/options/FairMQProgOptions.h +++ b/fairmq/options/FairMQProgOptions.h @@ -63,12 +63,12 @@ class FairMQProgOptions : public FairProgOptions , public FairMQEventManager // to customize title of the executable help command line void SetHelpTitle(const std::string& title) { - fHelpTitle=title; + fHelpTitle = title; } // to customize the executable version command line void SetVersion(const std::string& version) { - fVersion=version; + fVersion = version; } // store key-value of type T into variable_map. @@ -91,11 +91,11 @@ class FairMQProgOptions : public FairProgOptions , public FairMQEventManager { try { - if(fVarMap.count(key)) + if (fVarMap.count(key)) { - if(!FairMQ::is_this_type(fVarMap.at(key))) + if (!FairMQ::is_this_type(fVarMap.at(key))) { LOG(ERROR) << "You try to update a value as string (for key="<< key <<") while it has been defined with a different type in the option description."; abort(); @@ -104,7 +104,7 @@ class FairMQProgOptions : public FairProgOptions , public FairMQEventManager // update variable map UpdateVarMap(key,val); - if(fMQKeyMap.count(key)) + if (fMQKeyMap.count(key)) { std::string channelName; int index = 0; @@ -114,8 +114,8 @@ class FairMQProgOptions : public FairProgOptions , public FairMQEventManager } // execute stored function of a given key if exist - //if(std::is_same::value || std::is_same::value)//if one wants to restrict type - if(fSignalMap.count(key)) + //if (std::is_same::value || std::is_same::value)//if one wants to restrict type + if (fSignalMap.count(key)) EmitUpdate(key,val); return 0; @@ -151,28 +151,28 @@ class FairMQProgOptions : public FairProgOptions , public FairMQEventManager // specialization/overloading for string, pass by const ref int UpdateValue(const std::string& key, const std::string& val) // string API { - UpdateValue(key,val); + UpdateValue(key, val); return 0; } int UpdateValue(const std::string& key, const char* val) // string API { - UpdateValue(key,std::string(val)); + UpdateValue(key, std::string(val)); return 0; } template int UpdateValue(const std::string& key, T val) { - - if(fVarMap.count(key)) + if (fVarMap.count(key)) { // update variable map - UpdateVarMap::type>(key,val); + UpdateVarMap::type>(key, val); // update FairMQChannel map, check first if data are int or string - if(std::is_same::value || std::is_same::value) - if(fMQKeyMap.count(key)) + if (std::is_same::value || std::is_same::value) + { + if (fMQKeyMap.count(key)) { std::string channelName; int index = 0; @@ -180,44 +180,47 @@ class FairMQProgOptions : public FairProgOptions , public FairMQEventManager std::tie(channelName, index, member) = fMQKeyMap.at(key); UpdateChannelMap(channelName, index, member, val); } + } // execute stored function of a given key if exist - //if(std::is_same::value || std::is_same::value)//if one wants to restrict type - if(EventKeyFound(key)) - EmitUpdate::type >(key,val); + //if (std::is_same::value || std::is_same::value)//if one wants to restrict type + if (EventKeyFound(key)) + { + EmitUpdate::type>(key, val); + } return 0; } else { - - LOG(ERROR) <<"UpdatedValue failed because the provided key '" - < - void Subscribe(const std::string& key, F&& func) + void Subscribe(const std::string& key, F&& func) const { - static_assert(!std::is_same::value || !std::is_same::value, + static_assert(!std::is_same::value || !std::is_same::value, "In template member FairMQProgOptions::Subscribe(key,Lambda) the types const char* or char* for the calback signatures are not supported."); - - if(fVarMap.count(key)) - FairMQEventManager::Connect(key,std::forward(func)); + + if (fVarMap.count(key)) + { + FairMQEventManager::Connect(key, std::forward(func)); + } } /* template void Subscribe(const std::string& key, F&& func) { - if(fVarMap.count(key)) + if (fVarMap.count(key)) { //if key-value not yet found, then add it - if(fSignalMap.find(key) == fSignalMap.end()) + if (fSignalMap.find(key) == fSignalMap.end()) fSignalMap.emplace(key, boost::make_shared()); (*fSignalMap.at(key)).connect(std::forward(func)); } @@ -237,16 +240,18 @@ class FairMQProgOptions : public FairProgOptions , public FairMQEventManager bool EventKeyFound(const std::string& key) { - if ( - FairMQEventManager::EventKeyFound(key) - ) + if (FairMQEventManager::EventKeyFound(key)) + { return true; + } else + { return false; + } } - typedef std::tuple MQKey;//store key info - std::map fMQKeyMap;// key=full path - val=key info + typedef std::tuple MQKey;//store key info + std::map fMQKeyMap;// key=full path - val=key info virtual int NotifySwitchOption(); // for custom help & version printing void InitOptionDescription(); @@ -276,7 +281,7 @@ class FairMQProgOptions : public FairProgOptions , public FairMQEventManager //compile time check whether T is const char* or char*, and in that case a compile time error is thrown. static_assert(!std::is_same::value || !std::is_same::value, "In template member FairMQProgOptions::EmitUpdate(key,val) the types const char* or char* for the calback signatures are not supported."); - Emit(key,key,val); + Emit(key, key, val); } int UpdateChannelMap(const std::string& channelName, int index, const std::string& member, const std::string& val); diff --git a/fairmq/options/FairProgOptions.h b/fairmq/options/FairProgOptions.h index 831e502c..37c45a52 100644 --- a/fairmq/options/FairProgOptions.h +++ b/fairmq/options/FairProgOptions.h @@ -89,7 +89,7 @@ class FairProgOptions LOG(ERROR) << "Config has no key: " << key; } } - catch(std::exception& e) + catch (std::exception& e) { LOG(ERROR) << "Exception thrown for the key '" << key << "'"; LOG(ERROR) << e.what(); @@ -104,18 +104,18 @@ class FairProgOptions //restrict conversion to fundamental types template - T ConvertTo(const std::string& str_value) + T ConvertTo(const std::string& strValue) { if (std::is_arithmetic::value) { - std::istringstream iss( str_value ); + std::istringstream iss(strValue); T val; iss >> val; return val; } else { - LOG(ERROR)<<"the provided string "< - std::string Value(const po::variable_value& varValue,const std::string&, const std::string&, const std::string&) + std::string Value(const po::variable_value& varValue, const std::string&, const std::string&, const std::string&) { return ConvertVariableValueToString(varValue); } @@ -112,7 +112,7 @@ struct ToVarInfo { typedef std::tuple returned_type; template - returned_type Value(const po::variable_value& varValue,const std::string& type, const std::string& defaulted, const std::string& empty) + returned_type Value(const po::variable_value& varValue, const std::string& type, const std::string& defaulted, const std::string& empty) { std::string valueStr = ConvertVariableValueToString(varValue); return make_tuple(valueStr, type, defaulted, empty); @@ -155,75 +155,75 @@ struct ConvertVariableValue : T //////////////////////////////// std types // std::string albeit useless here if (is_this_type(varValue)) - return T::template Value(varValue,std::string(" "),defaultedValue,emptyValue); + return T::template Value(varValue, std::string(" "), defaultedValue, emptyValue); // std::vector if (is_this_type>(varValue)) - return T::template Value>(varValue,std::string(" >"),defaultedValue,emptyValue); + return T::template Value>(varValue, std::string(" >"), defaultedValue, emptyValue); // int if (is_this_type(varValue)) - return T::template Value(varValue,std::string(" "),defaultedValue,emptyValue); + return T::template Value(varValue, std::string(" "), defaultedValue, emptyValue); // std::vector if (is_this_type>(varValue)) - return T::template Value>(varValue,std::string(" >"),defaultedValue,emptyValue); + return T::template Value>(varValue, std::string(" >"), defaultedValue, emptyValue); // float if (is_this_type(varValue)) - return T::template Value(varValue,std::string(" "),defaultedValue,emptyValue); + return T::template Value(varValue, std::string(" "), defaultedValue, emptyValue); // std::vector float if (is_this_type>(varValue)) - return T::template Value>(varValue,std::string(" >"),defaultedValue,emptyValue); + return T::template Value>(varValue, std::string(" >"), defaultedValue, emptyValue); // double if (is_this_type(varValue)) - return T::template Value(varValue,std::string(" "),defaultedValue,emptyValue); + return T::template Value(varValue, std::string(" "), defaultedValue, emptyValue); // std::vector double if (is_this_type>(varValue)) - return T::template Value>(varValue,std::string(" >"),defaultedValue,emptyValue); + return T::template Value>(varValue, std::string(" >"), defaultedValue, emptyValue); // short if (is_this_type(varValue)) - return T::template Value(varValue,std::string(" "),defaultedValue,emptyValue); + return T::template Value(varValue, std::string(" "), defaultedValue, emptyValue); // std::vector short if (is_this_type>(varValue)) - return T::template Value>(varValue,std::string(" >"),defaultedValue,emptyValue); + return T::template Value>(varValue, std::string(" >"), defaultedValue, emptyValue); // long if (is_this_type(varValue)) - return T::template Value(varValue,std::string(" "),defaultedValue,emptyValue); + return T::template Value(varValue, std::string(" "), defaultedValue, emptyValue); // std::vector short if (is_this_type>(varValue)) - return T::template Value>(varValue,std::string(" >"),defaultedValue,emptyValue); + return T::template Value>(varValue, std::string(" >"), defaultedValue, emptyValue); // size_t if (is_this_type(varValue)) - return T::template Value(varValue,std::string(" "),defaultedValue,emptyValue); + return T::template Value(varValue, std::string(" "), defaultedValue, emptyValue); // std::vector size_t if (is_this_type>(varValue)) - return T::template Value>(varValue,std::string(" >"),defaultedValue,emptyValue); + return T::template Value>(varValue, std::string(" >"), defaultedValue, emptyValue); // bool if (is_this_type(varValue)) - return T::template Value(varValue,std::string(" "),defaultedValue,emptyValue); + return T::template Value(varValue, std::string(" "), defaultedValue, emptyValue); // std::vector bool if (is_this_type>(varValue)) - return T::template Value>(varValue,std::string(" >"),defaultedValue,emptyValue); + return T::template Value>(varValue, std::string(" >"), defaultedValue, emptyValue); //////////////////////////////// boost types // boost::filesystem::path if (is_this_type(varValue)) - return T::template Value(varValue,std::string(" "),defaultedValue,emptyValue); + return T::template Value(varValue, std::string(" "), defaultedValue, emptyValue); // if we get here, the type is not supported return unknown info - return T::DefaultValue(defaultedValue,emptyValue); + return T::DefaultValue(defaultedValue, emptyValue); } }; diff --git a/fairmq/options/README.md b/fairmq/options/README.md new file mode 100644 index 00000000..4b85b1a9 --- /dev/null +++ b/fairmq/options/README.md @@ -0,0 +1,92 @@ +## FairMQParser + +The FairMQParser configures the FairMQ channels from a JSON file. + +The basic structure looks like this: + +```json +{ + "fairMQOptions": + { + "devices": + [{ + "id": "device1", + "channels": + [{ + "name": "data", + "sockets": + [{ + "type": "push", + "method": "bind", + "address": "tcp://127.0.0.1:5555", + "sndBufSize": 1000, + "rcvBufSize": 1000, + "rateLogging": 1 + }] + }] + }] + } +} +``` + +The top level key is `fairMQOptions`, followed by one or more devices (with their IDs), each containing one or more channels (with their names), each containing one or more sockets. + +The socket parameters accept following values: +- `type` (default = ""): "push"/"pull", "pub"/"sub", "req"/"rep", "xsub"/"xpub", "dealer/router", "pair". +- `method` (default = ""): "bind"/"connect". +- `address` (default = ""): address to bind/connect. +- `sndBufSize` (default = 1000): socket send queue size in number of messages. +- `rcvBufSize` (default = 1000): socket receive queue size in number of messages. +- `rateLogging` (default = 1): log socket transfer rates in seconds. 0 for no logging of this socket. + +If a parameter is not specified, its default value will be set. + +When a channel has multiple sockets, sockets can share common parameters. In this case specify the common parameters directly on the channel, which will be applied to all sockets of the channel. For example, the following config will create 3 sockets for the data channel with same settings, except the address: + +```json +{ + "fairMQOptions": + { + "devices": + [{ + "id": "device1", + "channels": + [{ + "name": "data", + "type": "push", + "method": "connect", + "sockets": + [ + { "address": "tcp://127.0.0.1:5555" } + { "address": "tcp://127.0.0.1:5556" } + { "address": "tcp://127.0.0.1:5557" } + ] + }] + }] + } +} +``` + +The device ID should be unique within a topology. It is possible to create a configuration that can be shared by multiple devices, by specifying "key" instead of "id". To use it the started device must be started with `--config-key ` option. For example, the following config can be applied to multiple running devices within a topology: + +```json +{ + "fairMQOptions": + { + "devices": + [{ + "key": "processor", + "channels": + [{ + "name": "data", + "socket": + { + "type": "pull", + "method": "connect", + "address": "tcp://localhost:5555" + } + }] + }] + } +} +``` diff --git a/fairmq/plugins/config/CMakeLists.txt b/fairmq/plugins/config/CMakeLists.txt index 19c81ab8..3db48142 100644 --- a/fairmq/plugins/config/CMakeLists.txt +++ b/fairmq/plugins/config/CMakeLists.txt @@ -9,6 +9,7 @@ set(INCLUDE_DIRECTORIES ${CMAKE_SOURCE_DIR}/fairmq ${CMAKE_SOURCE_DIR}/fairmq/plugins/config + ${CMAKE_SOURCE_DIR}/fairmq/options ) set(SYSTEM_INCLUDE_DIRECTORIES diff --git a/fairmq/plugins/config/FairMQDDSConfigPlugin.cxx b/fairmq/plugins/config/FairMQDDSConfigPlugin.cxx index 12955dd7..3913c4a2 100644 --- a/fairmq/plugins/config/FairMQDDSConfigPlugin.cxx +++ b/fairmq/plugins/config/FairMQDDSConfigPlugin.cxx @@ -13,6 +13,7 @@ #include "FairMQLogger.h" #include "FairMQDevice.h" #include "FairMQChannel.h" +#include "FairMQProgOptions.h" #include #include diff --git a/fairmq/run/benchmark.json b/fairmq/run/benchmark.json index f2774a20..26cf9354 100644 --- a/fairmq/run/benchmark.json +++ b/fairmq/run/benchmark.json @@ -1,12 +1,12 @@ { "fairMQOptions": { - "device": - { + "devices": + [{ "id": "bsampler1", "channel": { - "name": "data-out", + "name": "data", "socket": { "type": "push", @@ -18,12 +18,11 @@ } } }, - "device": { "id": "sink1", "channel": { - "name": "data-in", + "name": "data", "socket": { "type": "pull", @@ -34,7 +33,7 @@ "rateLogging": "1" } } - } + }] } } diff --git a/fairmq/run/runBenchmarkSampler.cxx b/fairmq/run/runBenchmarkSampler.cxx index 9e19afdc..ae0b85e7 100644 --- a/fairmq/run/runBenchmarkSampler.cxx +++ b/fairmq/run/runBenchmarkSampler.cxx @@ -14,13 +14,13 @@ namespace bpo = boost::program_options; void addCustomOptions(bpo::options_description& options) { options.add_options() - ("out-channel", bpo::value()->default_value("data-out"), "Name of the output channel") + ("out-channel", bpo::value()->default_value("data"), "Name of the output channel") ("msg-size", bpo::value()->default_value(1000), "Message size in bytes") ("num-msgs", bpo::value()->default_value(0), "Number of messages to send") ("msg-rate", bpo::value()->default_value(0), "Msg rate limit in maximum number of messages per second"); } -FairMQDevice* getDevice(const FairMQProgOptions& /*config*/) +FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) { return new FairMQBenchmarkSampler(); } diff --git a/fairmq/run/runMerger.cxx b/fairmq/run/runMerger.cxx index bdad2e8a..efd8b3db 100644 --- a/fairmq/run/runMerger.cxx +++ b/fairmq/run/runMerger.cxx @@ -19,7 +19,7 @@ void addCustomOptions(bpo::options_description& options) ("multipart", bpo::value()->default_value(1), "Handle multipart payloads"); } -FairMQDevice* getDevice(const FairMQProgOptions& /*config*/) +FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) { return new FairMQMerger(); } diff --git a/fairmq/run/runMultiplier.cxx b/fairmq/run/runMultiplier.cxx index 3f18239e..f21f0828 100644 --- a/fairmq/run/runMultiplier.cxx +++ b/fairmq/run/runMultiplier.cxx @@ -19,7 +19,7 @@ void addCustomOptions(bpo::options_description& options) ("multipart", bpo::value()->default_value(1), "Handle multipart payloads"); } -FairMQDevice* getDevice(const FairMQProgOptions& /*config*/) +FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) { return new FairMQMultiplier(); } diff --git a/fairmq/run/runProxy.cxx b/fairmq/run/runProxy.cxx index 0fc50ab0..4e583da1 100644 --- a/fairmq/run/runProxy.cxx +++ b/fairmq/run/runProxy.cxx @@ -19,7 +19,7 @@ void addCustomOptions(bpo::options_description& options) ("multipart", bpo::value()->default_value(1), "Handle multipart payloads"); } -FairMQDevice* getDevice(const FairMQProgOptions& /*config*/) +FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) { return new FairMQProxy(); } diff --git a/fairmq/run/runSink.cxx b/fairmq/run/runSink.cxx index 124a3561..0f481b63 100644 --- a/fairmq/run/runSink.cxx +++ b/fairmq/run/runSink.cxx @@ -14,11 +14,11 @@ namespace bpo = boost::program_options; void addCustomOptions(bpo::options_description& options) { options.add_options() - ("in-channel", bpo::value()->default_value("data-in"), "Name of the input channel") + ("in-channel", bpo::value()->default_value("data"), "Name of the input channel") ("num-msgs", bpo::value()->default_value(0), "Number of messages to receive"); } -FairMQDevice* getDevice(const FairMQProgOptions& /*config*/) +FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) { return new FairMQSink(); } diff --git a/fairmq/run/runSplitter.cxx b/fairmq/run/runSplitter.cxx index f14d5171..c0df0b69 100644 --- a/fairmq/run/runSplitter.cxx +++ b/fairmq/run/runSplitter.cxx @@ -19,7 +19,7 @@ void addCustomOptions(bpo::options_description& options) ("multipart", bpo::value()->default_value(1), "Handle multipart payloads"); } -FairMQDevice* getDevice(const FairMQProgOptions& /*config*/) +FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) { return new FairMQSplitter(); } diff --git a/fairmq/runFairMQDevice.h b/fairmq/runFairMQDevice.h index a5c4a0c1..5bfa811a 100644 --- a/fairmq/runFairMQDevice.h +++ b/fairmq/runFairMQDevice.h @@ -32,8 +32,10 @@ FairMQDevice* makeDeviceWithConditionalRun(R r) return new GenericFairMQDevice(r); } +using FairMQDevicePtr = FairMQDevice*; + // to be implemented by the user to return a child class of FairMQDevice -FairMQDevice* getDevice(const FairMQProgOptions& config); +FairMQDevicePtr getDevice(const FairMQProgOptions& config); // to be implemented by the user to add custom command line options (or just with empty body) void addCustomOptions(boost::program_options::options_description&); diff --git a/fairmq/tools/runSimpleMQStateMachine.h b/fairmq/tools/runSimpleMQStateMachine.h index eb52202a..2328ae94 100644 --- a/fairmq/tools/runSimpleMQStateMachine.h +++ b/fairmq/tools/runSimpleMQStateMachine.h @@ -43,8 +43,8 @@ inline int runStateMachine(TMQDevice& device, FairMQProgOptions& cfg) FairMQConfigPlugin* fairmqConfigPlugin = nullptr; FairMQControlPlugin* fairmqControlPlugin = nullptr; - std::clock_t c_start = std::clock(); - auto t_start = std::chrono::high_resolution_clock::now(); + std::clock_t cStart = std::clock(); + auto tStart = std::chrono::high_resolution_clock::now(); device.ChangeState(TMQDevice::INIT_DEVICE); // Wait for the binding channels to bind @@ -112,11 +112,11 @@ inline int runStateMachine(TMQDevice& device, FairMQProgOptions& cfg) device.WaitForEndOfState(TMQDevice::INIT_DEVICE); - std::clock_t c_end = std::clock(); - auto t_end = std::chrono::high_resolution_clock::now(); + std::clock_t cEnd = std::clock(); + auto tEnd = std::chrono::high_resolution_clock::now(); - LOG(DEBUG) << "Init time (CPU) : " << std::fixed << std::setprecision(2) << 1000.0 * (c_end - c_start) / CLOCKS_PER_SEC << " ms"; - LOG(DEBUG) << "Init time (Wall): " << std::chrono::duration(t_end - t_start).count() << " ms"; + LOG(DEBUG) << "Init time (CPU) : " << std::fixed << std::setprecision(2) << 1000.0 * (cEnd - cStart) / CLOCKS_PER_SEC << " ms"; + LOG(DEBUG) << "Init time (Wall): " << std::chrono::duration(tEnd - tStart).count() << " ms"; device.ChangeState(TMQDevice::INIT_TASK); device.WaitForEndOfState(TMQDevice::INIT_TASK);