Port DDS plugin to the new plugin system.

This commit is contained in:
Alexey Rybalchenko 2017-09-15 13:51:42 +02:00 committed by Mohammad Al-Turany
parent 2db114bc5c
commit 01327426c3
21 changed files with 564 additions and 130 deletions

View File

@ -29,6 +29,14 @@
#
#
####################
# external plugins #
####################
if (DDS_FOUND)
add_subdirectory(plugins/DDS)
endif()
############################
# preprocessor definitions #
############################

View File

@ -162,7 +162,16 @@ string FairMQChannel::GetChannelName() const
string FairMQChannel::GetChannelPrefix() const
{
string prefix = fName;
return prefix.erase(fName.rfind("["));
prefix = prefix.erase(fName.rfind("["));
return prefix;
}
string FairMQChannel::GetChannelIndex() const
{
string indexStr = fName;
indexStr.erase(indexStr.rfind("]"));
indexStr.erase(0, indexStr.rfind("[") + 1);
return indexStr;
}
string FairMQChannel::GetType() const
@ -516,11 +525,9 @@ bool FairMQChannel::ValidateChannel()
}
else
{
//TODO: maybe cache fEndpoints as a class member? not really needed as tokenizing is
//fast, and only happens during (re-)configure
vector<string> fEndpoints;
Tokenize(fEndpoints, fAddress);
for (const auto endpoint : fEndpoints)
vector<string> endpoints;
boost::algorithm::split(endpoints, fAddress, boost::algorithm::is_any_of(","));
for (const auto endpoint : endpoints)
{
string address;
if (endpoint[0] == '@' || endpoint[0] == '+' || endpoint[0] == '>')
@ -841,11 +848,6 @@ FairMQChannel::~FairMQChannel()
{
}
void FairMQChannel::Tokenize(vector<string>& output, const string& input, const string delimiters)
{
boost::algorithm::split(output, input, boost::algorithm::is_any_of(delimiters));
}
unsigned long FairMQChannel::GetBytesTx() const
{
return fSocket->GetBytesTx();

View File

@ -78,9 +78,13 @@ class FairMQChannel
std::string GetChannelName() const;
/// Get channel prefix
/// @return Returns channel prefix (e.g. "data")
/// @return Returns channel prefix (e.g. "data" in "data[0]")
std::string GetChannelPrefix() const;
/// Get channel index
/// @return Returns channel index (e.g. 0 in "data[0]")
std::string GetChannelIndex() const;
/// Get socket type
/// @return Returns socket type (push/pull/pub/sub/spub/xsub/pair/req/rep/dealer/router/)
std::string GetType() const;
@ -270,9 +274,6 @@ class FairMQChannel
return ReceiveAsync(parts.fParts);
}
// TODO: this might go to some base utility library
static void Tokenize(std::vector<std::string>& output, const std::string& input, const std::string delimiters = ",");
unsigned long GetBytesTx() const;
unsigned long GetBytesRx() const;
unsigned long GetMessagesTx() const;

View File

@ -62,7 +62,6 @@ FairMQDevice::FairMQDevice()
, fDefaultTransport()
, fInitializationTimeoutInS(120)
, fCatchingSignals(false)
, fTerminationRequested(false)
, fInteractiveRunning(false)
, fDataCallbacks(false)
, fDeviceCmdSockets()
@ -94,7 +93,6 @@ FairMQDevice::FairMQDevice(const fair::mq::tools::Version version)
, fDefaultTransport()
, fInitializationTimeoutInS(120)
, fCatchingSignals(false)
, fTerminationRequested(false)
, fInteractiveRunning(false)
, fDataCallbacks(false)
, fDeviceCmdSockets()
@ -151,33 +149,6 @@ void FairMQDevice::SignalHandler(int signal)
}
}
void FairMQDevice::AttachChannels(list<FairMQChannel*>& chans)
{
auto itr = chans.begin();
while (itr != chans.end())
{
if ((*itr)->ValidateChannel())
{
if (AttachChannel(**itr))
{
(*itr)->InitCommandInterface();
(*itr)->SetModified(false);
chans.erase(itr++);
}
else
{
LOG(ERROR) << "failed to attach channel " << (*itr)->fName << " (" << (*itr)->fMethod << ")";
++itr;
}
}
else
{
++itr;
}
}
}
void FairMQDevice::InitWrapper()
{
if (!fTransportFactory)
@ -203,13 +174,13 @@ void FairMQDevice::InitWrapper()
}
// Containers to store the uninitialized channels.
list<FairMQChannel*> uninitializedBindingChannels;
list<FairMQChannel*> uninitializedConnectingChannels;
vector<FairMQChannel*> uninitializedBindingChannels;
vector<FairMQChannel*> uninitializedConnectingChannels;
// Fill the uninitialized channel containers
for (auto mi = fChannels.begin(); mi != fChannels.end(); ++mi)
for (auto& mi : fChannels)
{
for (auto vi = (mi->second).begin(); vi != (mi->second).end(); ++vi)
for (auto vi = mi.second.begin(); vi != mi.second.end(); ++vi)
{
if (vi->fModified)
{
@ -224,9 +195,7 @@ void FairMQDevice::InitWrapper()
vi->fChannelCmdSocket = nullptr;
}
// set channel name: name + vector index
stringstream ss;
ss << mi->first << "[" << vi - (mi->second).begin() << "]";
vi->fName = ss.str();
vi->fName = fair::mq::tools::ToString(mi.first, "[", vi - (mi.second).begin(), "]");
if (vi->fMethod == "bind")
{
@ -256,12 +225,11 @@ void FairMQDevice::InitWrapper()
else
{
LOG(ERROR) << "Cannot update configuration. Socket method (bind/connect) not specified.";
exit(EXIT_FAILURE);
throw runtime_error("Cannot update configuration. Socket method (bind/connect) not specified.");
}
}
}
}
CallStateChangeCallbacks(INITIALIZING_DEVICE);
// Bind channels. Here one run is enough, because bind settings should be available locally
// If necessary this could be handled in the same way as the connecting channels
@ -270,9 +238,11 @@ void FairMQDevice::InitWrapper()
if (uninitializedBindingChannels.size() > 0)
{
LOG(ERROR) << uninitializedBindingChannels.size() << " of the binding channels could not initialize. Initial configuration incomplete.";
exit(EXIT_FAILURE);
throw runtime_error(fair::mq::tools::ToString(uninitializedBindingChannels.size(), " of the binding channels could not initialize. Initial configuration incomplete."));
}
CallStateChangeCallbacks(INITIALIZING_DEVICE);
// notify parent thread about completion of first validation.
{
lock_guard<mutex> lock(fInitialValidationMutex);
@ -281,21 +251,36 @@ void FairMQDevice::InitWrapper()
}
// go over the list of channels until all are initialized (and removed from the uninitialized list)
int numAttempts = 0;
int numAttempts = 1;
auto sleepTimeInMS = 50;
auto maxAttempts = fInitializationTimeoutInS * 1000 / sleepTimeInMS;
// first attempt
AttachChannels(uninitializedConnectingChannels);
// if not all channels could be connected, update their address values from config and retry
while (!uninitializedConnectingChannels.empty())
{
AttachChannels(uninitializedConnectingChannels);
if (numAttempts > maxAttempts)
this_thread::sleep_for(chrono::milliseconds(sleepTimeInMS));
if (fConfig)
{
LOG(ERROR) << "could not connect all channels after " << fInitializationTimeoutInS << " attempts";
// TODO: goto ERROR state;
exit(EXIT_FAILURE);
for (auto& chan : uninitializedConnectingChannels)
{
string key{"chans." + chan->GetChannelPrefix() + "." + chan->GetChannelIndex() + ".address"};
string newAddress = fConfig->GetValue<string>(key);
if (newAddress != chan->GetAddress())
{
chan->UpdateAddress(newAddress);
}
}
}
this_thread::sleep_for(chrono::milliseconds(sleepTimeInMS));
numAttempts++;
if (numAttempts++ > maxAttempts)
{
LOG(ERROR) << "could not connect all channels after " << fInitializationTimeoutInS << " attempts";
throw runtime_error(fair::mq::tools::ToString("could not connect all channels after ", fInitializationTimeoutInS, " attempts"));
}
AttachChannels(uninitializedConnectingChannels);
}
Init();
@ -313,6 +298,33 @@ void FairMQDevice::Init()
{
}
void FairMQDevice::AttachChannels(vector<FairMQChannel*>& chans)
{
auto itr = chans.begin();
while (itr != chans.end())
{
if ((*itr)->ValidateChannel())
{
if (AttachChannel(**itr))
{
(*itr)->InitCommandInterface();
(*itr)->SetModified(false);
itr = chans.erase(itr);
}
else
{
LOG(ERROR) << "failed to attach channel " << (*itr)->fName << " (" << (*itr)->fMethod << ")";
++itr;
}
}
else
{
++itr;
}
}
}
bool FairMQDevice::AttachChannel(FairMQChannel& ch)
{
if (!ch.fTransportFactory)
@ -331,7 +343,7 @@ bool FairMQDevice::AttachChannel(FairMQChannel& ch)
}
vector<string> endpoints;
FairMQChannel::Tokenize(endpoints, ch.fAddress);
boost::algorithm::split(endpoints, ch.fAddress, boost::algorithm::is_any_of(","));
for (auto& endpoint : endpoints)
{
//(re-)init socket
@ -402,8 +414,17 @@ bool FairMQDevice::AttachChannel(FairMQChannel& ch)
}
}
// put the (possibly) modified address back in the config
ch.UpdateAddress(boost::algorithm::join(endpoints, ","));
// put the (possibly) modified address back in the channel object and config
string newAddress{boost::algorithm::join(endpoints, ",")};
if (newAddress != ch.fAddress)
{
ch.UpdateAddress(newAddress);
if (fConfig)
{
string key{"chans." + ch.GetChannelPrefix() + "." + ch.GetChannelIndex() + ".address"};
fConfig->SetValue(key, newAddress);
}
}
return true;
}

