FairMQ: Implement property change event config API

Replaced the old event manager implementation, which changed the
subscription semantics to bulk event subscriptions.
This commit is contained in:
Dennis Klein 2017-09-28 21:51:42 +02:00 committed by Mohammad Al-Turany
parent 8c8ee45914
commit 7f23a70670
7 changed files with 95 additions and 213 deletions

View File

@ -91,10 +91,10 @@ class Plugin
auto GetPropertyAsString(const std::string& key) const -> std::string { return fPluginServices->GetPropertyAsString(key); } auto GetPropertyAsString(const std::string& key) const -> std::string { return fPluginServices->GetPropertyAsString(key); }
auto GetChannelInfo() const -> std::unordered_map<std::string, int> { return fPluginServices->GetChannelInfo(); } auto GetChannelInfo() const -> std::unordered_map<std::string, int> { return fPluginServices->GetChannelInfo(); }
auto GetPropertyKeys() const -> std::vector<std::string> { return fPluginServices->GetPropertyKeys(); } auto GetPropertyKeys() const -> std::vector<std::string> { return fPluginServices->GetPropertyKeys(); }
// template<typename T> template<typename T>
// auto SubscribeToPropertyChange(std::function<void(const std::string& [>key*/, const T /*newValue<])> callback) const -> void { fPluginServices.SubscribeToPropertyChange(fkName, callback); } auto SubscribeToPropertyChange(std::function<void(const std::string& key, T newValue)> callback) -> void { fPluginServices->SubscribeToPropertyChange<T>(fkName, callback); }
// template<typename T> template<typename T>
// auto UnsubscribeFromPropertyChange() -> void { fPluginServices.UnsubscribeFromPropertyChange<T>(fkName); } auto UnsubscribeFromPropertyChange() -> void { fPluginServices->UnsubscribeFromPropertyChange<T>(fkName); }
private: private:
const std::string fkName; const std::string fkName;

View File

@ -206,30 +206,25 @@ class PluginServices
auto GetChannelInfo() const -> std::unordered_map<std::string, int> { return fConfig->GetChannelInfo(); } auto GetChannelInfo() const -> std::unordered_map<std::string, int> { return fConfig->GetChannelInfo(); }
/// @brief Discover the list of property keys
/// @return list of property keys
auto GetPropertyKeys() const -> std::vector<std::string> { return fConfig->GetPropertyKeys(); } auto GetPropertyKeys() const -> std::vector<std::string> { return fConfig->GetPropertyKeys(); }
/// @brief Subscribe to property updates of type T /// @brief Subscribe to property updates of type T
/// @param subscriber /// @param subscriber
/// @param callback function /// @param callback function
/// ///
/// While PluginServices provides the SetProperty method which can update properties only during certain device states, there are /// Subscribe to property changes with a callback to monitor property changes in an event based fashion.
/// other APIs in a FairMQ device that can update properties at any time. Therefore, the callback implementation should expect to be called in any template<typename T>
/// device state. auto SubscribeToPropertyChange(const std::string& subscriber, std::function<void(const std::string& key, T)> callback) const -> void
// template<typename T> {
// auto SubscribeToPropertyChange( fConfig->Subscribe<T>(subscriber, callback);
// const std::string& subscriber, }
// std::function<void(const std::string& [>key*/, const T /*newValue<])> callback
// ) const -> void /// @brief Unsubscribe from property updates of type T
// { /// @param subscriber
// fConfig->Subscribe(subscriber, callback); template<typename T>
// } auto UnsubscribeFromPropertyChange(const std::string& subscriber) -> void { fConfig->Unsubscribe<T>(subscriber); }
//
// /// @brief Unsubscribe from property updates of type T
// /// @param subscriber
// template<typename T>
// auto UnsubscribeFromPropertyChange(const std::string& subscriber) -> void { fConfig->Unsubscribe<T>(subscriber); }
//
// TODO Fix property subscription
static const std::unordered_map<std::string, DeviceState> fkDeviceStateStrMap; static const std::unordered_map<std::string, DeviceState> fkDeviceStateStrMap;
static const std::unordered_map<DeviceState, std::string, tools::HashEnum<DeviceState>> fkStrDeviceStateMap; static const std::unordered_map<DeviceState, std::string, tools::HashEnum<DeviceState>> fkStrDeviceStateMap;

View File

