mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 00:31:14 +00:00
FairMQ: Implement PluginServices - Control
This commit is contained in:
parent
9b61b924b2
commit
739460b2fe
|
@ -227,7 +227,6 @@ void FairMQDevice::InitWrapper()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!fStateChangeCallback.empty())
|
if (!fStateChangeCallback.empty())
|
||||||
{
|
{
|
||||||
fStateChangeCallback(INITIALIZING_DEVICE);
|
fStateChangeCallback(INITIALIZING_DEVICE);
|
||||||
|
|
|
@ -226,8 +226,13 @@ bool FairMQStateMachine::WaitForEndOfStateForMs(std::string event, int durationI
|
||||||
return WaitForEndOfStateForMs(GetEventNumber(event), durationInMs);
|
return WaitForEndOfStateForMs(GetEventNumber(event), durationInMs);
|
||||||
}
|
}
|
||||||
|
|
||||||
void FairMQStateMachine::OnStateChange(std::function<void(const State)> callback)
|
void FairMQStateMachine::OnStateChange(const std::string& key, std::function<void(const State)> callback)
|
||||||
{
|
{
|
||||||
fStateChangeCallback.connect(callback);
|
fStateChangeCallbacksMap.insert({key, fStateChangeCallback.connect(callback)});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void FairMQStateMachine::UnsubscribeFromStateChange(const std::string& key)
|
||||||
|
{
|
||||||
|
fStateChangeCallbacksMap.at(key).disconnect();
|
||||||
|
//fStateChangeCallbacksMap.erase(key);
|
||||||
|
}
|
||||||
|
|
|
@ -23,6 +23,7 @@
|
||||||
#include <condition_variable>
|
#include <condition_variable>
|
||||||
#include <thread>
|
#include <thread>
|
||||||
#include <functional>
|
#include <functional>
|
||||||
|
#include <unordered_map>
|
||||||
|
|
||||||
// Increase maximum number of boost::msm states (default is 10)
|
// Increase maximum number of boost::msm states (default is 10)
|
||||||
// This #define has to be before any msm header includes
|
// This #define has to be before any msm header includes
|
||||||
|
@ -85,6 +86,7 @@ struct FairMQFSM_ : public msmf::state_machine_def<FairMQFSM_>
|
||||||
, fState()
|
, fState()
|
||||||
, fChangeStateMutex()
|
, fChangeStateMutex()
|
||||||
, fStateChangeCallback()
|
, fStateChangeCallback()
|
||||||
|
, fStateChangeCallbacksMap()
|
||||||
{}
|
{}
|
||||||
|
|
||||||
// Destructor
|
// Destructor
|
||||||
|
@ -568,6 +570,7 @@ struct FairMQFSM_ : public msmf::state_machine_def<FairMQFSM_>
|
||||||
std::mutex fChangeStateMutex;
|
std::mutex fChangeStateMutex;
|
||||||
|
|
||||||
boost::signals2::signal<void(const State)> fStateChangeCallback;
|
boost::signals2::signal<void(const State)> fStateChangeCallback;
|
||||||
|
std::unordered_map<std::string, boost::signals2::connection> fStateChangeCallbacksMap;
|
||||||
};
|
};
|
||||||
|
|
||||||
// reactivate the warning for non-virtual destructor
|
// reactivate the warning for non-virtual destructor
|
||||||
|
@ -616,7 +619,8 @@ class FairMQStateMachine : public FairMQFSM::FairMQFSM
|
||||||
bool WaitForEndOfStateForMs(int state, int durationInMs);
|
bool WaitForEndOfStateForMs(int state, int durationInMs);
|
||||||
bool WaitForEndOfStateForMs(std::string state, int durationInMs);
|
bool WaitForEndOfStateForMs(std::string state, int durationInMs);
|
||||||
|
|
||||||
void OnStateChange(std::function<void(const State)> callback);
|
void OnStateChange(const std::string&, std::function<void(const State)> callback);
|
||||||
|
void UnsubscribeFromStateChange(const std::string&);
|
||||||
};
|
};
|
||||||
|
|
||||||
#endif /* FAIRMQSTATEMACHINE_H_ */
|
#endif /* FAIRMQSTATEMACHINE_H_ */
|
||||||
|
|
|
@ -69,6 +69,16 @@ class Plugin
|
||||||
}
|
}
|
||||||
static auto NoProgramOptions() -> const boost::optional<boost::program_options::options_description> { return boost::none; }
|
static auto NoProgramOptions() -> const boost::optional<boost::program_options::options_description> { return boost::none; }
|
||||||
|
|
||||||
|
// device control API
|
||||||
|
using DeviceState = fair::mq::PluginServices::DeviceState;
|
||||||
|
using DeviceStateTransition = fair::mq::PluginServices::DeviceStateTransition;
|
||||||
|
auto ToDeviceState(const std::string& state) const -> fair::mq::PluginServices::DeviceState { return fPluginServices.ToDeviceState(state); }
|
||||||
|
auto ToStr(fair::mq::PluginServices::DeviceState state) const -> std::string { return fPluginServices.ToStr(state); }
|
||||||
|
auto GetCurrentDeviceState() const -> fair::mq::PluginServices::DeviceState { return fPluginServices.GetCurrentDeviceState(); }
|
||||||
|
auto ChangeDeviceState(const fair::mq::PluginServices::DeviceStateTransition next) -> void { fPluginServices.ChangeDeviceState(next); }
|
||||||
|
auto SubscribeToDeviceStateChange(std::function<void(fair::mq::PluginServices::DeviceState)> callback) -> void { fPluginServices.SubscribeToDeviceStateChange(fkName, callback); }
|
||||||
|
auto UnsubscribeFromDeviceStateChange() -> void { fPluginServices.UnsubscribeFromDeviceStateChange(fkName); }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
||||||
const std::string fkName;
|
const std::string fkName;
|
||||||
|
|
|
@ -11,8 +11,10 @@
|
||||||
|
|
||||||
#include <FairMQDevice.h>
|
#include <FairMQDevice.h>
|
||||||
#include <options/FairMQProgOptions.h>
|
#include <options/FairMQProgOptions.h>
|
||||||
|
#include <atomic>
|
||||||
#include <functional>
|
#include <functional>
|
||||||
#include <string>
|
#include <string>
|
||||||
|
#include <unordered_map>
|
||||||
|
|
||||||
namespace fair
|
namespace fair
|
||||||
{
|
{
|
||||||
|
@ -30,56 +32,175 @@ class PluginServices
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
|
|
||||||
|
/// See https://github.com/FairRootGroup/FairRoot/blob/dev/fairmq/docs/Device.md#13-state-machine
|
||||||
|
enum class DeviceState
|
||||||
|
{
|
||||||
|
Ok,
|
||||||
|
Error,
|
||||||
|
Idle,
|
||||||
|
InitializingDevice,
|
||||||
|
DeviceReady,
|
||||||
|
InitializingTask,
|
||||||
|
Ready,
|
||||||
|
Running,
|
||||||
|
Paused,
|
||||||
|
ResettingTask,
|
||||||
|
ResettingDevice,
|
||||||
|
Exiting
|
||||||
|
};
|
||||||
|
enum class DeviceStateTransition // transition event between DeviceStates
|
||||||
|
{
|
||||||
|
InitDevice,
|
||||||
|
InitTask,
|
||||||
|
Run,
|
||||||
|
Pause,
|
||||||
|
Stop,
|
||||||
|
ResetTask,
|
||||||
|
ResetDevice,
|
||||||
|
End,
|
||||||
|
ErrorFound
|
||||||
|
};
|
||||||
|
|
||||||
PluginServices() = delete;
|
PluginServices() = delete;
|
||||||
PluginServices(FairMQProgOptions& config, FairMQDevice& device)
|
PluginServices(FairMQProgOptions& config, FairMQDevice& device)
|
||||||
: fDevice{device}
|
: fDevice{device}
|
||||||
, fConfig{config}
|
, fConfig{config}
|
||||||
{}
|
, fConfigEnabled{false}
|
||||||
|
, fkDeviceStateStrMap{
|
||||||
|
{"Ok", DeviceState::Ok},
|
||||||
|
{"Error", DeviceState::Error},
|
||||||
|
{"Idle", DeviceState::Idle},
|
||||||
|
{"InitializingDevice", DeviceState::InitializingDevice},
|
||||||
|
{"DeviceReady", DeviceState::DeviceReady},
|
||||||
|
{"InitializingTask", DeviceState::InitializingTask},
|
||||||
|
{"Ready", DeviceState::Ready},
|
||||||
|
{"Running", DeviceState::Running},
|
||||||
|
{"Paused", DeviceState::Paused},
|
||||||
|
{"ResettingTask", DeviceState::ResettingTask},
|
||||||
|
{"ResettingDevice", DeviceState::ResettingDevice},
|
||||||
|
{"Exiting", DeviceState::Exiting}
|
||||||
|
}
|
||||||
|
, fkStrDeviceStateMap{
|
||||||
|
{DeviceState::Ok, "Ok"},
|
||||||
|
{DeviceState::Error, "Error"},
|
||||||
|
{DeviceState::Idle, "Idle"},
|
||||||
|
{DeviceState::InitializingDevice, "InitializingDevice"},
|
||||||
|
{DeviceState::DeviceReady, "DeviceReady"},
|
||||||
|
{DeviceState::InitializingTask, "InitializingTask"},
|
||||||
|
{DeviceState::Ready, "Ready"},
|
||||||
|
{DeviceState::Running, "Running"},
|
||||||
|
{DeviceState::Paused, "Paused"},
|
||||||
|
{DeviceState::ResettingTask, "ResettingTask"},
|
||||||
|
{DeviceState::ResettingDevice, "ResettingDevice"},
|
||||||
|
{DeviceState::Exiting, "Exiting"}
|
||||||
|
}
|
||||||
|
, fkDeviceStateMap{
|
||||||
|
{FairMQDevice::OK, DeviceState::Ok},
|
||||||
|
{FairMQDevice::ERROR, DeviceState::Error},
|
||||||
|
{FairMQDevice::IDLE, DeviceState::Idle},
|
||||||
|
{FairMQDevice::INITIALIZING_DEVICE, DeviceState::InitializingDevice},
|
||||||
|
{FairMQDevice::DEVICE_READY, DeviceState::DeviceReady},
|
||||||
|
{FairMQDevice::INITIALIZING_TASK, DeviceState::InitializingTask},
|
||||||
|
{FairMQDevice::READY, DeviceState::Ready},
|
||||||
|
{FairMQDevice::RUNNING, DeviceState::Running},
|
||||||
|
{FairMQDevice::PAUSED, DeviceState::Paused},
|
||||||
|
{FairMQDevice::RESETTING_TASK, DeviceState::ResettingTask},
|
||||||
|
{FairMQDevice::RESETTING_DEVICE, DeviceState::ResettingDevice},
|
||||||
|
{FairMQDevice::EXITING, DeviceState::Exiting}
|
||||||
|
}
|
||||||
|
, fkDeviceStateTransitionMap{
|
||||||
|
{DeviceStateTransition::InitDevice, FairMQDevice::INIT_DEVICE},
|
||||||
|
{DeviceStateTransition::InitTask, FairMQDevice::INIT_TASK},
|
||||||
|
{DeviceStateTransition::Run, FairMQDevice::RUN},
|
||||||
|
{DeviceStateTransition::Pause, FairMQDevice::PAUSE},
|
||||||
|
{DeviceStateTransition::Stop, FairMQDevice::STOP},
|
||||||
|
{DeviceStateTransition::ResetTask, FairMQDevice::RESET_TASK},
|
||||||
|
{DeviceStateTransition::ResetDevice, FairMQDevice::RESET_DEVICE},
|
||||||
|
{DeviceStateTransition::End, FairMQDevice::END},
|
||||||
|
{DeviceStateTransition::ErrorFound, FairMQDevice::ERROR_FOUND}
|
||||||
|
}
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
// Control
|
// Control
|
||||||
//enum class DeviceState
|
|
||||||
//{
|
|
||||||
//Error,
|
|
||||||
//Idle,
|
|
||||||
//Initializing_device,
|
|
||||||
//Device_ready,
|
|
||||||
//Initializing_task,
|
|
||||||
//Ready,
|
|
||||||
//Running,
|
|
||||||
//Paused,
|
|
||||||
//Resetting_task,
|
|
||||||
//Resetting_device,
|
|
||||||
//Exiting
|
|
||||||
//}
|
|
||||||
|
|
||||||
//auto ToDeviceState(std::string state) const -> DeviceState;
|
/// @brief Convert string to DeviceState
|
||||||
|
/// @param state to convert
|
||||||
|
/// @return DeviceState enum entry
|
||||||
|
/// @throw std::out_of_range if a string cannot be resolved to a DeviceState
|
||||||
|
auto ToDeviceState(const std::string& state) const -> DeviceState
|
||||||
|
{
|
||||||
|
return fkDeviceStateStrMap.at(state);
|
||||||
|
}
|
||||||
|
|
||||||
//auto ChangeDeviceState(DeviceState next) -> void;
|
/// @brief Convert DeviceState to string
|
||||||
|
/// @param string to convert
|
||||||
|
/// @return string representation of DeviceState enum entry
|
||||||
|
auto ToStr(DeviceState state) const -> std::string
|
||||||
|
{
|
||||||
|
return fkStrDeviceStateMap.at(state);
|
||||||
|
}
|
||||||
|
|
||||||
//auto SubscribeToDeviceStateChange(std::function<void(DeviceState [>new<])> callback) -> void;
|
/// @return current device state
|
||||||
//auto UnsubscribeFromDeviceChange() -> void;
|
auto GetCurrentDeviceState() const -> DeviceState
|
||||||
|
{
|
||||||
|
return fkDeviceStateMap.at(static_cast<FairMQDevice::State>(fDevice.GetCurrentState()));
|
||||||
|
}
|
||||||
|
|
||||||
//// Configuration
|
/// @brief Trigger a device state transition
|
||||||
|
/// @param next state transition
|
||||||
|
///
|
||||||
|
/// The state transition may not happen immediately, but when the current state evaluates the
|
||||||
|
/// pending transition event and terminates. In other words, the device states are scheduled cooperatively.
|
||||||
|
auto ChangeDeviceState(const DeviceStateTransition next) -> void
|
||||||
|
{
|
||||||
|
fDevice.ChangeState(fkDeviceStateTransitionMap.at(next));
|
||||||
|
}
|
||||||
|
|
||||||
//// Writing only works during Initializing_device state
|
/// @brief Subscribe with a callback to device state changes
|
||||||
//template<typename T>
|
/// @param InputMsgCallback
|
||||||
//auto SetProperty(const std::string& key, T val) -> void;
|
///
|
||||||
|
/// The callback will be called at the beginning of a new state. The callback is called from the thread
|
||||||
|
/// the state is running in.
|
||||||
|
auto SubscribeToDeviceStateChange(const std::string& key, std::function<void(DeviceState /*newState*/)> callback) -> void
|
||||||
|
{
|
||||||
|
fDevice.OnStateChange(key, [&,callback](FairMQDevice::State newState){
|
||||||
|
callback(fkDeviceStateMap.at(newState));
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
//template<typename T>
|
auto UnsubscribeFromDeviceStateChange(const std::string& key) -> void
|
||||||
//auto GetProperty(const std::string& key) const -> T;
|
{
|
||||||
//auto GetPropertyAsString(const std::string& key) const -> std::string;
|
fDevice.UnsubscribeFromStateChange(key);
|
||||||
|
}
|
||||||
|
|
||||||
//template<typename T>
|
|
||||||
//auto SubscribeToPropertyChange(
|
//// Configuration
|
||||||
//const std::string& key,
|
|
||||||
//std::function<void(const std::string& [>key*/, const T /*newValue<])> callback
|
//// Writing only works during Initializing_device state
|
||||||
//) const -> void;
|
//template<typename T>
|
||||||
//auto UnsubscribeFromPropertyChange(const std::string& key) -> void;
|
//auto SetProperty(const std::string& key, T val) -> void;
|
||||||
|
|
||||||
|
//template<typename T>
|
||||||
|
//auto GetProperty(const std::string& key) const -> T;
|
||||||
|
//auto GetPropertyAsString(const std::string& key) const -> std::string;
|
||||||
|
|
||||||
|
//template<typename T>
|
||||||
|
//auto SubscribeToPropertyChange(
|
||||||
|
//const std::string& key,
|
||||||
|
//std::function<void(const std::string& [>key*/, const T /*newValue<])> callback
|
||||||
|
//) const -> void;
|
||||||
|
//auto UnsubscribeFromPropertyChange(const std::string& key) -> void;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
||||||
FairMQDevice& fDevice;
|
|
||||||
FairMQProgOptions& fConfig;
|
FairMQProgOptions& fConfig;
|
||||||
|
FairMQDevice& fDevice;
|
||||||
|
std::atomic<bool> fConfigEnabled;
|
||||||
|
const std::unordered_map<std::string, DeviceState> fkDeviceStateStrMap;
|
||||||
|
const std::unordered_map<DeviceState, std::string> fkStrDeviceStateMap;
|
||||||
|
const std::unordered_map<FairMQDevice::State, DeviceState> fkDeviceStateMap;
|
||||||
|
const std::unordered_map<DeviceStateTransition, FairMQDevice::Event> fkDeviceStateTransitionMap;
|
||||||
}; /* class PluginServices */
|
}; /* class PluginServices */
|
||||||
|
|
||||||
} /* namespace mq */
|
} /* namespace mq */
|
||||||
|
|
|
@ -26,22 +26,33 @@ class DummyPlugin : public fair::mq::Plugin
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
|
|
||||||
DummyPlugin(const std::string name, const Version version, const std::string maintainer, const std::string homepage, PluginServices& pluginServices)
|
DummyPlugin(
|
||||||
|
const std::string name,
|
||||||
|
const Version version,
|
||||||
|
const std::string maintainer,
|
||||||
|
const std::string homepage,
|
||||||
|
PluginServices& pluginServices)
|
||||||
: Plugin(name, version, maintainer, homepage, pluginServices)
|
: Plugin(name, version, maintainer, homepage, pluginServices)
|
||||||
{
|
{
|
||||||
|
SubscribeToDeviceStateChange(
|
||||||
|
[&](DeviceState newState){
|
||||||
|
switch (newState)
|
||||||
|
{
|
||||||
|
case DeviceState::Exiting:
|
||||||
|
UnsubscribeFromDeviceStateChange();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
}; /* class DummyPlugin */
|
}; /* class DummyPlugin */
|
||||||
|
|
||||||
auto DummyPluginProgramOptions() -> const boost::optional<boost::program_options::options_description>
|
auto DummyPluginProgramOptions() -> const boost::optional<boost::program_options::options_description>
|
||||||
{
|
{
|
||||||
using namespace boost::program_options;
|
auto plugin_options = boost::program_options::options_description{"Dummy Plugin"};
|
||||||
using std::string;
|
|
||||||
|
|
||||||
auto plugin_options = options_description{"Dummy Plugin"};
|
|
||||||
plugin_options.add_options()
|
plugin_options.add_options()
|
||||||
("custom-dummy-option", value<string>(), "Cool custom option.");
|
("custom-dummy-option", value<std::string>(), "Cool custom option.");
|
||||||
("custom-dummy-option2", value<string>(), "Another cool custom option.");
|
("custom-dummy-option2", value<std::string>(), "Another cool custom option.");
|
||||||
return plugin_options;
|
return plugin_options;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -36,9 +36,9 @@ auto control(FairMQDevice& device) -> void
|
||||||
|
|
||||||
TEST(Plugin, Operators)
|
TEST(Plugin, Operators)
|
||||||
{
|
{
|
||||||
auto config = FairMQProgOptions{};
|
FairMQProgOptions config{};
|
||||||
FairMQDevice device{};
|
FairMQDevice device{};
|
||||||
auto services = PluginServices{config, device};
|
PluginServices services{config, device};
|
||||||
auto p1 = Plugin{"dds", {1, 0, 0}, "Foo Bar <foo.bar@test.net>", "https://git.test.net/dds.git", services};
|
auto p1 = Plugin{"dds", {1, 0, 0}, "Foo Bar <foo.bar@test.net>", "https://git.test.net/dds.git", services};
|
||||||
auto p2 = Plugin{"dds", {1, 0, 0}, "Foo Bar <foo.bar@test.net>", "https://git.test.net/dds.git", services};
|
auto p2 = Plugin{"dds", {1, 0, 0}, "Foo Bar <foo.bar@test.net>", "https://git.test.net/dds.git", services};
|
||||||
auto p3 = Plugin{"file", {1, 0, 0}, "Foo Bar <foo.bar@test.net>", "https://git.test.net/file.git", services};
|
auto p3 = Plugin{"file", {1, 0, 0}, "Foo Bar <foo.bar@test.net>", "https://git.test.net/file.git", services};
|
||||||
|
@ -49,9 +49,9 @@ TEST(Plugin, Operators)
|
||||||
|
|
||||||
TEST(Plugin, OstreamOperators)
|
TEST(Plugin, OstreamOperators)
|
||||||
{
|
{
|
||||||
auto config = FairMQProgOptions{};
|
FairMQProgOptions config{};
|
||||||
FairMQDevice device{};
|
FairMQDevice device{};
|
||||||
auto services = PluginServices{config, device};
|
PluginServices services{config, device};
|
||||||
auto p1 = Plugin{"dds", {1, 0, 0}, "Foo Bar <foo.bar@test.net>", "https://git.test.net/dds.git", services};
|
auto p1 = Plugin{"dds", {1, 0, 0}, "Foo Bar <foo.bar@test.net>", "https://git.test.net/dds.git", services};
|
||||||
stringstream ss;
|
stringstream ss;
|
||||||
ss << p1;
|
ss << p1;
|
||||||
|
|
|
@ -38,7 +38,7 @@ auto control(FairMQDevice& device) -> void
|
||||||
|
|
||||||
TEST(PluginManager, LoadPlugin)
|
TEST(PluginManager, LoadPlugin)
|
||||||
{
|
{
|
||||||
auto config = FairMQProgOptions{};
|
FairMQProgOptions config{};
|
||||||
FairMQDevice device{};
|
FairMQDevice device{};
|
||||||
auto mgr = PluginManager{};
|
auto mgr = PluginManager{};
|
||||||
mgr.EmplacePluginServices(config, device);
|
mgr.EmplacePluginServices(config, device);
|
||||||
|
|
Loading…
Reference in New Issue
Block a user