diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index 6cc7dd6e..d081fb3b 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -28,11 +28,8 @@ Set(SYSTEM_INCLUDE_DIRECTORIES ) If(DDS_FOUND) - add_definitions(-DDDS_FOUND) - Set(SYSTEM_INCLUDE_DIRECTORIES - ${SYSTEM_INCLUDE_DIRECTORIES} - ${DDS_INCLUDE_DIR} - ) + add_subdirectory(plugins/config) + add_subdirectory(plugins/control) EndIf(DDS_FOUND) If(NANOMSG_FOUND) @@ -107,12 +104,12 @@ EndIf(NANOMSG_FOUND) # manual install (globbing add not recommended) Set(FAIRMQHEADERS FairMQParts.h - # FairMQPlugin.h + FairMQConfigPlugin.h + FairMQControlPlugin.h runFairMQDevice.h options/FairProgOptionsHelper.h options/FairMQEventManager.h tools/FairMQTools.h - tools/FairMQDDSTools.h tools/runSimpleMQStateMachine.h ) Install(FILES ${FAIRMQHEADERS} DESTINATION include) @@ -121,6 +118,7 @@ Set(DEPENDENCIES ${DEPENDENCIES} ${ZMQ_LIBRARY_SHARED} ${Boost_THREAD_LIBRARY} + dl fairmq_logger ${Boost_TIMER_LIBRARY} ${Boost_SYSTEM_LIBRARY} @@ -141,15 +139,6 @@ If(NANOMSG_FOUND) ) EndIf(NANOMSG_FOUND) -If(DDS_FOUND) - Set(DEPENDENCIES - ${DEPENDENCIES} - ${DDS_INTERCOM_LIBRARY_SHARED} - ${DDS_PROTOCOL_LIBRARY_SHARED} - ${DDS_USER_DEFAULTS_LIBRARY_SHARED} - ) -EndIf(DDS_FOUND) - Set(LIBRARY_NAME FairMQ) GENERATE_LIBRARY() @@ -164,13 +153,6 @@ Set(Exe_Names runConfigExample ) -If(DDS_FOUND) - Set(Exe_Names - ${Exe_Names} - fairmq-dds-command-ui - ) -EndIf(DDS_FOUND) - Set(Exe_Source run/runBenchmarkSampler.cxx run/runMerger.cxx @@ -181,13 +163,6 @@ Set(Exe_Source options/runConfigEx.cxx ) -If(DDS_FOUND) - Set(Exe_Source - ${Exe_Source} - run/runDDSCommandUI.cxx - ) -EndIf(DDS_FOUND) - list(LENGTH Exe_Names _length) math(EXPR _length ${_length}-1) @@ -200,5 +175,5 @@ ForEach(_file RANGE 0 ${_length}) GENERATE_EXECUTABLE() EndForEach(_file RANGE 0 ${_length}) -configure_file( ${CMAKE_SOURCE_DIR}/fairmq/options/startConfigExample.sh.in - ${CMAKE_BINARY_DIR}/bin/startConfigExample.sh ) +configure_file(${CMAKE_SOURCE_DIR}/fairmq/options/startConfigExample.sh.in + ${CMAKE_BINARY_DIR}/bin/startConfigExample.sh) diff --git a/fairmq/FairMQChannel.cxx b/fairmq/FairMQChannel.cxx index e34c7b37..ee313d7c 100644 --- a/fairmq/FairMQChannel.cxx +++ b/fairmq/FairMQChannel.cxx @@ -30,7 +30,6 @@ FairMQChannel::FairMQChannel() , fType("unspecified") , fMethod("unspecified") , fAddress("unspecified") - , fProperty("") , fSndBufSize(1000) , fRcvBufSize(1000) , fSndKernelSize(0) @@ -51,7 +50,6 @@ FairMQChannel::FairMQChannel(const string& type, const string& method, const str , fType(type) , fMethod(method) , fAddress(address) - , fProperty("") , fSndBufSize(1000) , fRcvBufSize(1000) , fSndKernelSize(0) @@ -72,7 +70,6 @@ FairMQChannel::FairMQChannel(const FairMQChannel& chan) , fType(chan.fType) , fMethod(chan.fMethod) , fAddress(chan.fAddress) - , fProperty(chan.fProperty) , fSndBufSize(chan.fSndBufSize) , fRcvBufSize(chan.fRcvBufSize) , fSndKernelSize(chan.fSndKernelSize) @@ -92,7 +89,6 @@ FairMQChannel& FairMQChannel::operator=(const FairMQChannel& chan) fType = chan.fType; fMethod = chan.fMethod; fAddress = chan.fAddress; - fProperty = chan.fProperty; fSndBufSize = chan.fSndBufSize; fRcvBufSize = chan.fRcvBufSize; fSndKernelSize = chan.fSndKernelSize; @@ -163,20 +159,6 @@ string FairMQChannel::GetAddress() const } } -string FairMQChannel::GetProperty() const -{ - try - { - boost::unique_lock scoped_lock(fChannelMutex); - return fProperty; - } - catch (boost::exception& e) - { - LOG(ERROR) << "Exception caught in FairMQChannel::GetProperty: " << boost::diagnostic_information(e); - exit(EXIT_FAILURE); - } -} - int FairMQChannel::GetSndBufSize() const { try @@ -292,21 +274,6 @@ void FairMQChannel::UpdateAddress(const string& address) } } -void FairMQChannel::UpdateProperty(const string& property) -{ - try - { - boost::unique_lock scoped_lock(fChannelMutex); - fIsValid = false; - fProperty = property; - } - catch (boost::exception& e) - { - LOG(ERROR) << "Exception caught in FairMQChannel::UpdateProperty: " << boost::diagnostic_information(e); - exit(EXIT_FAILURE); - } -} - void FairMQChannel::UpdateSndBufSize(const int sndBufSize) { try diff --git a/fairmq/FairMQChannel.h b/fairmq/FairMQChannel.h index 5d06788b..2bca463a 100644 --- a/fairmq/FairMQChannel.h +++ b/fairmq/FairMQChannel.h @@ -71,10 +71,6 @@ class FairMQChannel /// @return Returns socket type (e.g. "tcp://127.0.0.1:5555" or "ipc://abc") std::string GetAddress() const; - /// Get channel property (custom property) - /// @return Returns property value - std::string GetProperty() const; - /// Get socket send buffer size (in number of messages) /// @return Returns socket send buffer size (in number of messages) int GetSndBufSize() const; @@ -107,10 +103,6 @@ class FairMQChannel /// @param address Socket address (e.g. "tcp://127.0.0.1:5555" or "ipc://abc") void UpdateAddress(const std::string& address); - /// Set custom channel property - /// @param property Channel property - void UpdateProperty(const std::string& property); - /// Set socket send buffer size /// @param sndBufSize Socket send buffer size (in number of messages) void UpdateSndBufSize(const int sndBufSize); @@ -260,7 +252,6 @@ class FairMQChannel std::string fType; std::string fMethod; std::string fAddress; - std::string fProperty; int fSndBufSize; int fRcvBufSize; int fSndKernelSize; diff --git a/fairmq/FairMQConfigPlugin.h b/fairmq/FairMQConfigPlugin.h new file mode 100644 index 00000000..6a2285e3 --- /dev/null +++ b/fairmq/FairMQConfigPlugin.h @@ -0,0 +1,27 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#ifndef FAIRMQCONFIGPLUGIN_H_ +#define FAIRMQCONFIGPLUGIN_H_ + +class FairMQDevice; + +extern "C" +struct FairMQConfigPlugin +{ + typedef void (*init_t)(FairMQDevice&); + init_t initConfig; + + typedef void (*handleInitialConfig_t)(FairMQDevice&); + handleInitialConfig_t handleInitialConfig; + + typedef void (*stop_t)(); + stop_t stopConfig; +}; + +#endif /* FAIRMQCONFIGPLUGIN_H_ */ diff --git a/fairmq/FairMQControlPlugin.h b/fairmq/FairMQControlPlugin.h new file mode 100644 index 00000000..2e230331 --- /dev/null +++ b/fairmq/FairMQControlPlugin.h @@ -0,0 +1,27 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#ifndef FAIRMQCONTROLPLUGIN_H_ +#define FAIRMQCONTROLPLUGIN_H_ + +class FairMQDevice; + +extern "C" +struct FairMQControlPlugin +{ + typedef void (*init_t)(FairMQDevice&); + init_t initControl; + + typedef void (*handleStateChanges_t)(FairMQDevice&); + handleStateChanges_t handleStateChanges; + + typedef void (*stop_t)(); + stop_t stopControl; +}; + +#endif /* FAIRMQCONTROLPLUGIN_H_ */ diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index 98bb4c36..019cc489 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -17,14 +17,13 @@ #include // catching system signals #include #include +#include +#include #include // for the InteractiveStateLoop #include -#include #include -#include // for choosing random port in range -#include // for choosing random port in range #include "FairMQSocket.h" #include "FairMQDevice.h" @@ -279,8 +278,8 @@ bool FairMQDevice::BindChannel(FairMQChannel& ch) int numAttempts = 0; // initialize random generator - boost::random::mt19937 gen(getpid()); - boost::random::uniform_int_distribution<> randomPort(fPortRangeMin, fPortRangeMax); + std::default_random_engine generator(std::chrono::system_clock::now().time_since_epoch().count()); + std::uniform_int_distribution randomPort(fPortRangeMin, fPortRangeMax); LOG(DEBUG) << "Binding channel " << ch.fChannelName << " on " << ch.fAddress; @@ -298,7 +297,7 @@ bool FairMQDevice::BindChannel(FairMQChannel& ch) size_t pos = ch.fAddress.rfind(":"); stringstream newPort; - newPort << static_cast(randomPort(gen)); + newPort << static_cast(randomPort(generator)); ch.fAddress = ch.fAddress.substr(0, pos + 1) + newPort.str(); LOG(DEBUG) << "Binding channel " << ch.fChannelName << " on " << ch.fAddress; @@ -984,6 +983,11 @@ void FairMQDevice::Reset() } } +bool FairMQDevice::Terminated() +{ + return fTerminationRequested; +} + void FairMQDevice::Terminate() { // Termination signal has to be sent only once to any socket. diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index 82f23d91..22fef7bc 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -321,6 +321,8 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable void OnData(const std::string& channelName, InputMultipartCallback); + bool Terminated(); + protected: std::string fId; ///< Device ID std::string fNetworkInterface; ///< Network interface to use for dynamic binding diff --git a/fairmq/options/FairMQEventManager.h b/fairmq/options/FairMQEventManager.h index 52df8a8b..2d85b557 100644 --- a/fairmq/options/FairMQEventManager.h +++ b/fairmq/options/FairMQEventManager.h @@ -36,15 +36,38 @@ enum class EventId : uint32_t namespace Events { -template struct Traits; -template struct Traits { using signal_type = boost::signals2::signal; } ; -template struct Traits > { using signal_type = boost::signals2::signal& )>; } ; +template +struct Traits; -template <> struct Traits { using signal_type = boost::signals2::signal; } ; +template +struct Traits +{ + using signal_type = boost::signals2::signal; +}; -template struct Traits { using signal_type = boost::signals2::signal; } ; +template +struct Traits> +{ + using signal_type = boost::signals2::signal& )>; +}; -template struct Traits { using signal_type = boost::signals2::signal; } ; +template <> +struct Traits +{ + using signal_type = boost::signals2::signal; +}; + +template +struct Traits +{ + using signal_type = boost::signals2::signal; +}; + +template +struct Traits +{ + using signal_type = boost::signals2::signal; +}; /* template struct Traits2; @@ -58,54 +81,61 @@ template <> struct Traits2 { using signal_type = boo class FairMQEventManager { public: - typedef std::pair EventKey; - - FairMQEventManager() : fEventMap() {} - virtual ~FairMQEventManager(){} - + typedef std::pair EventKey; + + FairMQEventManager() : + fEventMap() + {} + + virtual ~FairMQEventManager() + {} template - void Connect(const std::string& key, F&& func) + void Connect(const std::string& key, F&& func) { - GetSlot(key).connect(std::forward(func)); - } - - template - void Disonnect(const std::string& key) - { - GetSlot(key).disconnect(); + GetSlot(key).connect(std::forward(func)); } + template + void Disonnect(const std::string& key) + { + GetSlot(key).disconnect(); + } template - void Emit(const std::string& key, Args&&... args) + void Emit(const std::string& key, Args&&... args) { GetSlot(key)(std::forward(args)...); } - template bool EventKeyFound(const std::string& key) { - if (fEventMap.find(std::pair(event,key) ) != fEventMap.end()) + if (fEventMap.find(std::pair(event, key) ) != fEventMap.end()) + { return true; + } else + { return false; + } } private: std::map fEventMap; - template ::signal_type, - typename SlotPtr = boost::shared_ptr > + template ::signal_type, + typename SlotPtr = boost::shared_ptr> Slot& GetSlot(const std::string& key) { try { - EventKey eventKey = std::make_pair(event,key); + EventKey eventKey = std::make_pair(event, key); //static_assert(std::is_same()),SlotPtr>::value, ""); if (fEventMap.find(eventKey) == fEventMap.end()) + { fEventMap.emplace(eventKey, boost::make_shared()); + } return *boost::any_cast(fEventMap.at(eventKey)); // auto &&tmp = boost::any_cast(fEventMap.at(eventKey)); diff --git a/fairmq/options/FairMQParser.cxx b/fairmq/options/FairMQParser.cxx index 097f5385..388147d9 100644 --- a/fairmq/options/FairMQParser.cxx +++ b/fairmq/options/FairMQParser.cxx @@ -228,7 +228,6 @@ void ChannelParser(const boost::property_tree::ptree& tree, FairMQMap& channelMa commonChannel.UpdateType(q.second.get("type", commonChannel.GetType())); commonChannel.UpdateMethod(q.second.get("method", commonChannel.GetMethod())); commonChannel.UpdateAddress(q.second.get("address", commonChannel.GetAddress())); - commonChannel.UpdateProperty(q.second.get("property", commonChannel.GetProperty())); commonChannel.UpdateSndBufSize(q.second.get("sndBufSize", commonChannel.GetSndBufSize())); commonChannel.UpdateRcvBufSize(q.second.get("rcvBufSize", commonChannel.GetRcvBufSize())); commonChannel.UpdateRateLogging(q.second.get("rateLogging", commonChannel.GetRateLogging())); @@ -245,7 +244,6 @@ void ChannelParser(const boost::property_tree::ptree& tree, FairMQMap& channelMa LOG(DEBUG) << "\ttype = " << commonChannel.GetType(); LOG(DEBUG) << "\tmethod = " << commonChannel.GetMethod(); LOG(DEBUG) << "\taddress = " << commonChannel.GetAddress(); - LOG(DEBUG) << "\tproperty = " << commonChannel.GetProperty(); LOG(DEBUG) << "\tsndBufSize = " << commonChannel.GetSndBufSize(); LOG(DEBUG) << "\trcvBufSize = " << commonChannel.GetRcvBufSize(); LOG(DEBUG) << "\trateLogging = " << commonChannel.GetRateLogging(); @@ -287,7 +285,6 @@ void ChannelParser(const boost::property_tree::ptree& tree, FairMQMap& channelMa commonChannel.UpdateType(p.second.get("type", commonChannel.GetType())); commonChannel.UpdateMethod(p.second.get("method", commonChannel.GetMethod())); commonChannel.UpdateAddress(p.second.get("address", commonChannel.GetAddress())); - commonChannel.UpdateProperty(p.second.get("property", commonChannel.GetProperty())); commonChannel.UpdateSndBufSize(p.second.get("sndBufSize", commonChannel.GetSndBufSize())); commonChannel.UpdateRcvBufSize(p.second.get("rcvBufSize", commonChannel.GetRcvBufSize())); commonChannel.UpdateRateLogging(p.second.get("rateLogging", commonChannel.GetRateLogging())); @@ -305,7 +302,6 @@ void ChannelParser(const boost::property_tree::ptree& tree, FairMQMap& channelMa LOG(DEBUG) << "\ttype = " << commonChannel.GetType(); LOG(DEBUG) << "\tmethod = " << commonChannel.GetMethod(); LOG(DEBUG) << "\taddress = " << commonChannel.GetAddress(); - LOG(DEBUG) << "\tproperty = " << commonChannel.GetProperty(); LOG(DEBUG) << "\tsndBufSize = " << commonChannel.GetSndBufSize(); LOG(DEBUG) << "\trcvBufSize = " << commonChannel.GetRcvBufSize(); LOG(DEBUG) << "\trateLogging = " << commonChannel.GetRateLogging(); @@ -344,7 +340,6 @@ void SocketParser(const boost::property_tree::ptree& tree, vector channel.UpdateType(q.second.get("type", channel.GetType())); channel.UpdateMethod(q.second.get("method", channel.GetMethod())); channel.UpdateAddress(q.second.get("address", channel.GetAddress())); - channel.UpdateProperty(q.second.get("property", channel.GetProperty())); channel.UpdateSndBufSize(q.second.get("sndBufSize", channel.GetSndBufSize())); channel.UpdateRcvBufSize(q.second.get("rcvBufSize", channel.GetRcvBufSize())); channel.UpdateRateLogging(q.second.get("rateLogging", channel.GetRateLogging())); @@ -353,7 +348,6 @@ void SocketParser(const boost::property_tree::ptree& tree, vector LOG(DEBUG) << "\ttype = " << channel.GetType(); LOG(DEBUG) << "\tmethod = " << channel.GetMethod(); LOG(DEBUG) << "\taddress = " << channel.GetAddress(); - LOG(DEBUG) << "\tproperty = " << channel.GetProperty(); LOG(DEBUG) << "\tsndBufSize = " << channel.GetSndBufSize(); LOG(DEBUG) << "\trcvBufSize = " << channel.GetRcvBufSize(); LOG(DEBUG) << "\trateLogging = " << channel.GetRateLogging(); @@ -372,7 +366,6 @@ void SocketParser(const boost::property_tree::ptree& tree, vector channel.UpdateType(p.second.get("type", channel.GetType())); channel.UpdateMethod(p.second.get("method", channel.GetMethod())); channel.UpdateAddress(p.second.get("address", channel.GetAddress())); - channel.UpdateProperty(p.second.get("property", channel.GetProperty())); channel.UpdateSndBufSize(p.second.get("sndBufSize", channel.GetSndBufSize())); channel.UpdateRcvBufSize(p.second.get("rcvBufSize", channel.GetRcvBufSize())); channel.UpdateRateLogging(p.second.get("rateLogging", channel.GetRateLogging())); @@ -381,7 +374,6 @@ void SocketParser(const boost::property_tree::ptree& tree, vector LOG(DEBUG) << "\ttype = " << channel.GetType(); LOG(DEBUG) << "\tmethod = " << channel.GetMethod(); LOG(DEBUG) << "\taddress = " << channel.GetAddress(); - LOG(DEBUG) << "\tproperty = " << channel.GetProperty(); LOG(DEBUG) << "\tsndBufSize = " << channel.GetSndBufSize(); LOG(DEBUG) << "\trcvBufSize = " << channel.GetRcvBufSize(); LOG(DEBUG) << "\trateLogging = " << channel.GetRateLogging(); @@ -406,7 +398,6 @@ void SocketParser(const boost::property_tree::ptree& tree, vector LOG(DEBUG) << "\ttype = " << channel.GetType(); LOG(DEBUG) << "\tmethod = " << channel.GetMethod(); LOG(DEBUG) << "\taddress = " << channel.GetAddress(); - LOG(DEBUG) << "\tproperty = " << channel.GetProperty(); LOG(DEBUG) << "\tsndBufSize = " << channel.GetSndBufSize(); LOG(DEBUG) << "\trcvBufSize = " << channel.GetRcvBufSize(); LOG(DEBUG) << "\trateLogging = " << channel.GetRateLogging(); diff --git a/fairmq/options/FairMQProgOptions.cxx b/fairmq/options/FairMQProgOptions.cxx index e048dec3..1b0ebd63 100644 --- a/fairmq/options/FairMQProgOptions.cxx +++ b/fairmq/options/FairMQProgOptions.cxx @@ -32,14 +32,10 @@ FairMQProgOptions::FairMQProgOptions() { } -// ---------------------------------------------------------------------------------- - FairMQProgOptions::~FairMQProgOptions() { } -// ---------------------------------------------------------------------------------- - void FairMQProgOptions::ParseAll(const int argc, char** argv, bool allowUnregistered) { // init description @@ -209,9 +205,6 @@ void FairMQProgOptions::ParseAll(const int argc, char** argv, bool allowUnregist FairProgOptions::PrintOptions(); } - -// ---------------------------------------------------------------------------------- - int FairMQProgOptions::Store(const FairMQMap& channels) { fFairMQMap = channels; @@ -219,9 +212,6 @@ int FairMQProgOptions::Store(const FairMQMap& channels) return 0; } - -// ---------------------------------------------------------------------------------- - // replace FairMQChannelMap, and update variable map accordingly int FairMQProgOptions::UpdateChannelMap(const FairMQMap& channels) { @@ -230,8 +220,6 @@ int FairMQProgOptions::UpdateChannelMap(const FairMQMap& channels) return 0; } -// ---------------------------------------------------------------------------------- - // read FairMQChannelMap and insert/update corresponding values in variable map // create key for variable map as follow : channelName.index.memberName void FairMQProgOptions::UpdateMQValues() @@ -244,23 +232,20 @@ void FairMQProgOptions::UpdateMQValues() std::string typeKey = p.first + "." + std::to_string(index) + ".type"; std::string methodKey = p.first + "." + std::to_string(index) + ".method"; std::string addressKey = p.first + "." + std::to_string(index) + ".address"; - std::string propertyKey = p.first + "." + std::to_string(index) + ".property"; std::string sndBufSizeKey = p.first + "." + std::to_string(index) + ".sndBufSize"; std::string rcvBufSizeKey = p.first + "." + std::to_string(index) + ".rcvBufSize"; std::string rateLoggingKey = p.first + "." + std::to_string(index) + ".rateLogging"; - + fMQKeyMap[typeKey] = std::make_tuple(p.first,index,"type"); fMQKeyMap[methodKey] = std::make_tuple(p.first,index,"method"); fMQKeyMap[addressKey] = std::make_tuple(p.first,index,"address"); - fMQKeyMap[propertyKey] = std::make_tuple(p.first,index,"property"); fMQKeyMap[sndBufSizeKey] = std::make_tuple(p.first,index,"sndBufSize"); fMQKeyMap[rcvBufSizeKey] = std::make_tuple(p.first,index,"rcvBufSize"); fMQKeyMap[rateLoggingKey] = std::make_tuple(p.first,index,"rateLogging"); - + UpdateVarMap(typeKey,channel.GetType()); UpdateVarMap(methodKey,channel.GetMethod()); UpdateVarMap(addressKey,channel.GetAddress()); - UpdateVarMap(propertyKey,channel.GetProperty()); //UpdateVarMap(sndBufSizeKey, std::to_string(channel.GetSndBufSize()));// string API @@ -277,21 +262,15 @@ void FairMQProgOptions::UpdateMQValues() LOG(DEBUG) << "key = " << typeKey <<"\t value = " << GetValue(typeKey); LOG(DEBUG) << "key = " << methodKey <<"\t value = " << GetValue(methodKey); LOG(DEBUG) << "key = " << addressKey <<"\t value = " << GetValue(addressKey); - LOG(DEBUG) << "key = " << propertyKey <<"\t value = " << GetValue(propertyKey); LOG(DEBUG) << "key = " << sndBufSizeKey << "\t value = " << GetValue(sndBufSizeKey); LOG(DEBUG) << "key = " << rcvBufSizeKey <<"\t value = " << GetValue(rcvBufSizeKey); LOG(DEBUG) << "key = " << rateLoggingKey <<"\t value = " << GetValue(rateLoggingKey); */ index++; } - } - } -// ---------------------------------------------------------------------------------- - - int FairMQProgOptions::NotifySwitchOption() { if (fVarMap.count("help")) @@ -309,8 +288,6 @@ int FairMQProgOptions::NotifySwitchOption() return 0; } -// ---------------------------------------------------------------------------------- - void FairMQProgOptions::InitOptionDescription() { // Id required in command line if config txt file not enabled @@ -320,8 +297,8 @@ void FairMQProgOptions::InitOptionDescription() ("id", po::value(), "Device ID (required argument).") ("io-threads", po::value()->default_value(1), "Number of I/O threads.") ("transport", po::value()->default_value("zeromq"), "Transport ('zeromq'/'nanomsg').") - ("deployment", po::value()->default_value("static"), "Deployment ('static'/'dds').") - ("control", po::value()->default_value("interactive"), "States control ('interactive'/'static'/'dds').") + ("config", po::value()->default_value("static"), "Config source ('static'/).") + ("control", po::value()->default_value("interactive"), "States control ('interactive'/'static'/).") ("network-interface", po::value()->default_value("eth0"), "Network interface to bind on (e.g. eth0, ib0, wlan0, en0, lo...).") ("config-key", po::value(), "Use provided value instead of device id for fetching the configuration from the config file") ("catch-signals", po::value()->default_value(1), "Enable signal handling (1/0)") @@ -332,8 +309,8 @@ void FairMQProgOptions::InitOptionDescription() ("id", po::value()->required(), "Device ID (required argument).") ("io-threads", po::value()->default_value(1), "Number of I/O threads.") ("transport", po::value()->default_value("zeromq"), "Transport ('zeromq'/'nanomsg').") - ("deployment", po::value()->default_value("static"), "Deployment ('static'/'dds').") - ("control", po::value()->default_value("interactive"), "States control ('interactive'/'static'/'dds').") + ("config", po::value()->default_value("static"), "Config source ('static'/).") + ("control", po::value()->default_value("interactive"), "States control ('interactive'/'static'/).") ("network-interface", po::value()->default_value("eth0"), "Network interface to bind on (e.g. eth0, ib0, wlan0, en0, lo...).") ("config-key", po::value(), "Use provided value instead of device id for fetching the configuration from the config file") ("catch-signals", po::value()->default_value(1), "Enable signal handling (1/0)") @@ -346,8 +323,8 @@ void FairMQProgOptions::InitOptionDescription() ("id", po::value()->required(), "Device ID (required argument)") ("io-threads", po::value()->default_value(1), "Number of I/O threads") ("transport", po::value()->default_value("zeromq"), "Transport ('zeromq'/'nanomsg').") - ("deployment", po::value()->default_value("static"), "Deployment ('static'/'dds').") - ("control", po::value()->default_value("interactive"), "States control ('interactive'/'static'/'dds').") + ("config", po::value()->default_value("static"), "Config source ('static'/).") + ("control", po::value()->default_value("interactive"), "States control ('interactive'/'static'/).") ("network-interface", po::value()->default_value("eth0"), "Network interface to bind on (e.g. eth0, ib0, wlan0, en0, lo...).") ("config-key", po::value(), "Use provided value instead of device id for fetching the configuration from the config file") ("catch-signals", po::value()->default_value(1), "Enable signal handling (1/0)") @@ -374,33 +351,25 @@ void FairMQProgOptions::InitOptionDescription() } } -// ---------------------------------------------------------------------------------- - int FairMQProgOptions::UpdateChannelMap(const std::string& channelName, int index, const std::string& member, const std::string& val) { - if(member == "type") + if (member == "type") { fFairMQMap.at(channelName).at(index).UpdateType(val); return 0; } - - if(member == "method") + + if (member == "method") { fFairMQMap.at(channelName).at(index).UpdateMethod(val); return 0; } - - if(member == "address") + + if (member == "address") { fFairMQMap.at(channelName).at(index).UpdateAddress(val); return 0; } - - if(member == "property") - { - fFairMQMap.at(channelName).at(index).UpdateProperty(val); - return 0; - } else { //if we get there it means something is wrong @@ -408,39 +377,32 @@ int FairMQProgOptions::UpdateChannelMap(const std::string& channelName, int inde << channelName<<"."<(val)); } @@ -458,20 +420,19 @@ int FairMQProgOptions::UpdateChannelMap(const std::string& channelName, int inde int FairMQProgOptions::UpdateChannelMap(const std::string& channelName, int index, const std::string& member, int val) { - - if(member == "sndBufSize") + if (member == "sndBufSize") { fFairMQMap.at(channelName).at(index).UpdateSndBufSize(val); return 0; } - - if(member == "rcvBufSize") + + if (member == "rcvBufSize") { fFairMQMap.at(channelName).at(index).UpdateRcvBufSize(val); return 0; } - if(member == "rateLogging") + if (member == "rateLogging") { fFairMQMap.at(channelName).at(index).UpdateRateLogging(val); return 0; diff --git a/fairmq/options/FairProgOptions.h b/fairmq/options/FairProgOptions.h index 25b7c294..831e502c 100644 --- a/fairmq/options/FairProgOptions.h +++ b/fairmq/options/FairProgOptions.h @@ -84,6 +84,10 @@ class FairProgOptions { val = fVarMap[key].as(); } + else + { + LOG(ERROR) << "Config has no key: " << key; + } } catch(std::exception& e) { diff --git a/fairmq/options/runConfigEx.cxx b/fairmq/options/runConfigEx.cxx index 91d50b86..4bbbeb7a 100644 --- a/fairmq/options/runConfigEx.cxx +++ b/fairmq/options/runConfigEx.cxx @@ -10,208 +10,181 @@ * Author: winckler */ - // FairRoot - FairMQ #include "FairMQLogger.h" #include "FairMQProgOptions.h" #include "FairMQDevice.h" -typedef std::unordered_map> FairMQMap; +#include +#include +#include +#include -class MyDevice : public FairMQDevice +using namespace std; + +typedef unordered_map> FairMQMap; + +class MyDevice : public FairMQDevice { public: - MyDevice() : rate(0.5) {} - virtual ~MyDevice() {} - void SetRate(double r){rate=r;} - void Print(){LOG(INFO)<<"[MyDevice] rate = "<()->default_value(0.5), "Data rate"); - + ("data-rate", po::value()->default_value(0.5), "Data rate"); + // parse command lines, parse json file and init FairMQMap config.ParseAll(argc, argv); - // get FairMQMap - auto map1 = config.GetFairMQMap(); - - // form keys from map1 and print the value stored in variable map - PrintMQParam(map1,config); + // // get FairMQMap + // auto map1 = config.GetFairMQMap(); - // update value in variable map, and propagate the update to the FairMQMap - config.UpdateValue("data.0.address","tcp://localhost:1234"); - - // get the updated FairMQMap - auto map2 = config.GetFairMQMap(); + // // form keys from map1 and print the value stored in variable map + // PrintMQParam(map1, config); - // modify one channel value - map2.at("data").at(0).UpdateSndBufSize(500); + // // update value in variable map, and propagate the update to the FairMQMap + // config.UpdateValue("data.0.address","tcp://localhost:1234"); - // update the FairMQMap and propagate the change in variable map - config.UpdateChannelMap(map2); + // // get the updated FairMQMap + // auto map2 = config.GetFairMQMap(); - // print values stored in variable map - PrintMQParam(map2,config); + // // modify one channel value + // map2.at("data").at(0).UpdateSndBufSize(500); + + // // update the FairMQMap and propagate the change in variable map + // config.UpdateChannelMap(map2); + + // // print values stored in variable map + // PrintMQParam(map2, config); MyDevice device; device.CatchSignals(); device.SetConfig(config); + // getting as string and conversion helpers - std::string blah = config.GetStringValue("data-rate"); - double blah2 = config.ConvertTo(blah); - LOG(INFO)<<"blah2 "<(dataRateStr); + // LOG(INFO) << "dataRate: " << dataRate; + LOG(INFO) << "Subscribing: (data.0.address)"; + config.Subscribe("data.0.address", [&device](const string& key, const string& value) + { + LOG(INFO) << "[callback] Updating device parameter " << key << " = " << value; + device.fChannels.at("data").at(0).UpdateAddress(value); + }); - LOG(INFO)<<"---- Connect 1"; - config.Subscribe("data.0.address",[&device](const std::string& key, const std::string& value) - { - LOG(INFO) << "[Lambda] Update parameter (0) " << key << " = " << value; - device.fChannels.at("data").at(0).UpdateAddress(value); - }); + LOG(INFO) << "Subscribing: (data.0.rcvBufSize)"; + config.Subscribe("data.0.rcvBufSize", [&device](const string& key, int value) + { + LOG(INFO) << "[callback] Updating device parameter " << key << " = " << value; + device.fChannels.at("data").at(0).UpdateRcvBufSize(value); + }); + LOG(INFO) << "Subscribing: (data-rate)"; + config.Subscribe("data-rate", [&device](const string& key, double value) + { + LOG(INFO) << "[callback] Updating device parameter " << key << " = " << value; + device.SetRate(value); + }); - std::string key1("data.0.address"); - std::string value1("tcp://localhost:4321"); - config.UpdateValue(key1,value1); + LOG(INFO) << "Starting value updates...\n"; - LOG(INFO)<<"device.fChannels.GetAddress = "<("data.0.address"); + LOG(INFO) << "device: " << device.fChannels.at("data").at(0).GetAddress() << endl; + config.UpdateValue("data.0.rcvBufSize", 100); + LOG(INFO) << "config: " << config.GetValue("data.0.rcvBufSize"); + LOG(INFO) << "device: " << device.fChannels.at("data").at(0).GetRcvBufSize() << endl; - - - LOG(INFO)<<"---- Connect 2"; - config.Subscribe("data.0.method",[&device](const std::string& key, const std::string& value) - { - //value="abcd"; - LOG(INFO) << "[Lambda] Update parameter " << key << " = " << value; - device.fChannels.at("data").at(0).UpdateMethod(value); - }); - - LOG(INFO)<<"---- Connect 3"; - config.Subscribe("data.0.rcvBufSize",[&device](const std::string& key, int value) - { - LOG(INFO) << "[Lambda] Update parameter " << key << " = " << value; - device.fChannels.at("data").at(0).UpdateRcvBufSize(value); - }); - - LOG(INFO)<<"---- Connect 4"; - config.Subscribe("data-rate",[&device](const std::string& key, double value) - { - LOG(INFO) << "[Lambda] Update parameter " << key << " = " << value; - device.SetRate(value); - }); - - - std::string key2("data.0.rcvBufSize"); - int value2(100); - - std::string key3("data.0.method"); - std::string value3("bind"); - - - - LOG(INFO)<<"-------------------- start update"; - - //config.EmitUpdate(key,value); - config.UpdateValue(key1,value1); - - LOG(INFO)<<"device.fChannels.GetAddress = "<("data-rate"); + LOG(INFO) << "device: " << device.GetRate() << endl; + // device.Print(); // advanced commands - LOG(INFO)<<"-------------------- start custom 1"; - - config.Connect("myNewKey",[](MyDevice& d, double val) - { - d.SetRate(val); - d.Print(); - }); - - double value4=0.123; - config.Emit("myNewKey",device,value4); - - - LOG(INFO)<<"-------------------- start custom 2 with function"; - config.Connect("function example",&MyCallBack); - - value4=6.66; - config.Emit("function example",device,value4); + // LOG(INFO) << "-------------------- start custom 1"; + // config.Connect("myNewKey", [](MyDevice& d, double val) + // { + // d.SetRate(val); + // d.Print(); + // }); + // config.Emit("myNewKey", device, 0.123); + // LOG(INFO) << "-------------------- start custom 2 with function"; + // config.Connect("function example", &MyCallBack); + // config.Emit("function example", device, 6.66); } - catch (std::exception& e) + catch (exception& e) { LOG(ERROR) << "Unhandled Exception reached the top of main: " << e.what() << ", application will now exit"; @@ -219,5 +192,3 @@ int main(int argc, char** argv) } return 0; } - - diff --git a/fairmq/plugins/config/CMakeLists.txt b/fairmq/plugins/config/CMakeLists.txt new file mode 100644 index 00000000..19c81ab8 --- /dev/null +++ b/fairmq/plugins/config/CMakeLists.txt @@ -0,0 +1,45 @@ + ################################################################################ + # Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH # + # # + # This software is distributed under the terms of the # + # GNU Lesser General Public Licence version 3 (LGPL) version 3, # + # copied verbatim in the file "LICENSE" # + ################################################################################ + +set(INCLUDE_DIRECTORIES + ${CMAKE_SOURCE_DIR}/fairmq + ${CMAKE_SOURCE_DIR}/fairmq/plugins/config + ) + +set(SYSTEM_INCLUDE_DIRECTORIES + ${SYSTEM_INCLUDE_DIRECTORIES} + ${Boost_INCLUDE_DIR} + ${DDS_INCLUDE_DIR} +) + +include_directories(${INCLUDE_DIRECTORIES}) +include_directories(SYSTEM ${SYSTEM_INCLUDE_DIRECTORIES}) + +set(LINK_DIRECTORIES + ${LINK_DIRECTORIES} + ${Boost_LIBRARY_DIRS} +) + +link_directories(${LINK_DIRECTORIES}) + +set(SRCS + "FairMQDDSConfigPlugin.cxx" +) + +set(LIBRARY_NAME FairMQDDSConfigPlugin) + +set(DEPENDENCIES + ${DEPENDENCIES} + FairMQ + pthread + ${DDS_INTERCOM_LIBRARY_SHARED} + ${DDS_PROTOCOL_LIBRARY_SHARED} + ${DDS_USER_DEFAULTS_LIBRARY_SHARED} +) + +GENERATE_LIBRARY() diff --git a/fairmq/plugins/config/FairMQDDSConfigPlugin.cxx b/fairmq/plugins/config/FairMQDDSConfigPlugin.cxx new file mode 100644 index 00000000..12955dd7 --- /dev/null +++ b/fairmq/plugins/config/FairMQDDSConfigPlugin.cxx @@ -0,0 +1,186 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#include "dds_intercom.h" + +#include "FairMQConfigPlugin.h" + +#include "FairMQLogger.h" +#include "FairMQDevice.h" +#include "FairMQChannel.h" + +#include +#include +#include +#include +#include + +using namespace std; +using namespace dds::intercom_api; + +// container to hold channel config and corresponding dds key values +struct DDSConfig +{ + DDSConfig() + : subChannels() + , ddsValues() + {} + + // container of sub channels, e.g. 'i' in data[i] + vector subChannels; + // dds values for the channel + unordered_map ddsValues; +}; + +class FairMQConfigPluginDDS +{ + public: + static FairMQConfigPluginDDS* GetInstance() + { + if (fInstance == NULL) + { + fInstance = new FairMQConfigPluginDDS(); + } + return fInstance; + } + + static void ResetInstance() + { + try + { + delete fInstance; + fInstance = NULL; + } + catch (exception& e) + { + LOG(ERROR) << "Error: " << e.what() << endl; + return; + } + } + + void Init(FairMQDevice& device) + { + for (auto& mi : device.fChannels) + { + if ((mi.second).at(0).GetMethod() == "bind") + { + for (auto& vi : mi.second) + { + fBindingChans.push_back(&vi); + } + } + else if ((mi.second).at(0).GetMethod() == "connect") + { + // try some trickery with forwarding emplacing values into map + fConnectingChans.emplace(piecewise_construct, forward_as_tuple(mi.first), forward_as_tuple()); + LOG(DEBUG) << "preparing to connect: " << (mi.second).at(0).GetChannelPrefix() << " with " << mi.second.size() << " sockets."; + for (auto& vi : mi.second) + { + fConnectingChans.at(mi.first).subChannels.push_back(&vi); + } + } + else + { + LOG(ERROR) << "Cannot update address configuration. Socket method (bind/connect) not specified."; + return; + } + } + + if (fConnectingChans.size() > 0) + { + LOG(DEBUG) << "Subscribing for DDS properties."; + + fDDSKeyValue.subscribe([&] (const string& propertyId, const string& key, const string& value) + { + LOG(DEBUG) << "Received update for " << propertyId << ": key=" << key << " value=" << value; + fConnectingChans.at(propertyId).ddsValues.insert(make_pair(key.c_str(), value.c_str())); + + // update channels and remove them from unfinished container + for (auto mi = fConnectingChans.begin(); mi != fConnectingChans.end(); /* no increment */) + { + if (mi->second.subChannels.size() == mi->second.ddsValues.size()) + { + auto it = mi->second.ddsValues.begin(); + for (unsigned int i = 0; i < mi->second.subChannels.size(); ++i) + { + mi->second.subChannels.at(i)->UpdateAddress(it->second); + ++it; + } + // when multiple subChannels are used, their order on every device should be the same, irregardless of arrival order from DDS. + device.SortChannel(mi->first); + fConnectingChans.erase(mi++); + } + else + { + ++mi; + } + } + }); + } + } + + void Run(FairMQDevice& device) + { + // start DDS intercom service + fService.start(); + + // publish bound addresses via DDS at keys corresponding to the channel prefixes, e.g. 'data' in data[i] + for (const auto& i : fBindingChans) + { + LOG(DEBUG) << "Publishing " << i->GetChannelPrefix() << " address to DDS under '" << i->GetChannelPrefix() << "' property name."; + fDDSKeyValue.putValue(i->GetChannelPrefix(), i->GetAddress()); + } + } + + private: + FairMQConfigPluginDDS() + : fService() + , fDDSKeyValue(fService) + , fBindingChans() + , fConnectingChans() + { + fService.subscribeOnError([](EErrorCode errorCode, const string& msg) { + LOG(ERROR) << "DDS key-value error code: " << errorCode << ", message: " << msg; + }); + } + + static FairMQConfigPluginDDS* fInstance; + + CIntercomService fService; + CKeyValue fDDSKeyValue; + + // container for binding channels + vector fBindingChans; + // container for connecting channels + map fConnectingChans; +}; + +FairMQConfigPluginDDS* FairMQConfigPluginDDS::fInstance = NULL; + +void initConfig(FairMQDevice& device) +{ + FairMQConfigPluginDDS::GetInstance()->Init(device); +} + +/// Handles channels addresses of the device with configuration from DDS +/// Addresses of binding channels are published via DDS using channels names as keys +/// Addresses of connecting channels are collected from DDS using channels names as keys +/// \param device Reference to FairMQDevice whose channels to handle +void handleInitialConfig(FairMQDevice& device) +{ + FairMQConfigPluginDDS::GetInstance()->Run(device); +} + +void stopConfig() +{ + LOG(DEBUG) << "[FairMQConfigPluginDDS]: " << "Resetting instance."; + FairMQConfigPluginDDS::ResetInstance(); + LOG(DEBUG) << "[FairMQConfigPluginDDS]: " << "Instance has been reset."; +} + +FairMQConfigPlugin fairmqConfigPlugin = { initConfig, handleInitialConfig, stopConfig }; diff --git a/fairmq/plugins/config/FairMQDDSConfigPlugin.h b/fairmq/plugins/config/FairMQDDSConfigPlugin.h new file mode 100644 index 00000000..ccd972be --- /dev/null +++ b/fairmq/plugins/config/FairMQDDSConfigPlugin.h @@ -0,0 +1,7 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ diff --git a/fairmq/plugins/control/CMakeLists.txt b/fairmq/plugins/control/CMakeLists.txt new file mode 100644 index 00000000..5b9d3838 --- /dev/null +++ b/fairmq/plugins/control/CMakeLists.txt @@ -0,0 +1,65 @@ + ################################################################################ + # Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH # + # # + # This software is distributed under the terms of the # + # GNU Lesser General Public Licence version 3 (LGPL) version 3, # + # copied verbatim in the file "LICENSE" # + ################################################################################ + +set(INCLUDE_DIRECTORIES + ${CMAKE_SOURCE_DIR}/fairmq + ${CMAKE_SOURCE_DIR}/fairmq/plugins/control + ) + +set(SYSTEM_INCLUDE_DIRECTORIES + ${SYSTEM_INCLUDE_DIRECTORIES} + ${Boost_INCLUDE_DIR} + ${DDS_INCLUDE_DIR} +) + +include_directories(${INCLUDE_DIRECTORIES}) +include_directories(SYSTEM ${SYSTEM_INCLUDE_DIRECTORIES}) + +set(LINK_DIRECTORIES + ${LINK_DIRECTORIES} + ${Boost_LIBRARY_DIRS} +) + +link_directories(${LINK_DIRECTORIES}) + +set(SRCS + "FairMQDDSControlPlugin.cxx" +) + +set(LIBRARY_NAME FairMQDDSControlPlugin) + +set(DEPENDENCIES + ${DEPENDENCIES} + FairMQ + pthread + ${DDS_INTERCOM_LIBRARY_SHARED} + ${DDS_PROTOCOL_LIBRARY_SHARED} + ${DDS_USER_DEFAULTS_LIBRARY_SHARED} +) + +GENERATE_LIBRARY() + +set(Exe_Names + fairmq-dds-command-ui +) + +set(Exe_Source + ../../run/runDDSCommandUI.cxx +) + +list(LENGTH Exe_Names _length) +math(EXPR _length ${_length}-1) + +foreach(_file RANGE 0 ${_length}) + list(GET Exe_Names ${_file} _name) + list(GET Exe_Source ${_file} _src) + set(EXE_NAME ${_name}) + set(SRCS ${_src}) + set(DEPENDENCIES FairMQ pthread ${DDS_INTERCOM_LIBRARY_SHARED} ${DDS_PROTOCOL_LIBRARY_SHARED} ${DDS_USER_DEFAULTS_LIBRARY_SHARED}) + GENERATE_EXECUTABLE() +endforeach(_file RANGE 0 ${_length}) diff --git a/fairmq/plugins/control/FairMQDDSControlPlugin.cxx b/fairmq/plugins/control/FairMQDDSControlPlugin.cxx new file mode 100644 index 00000000..e06f520e --- /dev/null +++ b/fairmq/plugins/control/FairMQDDSControlPlugin.cxx @@ -0,0 +1,163 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#include "dds_intercom.h" + +#include "FairMQControlPlugin.h" + +#include "FairMQLogger.h" +#include "FairMQDevice.h" + +#include +#include +#include +#include +#include + +using namespace std; +using namespace dds::intercom_api; + +class FairMQControlPluginDDS +{ + public: + static FairMQControlPluginDDS* GetInstance() + { + if (fInstance == NULL) + { + fInstance = new FairMQControlPluginDDS(); + } + return fInstance; + } + + static void ResetInstance() + { + try + { + delete fInstance; + fInstance = NULL; + } + catch (exception& e) + { + LOG(ERROR) << "Error: " << e.what() << endl; + return; + } + } + + void Init(FairMQDevice& device) + { + string id = device.GetProperty(FairMQDevice::Id, ""); + string pid(to_string(getpid())); + + try + { + fDDSCustomCmd.subscribe([id, pid, &device, this](const string& cmd, const string& cond, uint64_t senderId) + { + LOG(INFO) << "Received command: " << cmd; + + if (cmd == "check-state") + { + fDDSCustomCmd.send(id + ": " + device.GetCurrentStateName() + " (pid: " + pid + ")", to_string(senderId)); + } + else if (fEvents.find(cmd) != fEvents.end()) + { + fDDSCustomCmd.send(id + ": attempting to " + cmd, to_string(senderId)); + device.ChangeState(cmd); + } + else if (cmd == "END") + { + fDDSCustomCmd.send(id + ": attempting to " + cmd, to_string(senderId)); + device.ChangeState(cmd); + fDDSCustomCmd.send(id + ": " + device.GetCurrentStateName(), to_string(senderId)); + if (device.GetCurrentStateName() == "EXITING") + { + unique_lock lock(fMtx); + fStopCondition.notify_one(); + } + } + else + { + LOG(WARN) << "Unknown command: " << cmd; + LOG(WARN) << "Origin: " << senderId; + LOG(WARN) << "Destination: " << cond; + } + }); + } + catch (exception& e) + { + LOG(ERROR) << "Error: " << e.what() << endl; + return; + } + } + + void Run(FairMQDevice& device) + { + try + { + fService.start(); + + LOG(INFO) << "Listening for commands from DDS..."; + unique_lock lock(fMtx); + while (!device.Terminated()) + { + fStopCondition.wait_for(lock, chrono::seconds(1)); + } + LOG(DEBUG) << "Stopping DDS control plugin"; + } + catch (exception& e) + { + LOG(ERROR) << "Error: " << e.what() << endl; + return; + } + } + + private: + FairMQControlPluginDDS() + : fService() + , fDDSCustomCmd(fService) + , fMtx() + , fStopCondition() + , fEvents({ "INIT_DEVICE", "INIT_TASK", "PAUSE", "RUN", "STOP", "RESET_TASK", "RESET_DEVICE" }) + { + fService.subscribeOnError([](const EErrorCode errorCode, const string& errorMsg) { + LOG(ERROR) << "Error received: error code: " << errorCode << ", error message: " << errorMsg << endl; + }); + } + + static FairMQControlPluginDDS* fInstance; + + CIntercomService fService; + CCustomCmd fDDSCustomCmd; + + mutex fMtx; + condition_variable fStopCondition; + + const set fEvents; +}; + +FairMQControlPluginDDS* FairMQControlPluginDDS::fInstance = NULL; + +void initControl(FairMQDevice& device) +{ + FairMQControlPluginDDS::GetInstance()->Init(device); +} + +/// Controls device state via DDS custom commands interface +/// \param device Reference to FairMQDevice whose state to control +void handleStateChanges(FairMQDevice& device) +{ + FairMQControlPluginDDS::GetInstance()->Run(device); +} + +void stopControl() +{ + LOG(DEBUG) << "[FairMQControlPluginDDS]: " << "Resetting instance."; + FairMQControlPluginDDS::ResetInstance(); + LOG(DEBUG) << "[FairMQControlPluginDDS]: " << "Instance has been reset."; +} + +FairMQControlPlugin fairmqControlPlugin = { initControl, handleStateChanges, stopControl }; diff --git a/fairmq/plugins/control/FairMQDDSControlPlugin.h b/fairmq/plugins/control/FairMQDDSControlPlugin.h new file mode 100644 index 00000000..ccd972be --- /dev/null +++ b/fairmq/plugins/control/FairMQDDSControlPlugin.h @@ -0,0 +1,7 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ diff --git a/fairmq/run/runDDSCommandUI.cxx b/fairmq/run/runDDSCommandUI.cxx index d9796a5d..fe563960 100644 --- a/fairmq/run/runDDSCommandUI.cxx +++ b/fairmq/run/runDDSCommandUI.cxx @@ -23,7 +23,12 @@ int main(int argc, char* argv[]) { try { - CCustomCmd ddsCustomCmd; + CIntercomService service; + CCustomCmd ddsCustomCmd(service); + + service.subscribeOnError([](const EErrorCode errorCode, const string& errorMsg) { + cout << "DDS error received: error code: " << errorCode << ", error message: " << errorMsg << endl; + }); // subscribe to receive messages from DDS ddsCustomCmd.subscribe([](const string& msg, const string& condition, uint64_t senderId) @@ -31,6 +36,8 @@ int main(int argc, char* argv[]) cout << "Received: \"" << msg << "\"" << endl; }); + service.start(); + char c; // setup reading from cin (enable raw mode) @@ -46,41 +53,39 @@ int main(int argc, char* argv[]) while (cin >> c) { - int result = 0; // result of the dds send - switch (c) { case 'c': cout << " > checking state of the devices" << endl; - result = ddsCustomCmd.send("check-state", ""); + ddsCustomCmd.send("check-state", ""); break; case 'i': cout << " > init devices" << endl; - result = ddsCustomCmd.send("INIT_DEVICE", ""); + ddsCustomCmd.send("INIT_DEVICE", ""); break; case 'j': cout << " > init tasks" << endl; - result = ddsCustomCmd.send("INIT_TASK", ""); + ddsCustomCmd.send("INIT_TASK", ""); break; case 'p': cout << " > pause devices" << endl; - result = ddsCustomCmd.send("PAUSE", ""); + ddsCustomCmd.send("PAUSE", ""); break; case 'r': cout << " > run tasks" << endl; - result = ddsCustomCmd.send("RUN", ""); + ddsCustomCmd.send("RUN", ""); break; case 's': cout << " > stop devices" << endl; - result = ddsCustomCmd.send("STOP", ""); + ddsCustomCmd.send("STOP", ""); break; case 't': cout << " > reset tasks" << endl; - result = ddsCustomCmd.send("RESET_TASK", ""); + ddsCustomCmd.send("RESET_TASK", ""); break; case 'd': cout << " > reset devices" << endl; - result = ddsCustomCmd.send("RESET_DEVICE", ""); + ddsCustomCmd.send("RESET_DEVICE", ""); break; case 'h': cout << " > help" << endl; @@ -88,7 +93,7 @@ int main(int argc, char* argv[]) break; case 'q': cout << " > end" << endl; - result = ddsCustomCmd.send("END", ""); + ddsCustomCmd.send("END", ""); break; default: cout << "Invalid input: [" << c << "]" << endl; @@ -96,11 +101,8 @@ int main(int argc, char* argv[]) break; } - if (result == 1) + if (argc == 2) { - cerr << "Error sending custom command" << endl; - } - if ( argc == 2 ) { usleep(50000); return EXIT_SUCCESS; } diff --git a/fairmq/runFairMQDevice.h b/fairmq/runFairMQDevice.h index fde9b411..a5c4a0c1 100644 --- a/fairmq/runFairMQDevice.h +++ b/fairmq/runFairMQDevice.h @@ -59,7 +59,7 @@ int main(int argc, char** argv) } catch (std::exception& e) { - LOG(ERROR) << "Unhandled Exception reached the top of main: " << e.what() << ", application will now exit"; + LOG(ERROR) << "Unhandled exception reached the top of main: " << e.what() << ", application will now exit"; return 1; } diff --git a/fairmq/tools/FairMQDDSTools.h b/fairmq/tools/FairMQDDSTools.h deleted file mode 100644 index 8ac8b521..00000000 --- a/fairmq/tools/FairMQDDSTools.h +++ /dev/null @@ -1,203 +0,0 @@ -#ifndef FAIRMQDDSTOOLS_H_ -#define FAIRMQDDSTOOLS_H_ - -#include "FairMQLogger.h" -#include "FairMQDevice.h" -#include "FairMQChannel.h" - -#include -#include -#include -#include -#include -#include -#include - -#include "dds_intercom.h" // DDS - -using namespace std; -using namespace dds::intercom_api; - -// container to hold channel config and corresponding dds key values -struct DDSConfig -{ - DDSConfig() - : subChannels() - , ddsValues() - {} - - // container of sub channels, e.g. 'i' in data[i] - vector subChannels; - // dds values for the channel - CKeyValue::valuesMap_t ddsValues; -}; - -/// Handles channels addresses of the device with configuration from DDS -/// Addresses of binding channels are published via DDS using channels names as keys -/// Addresses of connecting channels are collected from DDS using channels names as keys -/// \param device Reference to FairMQDevice whose channels to handle -template -void HandleConfigViaDDS(TMQDevice& device) -{ - // container for binding channels - vector bindingChans; - // container for connecting channels - map connectingChans; - - // fill the containers - for (auto& mi : device.fChannels) - { - if ((mi.second).at(0).GetMethod() == "bind") - { - for (auto& vi : mi.second) - { - bindingChans.push_back(&vi); - } - } - else if ((mi.second).at(0).GetMethod() == "connect") - { - // try some trickery with forwarding emplacing values into map - connectingChans.emplace(piecewise_construct, forward_as_tuple(mi.first), forward_as_tuple()); - for (auto& vi : mi.second) - { - connectingChans.at(mi.first).subChannels.push_back(&vi); - } - } - else - { - LOG(ERROR) << "Cannot update address configuration. Socket method (bind/connect) not specified."; - return; - } - } - - // Wait for the binding channels to bind - device.WaitForInitialValidation(); - - // DDS key value store - CKeyValue ddsKV; - - // publish bound addresses via DDS at keys corresponding to the channel prefixes, e.g. 'data' in data[i] - for (const auto& i : bindingChans) - { - // LOG(INFO) << "Publishing " << i->GetChannelPrefix() << " address to DDS under '" << i->GetProperty() << "' property name."; - ddsKV.putValue(i->GetProperty(), i->GetAddress()); - } - - // receive connect addresses from DDS via keys corresponding to channel prefixes, e.g. 'data' in data[i] - if (connectingChans.size() > 0) - { - mutex keyMutex; - condition_variable keyCV; - - LOG(DEBUG) << "Subscribing for DDS properties."; - ddsKV.subscribe([&] (const string& /*key*/, const string& /*value*/) - { - keyCV.notify_all(); - }); - - // scope based locking - { - unique_lock lock(keyMutex); - keyCV.wait_for(lock, chrono::milliseconds(1000), [&] () - { - // receive new properties - for (auto& mi : connectingChans) - { - for (auto& vi : mi.second.subChannels) - { - // LOG(INFO) << "Waiting for " << vi->GetChannelPrefix() << " address from DDS."; - ddsKV.getValues(vi->GetProperty(), &(mi.second.ddsValues)); - } - } - - // update channels and remove them from unfinished container - for (auto mi = connectingChans.begin(); mi != connectingChans.end(); /* no increment */) - { - if (mi->second.subChannels.size() == mi->second.ddsValues.size()) - { - auto it = mi->second.ddsValues.begin(); - for (unsigned int i = 0; i < mi->second.subChannels.size(); ++i) - { - mi->second.subChannels.at(i)->UpdateAddress(it->second); - ++it; - } - // when multiple subChannels are used, their order on every device should be the same, irregardless of arrival order from DDS. - device.SortChannel(mi->first); - connectingChans.erase(mi++); - } - else - { - ++mi; - } - } - - if (connectingChans.empty()) - { - LOG(DEBUG) << "Successfully received all required DDS properties!"; - } - return connectingChans.empty(); - }); - } - } -} - -/// Controls device state via DDS custom commands interface -/// \param device Reference to FairMQDevice whose state to control -void runDDSStateHandler(FairMQDevice& device) -{ - mutex mtx; - condition_variable stopCondition; - - string id = device.GetProperty(FairMQDevice::Id, ""); - string pid(to_string(getpid())); - - try - { - const set events = { "INIT_DEVICE", "INIT_TASK", "PAUSE", "RUN", "STOP", "RESET_TASK", "RESET_DEVICE" }; - - CCustomCmd ddsCustomCmd; - - ddsCustomCmd.subscribe([&](const string& cmd, const string& cond, uint64_t senderId) - { - LOG(INFO) << "Received command: " << cmd; - - if (cmd == "check-state") - { - ddsCustomCmd.send(id + ": " + device.GetCurrentStateName() + " (pid: " + pid + ")", to_string(senderId)); - } - else if (events.find(cmd) != events.end()) - { - ddsCustomCmd.send(id + ": attempting to " + cmd, to_string(senderId)); - device.ChangeState(cmd); - } - else if (cmd == "END") - { - ddsCustomCmd.send(id + ": attempting to " + cmd, to_string(senderId)); - device.ChangeState(cmd); - ddsCustomCmd.send(id + ": " + device.GetCurrentStateName(), to_string(senderId)); - if (device.GetCurrentStateName() == "EXITING") - { - unique_lock lock(mtx); - stopCondition.notify_one(); - } - } - else - { - LOG(WARN) << "Unknown command: " << cmd; - LOG(WARN) << "Origin: " << senderId; - LOG(WARN) << "Destination: " << cond; - } - }); - - LOG(INFO) << "Listening for commands from DDS..."; - unique_lock lock(mtx); - stopCondition.wait(lock); - } - catch (exception& e) - { - cerr << "Error: " << e.what() << endl; - return; - } -} - -#endif /* FAIRMQDDSTOOLS_H_ */ diff --git a/fairmq/tools/runSimpleMQStateMachine.h b/fairmq/tools/runSimpleMQStateMachine.h index c659daba..eb52202a 100644 --- a/fairmq/tools/runSimpleMQStateMachine.h +++ b/fairmq/tools/runSimpleMQStateMachine.h @@ -9,23 +9,22 @@ #define RUNSIMPLEMQSTATEMACHINE_H #include "FairMQLogger.h" +#include "FairMQConfigPlugin.h" +#include "FairMQControlPlugin.h" #include "FairMQParser.h" #include "FairMQProgOptions.h" #include #include +#include #include -#ifdef DDS_FOUND -#include "FairMQDDSTools.h" -#endif /*DDS_FOUND*/ - // template function that takes any device -// and runs a simple MQ state machine configured from a JSON file and/or (optionally) DDS. +// and runs a simple MQ state machine configured from a JSON file and/or a plugin. template -inline int runStateMachine(TMQDevice& device, FairMQProgOptions& config) +inline int runStateMachine(TMQDevice& device, FairMQProgOptions& cfg) { - if (config.GetValue("catch-signals") > 0) + if (cfg.GetValue("catch-signals") > 0) { device.CatchSignals(); } @@ -34,20 +33,91 @@ inline int runStateMachine(TMQDevice& device, FairMQProgOptions& config) LOG(WARN) << "Signal handling (e.g. ctrl+C) has been deactivated via command line argument"; } - device.SetConfig(config); - std::string control = config.GetValue("control"); + device.SetConfig(cfg); + std::string config = cfg.GetValue("config"); + std::string control = cfg.GetValue("control"); + + // plugin objects + void* ldConfigHandle = nullptr; + void* ldControlHandle = nullptr; + FairMQConfigPlugin* fairmqConfigPlugin = nullptr; + FairMQControlPlugin* fairmqControlPlugin = nullptr; + + std::clock_t c_start = std::clock(); + auto t_start = std::chrono::high_resolution_clock::now(); device.ChangeState(TMQDevice::INIT_DEVICE); + // Wait for the binding channels to bind + device.WaitForInitialValidation(); -#ifdef DDS_FOUND - if (control == "dds") + if (config != "static") { - HandleConfigViaDDS(device); + LOG(DEBUG) << "Opening config plugin: " << config; + ldConfigHandle = dlopen(config.c_str(), RTLD_LAZY); + + if (!ldConfigHandle) + { + LOG(ERROR) << "Cannot open library: " << dlerror(); + return 1; + } + + // load the fairmqConfigPlugin + dlerror(); + fairmqConfigPlugin = static_cast(dlsym(ldConfigHandle, "fairmqConfigPlugin")); + const char* dlsymError = dlerror(); + if (dlsymError) + { + LOG(ERROR) << "Cannot load fairmqConfigPlugin() from: " << dlsymError; + fairmqConfigPlugin = nullptr; + dlclose(ldConfigHandle); + return 1; + } + + fairmqConfigPlugin->initConfig(device); + } + + if (control != "interactive" && control != "static") + { + LOG(DEBUG) << "Opening control plugin: " << control; + ldControlHandle = dlopen(control.c_str(), RTLD_LAZY); + + if (!ldControlHandle) + { + LOG(ERROR) << "Cannot open library: " << dlerror(); + return 1; + } + + // load the fairmqControlPlugin + dlerror(); + fairmqControlPlugin = static_cast(dlsym(ldControlHandle, "fairmqControlPlugin")); + const char* dlsymError = dlerror(); + if (dlsymError) + { + LOG(ERROR) << "Cannot load fairmqControlPlugin(): " << dlsymError; + fairmqControlPlugin = nullptr; + dlclose(ldControlHandle); + return 1; + } + + fairmqControlPlugin->initControl(device); + } + + if (config != "static") + { + if (fairmqConfigPlugin) + { + fairmqConfigPlugin->handleInitialConfig(device); + } } -#endif /*DDS_FOUND*/ device.WaitForEndOfState(TMQDevice::INIT_DEVICE); + std::clock_t c_end = std::clock(); + auto t_end = std::chrono::high_resolution_clock::now(); + + LOG(DEBUG) << "Init time (CPU) : " << std::fixed << std::setprecision(2) << 1000.0 * (c_end - c_start) / CLOCKS_PER_SEC << " ms"; + LOG(DEBUG) << "Init time (Wall): " << std::chrono::duration(t_end - t_start).count() << " ms"; + device.ChangeState(TMQDevice::INIT_TASK); device.WaitForEndOfState(TMQDevice::INIT_TASK); @@ -72,23 +142,32 @@ inline int runStateMachine(TMQDevice& device, FairMQProgOptions& config) device.ChangeState(TMQDevice::END); } } -#ifdef DDS_FOUND - else if (control == "dds") - { - runDDSStateHandler(device); - } -#endif /*DDS_FOUND*/ else { - LOG(ERROR) << "Requested control mechanism '" << control << "' is not available."; - LOG(ERROR) << "Currently available are:" - << " 'interactive'" - << ", 'static'" -#ifdef DDS_FOUND - << ", 'dds'" -#endif /*DDS_FOUND*/ - << ". Exiting."; - return 1; + if (fairmqControlPlugin) + { + fairmqControlPlugin->handleStateChanges(device); + } + } + + if (config != "static") + { + if (fairmqConfigPlugin) + { + LOG(DEBUG) << "Closing FairMQConfigPlugin..."; + fairmqConfigPlugin->stopConfig(); + dlclose(ldConfigHandle); + } + } + + if (control != "interactive" && control != "static") + { + if (fairmqControlPlugin) + { + LOG(DEBUG) << "Closing FairMQControlPlugin..."; + fairmqControlPlugin->stopControl(); + dlclose(ldControlHandle); + } } return 0;