FairMQ: Implement Take/ReleaseDeviceControl plugin API

The main function of this API is to make sure only one plugin is
controlling the device state transitions at a time. The
ReleaseDeviceControl() signal is used to implement a function
WaitForPluginsToReleaseDeviceControl() which is used to block the main
thread if a control plugin is used.
This commit is contained in:
Dennis Klein 2017-09-13 21:34:50 +02:00 committed by Mohammad Al-Turany
parent dfb2bac4bc
commit 2af3ae99eb
5 changed files with 143 additions and 16 deletions

View File

@ -67,12 +67,11 @@ class Plugin
auto ToDeviceState(const std::string& state) const -> DeviceState { return fPluginServices->ToDeviceState(state); }
auto ToStr(DeviceState state) const -> std::string { return fPluginServices->ToStr(state); }
auto GetCurrentDeviceState() const -> DeviceState { return fPluginServices->GetCurrentDeviceState(); }
auto ChangeDeviceState(const DeviceStateTransition next) -> void { fPluginServices->ChangeDeviceState(next); }
auto TakeDeviceControl() -> void { fPluginServices->TakeDeviceControl(fkName); };
auto ReleaseDeviceControl() -> void { fPluginServices->ReleaseDeviceControl(fkName); };
auto ChangeDeviceState(const DeviceStateTransition next) -> void { fPluginServices->ChangeDeviceState(fkName, next); }
auto SubscribeToDeviceStateChange(std::function<void(DeviceState)> callback) -> void { fPluginServices->SubscribeToDeviceStateChange(fkName, callback); }
auto UnsubscribeFromDeviceStateChange() -> void { fPluginServices->UnsubscribeFromDeviceStateChange(fkName); }
auto TakeControl() -> void { fPluginServices->TakeControl(fkName); };
auto ReleaseControl() -> void { fPluginServices->ReleaseControl(fkName); };
// device config API
// see <fairmq/PluginServices.h> for docs
template<typename T>

View File

@ -77,6 +77,8 @@ class PluginManager
template<typename... Args>
auto EmplacePluginServices(Args&&... args) -> void { fPluginServices = fair::mq::tools::make_unique<PluginServices>(std::forward<Args>(args)...); };
auto WaitForPluginsToReleaseDeviceControl() -> void { fPluginServices->WaitForReleaseDeviceControl(); }
private:
static auto ValidateSearchPath(const boost::filesystem::path&) -> void;

View File

@ -9,6 +9,7 @@
#include <fairmq/PluginServices.h>
using namespace fair::mq;
using namespace std;
const std::unordered_map<std::string, PluginServices::DeviceState> PluginServices::fkDeviceStateStrMap = {
{"OK", DeviceState::Ok},
@ -85,3 +86,78 @@ const std::unordered_map<PluginServices::DeviceStateTransition, FairMQDevice::Ev
{DeviceStateTransition::End, FairMQDevice::END},
{DeviceStateTransition::ErrorFound, FairMQDevice::ERROR_FOUND}
};
auto PluginServices::ChangeDeviceState(const std::string& controller, const DeviceStateTransition next) -> void
{
lock_guard<mutex> lock{fDeviceControllerMutex};
if(!fDeviceController) fDeviceController = controller;
if(fDeviceController == controller)
{
fDevice->ChangeState(fkDeviceStateTransitionMap.at(next));
}
else
{
throw DeviceControlError{tools::ToString(
"Plugin ", controller, " is not allowed to change device states. ",
"Currently, plugin ", fDeviceController, " has taken control."
)};
}
}
auto PluginServices::TakeDeviceControl(const std::string& controller) -> void
{
lock_guard<mutex> lock{fDeviceControllerMutex};
if(!fDeviceController)
{
fDeviceController = controller;
}
else if(fDeviceController == controller)
{
// nothing to do
}
else
{
throw DeviceControlError{tools::ToString(
"Plugin ", controller, " is not allowed to take over control. ",
"Currently, plugin ", fDeviceController, " has taken control."
)};
}
}
auto PluginServices::ReleaseDeviceControl(const std::string& controller) -> void
{
{
lock_guard<mutex> lock{fDeviceControllerMutex};
if(fDeviceController == controller)
{
fDeviceController = boost::none;
}
else
{
throw DeviceControlError{tools::ToString(
"Plugin ", controller, " cannot release control because it has not taken over control."
)};
}
}
fReleaseDeviceControlCondition.notify_one();
}
auto PluginServices::GetDeviceController() const -> boost::optional<std::string>
{
lock_guard<mutex> lock{fDeviceControllerMutex};
return fDeviceController;
}
auto PluginServices::WaitForReleaseDeviceControl() -> void
{
unique_lock<mutex> lock{fDeviceControllerMutex};
fReleaseDeviceControlCondition.wait(lock, [&]{ return !GetDeviceController(); });
}

View File