View File

@ -409,11 +409,6 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable
}
}
bool Terminated()
{
return fTerminationRequested;
}
const FairMQChannel& GetChannel(const std::string& channelName, const int index = 0) const;
virtual void RegisterChannelEndpoints() {}
@ -549,7 +544,7 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable
void Exit();
/// Attach (bind/connect) channels in the list
void AttachChannels(std::list<FairMQChannel*>& chans);
void AttachChannels(std::vector<FairMQChannel*>& chans);
/// Sets up and connects/binds a socket to an endpoint
/// return a string with the actual endpoint if it happens
@ -574,7 +569,6 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable
/// Signal handler
void SignalHandler(int signal);
bool fCatchingSignals;
std::atomic<bool> fTerminationRequested;
// Interactive state loop helper
std::atomic<bool> fInteractiveRunning;

View File

@ -91,6 +91,7 @@ struct FairMQFSM : public msmf::state_machine_def<FairMQFSM>
, fStateChangeSignal()
, fStateChangeSignalsMap()
, fWorkerThread()
, fTerminationRequested(false)
{}
virtual ~FairMQFSM()
@ -299,6 +300,7 @@ struct FairMQFSM : public msmf::state_machine_def<FairMQFSM>
{
LOG(STATE) << "Entering EXITING state";
fsm.fState = EXITING;
fsm.fTerminationRequested = true;
fsm.CallStateChangeCallbacks(EXITING);
// terminate worker thread
@ -454,6 +456,11 @@ struct FairMQFSM : public msmf::state_machine_def<FairMQFSM>
virtual void Exit() {}
virtual void Unblock() {}
bool Terminated()
{
return fTerminationRequested;
}
protected:
std::atomic<State> fState;
std::mutex fChangeStateMutex;
@ -469,6 +476,7 @@ struct FairMQFSM : public msmf::state_machine_def<FairMQFSM>
boost::signals2::signal<void(const State)> fStateChangeSignal;
std::unordered_map<std::string, boost::signals2::connection> fStateChangeSignalsMap;
std::atomic<bool> fTerminationRequested;
void CallStateChangeCallbacks(const State state) const
{

View File

@ -11,10 +11,13 @@
#include <fairmq/Tools.h>
#include <fairmq/PluginServices.h>
#include <boost/dll/alias.hpp>
#include <boost/optional.hpp>
#include <boost/program_options.hpp>
#include <functional>
#include <unordered_map>
#include <ostream>
#include <memory>
#include <string>
@ -68,7 +71,9 @@ class Plugin
using DeviceState = fair::mq::PluginServices::DeviceState;
using DeviceStateTransition = fair::mq::PluginServices::DeviceStateTransition;
auto ToDeviceState(const std::string& state) const -> DeviceState { return fPluginServices->ToDeviceState(state); }
auto ToDeviceStateTransition(const std::string& transition) const -> DeviceStateTransition { return fPluginServices->ToDeviceStateTransition(transition); }
auto ToStr(DeviceState state) const -> std::string { return fPluginServices->ToStr(state); }
auto ToStr(DeviceStateTransition transition) const -> std::string { return fPluginServices->ToStr(transition); }
auto GetCurrentDeviceState() const -> DeviceState { return fPluginServices->GetCurrentDeviceState(); }
auto TakeDeviceControl() -> void { fPluginServices->TakeDeviceControl(fkName); };
auto ReleaseDeviceControl() -> void { fPluginServices->ReleaseDeviceControl(fkName); };
@ -83,6 +88,7 @@ class Plugin
template<typename T>
auto GetProperty(const std::string& key) const -> T { return fPluginServices->GetProperty<T>(key); }
auto GetPropertyAsString(const std::string& key) const -> std::string { return fPluginServices->GetPropertyAsString(key); }
auto GetChannelInfo() const -> std::unordered_map<std::string, int> { return fPluginServices->GetChannelInfo(); }
// template<typename T>
// auto SubscribeToPropertyChange(std::function<void(const std::string& [>key*/, const T /*newValue<])> callback) const -> void { fPluginServices.SubscribeToPropertyChange(fkName, callback); }
// template<typename T>

View File

@ -12,11 +12,13 @@
#include <fairmq/Tools.h>
#include <FairMQDevice.h>
#include <options/FairMQProgOptions.h>
#include <boost/optional.hpp>
#include <boost/optional/optional_io.hpp>
#include <functional>
#include <string>
#include <unordered_map>
#include <boost/optional.hpp>
#include <boost/optional/optional_io.hpp>
#include <mutex>
#include <condition_variable>
@ -39,6 +41,9 @@ class PluginServices
PluginServices(FairMQProgOptions* config, std::shared_ptr<FairMQDevice> device)
: fConfig{config}
, fDevice{device}
, fDeviceController()
, fDeviceControllerMutex()
, fReleaseDeviceControlCondition()
{
}
@ -193,6 +198,8 @@ class PluginServices
/// If a type is not supported, the user can provide support by overloading the ostream operator for this type
auto GetPropertyAsString(const std::string& key) const -> std::string { return fConfig->GetStringValue(key); }
auto GetChannelInfo() const -> std::unordered_map<std::string, int> { return fConfig->GetChannelInfo(); }
/// @brief Subscribe to property updates of type T
/// @param subscriber
/// @param callback function

View File

@ -1,8 +1,8 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
@ -45,8 +45,8 @@ class FairMQBenchmarkSampler : public FairMQDevice
std::string fOutChannelName;
std::thread fResetMsgCounter;
virtual void InitTask();
virtual void Run();
virtual void InitTask() override;
virtual void Run() override;
};
#endif /* FAIRMQBENCHMARKSAMPLER_H_ */

View File

@ -14,7 +14,7 @@ using namespace std;
FairMQ::Transport FairMQTransportFactoryNN::fTransportType = FairMQ::Transport::NN;
FairMQTransportFactoryNN::FairMQTransportFactoryNN(const string& id, const FairMQProgOptions* config)
FairMQTransportFactoryNN::FairMQTransportFactoryNN(const string& id, const FairMQProgOptions* /*config*/)
: FairMQTransportFactory(id)
{
LOG(DEBUG) << "Transport: Using nanomsg library";

View File

@ -33,16 +33,7 @@ FairMQMap ptreeToMQMap(const boost::property_tree::ptree& pt, const string& id,
// Extract value from boost::property_tree
Helper::DeviceParser(pt.get_child(rootNode), channelMap, id, formatFlag);
if (channelMap.size() > 0)
{
stringstream channelKeys;
for (const auto& p : channelMap)
{
channelKeys << "'" << p.first << "' ";
}
LOG(DEBUG) << "---- Found following channel keys: " << channelKeys.str();
}
else
if (channelMap.empty())
{
LOG(WARN) << "---- No channel keys found for " << id;
LOG(WARN) << "---- Check the JSON inputs and/or command line inputs";

View File

@ -29,6 +29,7 @@ FairMQProgOptions::FairMQProgOptions()
, fFairMQMap()
, fHelpTitle("***** FAIRMQ Program Options ***** ")
, fVersion("Beta version 0.1")
, fChannelInfo()
, fMQKeyMap()
// , fSignalMap() //string API
{
@ -114,7 +115,7 @@ void FairMQProgOptions::ParseAll(const int argc, char const* const* argv, bool a
DefaultConsoleSetFilter(fSeverityMap.at(verbosity));
}
// check if one of required MQ config option is there
// check if one of required MQ config option is there
auto parserOptions = fMQParserOptions.options();
bool optionExists = false;
vector<string> MQParserKeys;
@ -157,24 +158,24 @@ void FairMQProgOptions::ParseAll(const int argc, char const* const* argv, bool a
string file = fVarMap["mq-config"].as<string>();
string fileExtension = boost::filesystem::extension(file);
string ext = boost::filesystem::extension(file);
transform(fileExtension.begin(), fileExtension.end(), fileExtension.begin(), ::tolower);
transform(ext.begin(), ext.end(), ext.begin(), ::tolower);
if (fileExtension == ".json")
if (ext == ".json")
{
UserParser<FairMQParser::JSON>(file, id);
}
else
{
if (fileExtension == ".xml")
if (ext == ".xml")
{
UserParser<FairMQParser::XML>(file, id);
}
else
{
LOG(ERROR) << "mq-config command line called but file extension '"
<< fileExtension
<< ext
<< "' not recognized. Program will now exit";
exit(EXIT_FAILURE);
}
@ -211,6 +212,7 @@ void FairMQProgOptions::ParseAll(const int argc, char const* const* argv, bool a
int FairMQProgOptions::Store(const FairMQMap& channels)
{
fFairMQMap = channels;
UpdateChannelInfo();
UpdateMQValues();
return 0;
}
@ -219,10 +221,20 @@ int FairMQProgOptions::Store(const FairMQMap& channels)
int FairMQProgOptions::UpdateChannelMap(const FairMQMap& channels)
{
fFairMQMap = channels;
UpdateChannelInfo();
UpdateMQValues();
return 0;
}
void FairMQProgOptions::UpdateChannelInfo()
{
fChannelInfo.clear();
for (const auto& c : fFairMQMap)
{
fChannelInfo.insert(std::make_pair(c.first, c.second.size()));
}
}
// read FairMQChannelMap and insert/update corresponding values in variable map
// create key for variable map as follow : channelName.index.memberName
void FairMQProgOptions::UpdateMQValues()
@ -233,15 +245,15 @@ void FairMQProgOptions::UpdateMQValues()
for (const auto& channel : p.second)
{
string typeKey = p.first + "." + to_string(index) + ".type";
string methodKey = p.first + "." + to_string(index) + ".method";
string addressKey = p.first + "." + to_string(index) + ".address";
string transportKey = p.first + "." + to_string(index) + ".transport";
string sndBufSizeKey = p.first + "." + to_string(index) + ".sndBufSize";
string rcvBufSizeKey = p.first + "." + to_string(index) + ".rcvBufSize";
string sndKernelSizeKey = p.first + "." + to_string(index) + ".sndKernelSize";
string rcvKernelSizeKey = p.first + "." + to_string(index) + ".rcvKernelSize";
string rateLoggingKey = p.first + "." + to_string(index) + ".rateLogging";
string typeKey = "chans." + p.first + "." + to_string(index) + ".type";
string methodKey = "chans." + p.first + "." + to_string(index) + ".method";
string addressKey = "chans." + p.first + "." + to_string(index) + ".address";
string transportKey = "chans." + p.first + "." + to_string(index) + ".transport";
string sndBufSizeKey = "chans." + p.first + "." + to_string(index) + ".sndBufSize";
string rcvBufSizeKey = "chans." + p.first + "." + to_string(index) + ".rcvBufSize";
string sndKernelSizeKey = "chans." + p.first + "." + to_string(index) + ".sndKernelSize";
string rcvKernelSizeKey = "chans." + p.first + "." + to_string(index) + ".rcvKernelSize";
string rateLoggingKey = "chans." + p.first + "." + to_string(index) + ".rateLogging";
fMQKeyMap[typeKey] = make_tuple(p.first, index, "type");
fMQKeyMap[methodKey] = make_tuple(p.first, index, "method");
@ -258,7 +270,6 @@ void FairMQProgOptions::UpdateMQValues()
UpdateVarMap<string>(addressKey, channel.GetAddress());
UpdateVarMap<string>(transportKey, channel.GetTransport());
//UpdateVarMap<string>(sndBufSizeKey, to_string(channel.GetSndBufSize()));// string API
UpdateVarMap<int>(sndBufSizeKey, channel.GetSndBufSize());

View File

@ -21,6 +21,7 @@
#include <map>
#include <set>
#include <mutex>
#include <string>
#include "FairProgOptions.h"
#include "FairMQEventManager.h"
@ -37,11 +38,11 @@ class FairMQProgOptions : public FairProgOptions , public FairMQEventManager
FairMQProgOptions();
virtual ~FairMQProgOptions();
// parse command line and txt/INI configuration file.
// parse command line and txt/INI configuration file.
// default parser for the mq-configuration file (JSON/XML) is called if command line key mq-config is called
virtual void ParseAll(const int argc, char const* const* argv, bool allowUnregistered = false);
// external parser, store function
// external parser, store function
template <typename T, typename ...Args>
int UserParser(Args &&... args)
{
@ -57,11 +58,16 @@ class FairMQProgOptions : public FairProgOptions , public FairMQEventManager
return 0;
}
FairMQMap GetFairMQMap()
FairMQMap GetFairMQMap() const
{
return fFairMQMap;
}
std::unordered_map<std::string, int> GetChannelInfo() const
{
return fChannelInfo;
}
// to customize title of the executable help command line
void SetHelpTitle(const std::string& title)
{
@ -257,7 +263,6 @@ class FairMQProgOptions : public FairProgOptions , public FairMQEventManager
Disconnect<EventId::UpdateParam, T>(key);
}
/*
template <typename F>
@ -284,6 +289,9 @@ class FairMQProgOptions : public FairProgOptions , public FairMQEventManager
std::string fHelpTitle;
std::string fVersion;
// map of read channel info - channel name - number of subchannels
std::unordered_map<std::string, int> fChannelInfo;
bool EventKeyFound(const std::string& key)
{
if (FairMQEventManager::EventKeyFound<EventId::UpdateParam>(key))
@ -338,6 +346,8 @@ class FairMQProgOptions : public FairProgOptions , public FairMQEventManager
{
return 0;
}
void UpdateChannelInfo();
};
#endif /* FAIRMQPROGOPTIONS_H */

View File

@ -201,9 +201,9 @@ class FairProgOptions
}
template<typename T>
void replace(std::map<std::string, po::variable_value>& vm, const std::string& opt, const T& val)
void replace(std::map<std::string, po::variable_value>& vm, const std::string& key, const T& val)
{
vm[opt].value() = boost::any(val);
vm[key].value() = boost::any(val);
}
private:

View File

@ -71,13 +71,13 @@ int main(int argc, char** argv)
("data-rate", po::value<double>()->default_value(0.5), "Data rate");
// parse command lines, parse json file and init FairMQMap
config.ParseAll(argc, argv);
config.ParseAll(argc, argv, true);
// // get FairMQMap
// auto map1 = config.GetFairMQMap();
// // update value in variable map, and propagate the update to the FairMQMap
// config.UpdateValue<string>("data.0.address","tcp://localhost:1234");
// config.UpdateValue<string>("chans.data.0.address","tcp://localhost:1234");
// // get the updated FairMQMap
// auto map2 = config.GetFairMQMap();
@ -98,15 +98,15 @@ int main(int argc, char** argv)
// double dataRate = config.ConvertTo<double>(dataRateStr);
// LOG(INFO) << "dataRate: " << dataRate;
LOG(INFO) << "Subscribing: <string>(data.0.address)";
config.Subscribe<string>("data.0.address", [&device](const string& key, const string& value)
LOG(INFO) << "Subscribing: <string>(chans.data.0.address)";
config.Subscribe<string>("chans.data.0.address", [&device](const string& key, const string& value)
{
LOG(INFO) << "[callback] Updating device parameter " << key << " = " << value;
device.fChannels.at("data").at(0).UpdateAddress(value);
});
LOG(INFO) << "Subscribing: <int>(data.0.rcvBufSize)";
config.Subscribe<int>("data.0.rcvBufSize", [&device](const string& key, int value)
LOG(INFO) << "Subscribing: <int>(chans.data.0.rcvBufSize)";
config.Subscribe<int>("chans.data.0.rcvBufSize", [&device](const string& key, int value)
{
LOG(INFO) << "[callback] Updating device parameter " << key << " = " << value;
device.fChannels.at("data").at(0).UpdateRcvBufSize(value);
@ -121,12 +121,12 @@ int main(int argc, char** argv)
LOG(INFO) << "Starting value updates...\n";
config.UpdateValue<string>("data.0.address", "tcp://localhost:4321");
LOG(INFO) << "config: " << config.GetValue<string>("data.0.address");
config.UpdateValue<string>("chans.data.0.address", "tcp://localhost:4321");
LOG(INFO) << "config: " << config.GetValue<string>("chans.data.0.address");
LOG(INFO) << "device: " << device.fChannels.at("data").at(0).GetAddress() << endl;
config.UpdateValue<int>("data.0.rcvBufSize", 100);
LOG(INFO) << "config: " << config.GetValue<int>("data.0.rcvBufSize");
config.UpdateValue<int>("chans.data.0.rcvBufSize", 100);
LOG(INFO) << "config: " << config.GetValue<int>("chans.data.0.rcvBufSize");
LOG(INFO) << "device: " << device.fChannels.at("data").at(0).GetRcvBufSize() << endl;
config.UpdateValue<double>("data-rate", 0.9);

View File

@ -8,8 +8,6 @@
#include "Control.h"
#include <chrono>
#include <termios.h> // for the interactive mode
#include <poll.h> // for the interactive mode
@ -79,7 +77,6 @@ auto Control::InteractiveMode() -> void
ChangeDeviceState(DeviceStateTransition::InitDevice);
while (WaitForNextState() != DeviceState::DeviceReady) {}
ChangeDeviceState(DeviceStateTransition::InitTask);
while (WaitForNextState() != DeviceState::Ready) {}
ChangeDeviceState(DeviceStateTransition::Run);

View File

@ -0,0 +1,18 @@
################################################################################
# Copyright (C) 2012-2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# #
# This software is distributed under the terms of the #
# GNU Lesser General Public Licence (LGPL) version 3, #
# copied verbatim in the file "LICENSE" #
################################################################################
set(plugin FairMQPlugin_dds)
add_library(${plugin} SHARED ${CMAKE_CURRENT_SOURCE_DIR}/DDS.cxx ${CMAKE_CURRENT_SOURCE_DIR}/DDS.h)
target_link_libraries(${plugin} FairMQ ${DDS_INTERCOM_LIBRARY_SHARED} ${DDS_PROTOCOL_LIBRARY_SHARED} ${DDS_USER_DEFAULTS_LIBRARY_SHARED})
target_include_directories(${plugin} PUBLIC ${CMAKE_CURRENT_BINARY_DIR} ${DDS_INCLUDE_DIR})
set_target_properties(${plugin} PROPERTIES CXX_VISIBILITY_PRESET hidden)
add_executable(fairmq-dds-command-ui ${CMAKE_CURRENT_SOURCE_DIR}/runDDSCommandUI.cxx)
target_link_libraries(fairmq-dds-command-ui FairMQ ${DDS_INTERCOM_LIBRARY_SHARED} ${DDS_PROTOCOL_LIBRARY_SHARED} ${DDS_USER_DEFAULTS_LIBRARY_SHARED})
target_include_directories(fairmq-dds-command-ui PUBLIC ${CMAKE_CURRENT_BINARY_DIR} ${DDS_INCLUDE_DIR})

260
fairmq/plugins/DDS/DDS.cxx Normal file
View File

@ -0,0 +1,260 @@
/********************************************************************************
* Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "DDS.h"
#include <termios.h> // for the interactive mode
#include <poll.h> // for the interactive mode
using namespace std;
namespace fair
{
namespace mq
{
namespace plugins
{
DDS::DDS(const string name, const Plugin::Version version, const string maintainer, const string homepage, PluginServices* pluginServices)
: Plugin(name, version, maintainer, homepage, pluginServices)
, fService()
, fDDSCustomCmd(fService)
, fDDSKeyValue(fService)
, fBindingChans()
, fConnectingChans()
, fStopMutex()
, fStopCondition()
, fCommands({ "INIT DEVICE", "INIT TASK", "PAUSE", "RUN", "STOP", "RESET TASK", "RESET DEVICE" })
, fControllerThread()
, fEvents()
, fEventsMutex()
, fNewEvent()
{
try
{
TakeDeviceControl();
fControllerThread = thread(&DDS::HandleControl, this);
}
catch (PluginServices::DeviceControlError& e)
{
LOG(DEBUG) << e.what();
}
catch (exception& e)
{
LOG(ERROR) << "Error in plugin initialization: " << e.what();
}
}
auto DDS::HandleControl() -> void
{
try
{
// subscribe for DDS service errors.
fService.subscribeOnError([](const dds::intercom_api::EErrorCode errorCode, const string& errorMsg) {
LOG(ERROR) << "DDS Error received: error code: " << errorCode << ", error message: " << errorMsg << endl;
});
// subscribe to device state changes, pushing new state chenges into the event queue
SubscribeToDeviceStateChange([&](DeviceState newState)
{
{
lock_guard<mutex> lock{fEventsMutex};
fEvents.push(newState);
}
fNewEvent.notify_one();
});
ChangeDeviceState(DeviceStateTransition::InitDevice);
while (WaitForNextState() != DeviceState::InitializingDevice) {}
// in the Initializing state subscribe to receive addresses of connecting channels from DDS
// and propagate addresses of bound channels to DDS.
FillChannelContainers();
if (fConnectingChans.size() > 0)
{
LOG(DEBUG) << "Subscribing for DDS properties.";
SubscribeForConnectingChannels();
}
// subscribe for state changes from DDS (subscriptions start firing after fService.start() is called)
SubscribeForStateChanges();
// start DDS service - subscriptions will only start firing after this step
fService.start();
// publish bound addresses via DDS at keys corresponding to the channel prefixes, e.g. 'data' in data[i]
PublishBoundChannels();
while (WaitForNextState() != DeviceState::DeviceReady) {}
ChangeDeviceState(DeviceStateTransition::InitTask);
while (WaitForNextState() != DeviceState::Ready) {}
ChangeDeviceState(DeviceStateTransition::Run);
// wait until stop signal
unique_lock<mutex> lock(fStopMutex);
while (!DeviceTerminated())
{
fStopCondition.wait_for(lock, chrono::seconds(1));
}
LOG(DEBUG) << "Stopping DDS control plugin";
}
catch (exception& e)
{
LOG(ERROR) << "Error: " << e.what() << endl;
return;
}
fDDSKeyValue.unsubscribe();
fDDSCustomCmd.unsubscribe();
UnsubscribeFromDeviceStateChange();
ReleaseDeviceControl();
}
auto DDS::FillChannelContainers() -> void
{
unordered_map<string, int> channelInfo(GetChannelInfo());
for (const auto& c : channelInfo)
{
string methodKey{"chans." + c.first + "." + to_string(c.second - 1) + ".method"};
string addressKey{"chans." + c.first + "." + to_string(c.second - 1) + ".address"};
if (GetProperty<string>(methodKey) == "bind")
{
fBindingChans.insert(make_pair(c.first, vector<string>()));
for (unsigned int i = 0; i < c.second; ++i)
{
fBindingChans.at(c.first).push_back(GetProperty<string>(addressKey));
}
}
else if (GetProperty<string>(methodKey) == "connect")
{
fConnectingChans.insert(make_pair(c.first, DDSConfig()));
LOG(DEBUG) << "preparing to connect: " << c.first << " with " << c.second << " sub-channels.";
for (unsigned int i = 0; i < c.second; ++i)
{
fConnectingChans.at(c.first).fSubChannelAddresses.push_back(string());
}
}
else
{
LOG(ERROR) << "Cannot update address configuration. Channel method (bind/connect) not specified.";
return;
}
}
}
auto DDS::SubscribeForConnectingChannels() -> void
{
fDDSKeyValue.subscribe([&] (const string& propertyId, const string& key, const string& value)
{
LOG(DEBUG) << "Received update for " << propertyId << ": key=" << key << " value=" << value;
fConnectingChans.at(propertyId).fDDSValues.insert(make_pair<string, string>(key.c_str(), value.c_str()));
// update channels and remove them from unfinished container
for (auto mi = fConnectingChans.begin(); mi != fConnectingChans.end(); /* no increment */)
{
if (mi->second.fSubChannelAddresses.size() == mi->second.fDDSValues.size())
{
// when multiple subChannels are used, their order on every device should be the same, irregardless of arrival order from DDS.
sort(mi->second.fSubChannelAddresses.begin(), mi->second.fSubChannelAddresses.end());
auto it = mi->second.fDDSValues.begin();
for (unsigned int i = 0; i < mi->second.fSubChannelAddresses.size(); ++i)
{
string k = "chans." + mi->first + "." + to_string(i) + ".address";
SetProperty<string>(k, it->second);
++it;
}
fConnectingChans.erase(mi++);
}
else
{
++mi;
}
}
});
}
auto DDS::PublishBoundChannels() -> void
{
for (const auto& chan : fBindingChans)
{
unsigned int index = 0;
for (const auto& i : chan.second)
{
LOG(DEBUG) << "Publishing " << chan.first << "[" << index << "] address to DDS under '" << chan.first << "' property name.";
fDDSKeyValue.putValue(chan.first, i);
++index;
}
}
}
auto DDS::SubscribeForStateChanges() -> void
{
string id = GetProperty<string>("id");
string pid(to_string(getpid()));
fDDSCustomCmd.subscribe([id, pid, this](const string& cmd, const string& cond, uint64_t senderId)
{
LOG(INFO) << "Received command: " << cmd;
if (cmd == "check-state")
{
fDDSCustomCmd.send(id + ": " + ToStr(GetCurrentDeviceState()) + " (pid: " + pid + ")", to_string(senderId));
}
else if (fCommands.find(cmd) != fCommands.end())
{
fDDSCustomCmd.send(id + ": attempting to " + cmd, to_string(senderId));
ChangeDeviceState(ToDeviceStateTransition(cmd));
}
else if (cmd == "END")
{
fDDSCustomCmd.send(id + ": attempting to " + cmd, to_string(senderId));
ChangeDeviceState(ToDeviceStateTransition(cmd));
fDDSCustomCmd.send(id + ": " + ToStr(GetCurrentDeviceState()), to_string(senderId));
if (ToStr(GetCurrentDeviceState()) == "EXITING")
{
unique_lock<mutex> lock(fStopMutex);
fStopCondition.notify_one();
}
}
else
{
LOG(WARN) << "Unknown command: " << cmd;
LOG(WARN) << "Origin: " << senderId;
LOG(WARN) << "Destination: " << cond;
}
});
}
auto DDS::WaitForNextState() -> DeviceState
{
unique_lock<mutex> lock{fEventsMutex};
while (fEvents.empty())
{
fNewEvent.wait(lock);
}
auto result = fEvents.front();
fEvents.pop();
return result;
}
DDS::~DDS()
{
if (fControllerThread.joinable())
{
fControllerThread.join();
}
}
} /* namespace plugins */
} /* namespace mq */
} /* namespace fair */

92
fairmq/plugins/DDS/DDS.h Normal file
View File

@ -0,0 +1,92 @@
/********************************************************************************
* Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIR_MQ_PLUGINS_DDS
#define FAIR_MQ_PLUGINS_DDS
#include <fairmq/Plugin.h>
#include <dds_intercom.h>
#include <condition_variable>
#include <mutex>
#include <string>
#include <queue>
#include <thread>
#include <vector>
#include <unordered_map>
#include <set>
namespace fair
{
namespace mq
{
namespace plugins
{
struct DDSConfig
{
DDSConfig()
: fSubChannelAddresses()
, fDDSValues()
{}
// container of sub channel addresses
std::vector<std::string> fSubChannelAddresses;
// dds values for the channel
std::unordered_map<std::string, std::string> fDDSValues;
};
class DDS : public Plugin
{
public:
DDS(const std::string name, const Plugin::Version version, const std::string maintainer, const std::string homepage, PluginServices* pluginServices);
~DDS();
private:
auto HandleControl() -> void;
auto WaitForNextState() -> DeviceState;
auto FillChannelContainers() -> void;
auto SubscribeForConnectingChannels() -> void;
auto PublishBoundChannels() -> void;
auto SubscribeForStateChanges() -> void;
dds::intercom_api::CIntercomService fService;
dds::intercom_api::CCustomCmd fDDSCustomCmd;
dds::intercom_api::CKeyValue fDDSKeyValue;
std::unordered_map<std::string, std::vector<std::string>> fBindingChans;
std::unordered_map<std::string, DDSConfig> fConnectingChans;
std::mutex fStopMutex;
std::condition_variable fStopCondition;
const std::set<std::string> fCommands;
std::thread fControllerThread;
std::queue<DeviceState> fEvents;
std::mutex fEventsMutex;
std::condition_variable fNewEvent;
};
REGISTER_FAIRMQ_PLUGIN(
DDS, // Class name
dds, // Plugin name (string, lower case chars only)
(Plugin::Version{1,0,0}), // Version
"FairRootGroup <fairroot@gsi.de>", // Maintainer
"https://github.com/FairRootGroup/FairRoot", // Homepage
fair::mq::Plugin::NoProgramOptions // custom program options for the plugin
)
} /* namespace plugins */
} /* namespace mq */
} /* namespace fair */
#endif /* FAIR_MQ_PLUGINS_DDS */

View File

@ -1,4 +1,12 @@
#include "dds_intercom.h"
/********************************************************************************
* Copyright (C) 2014-2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <dds_intercom.h>
#include <termios.h> // raw mode console input
@ -66,11 +74,11 @@ int main(int argc, char* argv[])
break;
case 'i':
cout << " > init devices" << endl;
ddsCustomCmd.send("INIT_DEVICE", "");
ddsCustomCmd.send("INIT DEVICE", "");
break;
case 'j':
cout << " > init tasks" << endl;
ddsCustomCmd.send("INIT_TASK", "");
ddsCustomCmd.send("INIT TASK", "");
break;
case 'p':
cout << " > pause devices" << endl;
@ -86,11 +94,11 @@ int main(int argc, char* argv[])
break;
case 't':
cout << " > reset tasks" << endl;
ddsCustomCmd.send("RESET_TASK", "");
ddsCustomCmd.send("RESET TASK", "");
break;
case 'd':
cout << " > reset devices" << endl;
ddsCustomCmd.send("RESET_DEVICE", "");
ddsCustomCmd.send("RESET DEVICE", "");
break;
case 'h':
cout << " > help" << endl;

View File

@ -53,7 +53,7 @@ auto DummyPluginProgramOptions() -> Plugin::ProgOptions
{
auto plugin_options = boost::program_options::options_description{"Dummy Plugin"};
plugin_options.add_options()
("custom-dummy-option", boost::program_options::value<std::string>(), "Cool custom option.");
("custom-dummy-option", boost::program_options::value<std::string>(), "Cool custom option.")
("custom-dummy-option2", boost::program_options::value<std::string>(), "Another cool custom option.");
return plugin_options;
}