@ -1,148 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/*
* File: FairMQEventManager.h
* Author: winckler
*
* Created on August 12, 2016, 13:50 PM
*/
#ifndef FAIRMQEVENTMANAGER_H
#define FAIRMQEVENTMANAGER_H
#include <FairMQLogger.h>
#include <map>
#include <utility>
#include <string>
#include <boost/any.hpp>
#include <boost/make_shared.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/signals2.hpp>
#include <boost/signals2/signal.hpp>
enum class EventId : uint32_t
{
// Place your new EventManager events here
UpdateParam = 0,
Custom = 1,
};
namespace Events
{
template <EventId,typename ...Args>
struct Traits;
template <typename T>
struct Traits<EventId::UpdateParam, T>
{
using signal_type = boost::signals2::signal<void(const std::string&, T)>;
};
template <typename T>
struct Traits<EventId::UpdateParam, std::vector<T>>
{
using signal_type = boost::signals2::signal<void(const std::string&, const std::vector<T>& )>;
};
template <>
struct Traits<EventId::UpdateParam, std::string>
{
using signal_type = boost::signals2::signal<void(const std::string&, const std::string&)>;
};
template<std::size_t N>
struct Traits<EventId::UpdateParam, const char[N]>
{
using signal_type = boost::signals2::signal<void(const std::string&, const std::string&)>;
};
template <typename ...T>
struct Traits<EventId::Custom,T...>
{
using signal_type = boost::signals2::signal<void(T...)>;
};
/*
template <EventId, typename ...Args> struct Traits2;
template <> struct Traits2<EventId::UpdateParam> { using signal_type = boost::signals2::signal<void(const std::string&, const std::string&)>; } ;
template <typename ...T> struct Traits2<EventId::UpdateParam,T...> { using signal_type = boost::signals2::signal<void(const std::string&, T...)>; } ;
template <> struct Traits2<EventId::UpdateParamInt> { using signal_type = boost::signals2::signal<void(const std::string&, int)>; } ;
// */
} // Events namespace
class FairMQEventManager
{
public:
using EventKey = std::pair<EventId, std::string>;
FairMQEventManager() :
fEventMap()
{}
virtual ~FairMQEventManager()
{}
template <EventId event, typename... ValueType, typename F>
void Connect(const std::string& key, F&& func) const
{
GetSlot<event, ValueType...>(key).connect(std::forward<F>(func));
}
template <EventId event, typename... ValueType>
void Disconnect(const std::string& key)
{
GetSlot<event, ValueType...>(key).disconnect();
}
template <EventId event, typename... ValueType, typename... Args>
void Emit(const std::string& key, Args&&... args)
{
GetSlot<event,ValueType...>(key)(std::forward<Args>(args)...);
}
template <EventId event>
bool EventKeyFound(const std::string& key)
{
return fEventMap.find(std::pair<EventId, std::string>(event, key)) != fEventMap.end();
}
private:
mutable std::map<EventKey, boost::any> fEventMap;
template <EventId event, typename... T, typename Slot = typename Events::Traits<event,T...>::signal_type,
typename SlotPtr = boost::shared_ptr<Slot>>
Slot& GetSlot(const std::string& key) const
{
try
{
EventKey eventKey = std::make_pair(event, key);
//static_assert(std::is_same<decltype(boost::make_shared<Slot>()),SlotPtr>::value, "");
if (fEventMap.find(eventKey) == fEventMap.end())
{
fEventMap.emplace(eventKey, boost::make_shared<Slot>());
}
return *boost::any_cast<SlotPtr>(fEventMap.at(eventKey));
// auto &&tmp = boost::any_cast<SlotPtr>(fEventMap.at(eventKey));
// return *tmp;
}
catch (boost::bad_any_cast const &e)
{
LOG(ERROR) << "Caught instance of boost::bad_any_cast: "
<< e.what() << " on event #" << static_cast<uint32_t>(event) << " and key" << key;
abort();
}
}
};
#endif /* FAIRMQEVENTMANAGER_H */

View File

@ -22,7 +22,7 @@
using namespace std; using namespace std;
FairMQProgOptions::FairMQProgOptions() FairMQProgOptions::FairMQProgOptions()
: FairProgOptions(), FairMQEventManager() : FairProgOptions()
, fMQParserOptions("MQ-Device parser options") , fMQParserOptions("MQ-Device parser options")
, fMQOptionsInCfg("MQ-Device options") , fMQOptionsInCfg("MQ-Device options")
, fMQOptionsInCmd("MQ-Device options") , fMQOptionsInCmd("MQ-Device options")

View File

