From 6ecd0e90852acfb265b78eb7a628a6c4d4bd4c7c Mon Sep 17 00:00:00 2001 From: Dennis Klein Date: Wed, 20 Sep 2017 21:23:21 +0200 Subject: [PATCH] FairMQ: Rewrite event manager to support multiple subscribers --- fairmq/CMakeLists.txt | 1 + fairmq/EventManager.h | 142 +++++++++++++++++++ fairmq/PluginServices.h | 2 +- fairmq/test/CMakeLists.txt | 9 ++ fairmq/test/event_manager/_event_manager.cxx | 64 +++++++++ fairmq/test/event_manager/runner.cxx | 16 +++ 6 files changed, 233 insertions(+), 1 deletion(-) create mode 100644 fairmq/EventManager.h create mode 100644 fairmq/test/event_manager/_event_manager.cxx create mode 100644 fairmq/test/event_manager/runner.cxx diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index 6ce822aa..22eebf41 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -63,6 +63,7 @@ set(FAIRMQ_DEPRECATED_HEADER_FILES ) set(FAIRMQ_HEADER_FILES ${FAIRMQ_DEPRECATED_HEADER_FILES} + EventManager.h FairMQChannel.h FairMQConfigurable.h FairMQDevice.h diff --git a/fairmq/EventManager.h b/fairmq/EventManager.h new file mode 100644 index 00000000..62045cbc --- /dev/null +++ b/fairmq/EventManager.h @@ -0,0 +1,142 @@ +/******************************************************************************** + * Copyright (C) 2014-2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#ifndef FAIR_MQ_EVENTMANAGER_H +#define FAIR_MQ_EVENTMANAGER_H + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +namespace fair +{ +namespace mq +{ + +// Inherit from this base event type to create custom event types +template +struct Event +{ + using KeyType = const K; +}; + +/** + * @class EventManager EventManager.h + * @brief Manages event callbacks from different subscribers + * + * The event manager stores a set of callbacks and associates them with + * events depending on the callback signature. The first callback + * argument must be of a special key type determined by the event type. + * + * Callbacks can be subscribed/unsubscribed based on a subscriber id, + * the event type, and the callback signature. + * + * Events can be emitted based on event type and callback signature. + * + * The event manager is thread-safe. + */ +class EventManager +{ + public: + template + using Callback = std::function; + template + using Signal = boost::signals2::signal; + + template + auto Subscribe(const std::string& subscriber, Callback callback) -> void + { + const std::type_index event_type_index{typeid(E)}; + const std::type_index callback_type_index{typeid(Callback)}; + const auto signalsKey = std::make_pair(event_type_index, callback_type_index); + const auto connectionsKey = std::make_pair(subscriber, signalsKey); + + const auto connection = GetSignal(signalsKey)->connect(callback); + + { + std::lock_guard lock{fMutex}; + + if (fConnections.find(connectionsKey) != fConnections.end()) + { + fConnections.at(connectionsKey).disconnect(); + fConnections.erase(connectionsKey); + } + fConnections.insert({connectionsKey, connection}); + } + } + + template + auto Unsubscribe(const std::string& subscriber) -> void + { + const std::type_index event_type_index{typeid(E)}; + const std::type_index callback_type_index{typeid(Callback)}; + const auto signalsKey = std::make_pair(event_type_index, callback_type_index); + const auto connectionsKey = std::make_pair(subscriber, signalsKey); + + std::lock_guard lock{fMutex}; + + fConnections.at(connectionsKey).disconnect(); + fConnections.erase(connectionsKey); + } + + template + auto Emit(typename E::KeyType& key, Args&&... args) const -> void + { + const std::type_index event_type_index{typeid(E)}; + const std::type_index callback_type_index{typeid(Callback)}; + const auto signalsKey = std::make_pair(event_type_index, callback_type_index); + + (*GetSignal(signalsKey))(key, std::forward(args)...); + } + + private: + using SignalsKey = std::pair; + // event , callback + using SignalsValue = boost::any; + using SignalsMap = std::unordered_map>; + mutable SignalsMap fSignals; + + using ConnectionsKey = std::pair; + // subscriber , event/callback + using ConnectionsValue = boost::signals2::connection; + using ConnectionsMap = std::unordered_map>; + ConnectionsMap fConnections; + + mutable std::mutex fMutex; + + template + auto GetSignal(const SignalsKey& key) const -> std::shared_ptr> + { + std::lock_guard lock{fMutex}; + + if (fSignals.find(key) == fSignals.end()) + { + // wrapper is needed because boost::signals2::signal is neither copyable nor movable + // and I don't know how else to insert it into the map + auto signal = std::make_shared>(); + fSignals.insert(std::make_pair(key, signal)); + } + + return boost::any_cast>>(fSignals.at(key)); + } +}; /* class EventManager */ + +} /* namespace mq */ +} /* namespace fair */ + +#endif /* FAIR_MQ_EVENTMANAGER_H */ diff --git a/fairmq/PluginServices.h b/fairmq/PluginServices.h index ba6dd042..c3bf6b33 100644 --- a/fairmq/PluginServices.h +++ b/fairmq/PluginServices.h @@ -211,7 +211,7 @@ class PluginServices /// @param callback function /// /// While PluginServices provides the SetProperty method which can update properties only during certain device states, there are - /// other methods in a FairMQ device that can update properties at any time. Therefore, the callback implementation should expect to be called in any + /// other APIs in a FairMQ device that can update properties at any time. Therefore, the callback implementation should expect to be called in any /// device state. // template // auto SubscribeToPropertyChange( diff --git a/fairmq/test/CMakeLists.txt b/fairmq/test/CMakeLists.txt index 6e00cff0..a96828be 100644 --- a/fairmq/test/CMakeLists.txt +++ b/fairmq/test/CMakeLists.txt @@ -142,6 +142,15 @@ add_testsuite(FairMQ.PluginServices TIMEOUT 10 ) +add_testsuite(FairMQ.EventManager + SOURCES + event_manager/runner.cxx + event_manager/_event_manager.cxx + + LINKS FairMQ + TIMEOUT 10 +) + ############################## # Aggregate all test targets # ############################## diff --git a/fairmq/test/event_manager/_event_manager.cxx b/fairmq/test/event_manager/_event_manager.cxx new file mode 100644 index 00000000..1bdf2a9c --- /dev/null +++ b/fairmq/test/event_manager/_event_manager.cxx @@ -0,0 +1,64 @@ +/******************************************************************************** + * Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#include +#include +#include + +namespace +{ + +using namespace std; +using namespace fair::mq; + +struct TestEvent : fair::mq::Event {}; + +TEST(EventManager, Basics) +{ + EventManager mgr{}; + int call_counter = 0; + int call_counter2 = 0; + int value = 0; + string value2; + + EventManager::Callback callback{ + [&](TestEvent::KeyType& key, int newValue){ + ++call_counter; + if (key == "test") value = newValue; + } + }; + EventManager::Callback callback2{ + [&](TestEvent::KeyType& key, string newValue){ + ++call_counter2; + if (key == "test") value2 = newValue; + } + }; + + mgr.Subscribe("foo_subscriber", callback); + // double subscription will automatically unsubscribe first + mgr.Subscribe("foo_subscriber", callback); + mgr.Emit(TestEvent::KeyType{"test"}, 42); + ASSERT_EQ(call_counter, 1); + ASSERT_EQ(value, 42); + + mgr.Unsubscribe("foo_subscriber"); + mgr.Emit(TestEvent::KeyType{"test"}, 13); + ASSERT_EQ(call_counter, 1); + + mgr.Subscribe("foo_subscriber", callback); + mgr.Subscribe("foo_subscriber", callback2); + // two different callbacks: + mgr.Emit(TestEvent::KeyType{"test"}, 9000); + mgr.Emit(TestEvent::KeyType{"test"}, string{"over 9000"}); + ASSERT_EQ(call_counter, 2); + ASSERT_EQ(value, 9000); + ASSERT_EQ(call_counter2, 1); + ASSERT_EQ(value2, "over 9000"); +} + +} // namespace diff --git a/fairmq/test/event_manager/runner.cxx b/fairmq/test/event_manager/runner.cxx new file mode 100644 index 00000000..5442845e --- /dev/null +++ b/fairmq/test/event_manager/runner.cxx @@ -0,0 +1,16 @@ +/******************************************************************************** + * Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#include + +auto main(int argc, char** argv) -> int +{ + ::testing::InitGoogleTest(&argc, argv); + ::testing::FLAGS_gtest_death_test_style = "threadsafe"; + return RUN_ALL_TESTS(); +}