diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index 8e4e5d50..6ce822aa 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -29,6 +29,14 @@ # # +#################### +# external plugins # +#################### +if (DDS_FOUND) + add_subdirectory(plugins/DDS) +endif() + + ############################ # preprocessor definitions # ############################ diff --git a/fairmq/FairMQChannel.cxx b/fairmq/FairMQChannel.cxx index 86e4802d..4f2f38f6 100644 --- a/fairmq/FairMQChannel.cxx +++ b/fairmq/FairMQChannel.cxx @@ -162,7 +162,16 @@ string FairMQChannel::GetChannelName() const string FairMQChannel::GetChannelPrefix() const { string prefix = fName; - return prefix.erase(fName.rfind("[")); + prefix = prefix.erase(fName.rfind("[")); + return prefix; +} + +string FairMQChannel::GetChannelIndex() const +{ + string indexStr = fName; + indexStr.erase(indexStr.rfind("]")); + indexStr.erase(0, indexStr.rfind("[") + 1); + return indexStr; } string FairMQChannel::GetType() const @@ -516,11 +525,9 @@ bool FairMQChannel::ValidateChannel() } else { - //TODO: maybe cache fEndpoints as a class member? not really needed as tokenizing is - //fast, and only happens during (re-)configure - vector fEndpoints; - Tokenize(fEndpoints, fAddress); - for (const auto endpoint : fEndpoints) + vector endpoints; + boost::algorithm::split(endpoints, fAddress, boost::algorithm::is_any_of(",")); + for (const auto endpoint : endpoints) { string address; if (endpoint[0] == '@' || endpoint[0] == '+' || endpoint[0] == '>') @@ -841,11 +848,6 @@ FairMQChannel::~FairMQChannel() { } -void FairMQChannel::Tokenize(vector& output, const string& input, const string delimiters) -{ - boost::algorithm::split(output, input, boost::algorithm::is_any_of(delimiters)); -} - unsigned long FairMQChannel::GetBytesTx() const { return fSocket->GetBytesTx(); diff --git a/fairmq/FairMQChannel.h b/fairmq/FairMQChannel.h index 664ce599..7a638e78 100644 --- a/fairmq/FairMQChannel.h +++ b/fairmq/FairMQChannel.h @@ -78,9 +78,13 @@ class FairMQChannel std::string GetChannelName() const; /// Get channel prefix - /// @return Returns channel prefix (e.g. "data") + /// @return Returns channel prefix (e.g. "data" in "data[0]") std::string GetChannelPrefix() const; + /// Get channel index + /// @return Returns channel index (e.g. 0 in "data[0]") + std::string GetChannelIndex() const; + /// Get socket type /// @return Returns socket type (push/pull/pub/sub/spub/xsub/pair/req/rep/dealer/router/) std::string GetType() const; @@ -270,9 +274,6 @@ class FairMQChannel return ReceiveAsync(parts.fParts); } - // TODO: this might go to some base utility library - static void Tokenize(std::vector& output, const std::string& input, const std::string delimiters = ","); - unsigned long GetBytesTx() const; unsigned long GetBytesRx() const; unsigned long GetMessagesTx() const; diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index 88606f56..e2a83e99 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -62,7 +62,6 @@ FairMQDevice::FairMQDevice() , fDefaultTransport() , fInitializationTimeoutInS(120) , fCatchingSignals(false) - , fTerminationRequested(false) , fInteractiveRunning(false) , fDataCallbacks(false) , fDeviceCmdSockets() @@ -94,7 +93,6 @@ FairMQDevice::FairMQDevice(const fair::mq::tools::Version version) , fDefaultTransport() , fInitializationTimeoutInS(120) , fCatchingSignals(false) - , fTerminationRequested(false) , fInteractiveRunning(false) , fDataCallbacks(false) , fDeviceCmdSockets() @@ -151,33 +149,6 @@ void FairMQDevice::SignalHandler(int signal) } } -void FairMQDevice::AttachChannels(list& chans) -{ - auto itr = chans.begin(); - - while (itr != chans.end()) - { - if ((*itr)->ValidateChannel()) - { - if (AttachChannel(**itr)) - { - (*itr)->InitCommandInterface(); - (*itr)->SetModified(false); - chans.erase(itr++); - } - else - { - LOG(ERROR) << "failed to attach channel " << (*itr)->fName << " (" << (*itr)->fMethod << ")"; - ++itr; - } - } - else - { - ++itr; - } - } -} - void FairMQDevice::InitWrapper() { if (!fTransportFactory) @@ -203,13 +174,13 @@ void FairMQDevice::InitWrapper() } // Containers to store the uninitialized channels. - list uninitializedBindingChannels; - list uninitializedConnectingChannels; + vector uninitializedBindingChannels; + vector uninitializedConnectingChannels; // Fill the uninitialized channel containers - for (auto mi = fChannels.begin(); mi != fChannels.end(); ++mi) + for (auto& mi : fChannels) { - for (auto vi = (mi->second).begin(); vi != (mi->second).end(); ++vi) + for (auto vi = mi.second.begin(); vi != mi.second.end(); ++vi) { if (vi->fModified) { @@ -224,9 +195,7 @@ void FairMQDevice::InitWrapper() vi->fChannelCmdSocket = nullptr; } // set channel name: name + vector index - stringstream ss; - ss << mi->first << "[" << vi - (mi->second).begin() << "]"; - vi->fName = ss.str(); + vi->fName = fair::mq::tools::ToString(mi.first, "[", vi - (mi.second).begin(), "]"); if (vi->fMethod == "bind") { @@ -256,12 +225,11 @@ void FairMQDevice::InitWrapper() else { LOG(ERROR) << "Cannot update configuration. Socket method (bind/connect) not specified."; - exit(EXIT_FAILURE); + throw runtime_error("Cannot update configuration. Socket method (bind/connect) not specified."); } } } } - CallStateChangeCallbacks(INITIALIZING_DEVICE); // Bind channels. Here one run is enough, because bind settings should be available locally // If necessary this could be handled in the same way as the connecting channels @@ -270,9 +238,11 @@ void FairMQDevice::InitWrapper() if (uninitializedBindingChannels.size() > 0) { LOG(ERROR) << uninitializedBindingChannels.size() << " of the binding channels could not initialize. Initial configuration incomplete."; - exit(EXIT_FAILURE); + throw runtime_error(fair::mq::tools::ToString(uninitializedBindingChannels.size(), " of the binding channels could not initialize. Initial configuration incomplete.")); } + CallStateChangeCallbacks(INITIALIZING_DEVICE); + // notify parent thread about completion of first validation. { lock_guard lock(fInitialValidationMutex); @@ -281,21 +251,36 @@ void FairMQDevice::InitWrapper() } // go over the list of channels until all are initialized (and removed from the uninitialized list) - int numAttempts = 0; + int numAttempts = 1; auto sleepTimeInMS = 50; auto maxAttempts = fInitializationTimeoutInS * 1000 / sleepTimeInMS; + // first attempt + AttachChannels(uninitializedConnectingChannels); + // if not all channels could be connected, update their address values from config and retry while (!uninitializedConnectingChannels.empty()) { - AttachChannels(uninitializedConnectingChannels); - if (numAttempts > maxAttempts) + this_thread::sleep_for(chrono::milliseconds(sleepTimeInMS)); + + if (fConfig) { - LOG(ERROR) << "could not connect all channels after " << fInitializationTimeoutInS << " attempts"; - // TODO: goto ERROR state; - exit(EXIT_FAILURE); + for (auto& chan : uninitializedConnectingChannels) + { + string key{"chans." + chan->GetChannelPrefix() + "." + chan->GetChannelIndex() + ".address"}; + string newAddress = fConfig->GetValue(key); + if (newAddress != chan->GetAddress()) + { + chan->UpdateAddress(newAddress); + } + } } - this_thread::sleep_for(chrono::milliseconds(sleepTimeInMS)); - numAttempts++; + if (numAttempts++ > maxAttempts) + { + LOG(ERROR) << "could not connect all channels after " << fInitializationTimeoutInS << " attempts"; + throw runtime_error(fair::mq::tools::ToString("could not connect all channels after ", fInitializationTimeoutInS, " attempts")); + } + + AttachChannels(uninitializedConnectingChannels); } Init(); @@ -313,6 +298,33 @@ void FairMQDevice::Init() { } +void FairMQDevice::AttachChannels(vector& chans) +{ + auto itr = chans.begin(); + + while (itr != chans.end()) + { + if ((*itr)->ValidateChannel()) + { + if (AttachChannel(**itr)) + { + (*itr)->InitCommandInterface(); + (*itr)->SetModified(false); + itr = chans.erase(itr); + } + else + { + LOG(ERROR) << "failed to attach channel " << (*itr)->fName << " (" << (*itr)->fMethod << ")"; + ++itr; + } + } + else + { + ++itr; + } + } +} + bool FairMQDevice::AttachChannel(FairMQChannel& ch) { if (!ch.fTransportFactory) @@ -331,7 +343,7 @@ bool FairMQDevice::AttachChannel(FairMQChannel& ch) } vector endpoints; - FairMQChannel::Tokenize(endpoints, ch.fAddress); + boost::algorithm::split(endpoints, ch.fAddress, boost::algorithm::is_any_of(",")); for (auto& endpoint : endpoints) { //(re-)init socket @@ -402,8 +414,17 @@ bool FairMQDevice::AttachChannel(FairMQChannel& ch) } } - // put the (possibly) modified address back in the config - ch.UpdateAddress(boost::algorithm::join(endpoints, ",")); + // put the (possibly) modified address back in the channel object and config + string newAddress{boost::algorithm::join(endpoints, ",")}; + if (newAddress != ch.fAddress) + { + ch.UpdateAddress(newAddress); + if (fConfig) + { + string key{"chans." + ch.GetChannelPrefix() + "." + ch.GetChannelIndex() + ".address"}; + fConfig->SetValue(key, newAddress); + } + } return true; } diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index dd593fef..e1862c20 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -409,11 +409,6 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable } } - bool Terminated() - { - return fTerminationRequested; - } - const FairMQChannel& GetChannel(const std::string& channelName, const int index = 0) const; virtual void RegisterChannelEndpoints() {} @@ -549,7 +544,7 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable void Exit(); /// Attach (bind/connect) channels in the list - void AttachChannels(std::list& chans); + void AttachChannels(std::vector& chans); /// Sets up and connects/binds a socket to an endpoint /// return a string with the actual endpoint if it happens @@ -574,7 +569,6 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable /// Signal handler void SignalHandler(int signal); bool fCatchingSignals; - std::atomic fTerminationRequested; // Interactive state loop helper std::atomic fInteractiveRunning; diff --git a/fairmq/FairMQStateMachine.h b/fairmq/FairMQStateMachine.h index 5afe7968..a79d64e5 100644 --- a/fairmq/FairMQStateMachine.h +++ b/fairmq/FairMQStateMachine.h @@ -91,6 +91,7 @@ struct FairMQFSM : public msmf::state_machine_def , fStateChangeSignal() , fStateChangeSignalsMap() , fWorkerThread() + , fTerminationRequested(false) {} virtual ~FairMQFSM() @@ -299,6 +300,7 @@ struct FairMQFSM : public msmf::state_machine_def { LOG(STATE) << "Entering EXITING state"; fsm.fState = EXITING; + fsm.fTerminationRequested = true; fsm.CallStateChangeCallbacks(EXITING); // terminate worker thread @@ -454,6 +456,11 @@ struct FairMQFSM : public msmf::state_machine_def virtual void Exit() {} virtual void Unblock() {} + bool Terminated() + { + return fTerminationRequested; + } + protected: std::atomic fState; std::mutex fChangeStateMutex; @@ -469,6 +476,7 @@ struct FairMQFSM : public msmf::state_machine_def boost::signals2::signal fStateChangeSignal; std::unordered_map fStateChangeSignalsMap; + std::atomic fTerminationRequested; void CallStateChangeCallbacks(const State state) const { diff --git a/fairmq/Plugin.h b/fairmq/Plugin.h index 943cfe86..3d0bc120 100644 --- a/fairmq/Plugin.h +++ b/fairmq/Plugin.h @@ -11,10 +11,13 @@ #include #include + #include #include #include + #include +#include #include #include #include @@ -68,7 +71,9 @@ class Plugin using DeviceState = fair::mq::PluginServices::DeviceState; using DeviceStateTransition = fair::mq::PluginServices::DeviceStateTransition; auto ToDeviceState(const std::string& state) const -> DeviceState { return fPluginServices->ToDeviceState(state); } + auto ToDeviceStateTransition(const std::string& transition) const -> DeviceStateTransition { return fPluginServices->ToDeviceStateTransition(transition); } auto ToStr(DeviceState state) const -> std::string { return fPluginServices->ToStr(state); } + auto ToStr(DeviceStateTransition transition) const -> std::string { return fPluginServices->ToStr(transition); } auto GetCurrentDeviceState() const -> DeviceState { return fPluginServices->GetCurrentDeviceState(); } auto TakeDeviceControl() -> void { fPluginServices->TakeDeviceControl(fkName); }; auto ReleaseDeviceControl() -> void { fPluginServices->ReleaseDeviceControl(fkName); }; @@ -83,6 +88,7 @@ class Plugin template auto GetProperty(const std::string& key) const -> T { return fPluginServices->GetProperty(key); } auto GetPropertyAsString(const std::string& key) const -> std::string { return fPluginServices->GetPropertyAsString(key); } + auto GetChannelInfo() const -> std::unordered_map { return fPluginServices->GetChannelInfo(); } // template // auto SubscribeToPropertyChange(std::functionkey*/, const T /*newValue<])> callback) const -> void { fPluginServices.SubscribeToPropertyChange(fkName, callback); } // template diff --git a/fairmq/PluginServices.h b/fairmq/PluginServices.h index 556cbd2c..112ae868 100644 --- a/fairmq/PluginServices.h +++ b/fairmq/PluginServices.h @@ -12,11 +12,13 @@ #include #include #include + +#include +#include + #include #include #include -#include -#include #include #include @@ -39,6 +41,9 @@ class PluginServices PluginServices(FairMQProgOptions* config, std::shared_ptr device) : fConfig{config} , fDevice{device} + , fDeviceController() + , fDeviceControllerMutex() + , fReleaseDeviceControlCondition() { } @@ -193,6 +198,8 @@ class PluginServices /// If a type is not supported, the user can provide support by overloading the ostream operator for this type auto GetPropertyAsString(const std::string& key) const -> std::string { return fConfig->GetStringValue(key); } + auto GetChannelInfo() const -> std::unordered_map { return fConfig->GetChannelInfo(); } + /// @brief Subscribe to property updates of type T /// @param subscriber /// @param callback function diff --git a/fairmq/devices/FairMQBenchmarkSampler.h b/fairmq/devices/FairMQBenchmarkSampler.h index af05a21a..3c3da626 100644 --- a/fairmq/devices/FairMQBenchmarkSampler.h +++ b/fairmq/devices/FairMQBenchmarkSampler.h @@ -1,8 +1,8 @@ /******************************************************************************** * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ /** @@ -45,8 +45,8 @@ class FairMQBenchmarkSampler : public FairMQDevice std::string fOutChannelName; std::thread fResetMsgCounter; - virtual void InitTask(); - virtual void Run(); + virtual void InitTask() override; + virtual void Run() override; }; #endif /* FAIRMQBENCHMARKSAMPLER_H_ */ diff --git a/fairmq/nanomsg/FairMQTransportFactoryNN.cxx b/fairmq/nanomsg/FairMQTransportFactoryNN.cxx index 01ae7192..5633f3e8 100644 --- a/fairmq/nanomsg/FairMQTransportFactoryNN.cxx +++ b/fairmq/nanomsg/FairMQTransportFactoryNN.cxx @@ -14,7 +14,7 @@ using namespace std; FairMQ::Transport FairMQTransportFactoryNN::fTransportType = FairMQ::Transport::NN; -FairMQTransportFactoryNN::FairMQTransportFactoryNN(const string& id, const FairMQProgOptions* config) +FairMQTransportFactoryNN::FairMQTransportFactoryNN(const string& id, const FairMQProgOptions* /*config*/) : FairMQTransportFactory(id) { LOG(DEBUG) << "Transport: Using nanomsg library"; diff --git a/fairmq/options/FairMQParser.cxx b/fairmq/options/FairMQParser.cxx index 4d9df510..b26cf4f5 100644 --- a/fairmq/options/FairMQParser.cxx +++ b/fairmq/options/FairMQParser.cxx @@ -33,16 +33,7 @@ FairMQMap ptreeToMQMap(const boost::property_tree::ptree& pt, const string& id, // Extract value from boost::property_tree Helper::DeviceParser(pt.get_child(rootNode), channelMap, id, formatFlag); - if (channelMap.size() > 0) - { - stringstream channelKeys; - for (const auto& p : channelMap) - { - channelKeys << "'" << p.first << "' "; - } - LOG(DEBUG) << "---- Found following channel keys: " << channelKeys.str(); - } - else + if (channelMap.empty()) { LOG(WARN) << "---- No channel keys found for " << id; LOG(WARN) << "---- Check the JSON inputs and/or command line inputs"; diff --git a/fairmq/options/FairMQProgOptions.cxx b/fairmq/options/FairMQProgOptions.cxx index 9a5c781a..af923d32 100644 --- a/fairmq/options/FairMQProgOptions.cxx +++ b/fairmq/options/FairMQProgOptions.cxx @@ -29,6 +29,7 @@ FairMQProgOptions::FairMQProgOptions() , fFairMQMap() , fHelpTitle("***** FAIRMQ Program Options ***** ") , fVersion("Beta version 0.1") + , fChannelInfo() , fMQKeyMap() // , fSignalMap() //string API { @@ -114,7 +115,7 @@ void FairMQProgOptions::ParseAll(const int argc, char const* const* argv, bool a DefaultConsoleSetFilter(fSeverityMap.at(verbosity)); } - // check if one of required MQ config option is there + // check if one of required MQ config option is there auto parserOptions = fMQParserOptions.options(); bool optionExists = false; vector MQParserKeys; @@ -157,24 +158,24 @@ void FairMQProgOptions::ParseAll(const int argc, char const* const* argv, bool a string file = fVarMap["mq-config"].as(); - string fileExtension = boost::filesystem::extension(file); + string ext = boost::filesystem::extension(file); - transform(fileExtension.begin(), fileExtension.end(), fileExtension.begin(), ::tolower); + transform(ext.begin(), ext.end(), ext.begin(), ::tolower); - if (fileExtension == ".json") + if (ext == ".json") { UserParser(file, id); } else { - if (fileExtension == ".xml") + if (ext == ".xml") { UserParser(file, id); } else { LOG(ERROR) << "mq-config command line called but file extension '" - << fileExtension + << ext << "' not recognized. Program will now exit"; exit(EXIT_FAILURE); } @@ -211,6 +212,7 @@ void FairMQProgOptions::ParseAll(const int argc, char const* const* argv, bool a int FairMQProgOptions::Store(const FairMQMap& channels) { fFairMQMap = channels; + UpdateChannelInfo(); UpdateMQValues(); return 0; } @@ -219,10 +221,20 @@ int FairMQProgOptions::Store(const FairMQMap& channels) int FairMQProgOptions::UpdateChannelMap(const FairMQMap& channels) { fFairMQMap = channels; + UpdateChannelInfo(); UpdateMQValues(); return 0; } +void FairMQProgOptions::UpdateChannelInfo() +{ + fChannelInfo.clear(); + for (const auto& c : fFairMQMap) + { + fChannelInfo.insert(std::make_pair(c.first, c.second.size())); + } +} + // read FairMQChannelMap and insert/update corresponding values in variable map // create key for variable map as follow : channelName.index.memberName void FairMQProgOptions::UpdateMQValues() @@ -233,15 +245,15 @@ void FairMQProgOptions::UpdateMQValues() for (const auto& channel : p.second) { - string typeKey = p.first + "." + to_string(index) + ".type"; - string methodKey = p.first + "." + to_string(index) + ".method"; - string addressKey = p.first + "." + to_string(index) + ".address"; - string transportKey = p.first + "." + to_string(index) + ".transport"; - string sndBufSizeKey = p.first + "." + to_string(index) + ".sndBufSize"; - string rcvBufSizeKey = p.first + "." + to_string(index) + ".rcvBufSize"; - string sndKernelSizeKey = p.first + "." + to_string(index) + ".sndKernelSize"; - string rcvKernelSizeKey = p.first + "." + to_string(index) + ".rcvKernelSize"; - string rateLoggingKey = p.first + "." + to_string(index) + ".rateLogging"; + string typeKey = "chans." + p.first + "." + to_string(index) + ".type"; + string methodKey = "chans." + p.first + "." + to_string(index) + ".method"; + string addressKey = "chans." + p.first + "." + to_string(index) + ".address"; + string transportKey = "chans." + p.first + "." + to_string(index) + ".transport"; + string sndBufSizeKey = "chans." + p.first + "." + to_string(index) + ".sndBufSize"; + string rcvBufSizeKey = "chans." + p.first + "." + to_string(index) + ".rcvBufSize"; + string sndKernelSizeKey = "chans." + p.first + "." + to_string(index) + ".sndKernelSize"; + string rcvKernelSizeKey = "chans." + p.first + "." + to_string(index) + ".rcvKernelSize"; + string rateLoggingKey = "chans." + p.first + "." + to_string(index) + ".rateLogging"; fMQKeyMap[typeKey] = make_tuple(p.first, index, "type"); fMQKeyMap[methodKey] = make_tuple(p.first, index, "method"); @@ -258,7 +270,6 @@ void FairMQProgOptions::UpdateMQValues() UpdateVarMap(addressKey, channel.GetAddress()); UpdateVarMap(transportKey, channel.GetTransport()); - //UpdateVarMap(sndBufSizeKey, to_string(channel.GetSndBufSize()));// string API UpdateVarMap(sndBufSizeKey, channel.GetSndBufSize()); diff --git a/fairmq/options/FairMQProgOptions.h b/fairmq/options/FairMQProgOptions.h index bd33ba75..61bf7e77 100644 --- a/fairmq/options/FairMQProgOptions.h +++ b/fairmq/options/FairMQProgOptions.h @@ -21,6 +21,7 @@ #include #include #include +#include #include "FairProgOptions.h" #include "FairMQEventManager.h" @@ -37,11 +38,11 @@ class FairMQProgOptions : public FairProgOptions , public FairMQEventManager FairMQProgOptions(); virtual ~FairMQProgOptions(); - // parse command line and txt/INI configuration file. + // parse command line and txt/INI configuration file. // default parser for the mq-configuration file (JSON/XML) is called if command line key mq-config is called virtual void ParseAll(const int argc, char const* const* argv, bool allowUnregistered = false); - // external parser, store function + // external parser, store function template int UserParser(Args &&... args) { @@ -57,11 +58,16 @@ class FairMQProgOptions : public FairProgOptions , public FairMQEventManager return 0; } - FairMQMap GetFairMQMap() + FairMQMap GetFairMQMap() const { return fFairMQMap; } + std::unordered_map GetChannelInfo() const + { + return fChannelInfo; + } + // to customize title of the executable help command line void SetHelpTitle(const std::string& title) { @@ -257,7 +263,6 @@ class FairMQProgOptions : public FairProgOptions , public FairMQEventManager Disconnect(key); } - /* template @@ -284,6 +289,9 @@ class FairMQProgOptions : public FairProgOptions , public FairMQEventManager std::string fHelpTitle; std::string fVersion; + // map of read channel info - channel name - number of subchannels + std::unordered_map fChannelInfo; + bool EventKeyFound(const std::string& key) { if (FairMQEventManager::EventKeyFound(key)) @@ -338,6 +346,8 @@ class FairMQProgOptions : public FairProgOptions , public FairMQEventManager { return 0; } + + void UpdateChannelInfo(); }; #endif /* FAIRMQPROGOPTIONS_H */ diff --git a/fairmq/options/FairProgOptions.h b/fairmq/options/FairProgOptions.h index 11c40294..2762e80f 100644 --- a/fairmq/options/FairProgOptions.h +++ b/fairmq/options/FairProgOptions.h @@ -201,9 +201,9 @@ class FairProgOptions } template - void replace(std::map& vm, const std::string& opt, const T& val) + void replace(std::map& vm, const std::string& key, const T& val) { - vm[opt].value() = boost::any(val); + vm[key].value() = boost::any(val); } private: diff --git a/fairmq/options/runConfigEx.cxx b/fairmq/options/runConfigEx.cxx index 4b79d9d5..8926f161 100644 --- a/fairmq/options/runConfigEx.cxx +++ b/fairmq/options/runConfigEx.cxx @@ -71,13 +71,13 @@ int main(int argc, char** argv) ("data-rate", po::value()->default_value(0.5), "Data rate"); // parse command lines, parse json file and init FairMQMap - config.ParseAll(argc, argv); + config.ParseAll(argc, argv, true); // // get FairMQMap // auto map1 = config.GetFairMQMap(); // // update value in variable map, and propagate the update to the FairMQMap - // config.UpdateValue("data.0.address","tcp://localhost:1234"); + // config.UpdateValue("chans.data.0.address","tcp://localhost:1234"); // // get the updated FairMQMap // auto map2 = config.GetFairMQMap(); @@ -98,15 +98,15 @@ int main(int argc, char** argv) // double dataRate = config.ConvertTo(dataRateStr); // LOG(INFO) << "dataRate: " << dataRate; - LOG(INFO) << "Subscribing: (data.0.address)"; - config.Subscribe("data.0.address", [&device](const string& key, const string& value) + LOG(INFO) << "Subscribing: (chans.data.0.address)"; + config.Subscribe("chans.data.0.address", [&device](const string& key, const string& value) { LOG(INFO) << "[callback] Updating device parameter " << key << " = " << value; device.fChannels.at("data").at(0).UpdateAddress(value); }); - LOG(INFO) << "Subscribing: (data.0.rcvBufSize)"; - config.Subscribe("data.0.rcvBufSize", [&device](const string& key, int value) + LOG(INFO) << "Subscribing: (chans.data.0.rcvBufSize)"; + config.Subscribe("chans.data.0.rcvBufSize", [&device](const string& key, int value) { LOG(INFO) << "[callback] Updating device parameter " << key << " = " << value; device.fChannels.at("data").at(0).UpdateRcvBufSize(value); @@ -121,12 +121,12 @@ int main(int argc, char** argv) LOG(INFO) << "Starting value updates...\n"; - config.UpdateValue("data.0.address", "tcp://localhost:4321"); - LOG(INFO) << "config: " << config.GetValue("data.0.address"); + config.UpdateValue("chans.data.0.address", "tcp://localhost:4321"); + LOG(INFO) << "config: " << config.GetValue("chans.data.0.address"); LOG(INFO) << "device: " << device.fChannels.at("data").at(0).GetAddress() << endl; - config.UpdateValue("data.0.rcvBufSize", 100); - LOG(INFO) << "config: " << config.GetValue("data.0.rcvBufSize"); + config.UpdateValue("chans.data.0.rcvBufSize", 100); + LOG(INFO) << "config: " << config.GetValue("chans.data.0.rcvBufSize"); LOG(INFO) << "device: " << device.fChannels.at("data").at(0).GetRcvBufSize() << endl; config.UpdateValue("data-rate", 0.9); diff --git a/fairmq/plugins/Control.cxx b/fairmq/plugins/Control.cxx index d189d35b..4b38735f 100644 --- a/fairmq/plugins/Control.cxx +++ b/fairmq/plugins/Control.cxx @@ -8,8 +8,6 @@ #include "Control.h" -#include - #include // for the interactive mode #include // for the interactive mode @@ -79,7 +77,6 @@ auto Control::InteractiveMode() -> void ChangeDeviceState(DeviceStateTransition::InitDevice); while (WaitForNextState() != DeviceState::DeviceReady) {} - ChangeDeviceState(DeviceStateTransition::InitTask); while (WaitForNextState() != DeviceState::Ready) {} ChangeDeviceState(DeviceStateTransition::Run); diff --git a/fairmq/plugins/DDS/CMakeLists.txt b/fairmq/plugins/DDS/CMakeLists.txt new file mode 100644 index 00000000..4b58340a --- /dev/null +++ b/fairmq/plugins/DDS/CMakeLists.txt @@ -0,0 +1,18 @@ +################################################################################ +# Copyright (C) 2012-2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH # +# # +# This software is distributed under the terms of the # +# GNU Lesser General Public Licence (LGPL) version 3, # +# copied verbatim in the file "LICENSE" # +################################################################################ + +set(plugin FairMQPlugin_dds) + +add_library(${plugin} SHARED ${CMAKE_CURRENT_SOURCE_DIR}/DDS.cxx ${CMAKE_CURRENT_SOURCE_DIR}/DDS.h) +target_link_libraries(${plugin} FairMQ ${DDS_INTERCOM_LIBRARY_SHARED} ${DDS_PROTOCOL_LIBRARY_SHARED} ${DDS_USER_DEFAULTS_LIBRARY_SHARED}) +target_include_directories(${plugin} PUBLIC ${CMAKE_CURRENT_BINARY_DIR} ${DDS_INCLUDE_DIR}) +set_target_properties(${plugin} PROPERTIES CXX_VISIBILITY_PRESET hidden) + +add_executable(fairmq-dds-command-ui ${CMAKE_CURRENT_SOURCE_DIR}/runDDSCommandUI.cxx) +target_link_libraries(fairmq-dds-command-ui FairMQ ${DDS_INTERCOM_LIBRARY_SHARED} ${DDS_PROTOCOL_LIBRARY_SHARED} ${DDS_USER_DEFAULTS_LIBRARY_SHARED}) +target_include_directories(fairmq-dds-command-ui PUBLIC ${CMAKE_CURRENT_BINARY_DIR} ${DDS_INCLUDE_DIR}) diff --git a/fairmq/plugins/DDS/DDS.cxx b/fairmq/plugins/DDS/DDS.cxx new file mode 100644 index 00000000..84e92a05 --- /dev/null +++ b/fairmq/plugins/DDS/DDS.cxx @@ -0,0 +1,260 @@ +/******************************************************************************** + * Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#include "DDS.h" + +#include // for the interactive mode +#include // for the interactive mode + +using namespace std; + +namespace fair +{ +namespace mq +{ +namespace plugins +{ + +DDS::DDS(const string name, const Plugin::Version version, const string maintainer, const string homepage, PluginServices* pluginServices) + : Plugin(name, version, maintainer, homepage, pluginServices) + , fService() + , fDDSCustomCmd(fService) + , fDDSKeyValue(fService) + , fBindingChans() + , fConnectingChans() + , fStopMutex() + , fStopCondition() + , fCommands({ "INIT DEVICE", "INIT TASK", "PAUSE", "RUN", "STOP", "RESET TASK", "RESET DEVICE" }) + , fControllerThread() + , fEvents() + , fEventsMutex() + , fNewEvent() +{ + try + { + TakeDeviceControl(); + fControllerThread = thread(&DDS::HandleControl, this); + } + catch (PluginServices::DeviceControlError& e) + { + LOG(DEBUG) << e.what(); + } + catch (exception& e) + { + LOG(ERROR) << "Error in plugin initialization: " << e.what(); + } +} + +auto DDS::HandleControl() -> void +{ + try + { + // subscribe for DDS service errors. + fService.subscribeOnError([](const dds::intercom_api::EErrorCode errorCode, const string& errorMsg) { + LOG(ERROR) << "DDS Error received: error code: " << errorCode << ", error message: " << errorMsg << endl; + }); + + // subscribe to device state changes, pushing new state chenges into the event queue + SubscribeToDeviceStateChange([&](DeviceState newState) + { + { + lock_guard lock{fEventsMutex}; + fEvents.push(newState); + } + fNewEvent.notify_one(); + }); + + ChangeDeviceState(DeviceStateTransition::InitDevice); + while (WaitForNextState() != DeviceState::InitializingDevice) {} + + // in the Initializing state subscribe to receive addresses of connecting channels from DDS + // and propagate addresses of bound channels to DDS. + FillChannelContainers(); + + if (fConnectingChans.size() > 0) + { + LOG(DEBUG) << "Subscribing for DDS properties."; + + SubscribeForConnectingChannels(); + } + + // subscribe for state changes from DDS (subscriptions start firing after fService.start() is called) + SubscribeForStateChanges(); + + // start DDS service - subscriptions will only start firing after this step + fService.start(); + + // publish bound addresses via DDS at keys corresponding to the channel prefixes, e.g. 'data' in data[i] + PublishBoundChannels(); + + while (WaitForNextState() != DeviceState::DeviceReady) {} + + ChangeDeviceState(DeviceStateTransition::InitTask); + while (WaitForNextState() != DeviceState::Ready) {} + ChangeDeviceState(DeviceStateTransition::Run); + + // wait until stop signal + unique_lock lock(fStopMutex); + while (!DeviceTerminated()) + { + fStopCondition.wait_for(lock, chrono::seconds(1)); + } + LOG(DEBUG) << "Stopping DDS control plugin"; + } + catch (exception& e) + { + LOG(ERROR) << "Error: " << e.what() << endl; + return; + } + + fDDSKeyValue.unsubscribe(); + fDDSCustomCmd.unsubscribe(); + + UnsubscribeFromDeviceStateChange(); + ReleaseDeviceControl(); +} + +auto DDS::FillChannelContainers() -> void +{ + unordered_map channelInfo(GetChannelInfo()); + for (const auto& c : channelInfo) + { + string methodKey{"chans." + c.first + "." + to_string(c.second - 1) + ".method"}; + string addressKey{"chans." + c.first + "." + to_string(c.second - 1) + ".address"}; + if (GetProperty(methodKey) == "bind") + { + fBindingChans.insert(make_pair(c.first, vector())); + for (unsigned int i = 0; i < c.second; ++i) + { + fBindingChans.at(c.first).push_back(GetProperty(addressKey)); + } + } + else if (GetProperty(methodKey) == "connect") + { + fConnectingChans.insert(make_pair(c.first, DDSConfig())); + LOG(DEBUG) << "preparing to connect: " << c.first << " with " << c.second << " sub-channels."; + for (unsigned int i = 0; i < c.second; ++i) + { + fConnectingChans.at(c.first).fSubChannelAddresses.push_back(string()); + } + } + else + { + LOG(ERROR) << "Cannot update address configuration. Channel method (bind/connect) not specified."; + return; + } + } +} + +auto DDS::SubscribeForConnectingChannels() -> void +{ + fDDSKeyValue.subscribe([&] (const string& propertyId, const string& key, const string& value) + { + LOG(DEBUG) << "Received update for " << propertyId << ": key=" << key << " value=" << value; + fConnectingChans.at(propertyId).fDDSValues.insert(make_pair(key.c_str(), value.c_str())); + + // update channels and remove them from unfinished container + for (auto mi = fConnectingChans.begin(); mi != fConnectingChans.end(); /* no increment */) + { + if (mi->second.fSubChannelAddresses.size() == mi->second.fDDSValues.size()) + { + // when multiple subChannels are used, their order on every device should be the same, irregardless of arrival order from DDS. + sort(mi->second.fSubChannelAddresses.begin(), mi->second.fSubChannelAddresses.end()); + auto it = mi->second.fDDSValues.begin(); + for (unsigned int i = 0; i < mi->second.fSubChannelAddresses.size(); ++i) + { + string k = "chans." + mi->first + "." + to_string(i) + ".address"; + SetProperty(k, it->second); + ++it; + } + fConnectingChans.erase(mi++); + } + else + { + ++mi; + } + } + }); +} + +auto DDS::PublishBoundChannels() -> void +{ + for (const auto& chan : fBindingChans) + { + unsigned int index = 0; + for (const auto& i : chan.second) + { + LOG(DEBUG) << "Publishing " << chan.first << "[" << index << "] address to DDS under '" << chan.first << "' property name."; + fDDSKeyValue.putValue(chan.first, i); + ++index; + } + } +} + +auto DDS::SubscribeForStateChanges() -> void +{ + string id = GetProperty("id"); + string pid(to_string(getpid())); + + fDDSCustomCmd.subscribe([id, pid, this](const string& cmd, const string& cond, uint64_t senderId) + { + LOG(INFO) << "Received command: " << cmd; + + if (cmd == "check-state") + { + fDDSCustomCmd.send(id + ": " + ToStr(GetCurrentDeviceState()) + " (pid: " + pid + ")", to_string(senderId)); + } + else if (fCommands.find(cmd) != fCommands.end()) + { + fDDSCustomCmd.send(id + ": attempting to " + cmd, to_string(senderId)); + ChangeDeviceState(ToDeviceStateTransition(cmd)); + } + else if (cmd == "END") + { + fDDSCustomCmd.send(id + ": attempting to " + cmd, to_string(senderId)); + ChangeDeviceState(ToDeviceStateTransition(cmd)); + fDDSCustomCmd.send(id + ": " + ToStr(GetCurrentDeviceState()), to_string(senderId)); + if (ToStr(GetCurrentDeviceState()) == "EXITING") + { + unique_lock lock(fStopMutex); + fStopCondition.notify_one(); + } + } + else + { + LOG(WARN) << "Unknown command: " << cmd; + LOG(WARN) << "Origin: " << senderId; + LOG(WARN) << "Destination: " << cond; + } + }); +} + +auto DDS::WaitForNextState() -> DeviceState +{ + unique_lock lock{fEventsMutex}; + while (fEvents.empty()) + { + fNewEvent.wait(lock); + } + + auto result = fEvents.front(); + fEvents.pop(); + return result; +} + +DDS::~DDS() +{ + if (fControllerThread.joinable()) + { + fControllerThread.join(); + } +} + +} /* namespace plugins */ +} /* namespace mq */ +} /* namespace fair */ diff --git a/fairmq/plugins/DDS/DDS.h b/fairmq/plugins/DDS/DDS.h new file mode 100644 index 00000000..1253b0c5 --- /dev/null +++ b/fairmq/plugins/DDS/DDS.h @@ -0,0 +1,92 @@ +/******************************************************************************** + * Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#ifndef FAIR_MQ_PLUGINS_DDS +#define FAIR_MQ_PLUGINS_DDS + +#include + +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace fair +{ +namespace mq +{ +namespace plugins +{ + +struct DDSConfig +{ + DDSConfig() + : fSubChannelAddresses() + , fDDSValues() + {} + + // container of sub channel addresses + std::vector fSubChannelAddresses; + // dds values for the channel + std::unordered_map fDDSValues; +}; + +class DDS : public Plugin +{ + public: + DDS(const std::string name, const Plugin::Version version, const std::string maintainer, const std::string homepage, PluginServices* pluginServices); + + ~DDS(); + + private: + auto HandleControl() -> void; + auto WaitForNextState() -> DeviceState; + + auto FillChannelContainers() -> void; + auto SubscribeForConnectingChannels() -> void; + auto PublishBoundChannels() -> void; + auto SubscribeForStateChanges() -> void; + + dds::intercom_api::CIntercomService fService; + dds::intercom_api::CCustomCmd fDDSCustomCmd; + dds::intercom_api::CKeyValue fDDSKeyValue; + + std::unordered_map> fBindingChans; + std::unordered_map fConnectingChans; + + std::mutex fStopMutex; + std::condition_variable fStopCondition; + + const std::set fCommands; + + std::thread fControllerThread; + std::queue fEvents; + std::mutex fEventsMutex; + std::condition_variable fNewEvent; +}; + +REGISTER_FAIRMQ_PLUGIN( + DDS, // Class name + dds, // Plugin name (string, lower case chars only) + (Plugin::Version{1,0,0}), // Version + "FairRootGroup ", // Maintainer + "https://github.com/FairRootGroup/FairRoot", // Homepage + fair::mq::Plugin::NoProgramOptions // custom program options for the plugin +) + +} /* namespace plugins */ +} /* namespace mq */ +} /* namespace fair */ + +#endif /* FAIR_MQ_PLUGINS_DDS */ diff --git a/fairmq/run/runDDSCommandUI.cxx b/fairmq/plugins/DDS/runDDSCommandUI.cxx similarity index 81% rename from fairmq/run/runDDSCommandUI.cxx rename to fairmq/plugins/DDS/runDDSCommandUI.cxx index deeb4817..c7ffe6e9 100644 --- a/fairmq/run/runDDSCommandUI.cxx +++ b/fairmq/plugins/DDS/runDDSCommandUI.cxx @@ -1,4 +1,12 @@ -#include "dds_intercom.h" +/******************************************************************************** + * Copyright (C) 2014-2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#include #include // raw mode console input @@ -66,11 +74,11 @@ int main(int argc, char* argv[]) break; case 'i': cout << " > init devices" << endl; - ddsCustomCmd.send("INIT_DEVICE", ""); + ddsCustomCmd.send("INIT DEVICE", ""); break; case 'j': cout << " > init tasks" << endl; - ddsCustomCmd.send("INIT_TASK", ""); + ddsCustomCmd.send("INIT TASK", ""); break; case 'p': cout << " > pause devices" << endl; @@ -86,11 +94,11 @@ int main(int argc, char* argv[]) break; case 't': cout << " > reset tasks" << endl; - ddsCustomCmd.send("RESET_TASK", ""); + ddsCustomCmd.send("RESET TASK", ""); break; case 'd': cout << " > reset devices" << endl; - ddsCustomCmd.send("RESET_DEVICE", ""); + ddsCustomCmd.send("RESET DEVICE", ""); break; case 'h': cout << " > help" << endl; diff --git a/fairmq/test/helper/plugins/dummy.h.in b/fairmq/test/helper/plugins/dummy.h.in index 6cb1cd2e..a8441034 100644 --- a/fairmq/test/helper/plugins/dummy.h.in +++ b/fairmq/test/helper/plugins/dummy.h.in @@ -53,7 +53,7 @@ auto DummyPluginProgramOptions() -> Plugin::ProgOptions { auto plugin_options = boost::program_options::options_description{"Dummy Plugin"}; plugin_options.add_options() - ("custom-dummy-option", boost::program_options::value(), "Cool custom option."); + ("custom-dummy-option", boost::program_options::value(), "Cool custom option.") ("custom-dummy-option2", boost::program_options::value(), "Another cool custom option."); return plugin_options; }