@ -16,6 +16,8 @@
#ifndef FAIRMQPROGOPTIONS_H #ifndef FAIRMQPROGOPTIONS_H
#define FAIRMQPROGOPTIONS_H #define FAIRMQPROGOPTIONS_H
#include <fairmq/EventManager.h>
#include <unordered_map> #include <unordered_map>
#include <functional> #include <functional>
#include <map> #include <map>
@ -24,10 +26,19 @@
#include <string> #include <string>
#include "FairProgOptions.h" #include "FairProgOptions.h"
#include "FairMQEventManager.h"
#include "FairMQChannel.h" #include "FairMQChannel.h"
class FairMQProgOptions : public FairProgOptions , public FairMQEventManager namespace fair
{
namespace mq
{
struct PropertyChange : Event<std::string> {};
} /* namespace mq */
} /* namespace fair */
class FairMQProgOptions : public FairProgOptions
{ {
protected: protected:
using FairMQMap = std::unordered_map<std::string, std::vector<FairMQChannel>>; using FairMQMap = std::unordered_map<std::string, std::vector<FairMQChannel>>;
@ -192,12 +203,8 @@ class FairMQProgOptions : public FairProgOptions , public FairMQEventManager
} }
} }
// execute stored function of a given key if exist
//if (std::is_same<T, int>::value || std::is_same<T, std::string>::value)//if one wants to restrict type //if (std::is_same<T, int>::value || std::is_same<T, std::string>::value)//if one wants to restrict type
if (EventKeyFound(key)) fEvents.Emit<fair::mq::PropertyChange, typename std::decay<T>::type>(key, val);
{
EmitUpdate<typename std::decay<T>::type>(key, val);
}
return 0; return 0;
} }
@ -232,36 +239,29 @@ class FairMQProgOptions : public FairProgOptions , public FairMQEventManager
} }
} }
// execute stored function of a given key if exist
//if (std::is_same<T, int>::value || std::is_same<T, std::string>::value)//if one wants to restrict type //if (std::is_same<T, int>::value || std::is_same<T, std::string>::value)//if one wants to restrict type
if (EventKeyFound(key)) fEvents.Emit<fair::mq::PropertyChange, typename std::decay<T>::type>(key, val);
{
EmitUpdate<typename std::decay<T>::type>(key, val);
}
return 0; return 0;
} }
template <typename T, typename F> template <typename T>
void Subscribe(const std::string& key, F&& func) const void Subscribe(const std::string& subscriber, std::function<void(typename fair::mq::PropertyChange::KeyType, T)> func)
{ {
std::unique_lock<std::mutex> lock(fConfigMutex); std::unique_lock<std::mutex> lock(fConfigMutex);
static_assert(!std::is_same<T,const char*>::value || !std::is_same<T, char*>::value, static_assert(!std::is_same<T,const char*>::value || !std::is_same<T, char*>::value,
"In template member FairMQProgOptions::Subscribe<T>(key,Lambda) the types const char* or char* for the calback signatures are not supported."); "In template member FairMQProgOptions::Subscribe<T>(key,Lambda) the types const char* or char* for the calback signatures are not supported.");
if (fVarMap.count(key)) fEvents.Subscribe<fair::mq::PropertyChange, T>(subscriber, func);
{
Connect<EventId::UpdateParam, T>(key, std::forward<F>(func));
}
} }
template <typename T> template <typename T>
void Unsubscribe(const std::string& key) const void Unsubscribe(const std::string& subscriber)
{ {
std::unique_lock<std::mutex> lock(fConfigMutex); std::unique_lock<std::mutex> lock(fConfigMutex);
Disconnect<EventId::UpdateParam, T>(key); fEvents.Unsubscribe<fair::mq::PropertyChange, T>(subscriber);
} }
/* /*
@ -292,18 +292,6 @@ class FairMQProgOptions : public FairProgOptions , public FairMQEventManager
// map of read channel info - channel name - number of subchannels // map of read channel info - channel name - number of subchannels
std::unordered_map<std::string, int> fChannelInfo; std::unordered_map<std::string, int> fChannelInfo;
bool EventKeyFound(const std::string& key)
{
if (FairMQEventManager::EventKeyFound<EventId::UpdateParam>(key))
{
return true;
}
else
{
return false;
}
}
using MQKey = std::tuple<std::string, int, std::string>;//store key info using MQKey = std::tuple<std::string, int, std::string>;//store key info
std::map<std::string, MQKey> fMQKeyMap;// key=full path - val=key info std::map<std::string, MQKey> fMQKeyMap;// key=full path - val=key info
@ -335,7 +323,7 @@ class FairMQProgOptions : public FairProgOptions , public FairMQEventManager
//compile time check whether T is const char* or char*, and in that case a compile time error is thrown. //compile time check whether T is const char* or char*, and in that case a compile time error is thrown.
static_assert(!std::is_same<T,const char*>::value || !std::is_same<T, char*>::value, static_assert(!std::is_same<T,const char*>::value || !std::is_same<T, char*>::value,
"In template member FairMQProgOptions::EmitUpdate<T>(key,val) the types const char* or char* for the calback signatures are not supported."); "In template member FairMQProgOptions::EmitUpdate<T>(key,val) the types const char* or char* for the calback signatures are not supported.");
Emit<EventId::UpdateParam, T>(key, key, val); fEvents.Emit<fair::mq::PropertyChange, T>(key, val);
} }
int UpdateChannelMap(const std::string& channelName, int index, const std::string& member, const std::string& val); int UpdateChannelMap(const std::string& channelName, int index, const std::string& member, const std::string& val);
@ -348,6 +336,8 @@ class FairMQProgOptions : public FairProgOptions , public FairMQEventManager
} }
void UpdateChannelInfo(); void UpdateChannelInfo();
fair::mq::EventManager fEvents;
}; };
#endif /* FAIRMQPROGOPTIONS_H */ #endif /* FAIRMQPROGOPTIONS_H */