@ -15,6 +15,10 @@
#include <functional>
#include <string>
#include <unordered_map>
#include <boost/optional.hpp>
#include <boost/optional/optional_io.hpp>
#include <mutex>
#include <condition_variable>
namespace fair
{
@ -99,12 +103,34 @@ class PluginServices
/// @return current device state
auto GetCurrentDeviceState() const -> DeviceState { return fkDeviceStateMap.at(static_cast<FairMQDevice::State>(fDevice->GetCurrentState())); }
/// @brief Become device controller
/// @param controller id
/// @throws fair::mq::PluginServices::DeviceControlError if there is already a device controller.
///
/// Only one plugin can succeed to take control over device state transitions at a time.
auto TakeDeviceControl(const std::string& controller) -> void;
struct DeviceControlError : std::runtime_error { using std::runtime_error::runtime_error; };
/// @brief Release device controller role
/// @param controller id
/// @throws fair::mq::PluginServices::DeviceControlError if passed controller id is not the current device controller.
auto ReleaseDeviceControl(const std::string& controller) -> void;
/// @brief Get current device controller
auto GetDeviceController() const -> boost::optional<std::string>;
/// @brief Block until control is released
auto WaitForReleaseDeviceControl() -> void;
/// @brief Request a device state transition
/// @param controller id
/// @param next state transition
/// @throws fair::mq::PluginServices::DeviceControlError if control role is not currently owned by passed controller id.
///
/// The state transition may not happen immediately, but when the current state evaluates the
/// pending transition event and terminates. In other words, the device states are scheduled cooperatively.
auto ChangeDeviceState(const DeviceStateTransition next) -> void { fDevice->ChangeState(fkDeviceStateTransitionMap.at(next)); }
/// If the device control role has not been taken yet, calling this function will take over control implicitely.
auto ChangeDeviceState(const std::string& controller, const DeviceStateTransition next) -> void;
/// @brief Subscribe with a callback to device state changes
/// @param subscriber id
@ -119,9 +145,6 @@ class PluginServices
});
}
auto TakeControl(const std::string& controller) -> void { };
auto ReleaseControl(const std::string& controller) -> void { };
/// @brief Unsubscribe from device state changes
/// @param subscriber id
auto UnsubscribeFromDeviceStateChange(const std::string& subscriber) -> void { fDevice->UnsubscribeFromStateChange(subscriber); }
@ -202,6 +225,9 @@ class PluginServices
FairMQProgOptions* fConfig; // TODO make it a shared pointer, once old AliceO2 code is cleaned up
std::shared_ptr<FairMQDevice> fDevice;
boost::optional<std::string> fDeviceController;
mutable std::mutex fDeviceControllerMutex;
std::condition_variable fReleaseDeviceControlCondition;
}; /* class PluginServices */
} /* namespace mq */

View File

@ -9,7 +9,6 @@
#include "Fixture.h"
#include <condition_variable>
#include <mutex>
// #include <thread>
namespace
{
@ -19,10 +18,36 @@ using fair::mq::test::PluginServices;
using DeviceState = fair::mq::PluginServices::DeviceState;
using DeviceStateTransition = fair::mq::PluginServices::DeviceStateTransition;
TEST_F(PluginServices, OnlySingleController)
{
ASSERT_NO_THROW(mServices.TakeDeviceControl("foo"));
ASSERT_NO_THROW(mServices.TakeDeviceControl("foo")); // noop
ASSERT_THROW( // no control for bar
mServices.ChangeDeviceState("bar", DeviceStateTransition::InitDevice),
fair::mq::PluginServices::DeviceControlError
);
ASSERT_THROW( // no control for bar
mServices.ReleaseDeviceControl("bar"),
fair::mq::PluginServices::DeviceControlError
);
ASSERT_NO_THROW(mServices.ReleaseDeviceControl("foo"));
ASSERT_FALSE(mServices.GetDeviceController());
// take control implicitely
ASSERT_NO_THROW(mServices.ChangeDeviceState("foo", DeviceStateTransition::InitDevice));
EXPECT_EQ(mServices.GetDeviceController(), string{"foo"});
// park device
mDevice->WaitForEndOfState(FairMQDevice::DEVICE_READY);
mServices.ChangeDeviceState("foo", DeviceStateTransition::ResetDevice);
mDevice->WaitForEndOfState(FairMQDevice::RESET_DEVICE);
mServices.ChangeDeviceState("foo", DeviceStateTransition::End);
}
TEST_F(PluginServices, Control)
{
ASSERT_EQ(mServices.GetCurrentDeviceState(), DeviceState::Idle);
ASSERT_NO_THROW(mServices.ChangeDeviceState(DeviceStateTransition::InitDevice));
ASSERT_NO_THROW(mServices.ChangeDeviceState("foo", DeviceStateTransition::InitDevice));
DeviceState nextState;
condition_variable cv;
@ -38,16 +63,15 @@ TEST_F(PluginServices, Control)
mServices.UnsubscribeFromDeviceStateChange("test");
}
});
{
unique_lock<mutex> lock{cv_m};
cv.wait(lock);
}
unique_lock<mutex> lock{cv_m};
cv.wait(lock);
ASSERT_EQ(mServices.GetCurrentDeviceState(), DeviceState::DeviceReady);
mServices.ChangeDeviceState(DeviceStateTransition::ResetDevice);
mServices.ChangeDeviceState("foo", DeviceStateTransition::ResetDevice);
mDevice->WaitForEndOfState(FairMQDevice::RESET_DEVICE);
mServices.ChangeDeviceState(DeviceStateTransition::End);
mServices.ChangeDeviceState("foo", DeviceStateTransition::End);
}
TEST_F(PluginServices, ControlStateConversions)