mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 16:46:47 +00:00
FairMQ: Rewrite event manager to support multiple subscribers
This commit is contained in:
parent
88b3b8ef18
commit
6ecd0e9085
|
@ -63,6 +63,7 @@ set(FAIRMQ_DEPRECATED_HEADER_FILES
|
||||||
)
|
)
|
||||||
set(FAIRMQ_HEADER_FILES
|
set(FAIRMQ_HEADER_FILES
|
||||||
${FAIRMQ_DEPRECATED_HEADER_FILES}
|
${FAIRMQ_DEPRECATED_HEADER_FILES}
|
||||||
|
EventManager.h
|
||||||
FairMQChannel.h
|
FairMQChannel.h
|
||||||
FairMQConfigurable.h
|
FairMQConfigurable.h
|
||||||
FairMQDevice.h
|
FairMQDevice.h
|
||||||
|
|
142
fairmq/EventManager.h
Normal file
142
fairmq/EventManager.h
Normal file
|
@ -0,0 +1,142 @@
|
||||||
|
/********************************************************************************
|
||||||
|
* Copyright (C) 2014-2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||||
|
* *
|
||||||
|
* This software is distributed under the terms of the *
|
||||||
|
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||||
|
* copied verbatim in the file "LICENSE" *
|
||||||
|
********************************************************************************/
|
||||||
|
|
||||||
|
#ifndef FAIR_MQ_EVENTMANAGER_H
|
||||||
|
#define FAIR_MQ_EVENTMANAGER_H
|
||||||
|
|
||||||
|
#include <memory>
|
||||||
|
#include <mutex>
|
||||||
|
#include <string>
|
||||||
|
#include <tuple>
|
||||||
|
#include <typeindex>
|
||||||
|
#include <typeinfo>
|
||||||
|
#include <unordered_map>
|
||||||
|
#include <utility>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
|
#include <boost/any.hpp>
|
||||||
|
#include <boost/functional/hash.hpp>
|
||||||
|
#include <boost/signals2.hpp>
|
||||||
|
|
||||||
|
namespace fair
|
||||||
|
{
|
||||||
|
namespace mq
|
||||||
|
{
|
||||||
|
|
||||||
|
// Inherit from this base event type to create custom event types
|
||||||
|
template<typename K>
|
||||||
|
struct Event
|
||||||
|
{
|
||||||
|
using KeyType = const K;
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @class EventManager EventManager.h <fairmq/EventManager.h>
|
||||||
|
* @brief Manages event callbacks from different subscribers
|
||||||
|
*
|
||||||
|
* The event manager stores a set of callbacks and associates them with
|
||||||
|
* events depending on the callback signature. The first callback
|
||||||
|
* argument must be of a special key type determined by the event type.
|
||||||
|
*
|
||||||
|
* Callbacks can be subscribed/unsubscribed based on a subscriber id,
|
||||||
|
* the event type, and the callback signature.
|
||||||
|
*
|
||||||
|
* Events can be emitted based on event type and callback signature.
|
||||||
|
*
|
||||||
|
* The event manager is thread-safe.
|
||||||
|
*/
|
||||||
|
class EventManager
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
template<typename E, typename ...Args>
|
||||||
|
using Callback = std::function<void(typename E::KeyType, Args...)>;
|
||||||
|
template<typename E, typename ...Args>
|
||||||
|
using Signal = boost::signals2::signal<void(typename E::KeyType, Args...)>;
|
||||||
|
|
||||||
|
template<typename E, typename ...Args>
|
||||||
|
auto Subscribe(const std::string& subscriber, Callback<E, Args...> callback) -> void
|
||||||
|
{
|
||||||
|
const std::type_index event_type_index{typeid(E)};
|
||||||
|
const std::type_index callback_type_index{typeid(Callback<E, Args...>)};
|
||||||
|
const auto signalsKey = std::make_pair(event_type_index, callback_type_index);
|
||||||
|
const auto connectionsKey = std::make_pair(subscriber, signalsKey);
|
||||||
|
|
||||||
|
const auto connection = GetSignal<E, Args...>(signalsKey)->connect(callback);
|
||||||
|
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock{fMutex};
|
||||||
|
|
||||||
|
if (fConnections.find(connectionsKey) != fConnections.end())
|
||||||
|
{
|
||||||
|
fConnections.at(connectionsKey).disconnect();
|
||||||
|
fConnections.erase(connectionsKey);
|
||||||
|
}
|
||||||
|
fConnections.insert({connectionsKey, connection});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
template<typename E, typename ...Args>
|
||||||
|
auto Unsubscribe(const std::string& subscriber) -> void
|
||||||
|
{
|
||||||
|
const std::type_index event_type_index{typeid(E)};
|
||||||
|
const std::type_index callback_type_index{typeid(Callback<E, Args...>)};
|
||||||
|
const auto signalsKey = std::make_pair(event_type_index, callback_type_index);
|
||||||
|
const auto connectionsKey = std::make_pair(subscriber, signalsKey);
|
||||||
|
|
||||||
|
std::lock_guard<std::mutex> lock{fMutex};
|
||||||
|
|
||||||
|
fConnections.at(connectionsKey).disconnect();
|
||||||
|
fConnections.erase(connectionsKey);
|
||||||
|
}
|
||||||
|
|
||||||
|
template<typename E, typename ...Args>
|
||||||
|
auto Emit(typename E::KeyType& key, Args&&... args) const -> void
|
||||||
|
{
|
||||||
|
const std::type_index event_type_index{typeid(E)};
|
||||||
|
const std::type_index callback_type_index{typeid(Callback<E, Args...>)};
|
||||||
|
const auto signalsKey = std::make_pair(event_type_index, callback_type_index);
|
||||||
|
|
||||||
|
(*GetSignal<E, Args...>(signalsKey))(key, std::forward<Args>(args)...);
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
using SignalsKey = std::pair<std::type_index, std::type_index>;
|
||||||
|
// event , callback
|
||||||
|
using SignalsValue = boost::any;
|
||||||
|
using SignalsMap = std::unordered_map<SignalsKey, SignalsValue, boost::hash<SignalsKey>>;
|
||||||
|
mutable SignalsMap fSignals;
|
||||||
|
|
||||||
|
using ConnectionsKey = std::pair<std::string, SignalsKey>;
|
||||||
|
// subscriber , event/callback
|
||||||
|
using ConnectionsValue = boost::signals2::connection;
|
||||||
|
using ConnectionsMap = std::unordered_map<ConnectionsKey, ConnectionsValue, boost::hash<ConnectionsKey>>;
|
||||||
|
ConnectionsMap fConnections;
|
||||||
|
|
||||||
|
mutable std::mutex fMutex;
|
||||||
|
|
||||||
|
template<typename E, typename ...Args>
|
||||||
|
auto GetSignal(const SignalsKey& key) const -> std::shared_ptr<Signal<E, Args...>>
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock{fMutex};
|
||||||
|
|
||||||
|
if (fSignals.find(key) == fSignals.end())
|
||||||
|
{
|
||||||
|
// wrapper is needed because boost::signals2::signal is neither copyable nor movable
|
||||||
|
// and I don't know how else to insert it into the map
|
||||||
|
auto signal = std::make_shared<Signal<E, Args...>>();
|
||||||
|
fSignals.insert(std::make_pair(key, signal));
|
||||||
|
}
|
||||||
|
|
||||||
|
return boost::any_cast<std::shared_ptr<Signal<E, Args...>>>(fSignals.at(key));
|
||||||
|
}
|
||||||
|
}; /* class EventManager */
|
||||||
|
|
||||||
|
} /* namespace mq */
|
||||||
|
} /* namespace fair */
|
||||||
|
|
||||||
|
#endif /* FAIR_MQ_EVENTMANAGER_H */
|
|
@ -211,7 +211,7 @@ class PluginServices
|
||||||
/// @param callback function
|
/// @param callback function
|
||||||
///
|
///
|
||||||
/// While PluginServices provides the SetProperty method which can update properties only during certain device states, there are
|
/// While PluginServices provides the SetProperty method which can update properties only during certain device states, there are
|
||||||
/// other methods in a FairMQ device that can update properties at any time. Therefore, the callback implementation should expect to be called in any
|
/// other APIs in a FairMQ device that can update properties at any time. Therefore, the callback implementation should expect to be called in any
|
||||||
/// device state.
|
/// device state.
|
||||||
// template<typename T>
|
// template<typename T>
|
||||||
// auto SubscribeToPropertyChange(
|
// auto SubscribeToPropertyChange(
|
||||||
|
|
|
@ -142,6 +142,15 @@ add_testsuite(FairMQ.PluginServices
|
||||||
TIMEOUT 10
|
TIMEOUT 10
|
||||||
)
|
)
|
||||||
|
|
||||||
|
add_testsuite(FairMQ.EventManager
|
||||||
|
SOURCES
|
||||||
|
event_manager/runner.cxx
|
||||||
|
event_manager/_event_manager.cxx
|
||||||
|
|
||||||
|
LINKS FairMQ
|
||||||
|
TIMEOUT 10
|
||||||
|
)
|
||||||
|
|
||||||
##############################
|
##############################
|
||||||
# Aggregate all test targets #
|
# Aggregate all test targets #
|
||||||
##############################
|
##############################
|
||||||
|
|
64
fairmq/test/event_manager/_event_manager.cxx
Normal file
64
fairmq/test/event_manager/_event_manager.cxx
Normal file
|
@ -0,0 +1,64 @@
|
||||||
|
/********************************************************************************
|
||||||
|
* Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||||
|
* *
|
||||||
|
* This software is distributed under the terms of the *
|
||||||
|
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||||
|
* copied verbatim in the file "LICENSE" *
|
||||||
|
********************************************************************************/
|
||||||
|
|
||||||
|
#include <gtest/gtest.h>
|
||||||
|
#include <fairmq/EventManager.h>
|
||||||
|
#include <string>
|
||||||
|
|
||||||
|
namespace
|
||||||
|
{
|
||||||
|
|
||||||
|
using namespace std;
|
||||||
|
using namespace fair::mq;
|
||||||
|
|
||||||
|
struct TestEvent : fair::mq::Event<std::string> {};
|
||||||
|
|
||||||
|
TEST(EventManager, Basics)
|
||||||
|
{
|
||||||
|
EventManager mgr{};
|
||||||
|
int call_counter = 0;
|
||||||
|
int call_counter2 = 0;
|
||||||
|
int value = 0;
|
||||||
|
string value2;
|
||||||
|
|
||||||
|
EventManager::Callback<TestEvent, int> callback{
|
||||||
|
[&](TestEvent::KeyType& key, int newValue){
|
||||||
|
++call_counter;
|
||||||
|
if (key == "test") value = newValue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
EventManager::Callback<TestEvent, string> callback2{
|
||||||
|
[&](TestEvent::KeyType& key, string newValue){
|
||||||
|
++call_counter2;
|
||||||
|
if (key == "test") value2 = newValue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
mgr.Subscribe<TestEvent, int>("foo_subscriber", callback);
|
||||||
|
// double subscription will automatically unsubscribe first
|
||||||
|
mgr.Subscribe<TestEvent, int>("foo_subscriber", callback);
|
||||||
|
mgr.Emit<TestEvent>(TestEvent::KeyType{"test"}, 42);
|
||||||
|
ASSERT_EQ(call_counter, 1);
|
||||||
|
ASSERT_EQ(value, 42);
|
||||||
|
|
||||||
|
mgr.Unsubscribe<TestEvent, int>("foo_subscriber");
|
||||||
|
mgr.Emit<TestEvent>(TestEvent::KeyType{"test"}, 13);
|
||||||
|
ASSERT_EQ(call_counter, 1);
|
||||||
|
|
||||||
|
mgr.Subscribe<TestEvent, int>("foo_subscriber", callback);
|
||||||
|
mgr.Subscribe<TestEvent, string>("foo_subscriber", callback2);
|
||||||
|
// two different callbacks:
|
||||||
|
mgr.Emit<TestEvent>(TestEvent::KeyType{"test"}, 9000);
|
||||||
|
mgr.Emit<TestEvent>(TestEvent::KeyType{"test"}, string{"over 9000"});
|
||||||
|
ASSERT_EQ(call_counter, 2);
|
||||||
|
ASSERT_EQ(value, 9000);
|
||||||
|
ASSERT_EQ(call_counter2, 1);
|
||||||
|
ASSERT_EQ(value2, "over 9000");
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace
|
16
fairmq/test/event_manager/runner.cxx
Normal file
16
fairmq/test/event_manager/runner.cxx
Normal file
|
@ -0,0 +1,16 @@
|
||||||
|
/********************************************************************************
|
||||||
|
* Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||||
|
* *
|
||||||
|
* This software is distributed under the terms of the *
|
||||||
|
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||||
|
* copied verbatim in the file "LICENSE" *
|
||||||
|
********************************************************************************/
|
||||||
|
|
||||||
|
#include <gtest/gtest.h>
|
||||||
|
|
||||||
|
auto main(int argc, char** argv) -> int
|
||||||
|
{
|
||||||
|
::testing::InitGoogleTest(&argc, argv);
|
||||||
|
::testing::FLAGS_gtest_death_test_style = "threadsafe";
|
||||||
|
return RUN_ALL_TESTS();
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user