View File

@ -99,24 +99,33 @@ int main(int argc, char** argv)
// LOG(INFO) << "dataRate: " << dataRate; // LOG(INFO) << "dataRate: " << dataRate;
LOG(INFO) << "Subscribing: <string>(chans.data.0.address)"; LOG(INFO) << "Subscribing: <string>(chans.data.0.address)";
config.Subscribe<string>("chans.data.0.address", [&device](const string& key, const string& value) config.Subscribe<string>("test", [&device](const string& key, string value)
{ {
LOG(INFO) << "[callback] Updating device parameter " << key << " = " << value; if (key == "chans.data.0.address")
device.fChannels.at("data").at(0).UpdateAddress(value); {
LOG(INFO) << "[callback] Updating device parameter " << key << " = " << value;
device.fChannels.at("data").at(0).UpdateAddress(value);
}
}); });
LOG(INFO) << "Subscribing: <int>(chans.data.0.rcvBufSize)"; LOG(INFO) << "Subscribing: <int>(chans.data.0.rcvBufSize)";
config.Subscribe<int>("chans.data.0.rcvBufSize", [&device](const string& key, int value) config.Subscribe<int>("test", [&device](const string& key, int value)
{ {
LOG(INFO) << "[callback] Updating device parameter " << key << " = " << value; if(key == "chans.data.0.rcvBufSize")
device.fChannels.at("data").at(0).UpdateRcvBufSize(value); {
LOG(INFO) << "[callback] Updating device parameter " << key << " = " << value;
device.fChannels.at("data").at(0).UpdateRcvBufSize(value);
}
}); });
LOG(INFO) << "Subscribing: <double>(data-rate)"; LOG(INFO) << "Subscribing: <double>(data-rate)";
config.Subscribe<double>("data-rate", [&device](const string& key, double value) config.Subscribe<double>("test", [&device](const string& key, double value)
{ {
LOG(INFO) << "[callback] Updating device parameter " << key << " = " << value; if (key == "data-rate")
device.SetRate(value); {
LOG(INFO) << "[callback] Updating device parameter " << key << " = " << value;
device.SetRate(value);
}
}); });
LOG(INFO) << "Starting value updates...\n"; LOG(INFO) << "Starting value updates...\n";
@ -134,6 +143,11 @@ int main(int argc, char** argv)
LOG(INFO) << "device: " << device.GetRate() << endl; LOG(INFO) << "device: " << device.GetRate() << endl;
// device.Print(); // device.Print();
LOG(INFO) << "nase: " << config.GetValue<double>("nase");
config.Unsubscribe<string>("test");
config.Unsubscribe<int>("test");
config.Unsubscribe<double>("test");
// advanced commands // advanced commands
// LOG(INFO) << "-------------------- start custom 1"; // LOG(INFO) << "-------------------- start custom 1";

View File

@ -57,4 +57,35 @@ TEST_F(PluginServices, KeyDiscovery)
EXPECT_TRUE(find(keys.begin(), keys.end(), "foo") != keys.end()); EXPECT_TRUE(find(keys.begin(), keys.end(), "foo") != keys.end());
} }
TEST_F(PluginServices, ConfigCallbacks)
{
mServices.SubscribeToPropertyChange<string>("test", [](const string& key, string value) {
if (key == "chans.data.0.address") { ASSERT_EQ(value, "tcp://localhost:4321"); }
});
mServices.SubscribeToPropertyChange<int>("test", [](const string& key, int value) {
if(key == "chans.data.0.rcvBufSize") {
FAIL(); // should not be called because we unsubscribed
}
});
mServices.SubscribeToPropertyChange<double>("test", [](const string& key, double value) {
if (key == "data-rate") { ASSERT_EQ(value, 0.9); }
});
mServices.SubscribeToDeviceStateChange("test",[&](DeviceState newState){
switch (newState) {
case DeviceState::InitializingDevice:
mServices.SetProperty<string>("chans.data.0.address", "tcp://localhost:4321");
mServices.SetProperty<int>("chans.data.0.rcvBufSize", 100);
mServices.SetProperty<double>("data-rate", 0.9);
break;
default:
break;
}
});
mServices.UnsubscribeFromPropertyChange<int>("test");
}
} /* namespace */ } /* namespace */