mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 00:31:14 +00:00
Update JSON files & readme, use FairMQDevicePtr, cleanup.
This commit is contained in:
parent
da3010b20c
commit
79fba8ec4c
|
@ -91,7 +91,7 @@ class FairMQEventManager
|
||||||
{}
|
{}
|
||||||
|
|
||||||
template <EventId event, typename... ValueType, typename F>
|
template <EventId event, typename... ValueType, typename F>
|
||||||
void Connect(const std::string& key, F&& func)
|
void Connect(const std::string& key, F&& func) const
|
||||||
{
|
{
|
||||||
GetSlot<event, ValueType...>(key).connect(std::forward<F>(func));
|
GetSlot<event, ValueType...>(key).connect(std::forward<F>(func));
|
||||||
}
|
}
|
||||||
|
@ -122,11 +122,11 @@ class FairMQEventManager
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::map<EventKey, boost::any> fEventMap;
|
mutable std::map<EventKey, boost::any> fEventMap;
|
||||||
|
|
||||||
template <EventId event, typename... T, typename Slot = typename Events::Traits<event,T...>::signal_type,
|
template <EventId event, typename... T, typename Slot = typename Events::Traits<event,T...>::signal_type,
|
||||||
typename SlotPtr = boost::shared_ptr<Slot>>
|
typename SlotPtr = boost::shared_ptr<Slot>>
|
||||||
Slot& GetSlot(const std::string& key)
|
Slot& GetSlot(const std::string& key) const
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
|
|
@ -67,8 +67,8 @@ void FairMQProgOptions::ParseAll(const int argc, char** argv, bool allowUnregist
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string verbosity = GetValue<std::string>("verbosity");
|
string verbosity = GetValue<string>("verbosity");
|
||||||
std::string logFile = GetValue<std::string>("log-to-file");
|
string logFile = GetValue<string>("log-to-file");
|
||||||
bool color = GetValue<bool>("log-color");
|
bool color = GetValue<bool>("log-color");
|
||||||
|
|
||||||
// check if the provided verbosity level is valid, otherwise set to DEBUG
|
// check if the provided verbosity level is valid, otherwise set to DEBUG
|
||||||
|
@ -125,21 +125,21 @@ void FairMQProgOptions::ParseAll(const int argc, char** argv, bool allowUnregist
|
||||||
{
|
{
|
||||||
LOG(DEBUG) << "mq-config: Using default XML/JSON parser";
|
LOG(DEBUG) << "mq-config: Using default XML/JSON parser";
|
||||||
|
|
||||||
std::string file = fVarMap["mq-config"].as<std::string>();
|
string file = fVarMap["mq-config"].as<string>();
|
||||||
std::string id;
|
string id;
|
||||||
|
|
||||||
if (fVarMap.count("config-key"))
|
if (fVarMap.count("config-key"))
|
||||||
{
|
{
|
||||||
id = fVarMap["config-key"].as<std::string>();
|
id = fVarMap["config-key"].as<string>();
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
id = fVarMap["id"].as<std::string>();
|
id = fVarMap["id"].as<string>();
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string fileExtension = boost::filesystem::extension(file);
|
string fileExtension = boost::filesystem::extension(file);
|
||||||
|
|
||||||
std::transform(fileExtension.begin(), fileExtension.end(), fileExtension.begin(), ::tolower);
|
transform(fileExtension.begin(), fileExtension.end(), fileExtension.begin(), ::tolower);
|
||||||
|
|
||||||
if (fileExtension == ".json")
|
if (fileExtension == ".json")
|
||||||
{
|
{
|
||||||
|
@ -164,19 +164,19 @@ void FairMQProgOptions::ParseAll(const int argc, char** argv, bool allowUnregist
|
||||||
{
|
{
|
||||||
LOG(DEBUG) << "config-json-string: Parsing JSON string";
|
LOG(DEBUG) << "config-json-string: Parsing JSON string";
|
||||||
|
|
||||||
std::string id;
|
string id;
|
||||||
|
|
||||||
if (fVarMap.count("config-key"))
|
if (fVarMap.count("config-key"))
|
||||||
{
|
{
|
||||||
id = fVarMap["config-key"].as<std::string>();
|
id = fVarMap["config-key"].as<string>();
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
id = fVarMap["id"].as<std::string>();
|
id = fVarMap["id"].as<string>();
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string value = FairMQ::ConvertVariableValue<FairMQ::ToString>().Run(fVarMap.at("config-json-string"));
|
string value = FairMQ::ConvertVariableValue<FairMQ::ToString>().Run(fVarMap.at("config-json-string"));
|
||||||
std::stringstream ss;
|
stringstream ss;
|
||||||
ss << value;
|
ss << value;
|
||||||
UserParser<FairMQParser::JSON>(ss, id);
|
UserParser<FairMQParser::JSON>(ss, id);
|
||||||
}
|
}
|
||||||
|
@ -184,19 +184,19 @@ void FairMQProgOptions::ParseAll(const int argc, char** argv, bool allowUnregist
|
||||||
{
|
{
|
||||||
LOG(DEBUG) << "config-json-string: Parsing XML string";
|
LOG(DEBUG) << "config-json-string: Parsing XML string";
|
||||||
|
|
||||||
std::string id;
|
string id;
|
||||||
|
|
||||||
if (fVarMap.count("config-key"))
|
if (fVarMap.count("config-key"))
|
||||||
{
|
{
|
||||||
id = fVarMap["config-key"].as<std::string>();
|
id = fVarMap["config-key"].as<string>();
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
id = fVarMap["id"].as<std::string>();
|
id = fVarMap["id"].as<string>();
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string value = FairMQ::ConvertVariableValue<FairMQ::ToString>().Run(fVarMap.at("config-xml-string"));
|
string value = FairMQ::ConvertVariableValue<FairMQ::ToString>().Run(fVarMap.at("config-xml-string"));
|
||||||
std::stringstream ss;
|
stringstream ss;
|
||||||
ss << value;
|
ss << value;
|
||||||
UserParser<FairMQParser::XML>(ss, id);
|
UserParser<FairMQParser::XML>(ss, id);
|
||||||
}
|
}
|
||||||
|
@ -227,47 +227,49 @@ void FairMQProgOptions::UpdateMQValues()
|
||||||
for (const auto& p : fFairMQMap)
|
for (const auto& p : fFairMQMap)
|
||||||
{
|
{
|
||||||
int index = 0;
|
int index = 0;
|
||||||
|
|
||||||
for (const auto& channel : p.second)
|
for (const auto& channel : p.second)
|
||||||
{
|
{
|
||||||
std::string typeKey = p.first + "." + std::to_string(index) + ".type";
|
string typeKey = p.first + "." + to_string(index) + ".type";
|
||||||
std::string methodKey = p.first + "." + std::to_string(index) + ".method";
|
string methodKey = p.first + "." + to_string(index) + ".method";
|
||||||
std::string addressKey = p.first + "." + std::to_string(index) + ".address";
|
string addressKey = p.first + "." + to_string(index) + ".address";
|
||||||
std::string sndBufSizeKey = p.first + "." + std::to_string(index) + ".sndBufSize";
|
string sndBufSizeKey = p.first + "." + to_string(index) + ".sndBufSize";
|
||||||
std::string rcvBufSizeKey = p.first + "." + std::to_string(index) + ".rcvBufSize";
|
string rcvBufSizeKey = p.first + "." + to_string(index) + ".rcvBufSize";
|
||||||
std::string rateLoggingKey = p.first + "." + std::to_string(index) + ".rateLogging";
|
string rateLoggingKey = p.first + "." + to_string(index) + ".rateLogging";
|
||||||
|
|
||||||
fMQKeyMap[typeKey] = std::make_tuple(p.first,index,"type");
|
fMQKeyMap[typeKey] = make_tuple(p.first, index, "type");
|
||||||
fMQKeyMap[methodKey] = std::make_tuple(p.first,index,"method");
|
fMQKeyMap[methodKey] = make_tuple(p.first, index, "method");
|
||||||
fMQKeyMap[addressKey] = std::make_tuple(p.first,index,"address");
|
fMQKeyMap[addressKey] = make_tuple(p.first, index, "address");
|
||||||
fMQKeyMap[sndBufSizeKey] = std::make_tuple(p.first,index,"sndBufSize");
|
fMQKeyMap[sndBufSizeKey] = make_tuple(p.first, index, "sndBufSize");
|
||||||
fMQKeyMap[rcvBufSizeKey] = std::make_tuple(p.first,index,"rcvBufSize");
|
fMQKeyMap[rcvBufSizeKey] = make_tuple(p.first, index, "rcvBufSize");
|
||||||
fMQKeyMap[rateLoggingKey] = std::make_tuple(p.first,index,"rateLogging");
|
fMQKeyMap[rateLoggingKey] = make_tuple(p.first, index, "rateLogging");
|
||||||
|
|
||||||
UpdateVarMap<std::string>(typeKey,channel.GetType());
|
UpdateVarMap<string>(typeKey, channel.GetType());
|
||||||
UpdateVarMap<std::string>(methodKey,channel.GetMethod());
|
UpdateVarMap<string>(methodKey, channel.GetMethod());
|
||||||
UpdateVarMap<std::string>(addressKey,channel.GetAddress());
|
UpdateVarMap<string>(addressKey, channel.GetAddress());
|
||||||
|
|
||||||
|
|
||||||
//UpdateVarMap<std::string>(sndBufSizeKey, std::to_string(channel.GetSndBufSize()));// string API
|
//UpdateVarMap<string>(sndBufSizeKey, to_string(channel.GetSndBufSize()));// string API
|
||||||
UpdateVarMap<int>(sndBufSizeKey, channel.GetSndBufSize());
|
UpdateVarMap<int>(sndBufSizeKey, channel.GetSndBufSize());
|
||||||
|
|
||||||
//UpdateVarMap<std::string>(rcvBufSizeKey, std::to_string(channel.GetRcvBufSize()));// string API
|
//UpdateVarMap<string>(rcvBufSizeKey, to_string(channel.GetRcvBufSize()));// string API
|
||||||
UpdateVarMap<int>(rcvBufSizeKey, channel.GetRcvBufSize());
|
UpdateVarMap<int>(rcvBufSizeKey, channel.GetRcvBufSize());
|
||||||
|
|
||||||
//UpdateVarMap<std::string>(rateLoggingKey,std::to_string(channel.GetRateLogging()));// string API
|
//UpdateVarMap<string>(rateLoggingKey,to_string(channel.GetRateLogging()));// string API
|
||||||
UpdateVarMap<int>(rateLoggingKey, channel.GetRateLogging());
|
UpdateVarMap<int>(rateLoggingKey, channel.GetRateLogging());
|
||||||
|
|
||||||
/*
|
/*
|
||||||
LOG(DEBUG) << "Update MQ parameters of variable map";
|
LOG(DEBUG) << "Update MQ parameters of variable map";
|
||||||
LOG(DEBUG) << "key = " << typeKey <<"\t value = " << GetValue<std::string>(typeKey);
|
LOG(DEBUG) << "key = " << typeKey <<"\t value = " << GetValue<string>(typeKey);
|
||||||
LOG(DEBUG) << "key = " << methodKey <<"\t value = " << GetValue<std::string>(methodKey);
|
LOG(DEBUG) << "key = " << methodKey <<"\t value = " << GetValue<string>(methodKey);
|
||||||
LOG(DEBUG) << "key = " << addressKey <<"\t value = " << GetValue<std::string>(addressKey);
|
LOG(DEBUG) << "key = " << addressKey <<"\t value = " << GetValue<string>(addressKey);
|
||||||
LOG(DEBUG) << "key = " << sndBufSizeKey << "\t value = " << GetValue<int>(sndBufSizeKey);
|
LOG(DEBUG) << "key = " << sndBufSizeKey << "\t value = " << GetValue<int>(sndBufSizeKey);
|
||||||
LOG(DEBUG) << "key = " << rcvBufSizeKey <<"\t value = " << GetValue<int>(rcvBufSizeKey);
|
LOG(DEBUG) << "key = " << rcvBufSizeKey <<"\t value = " << GetValue<int>(rcvBufSizeKey);
|
||||||
LOG(DEBUG) << "key = " << rateLoggingKey <<"\t value = " << GetValue<int>(rateLoggingKey);
|
LOG(DEBUG) << "key = " << rateLoggingKey <<"\t value = " << GetValue<int>(rateLoggingKey);
|
||||||
*/
|
*/
|
||||||
index++;
|
index++;
|
||||||
}
|
}
|
||||||
|
UpdateVarMap<int>(p.first + ".numSockets", index);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -351,7 +353,7 @@ void FairMQProgOptions::InitOptionDescription()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int FairMQProgOptions::UpdateChannelMap(const std::string& channelName, int index, const std::string& member, const std::string& val)
|
int FairMQProgOptions::UpdateChannelMap(const string& channelName, int index, const string& member, const string& val)
|
||||||
{
|
{
|
||||||
if (member == "type")
|
if (member == "type")
|
||||||
{
|
{
|
||||||
|
@ -381,7 +383,7 @@ int FairMQProgOptions::UpdateChannelMap(const std::string& channelName, int inde
|
||||||
|
|
||||||
/*
|
/*
|
||||||
// string API
|
// string API
|
||||||
int FairMQProgOptions::UpdateChannelMap(const std::string& channelName, int index, const std::string& member, const std::string& val)
|
int FairMQProgOptions::UpdateChannelMap(const string& channelName, int index, const string& member, const string& val)
|
||||||
{
|
{
|
||||||
if (member == "type")
|
if (member == "type")
|
||||||
{
|
{
|
||||||
|
@ -418,7 +420,7 @@ int FairMQProgOptions::UpdateChannelMap(const std::string& channelName, int inde
|
||||||
// ----------------------------------------------------------------------------------
|
// ----------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
int FairMQProgOptions::UpdateChannelMap(const std::string& channelName, int index, const std::string& member, int val)
|
int FairMQProgOptions::UpdateChannelMap(const string& channelName, int index, const string& member, int val)
|
||||||
{
|
{
|
||||||
if (member == "sndBufSize")
|
if (member == "sndBufSize")
|
||||||
{
|
{
|
||||||
|
|
|
@ -164,7 +164,6 @@ class FairMQProgOptions : public FairProgOptions , public FairMQEventManager
|
||||||
template<typename T>
|
template<typename T>
|
||||||
int UpdateValue(const std::string& key, T val)
|
int UpdateValue(const std::string& key, T val)
|
||||||
{
|
{
|
||||||
|
|
||||||
if (fVarMap.count(key))
|
if (fVarMap.count(key))
|
||||||
{
|
{
|
||||||
// update variable map
|
// update variable map
|
||||||
|
@ -172,6 +171,7 @@ class FairMQProgOptions : public FairProgOptions , public FairMQEventManager
|
||||||
|
|
||||||
// update FairMQChannel map, check first if data are int or string
|
// update FairMQChannel map, check first if data are int or string
|
||||||
if (std::is_same<T, int>::value || std::is_same<T, std::string>::value)
|
if (std::is_same<T, int>::value || std::is_same<T, std::string>::value)
|
||||||
|
{
|
||||||
if (fMQKeyMap.count(key))
|
if (fMQKeyMap.count(key))
|
||||||
{
|
{
|
||||||
std::string channelName;
|
std::string channelName;
|
||||||
|
@ -180,17 +180,19 @@ class FairMQProgOptions : public FairProgOptions , public FairMQEventManager
|
||||||
std::tie(channelName, index, member) = fMQKeyMap.at(key);
|
std::tie(channelName, index, member) = fMQKeyMap.at(key);
|
||||||
UpdateChannelMap(channelName, index, member, val);
|
UpdateChannelMap(channelName, index, member, val);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// execute stored function of a given key if exist
|
// execute stored function of a given key if exist
|
||||||
//if (std::is_same<T, int>::value || std::is_same<T, std::string>::value)//if one wants to restrict type
|
//if (std::is_same<T, int>::value || std::is_same<T, std::string>::value)//if one wants to restrict type
|
||||||
if (EventKeyFound(key))
|
if (EventKeyFound(key))
|
||||||
|
{
|
||||||
EmitUpdate<typename std::decay<T>::type>(key, val);
|
EmitUpdate<typename std::decay<T>::type>(key, val);
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
|
||||||
LOG(ERROR) << "UpdatedValue failed because the provided key '"
|
LOG(ERROR) << "UpdatedValue failed because the provided key '"
|
||||||
<< key
|
<< key
|
||||||
<< "' is not found in the variable map";
|
<< "' is not found in the variable map";
|
||||||
|
@ -199,16 +201,17 @@ class FairMQProgOptions : public FairProgOptions , public FairMQEventManager
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
template <typename T, typename F>
|
template <typename T, typename F>
|
||||||
void Subscribe(const std::string& key, F&& func)
|
void Subscribe(const std::string& key, F&& func) const
|
||||||
{
|
{
|
||||||
static_assert(!std::is_same<T,const char*>::value || !std::is_same<T, char*>::value,
|
static_assert(!std::is_same<T,const char*>::value || !std::is_same<T, char*>::value,
|
||||||
"In template member FairMQProgOptions::Subscribe<T>(key,Lambda) the types const char* or char* for the calback signatures are not supported.");
|
"In template member FairMQProgOptions::Subscribe<T>(key,Lambda) the types const char* or char* for the calback signatures are not supported.");
|
||||||
|
|
||||||
if (fVarMap.count(key))
|
if (fVarMap.count(key))
|
||||||
|
{
|
||||||
FairMQEventManager::Connect<EventId::UpdateParam, T>(key, std::forward<F>(func));
|
FairMQEventManager::Connect<EventId::UpdateParam, T>(key, std::forward<F>(func));
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
template <typename F>
|
template <typename F>
|
||||||
|
@ -237,13 +240,15 @@ class FairMQProgOptions : public FairProgOptions , public FairMQEventManager
|
||||||
|
|
||||||
bool EventKeyFound(const std::string& key)
|
bool EventKeyFound(const std::string& key)
|
||||||
{
|
{
|
||||||
if (
|
if (FairMQEventManager::EventKeyFound<EventId::UpdateParam>(key))
|
||||||
FairMQEventManager::EventKeyFound<EventId::UpdateParam>(key)
|
{
|
||||||
)
|
|
||||||
return true;
|
return true;
|
||||||
|
}
|
||||||
else
|
else
|
||||||
|
{
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
typedef std::tuple<std::string, int, std::string> MQKey;//store key info
|
typedef std::tuple<std::string, int, std::string> MQKey;//store key info
|
||||||
std::map<std::string, MQKey> fMQKeyMap;// key=full path - val=key info
|
std::map<std::string, MQKey> fMQKeyMap;// key=full path - val=key info
|
||||||
|
|
|
@ -104,18 +104,18 @@ class FairProgOptions
|
||||||
|
|
||||||
//restrict conversion to fundamental types
|
//restrict conversion to fundamental types
|
||||||
template<typename T>
|
template<typename T>
|
||||||
T ConvertTo(const std::string& str_value)
|
T ConvertTo(const std::string& strValue)
|
||||||
{
|
{
|
||||||
if (std::is_arithmetic<T>::value)
|
if (std::is_arithmetic<T>::value)
|
||||||
{
|
{
|
||||||
std::istringstream iss( str_value );
|
std::istringstream iss(strValue);
|
||||||
T val;
|
T val;
|
||||||
iss >> val;
|
iss >> val;
|
||||||
return val;
|
return val;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
LOG(ERROR)<<"the provided string "<<str_value << " cannot be converted in the requested type. The target types must be arithmetic types";
|
LOG(ERROR) << "the provided string " << strValue << " cannot be converted in the requested type. The target types must be arithmetic types";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
92
fairmq/options/README.md
Normal file
92
fairmq/options/README.md
Normal file
|
@ -0,0 +1,92 @@
|
||||||
|
## FairMQParser
|
||||||
|
|
||||||
|
The FairMQParser configures the FairMQ channels from a JSON file.
|
||||||
|
|
||||||
|
The basic structure looks like this:
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"fairMQOptions":
|
||||||
|
{
|
||||||
|
"devices":
|
||||||
|
[{
|
||||||
|
"id": "device1",
|
||||||
|
"channels":
|
||||||
|
[{
|
||||||
|
"name": "data",
|
||||||
|
"sockets":
|
||||||
|
[{
|
||||||
|
"type": "push",
|
||||||
|
"method": "bind",
|
||||||
|
"address": "tcp://127.0.0.1:5555",
|
||||||
|
"sndBufSize": 1000,
|
||||||
|
"rcvBufSize": 1000,
|
||||||
|
"rateLogging": 1
|
||||||
|
}]
|
||||||
|
}]
|
||||||
|
}]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
The top level key is `fairMQOptions`, followed by one or more devices (with their IDs), each containing one or more channels (with their names), each containing one or more sockets.
|
||||||
|
|
||||||
|
The socket parameters accept following values:
|
||||||
|
- `type` (default = ""): "push"/"pull", "pub"/"sub", "req"/"rep", "xsub"/"xpub", "dealer/router", "pair".
|
||||||
|
- `method` (default = ""): "bind"/"connect".
|
||||||
|
- `address` (default = ""): address to bind/connect.
|
||||||
|
- `sndBufSize` (default = 1000): socket send queue size in number of messages.
|
||||||
|
- `rcvBufSize` (default = 1000): socket receive queue size in number of messages.
|
||||||
|
- `rateLogging` (default = 1): log socket transfer rates in seconds. 0 for no logging of this socket.
|
||||||
|
|
||||||
|
If a parameter is not specified, its default value will be set.
|
||||||
|
|
||||||
|
When a channel has multiple sockets, sockets can share common parameters. In this case specify the common parameters directly on the channel, which will be applied to all sockets of the channel. For example, the following config will create 3 sockets for the data channel with same settings, except the address:
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"fairMQOptions":
|
||||||
|
{
|
||||||
|
"devices":
|
||||||
|
[{
|
||||||
|
"id": "device1",
|
||||||
|
"channels":
|
||||||
|
[{
|
||||||
|
"name": "data",
|
||||||
|
"type": "push",
|
||||||
|
"method": "connect",
|
||||||
|
"sockets":
|
||||||
|
[
|
||||||
|
{ "address": "tcp://127.0.0.1:5555" }
|
||||||
|
{ "address": "tcp://127.0.0.1:5556" }
|
||||||
|
{ "address": "tcp://127.0.0.1:5557" }
|
||||||
|
]
|
||||||
|
}]
|
||||||
|
}]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
The device ID should be unique within a topology. It is possible to create a configuration that can be shared by multiple devices, by specifying "key" instead of "id". To use it the started device must be started with `--config-key <key>` option. For example, the following config can be applied to multiple running devices within a topology:
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"fairMQOptions":
|
||||||
|
{
|
||||||
|
"devices":
|
||||||
|
[{
|
||||||
|
"key": "processor",
|
||||||
|
"channels":
|
||||||
|
[{
|
||||||
|
"name": "data",
|
||||||
|
"socket":
|
||||||
|
{
|
||||||
|
"type": "pull",
|
||||||
|
"method": "connect",
|
||||||
|
"address": "tcp://localhost:5555"
|
||||||
|
}
|
||||||
|
}]
|
||||||
|
}]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
|
@ -9,6 +9,7 @@
|
||||||
set(INCLUDE_DIRECTORIES
|
set(INCLUDE_DIRECTORIES
|
||||||
${CMAKE_SOURCE_DIR}/fairmq
|
${CMAKE_SOURCE_DIR}/fairmq
|
||||||
${CMAKE_SOURCE_DIR}/fairmq/plugins/config
|
${CMAKE_SOURCE_DIR}/fairmq/plugins/config
|
||||||
|
${CMAKE_SOURCE_DIR}/fairmq/options
|
||||||
)
|
)
|
||||||
|
|
||||||
set(SYSTEM_INCLUDE_DIRECTORIES
|
set(SYSTEM_INCLUDE_DIRECTORIES
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
#include "FairMQLogger.h"
|
#include "FairMQLogger.h"
|
||||||
#include "FairMQDevice.h"
|
#include "FairMQDevice.h"
|
||||||
#include "FairMQChannel.h"
|
#include "FairMQChannel.h"
|
||||||
|
#include "FairMQProgOptions.h"
|
||||||
|
|
||||||
#include <vector>
|
#include <vector>
|
||||||
#include <map>
|
#include <map>
|
||||||
|
|
|
@ -1,12 +1,12 @@
|
||||||
{
|
{
|
||||||
"fairMQOptions":
|
"fairMQOptions":
|
||||||
{
|
{
|
||||||
"device":
|
"devices":
|
||||||
{
|
[{
|
||||||
"id": "bsampler1",
|
"id": "bsampler1",
|
||||||
"channel":
|
"channel":
|
||||||
{
|
{
|
||||||
"name": "data-out",
|
"name": "data",
|
||||||
"socket":
|
"socket":
|
||||||
{
|
{
|
||||||
"type": "push",
|
"type": "push",
|
||||||
|
@ -18,12 +18,11 @@
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"device":
|
|
||||||
{
|
{
|
||||||
"id": "sink1",
|
"id": "sink1",
|
||||||
"channel":
|
"channel":
|
||||||
{
|
{
|
||||||
"name": "data-in",
|
"name": "data",
|
||||||
"socket":
|
"socket":
|
||||||
{
|
{
|
||||||
"type": "pull",
|
"type": "pull",
|
||||||
|
@ -34,7 +33,7 @@
|
||||||
"rateLogging": "1"
|
"rateLogging": "1"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -14,13 +14,13 @@ namespace bpo = boost::program_options;
|
||||||
void addCustomOptions(bpo::options_description& options)
|
void addCustomOptions(bpo::options_description& options)
|
||||||
{
|
{
|
||||||
options.add_options()
|
options.add_options()
|
||||||
("out-channel", bpo::value<std::string>()->default_value("data-out"), "Name of the output channel")
|
("out-channel", bpo::value<std::string>()->default_value("data"), "Name of the output channel")
|
||||||
("msg-size", bpo::value<int>()->default_value(1000), "Message size in bytes")
|
("msg-size", bpo::value<int>()->default_value(1000), "Message size in bytes")
|
||||||
("num-msgs", bpo::value<int>()->default_value(0), "Number of messages to send")
|
("num-msgs", bpo::value<int>()->default_value(0), "Number of messages to send")
|
||||||
("msg-rate", bpo::value<int>()->default_value(0), "Msg rate limit in maximum number of messages per second");
|
("msg-rate", bpo::value<int>()->default_value(0), "Msg rate limit in maximum number of messages per second");
|
||||||
}
|
}
|
||||||
|
|
||||||
FairMQDevice* getDevice(const FairMQProgOptions& /*config*/)
|
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
|
||||||
{
|
{
|
||||||
return new FairMQBenchmarkSampler();
|
return new FairMQBenchmarkSampler();
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,7 +19,7 @@ void addCustomOptions(bpo::options_description& options)
|
||||||
("multipart", bpo::value<int>()->default_value(1), "Handle multipart payloads");
|
("multipart", bpo::value<int>()->default_value(1), "Handle multipart payloads");
|
||||||
}
|
}
|
||||||
|
|
||||||
FairMQDevice* getDevice(const FairMQProgOptions& /*config*/)
|
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
|
||||||
{
|
{
|
||||||
return new FairMQMerger();
|
return new FairMQMerger();
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,7 +19,7 @@ void addCustomOptions(bpo::options_description& options)
|
||||||
("multipart", bpo::value<int>()->default_value(1), "Handle multipart payloads");
|
("multipart", bpo::value<int>()->default_value(1), "Handle multipart payloads");
|
||||||
}
|
}
|
||||||
|
|
||||||
FairMQDevice* getDevice(const FairMQProgOptions& /*config*/)
|
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
|
||||||
{
|
{
|
||||||
return new FairMQMultiplier();
|
return new FairMQMultiplier();
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,7 +19,7 @@ void addCustomOptions(bpo::options_description& options)
|
||||||
("multipart", bpo::value<int>()->default_value(1), "Handle multipart payloads");
|
("multipart", bpo::value<int>()->default_value(1), "Handle multipart payloads");
|
||||||
}
|
}
|
||||||
|
|
||||||
FairMQDevice* getDevice(const FairMQProgOptions& /*config*/)
|
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
|
||||||
{
|
{
|
||||||
return new FairMQProxy();
|
return new FairMQProxy();
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,11 +14,11 @@ namespace bpo = boost::program_options;
|
||||||
void addCustomOptions(bpo::options_description& options)
|
void addCustomOptions(bpo::options_description& options)
|
||||||
{
|
{
|
||||||
options.add_options()
|
options.add_options()
|
||||||
("in-channel", bpo::value<std::string>()->default_value("data-in"), "Name of the input channel")
|
("in-channel", bpo::value<std::string>()->default_value("data"), "Name of the input channel")
|
||||||
("num-msgs", bpo::value<int>()->default_value(0), "Number of messages to receive");
|
("num-msgs", bpo::value<int>()->default_value(0), "Number of messages to receive");
|
||||||
}
|
}
|
||||||
|
|
||||||
FairMQDevice* getDevice(const FairMQProgOptions& /*config*/)
|
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
|
||||||
{
|
{
|
||||||
return new FairMQSink();
|
return new FairMQSink();
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,7 +19,7 @@ void addCustomOptions(bpo::options_description& options)
|
||||||
("multipart", bpo::value<int>()->default_value(1), "Handle multipart payloads");
|
("multipart", bpo::value<int>()->default_value(1), "Handle multipart payloads");
|
||||||
}
|
}
|
||||||
|
|
||||||
FairMQDevice* getDevice(const FairMQProgOptions& /*config*/)
|
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
|
||||||
{
|
{
|
||||||
return new FairMQSplitter();
|
return new FairMQSplitter();
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,8 +32,10 @@ FairMQDevice* makeDeviceWithConditionalRun(R r)
|
||||||
return new GenericFairMQDevice<R>(r);
|
return new GenericFairMQDevice<R>(r);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
using FairMQDevicePtr = FairMQDevice*;
|
||||||
|
|
||||||
// to be implemented by the user to return a child class of FairMQDevice
|
// to be implemented by the user to return a child class of FairMQDevice
|
||||||
FairMQDevice* getDevice(const FairMQProgOptions& config);
|
FairMQDevicePtr getDevice(const FairMQProgOptions& config);
|
||||||
|
|
||||||
// to be implemented by the user to add custom command line options (or just with empty body)
|
// to be implemented by the user to add custom command line options (or just with empty body)
|
||||||
void addCustomOptions(boost::program_options::options_description&);
|
void addCustomOptions(boost::program_options::options_description&);
|
||||||
|
|
|
@ -43,8 +43,8 @@ inline int runStateMachine(TMQDevice& device, FairMQProgOptions& cfg)
|
||||||
FairMQConfigPlugin* fairmqConfigPlugin = nullptr;
|
FairMQConfigPlugin* fairmqConfigPlugin = nullptr;
|
||||||
FairMQControlPlugin* fairmqControlPlugin = nullptr;
|
FairMQControlPlugin* fairmqControlPlugin = nullptr;
|
||||||
|
|
||||||
std::clock_t c_start = std::clock();
|
std::clock_t cStart = std::clock();
|
||||||
auto t_start = std::chrono::high_resolution_clock::now();
|
auto tStart = std::chrono::high_resolution_clock::now();
|
||||||
|
|
||||||
device.ChangeState(TMQDevice::INIT_DEVICE);
|
device.ChangeState(TMQDevice::INIT_DEVICE);
|
||||||
// Wait for the binding channels to bind
|
// Wait for the binding channels to bind
|
||||||
|
@ -112,11 +112,11 @@ inline int runStateMachine(TMQDevice& device, FairMQProgOptions& cfg)
|
||||||
|
|
||||||
device.WaitForEndOfState(TMQDevice::INIT_DEVICE);
|
device.WaitForEndOfState(TMQDevice::INIT_DEVICE);
|
||||||
|
|
||||||
std::clock_t c_end = std::clock();
|
std::clock_t cEnd = std::clock();
|
||||||
auto t_end = std::chrono::high_resolution_clock::now();
|
auto tEnd = std::chrono::high_resolution_clock::now();
|
||||||
|
|
||||||
LOG(DEBUG) << "Init time (CPU) : " << std::fixed << std::setprecision(2) << 1000.0 * (c_end - c_start) / CLOCKS_PER_SEC << " ms";
|
LOG(DEBUG) << "Init time (CPU) : " << std::fixed << std::setprecision(2) << 1000.0 * (cEnd - cStart) / CLOCKS_PER_SEC << " ms";
|
||||||
LOG(DEBUG) << "Init time (Wall): " << std::chrono::duration<double, std::milli>(t_end - t_start).count() << " ms";
|
LOG(DEBUG) << "Init time (Wall): " << std::chrono::duration<double, std::milli>(tEnd - tStart).count() << " ms";
|
||||||
|
|
||||||
device.ChangeState(TMQDevice::INIT_TASK);
|
device.ChangeState(TMQDevice::INIT_TASK);
|
||||||
device.WaitForEndOfState(TMQDevice::INIT_TASK);
|
device.WaitForEndOfState(TMQDevice::INIT_TASK);
|
||||||
|
|
Loading…
Reference in New Issue
Block a user