Fix race in plugin manager/services

This commit is contained in:
Alexey Rybalchenko 2018-07-25 15:16:23 +02:00 committed by Dennis Klein
parent a53ef79552
commit ee8afd7d2b
14 changed files with 204 additions and 250 deletions

View File

@ -223,6 +223,7 @@ target_link_libraries(FairMQ
INTERFACE # only consumers link against interface dependencies
PUBLIC # libFairMQ AND consumers of libFairMQ link aginst public dependencies
pthread
dl
Boost::boost
Boost::program_options

View File

@ -13,10 +13,10 @@
using namespace fair::mq;
DeviceRunner::DeviceRunner(int argc, char* const argv[])
: fRawCmdLineArgs(tools::ToStrVector(argc, argv, false))
, fPluginManager(PluginManager::MakeFromCommandLineOptions(fRawCmdLineArgs))
: fDevice(nullptr)
, fRawCmdLineArgs(tools::ToStrVector(argc, argv, false))
, fConfig()
, fDevice(nullptr)
, fPluginManager(fRawCmdLineArgs)
, fEvents()
{}
@ -27,16 +27,16 @@ auto DeviceRunner::Run() -> int
////////////////////////
// Load builtin plugins last
fPluginManager->LoadPlugin("s:control");
fPluginManager.LoadPlugin("s:control");
////// CALL HOOK ///////
fEvents.Emit<hooks::SetCustomCmdLineOptions>(*this);
////////////////////////
fPluginManager->ForEachPluginProgOptions([&](boost::program_options::options_description options){
fPluginManager.ForEachPluginProgOptions([&](boost::program_options::options_description options){
fConfig.AddToCmdLineOptions(options);
});
fConfig.AddToCmdLineOptions(fPluginManager->ProgramOptions());
fConfig.AddToCmdLineOptions(fPluginManager.ProgramOptions());
////// CALL HOOK ///////
fEvents.Emit<hooks::ModifyRawCmdLineArgs>(*this);
@ -83,16 +83,16 @@ auto DeviceRunner::Run() -> int
fDevice->SetConfig(fConfig);
// Initialize plugin services
fPluginManager->EmplacePluginServices(&fConfig, fDevice);
fPluginManager.EmplacePluginServices(&fConfig, *fDevice);
// Instantiate and run plugins
fPluginManager->InstantiatePlugins();
fPluginManager.InstantiatePlugins();
// Run the device
fDevice->RunStateMachine();
// Wait for control plugin to release device control
fPluginManager->WaitForPluginsToReleaseDeviceControl();
fPluginManager.WaitForPluginsToReleaseDeviceControl();
return 0;
}

View File

@ -61,10 +61,10 @@ class DeviceRunner
template<typename H>
auto RemoveHook() -> void { fEvents.Unsubscribe<H>("runner"); }
std::unique_ptr<FairMQDevice> fDevice;
std::vector<std::string> fRawCmdLineArgs;
std::shared_ptr<PluginManager> fPluginManager;
FairMQProgOptions fConfig;
std::shared_ptr<FairMQDevice> fDevice;
PluginManager fPluginManager;
private:
EventManager fEvents;

View File

@ -119,34 +119,34 @@ static array<string, 12> eventNames =
static map<string, int> stateNumbers =
{
{ "OK", FairMQStateMachine::State::OK },
{ "Error", FairMQStateMachine::State::Error },
{ "IDLE", FairMQStateMachine::State::IDLE },
{ "OK", FairMQStateMachine::State::OK },
{ "Error", FairMQStateMachine::State::Error },
{ "IDLE", FairMQStateMachine::State::IDLE },
{ "INITIALIZING_DEVICE", FairMQStateMachine::State::INITIALIZING_DEVICE },
{ "DEVICE_READY", FairMQStateMachine::State::DEVICE_READY },
{ "INITIALIZING_TASK", FairMQStateMachine::State::INITIALIZING_TASK },
{ "READY", FairMQStateMachine::State::READY },
{ "RUNNING", FairMQStateMachine::State::RUNNING },
{ "PAUSED", FairMQStateMachine::State::PAUSED },
{ "RESETTING_TASK", FairMQStateMachine::State::RESETTING_TASK },
{ "RESETTING_DEVICE", FairMQStateMachine::State::RESETTING_DEVICE },
{ "EXITING", FairMQStateMachine::State::EXITING }
{ "DEVICE_READY", FairMQStateMachine::State::DEVICE_READY },
{ "INITIALIZING_TASK", FairMQStateMachine::State::INITIALIZING_TASK },
{ "READY", FairMQStateMachine::State::READY },
{ "RUNNING", FairMQStateMachine::State::RUNNING },
{ "PAUSED", FairMQStateMachine::State::PAUSED },
{ "RESETTING_TASK", FairMQStateMachine::State::RESETTING_TASK },
{ "RESETTING_DEVICE", FairMQStateMachine::State::RESETTING_DEVICE },
{ "EXITING", FairMQStateMachine::State::EXITING }
};
static map<string, int> eventNumbers =
{
{ "INIT_DEVICE", FairMQStateMachine::Event::INIT_DEVICE },
{ "INIT_DEVICE", FairMQStateMachine::Event::INIT_DEVICE },
{ "internal_DEVICE_READY", FairMQStateMachine::Event::internal_DEVICE_READY },
{ "INIT_TASK", FairMQStateMachine::Event::INIT_TASK },
{ "internal_READY", FairMQStateMachine::Event::internal_READY },
{ "RUN", FairMQStateMachine::Event::RUN },
{ "PAUSE", FairMQStateMachine::Event::PAUSE },
{ "STOP", FairMQStateMachine::Event::STOP },
{ "RESET_TASK", FairMQStateMachine::Event::RESET_TASK },
{ "RESET_DEVICE", FairMQStateMachine::Event::RESET_DEVICE },
{ "internal_IDLE", FairMQStateMachine::Event::internal_IDLE },
{ "END", FairMQStateMachine::Event::END },
{ "ERROR_FOUND", FairMQStateMachine::Event::ERROR_FOUND }
{ "INIT_TASK", FairMQStateMachine::Event::INIT_TASK },
{ "internal_READY", FairMQStateMachine::Event::internal_READY },
{ "RUN", FairMQStateMachine::Event::RUN },
{ "PAUSE", FairMQStateMachine::Event::PAUSE },
{ "STOP", FairMQStateMachine::Event::STOP },
{ "RESET_TASK", FairMQStateMachine::Event::RESET_TASK },
{ "RESET_DEVICE", FairMQStateMachine::Event::RESET_DEVICE },
{ "internal_IDLE", FairMQStateMachine::Event::internal_IDLE },
{ "END", FairMQStateMachine::Event::END },
{ "ERROR_FOUND", FairMQStateMachine::Event::ERROR_FOUND }
};
// defining the boost MSM state machine
@ -154,7 +154,9 @@ struct Machine_ : public state_machine_def<Machine_>
{
public:
Machine_()
: fWork()
: fUnblockHandler()
, fStateHandlers()
, fWork()
, fWorkAvailableCondition()
, fWorkDoneCondition()
, fWorkMutex()
@ -198,10 +200,10 @@ struct Machine_ : public state_machine_def<Machine_>
}
};
struct InitDeviceFct
struct DefaultFct
{
template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState& /* ss */, TargetState& ts)
void operator()(EVT const& e, FSM& fsm, SourceState& /* ss */, TargetState& ts)
{
fsm.fState = ts.Type();
@ -212,45 +214,7 @@ struct Machine_ : public state_machine_def<Machine_>
}
fsm.fWorkAvailable = true;
LOG(state) << "Entering " << ts.Name() << " state";
fsm.fWork = fsm.fInitWrapperHandler;
fsm.fWorkAvailableCondition.notify_one();
}
};
struct InitTaskFct
{
template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState& /* ss */, TargetState& ts)
{
fsm.fState = ts.Type();
unique_lock<mutex> lock(fsm.fWorkMutex);
while (fsm.fWorkActive)
{
fsm.fWorkDoneCondition.wait(lock);
}
fsm.fWorkAvailable = true;
LOG(state) << "Entering " << ts.Name() << " state";
fsm.fWork = fsm.fInitTaskWrapperHandler;
fsm.fWorkAvailableCondition.notify_one();
}
};
struct RunFct
{
template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState& /* ss */, TargetState& ts)
{
fsm.fState = ts.Type();
unique_lock<mutex> lock(fsm.fWorkMutex);
while (fsm.fWorkActive)
{
fsm.fWorkDoneCondition.wait(lock);
}
fsm.fWorkAvailable = true;
LOG(state) << "Entering " << ts.Name() << " state";
fsm.fWork = fsm.fRunWrapperHandler;
fsm.fWork = fsm.fStateHandlers.at(e.Type());
fsm.fWorkAvailableCondition.notify_one();
}
};
@ -303,48 +267,10 @@ struct Machine_ : public state_machine_def<Machine_>
}
};
struct ResetTaskFct
{
template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState& /* ss */, TargetState& ts)
{
fsm.fState = ts.Type();
unique_lock<mutex> lock(fsm.fWorkMutex);
while (fsm.fWorkActive)
{
fsm.fWorkDoneCondition.wait(lock);
}
fsm.fWorkAvailable = true;
LOG(state) << "Entering " << ts.Name() << " state";
fsm.fWork = fsm.fResetTaskWrapperHandler;
fsm.fWorkAvailableCondition.notify_one();
}
};
struct ResetDeviceFct
{
template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState& /* ss */, TargetState& ts)
{
fsm.fState = ts.Type();
unique_lock<mutex> lock(fsm.fWorkMutex);
while (fsm.fWorkActive)
{
fsm.fWorkDoneCondition.wait(lock);
}
fsm.fWorkAvailable = true;
LOG(state) << "Entering " << ts.Name() << " state";
fsm.fWork = fsm.fResetWrapperHandler;
fsm.fWorkAvailableCondition.notify_one();
}
};
struct ExitingFct
{
template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState& /* ss */, TargetState& ts)
void operator()(EVT const& e, FSM& fsm, SourceState& /* ss */, TargetState& ts)
{
LOG(state) << "Entering " << ts.Name() << " state";
fsm.fState = ts.Type();
@ -357,7 +283,7 @@ struct Machine_ : public state_machine_def<Machine_>
fsm.fWorkAvailableCondition.notify_one();
}
fsm.fExitHandler();
fsm.fStateHandlers.at(e.Type())();
}
};
@ -375,18 +301,18 @@ struct Machine_ : public state_machine_def<Machine_>
// Transition table for Machine_
struct transition_table : boost::mpl::vector<
// Start Event Next Action Guard
Row<IDLE_FSM_STATE, INIT_DEVICE_FSM_EVENT, INITIALIZING_DEVICE_FSM_STATE, InitDeviceFct, none>,
Row<IDLE_FSM_STATE, INIT_DEVICE_FSM_EVENT, INITIALIZING_DEVICE_FSM_STATE, DefaultFct, none>,
Row<IDLE_FSM_STATE, END_FSM_EVENT, EXITING_FSM_STATE, ExitingFct, none>,
Row<INITIALIZING_DEVICE_FSM_STATE, internal_DEVICE_READY_FSM_EVENT, DEVICE_READY_FSM_STATE, AutomaticFct, none>,
Row<DEVICE_READY_FSM_STATE, INIT_TASK_FSM_EVENT, INITIALIZING_TASK_FSM_STATE, InitTaskFct, none>,
Row<DEVICE_READY_FSM_STATE, RESET_DEVICE_FSM_EVENT, RESETTING_DEVICE_FSM_STATE, ResetDeviceFct, none>,
Row<DEVICE_READY_FSM_STATE, INIT_TASK_FSM_EVENT, INITIALIZING_TASK_FSM_STATE, DefaultFct, none>,
Row<DEVICE_READY_FSM_STATE, RESET_DEVICE_FSM_EVENT, RESETTING_DEVICE_FSM_STATE, DefaultFct, none>,
Row<INITIALIZING_TASK_FSM_STATE, internal_READY_FSM_EVENT, READY_FSM_STATE, AutomaticFct, none>,
Row<READY_FSM_STATE, RUN_FSM_EVENT, RUNNING_FSM_STATE, RunFct, none>,
Row<READY_FSM_STATE, RESET_TASK_FSM_EVENT, RESETTING_TASK_FSM_STATE, ResetTaskFct, none>,
Row<RUNNING_FSM_STATE, PAUSE_FSM_EVENT, PAUSED_FSM_STATE, PauseFct, none>,
Row<READY_FSM_STATE, RUN_FSM_EVENT, RUNNING_FSM_STATE, DefaultFct, none>,
Row<READY_FSM_STATE, RESET_TASK_FSM_EVENT, RESETTING_TASK_FSM_STATE, DefaultFct, none>,
Row<RUNNING_FSM_STATE, PAUSE_FSM_EVENT, PAUSED_FSM_STATE, DefaultFct, none>,
Row<RUNNING_FSM_STATE, STOP_FSM_EVENT, READY_FSM_STATE, StopFct, none>,
Row<RUNNING_FSM_STATE, internal_READY_FSM_EVENT, READY_FSM_STATE, InternalStopFct, none>,
Row<PAUSED_FSM_STATE, RUN_FSM_EVENT, RUNNING_FSM_STATE, RunFct, none>,
Row<PAUSED_FSM_STATE, RUN_FSM_EVENT, RUNNING_FSM_STATE, DefaultFct, none>,
Row<RESETTING_TASK_FSM_STATE, internal_DEVICE_READY_FSM_EVENT, DEVICE_READY_FSM_STATE, AutomaticFct, none>,
Row<RESETTING_DEVICE_FSM_STATE, internal_IDLE_FSM_EVENT, IDLE_FSM_STATE, AutomaticFct, none>,
Row<OK_FSM_STATE, ERROR_FOUND_FSM_EVENT, ERROR_FSM_STATE, ErrorFoundFct, none>>
@ -425,14 +351,8 @@ struct Machine_ : public state_machine_def<Machine_>
}
}
function<void(void)> fInitWrapperHandler;
function<void(void)> fInitTaskWrapperHandler;
function<void(void)> fRunWrapperHandler;
function<void(void)> fPauseWrapperHandler;
function<void(void)> fResetWrapperHandler;
function<void(void)> fResetTaskWrapperHandler;
function<void(void)> fExitHandler;
function<void(void)> fUnblockHandler;
unordered_map<FairMQStateMachine::Event, function<void(void)>> fStateHandlers;
// function to execute user states in a worker thread
function<void(void)> fWork;
@ -457,7 +377,7 @@ struct Machine_ : public state_machine_def<Machine_>
// Wait for work to be done.
while (!fWorkAvailable && !fWorkerTerminated)
{
fWorkAvailableCondition.wait_for(lock, chrono::milliseconds(300));
fWorkAvailableCondition.wait_for(lock, chrono::milliseconds(100));
}
if (fWorkerTerminated)
@ -493,13 +413,13 @@ FairMQStateMachine::FairMQStateMachine()
: fChangeStateMutex()
, fFsm(new FairMQFSM)
{
static_pointer_cast<FairMQFSM>(fFsm)->fInitWrapperHandler = bind(&FairMQStateMachine::InitWrapper, this);
static_pointer_cast<FairMQFSM>(fFsm)->fInitTaskWrapperHandler = bind(&FairMQStateMachine::InitTaskWrapper, this);
static_pointer_cast<FairMQFSM>(fFsm)->fRunWrapperHandler = bind(&FairMQStateMachine::RunWrapper, this);
static_pointer_cast<FairMQFSM>(fFsm)->fPauseWrapperHandler = bind(&FairMQStateMachine::PauseWrapper, this);
static_pointer_cast<FairMQFSM>(fFsm)->fResetWrapperHandler = bind(&FairMQStateMachine::ResetWrapper, this);
static_pointer_cast<FairMQFSM>(fFsm)->fResetTaskWrapperHandler = bind(&FairMQStateMachine::ResetTaskWrapper, this);
static_pointer_cast<FairMQFSM>(fFsm)->fExitHandler = bind(&FairMQStateMachine::Exit, this);
static_pointer_cast<FairMQFSM>(fFsm)->fStateHandlers.emplace(INIT_DEVICE, bind(&FairMQStateMachine::InitWrapper, this));
static_pointer_cast<FairMQFSM>(fFsm)->fStateHandlers.emplace(INIT_TASK, bind(&FairMQStateMachine::InitTaskWrapper, this));
static_pointer_cast<FairMQFSM>(fFsm)->fStateHandlers.emplace(RUN, bind(&FairMQStateMachine::RunWrapper, this));
static_pointer_cast<FairMQFSM>(fFsm)->fStateHandlers.emplace(PAUSE, bind(&FairMQStateMachine::PauseWrapper, this));
static_pointer_cast<FairMQFSM>(fFsm)->fStateHandlers.emplace(RESET_TASK, bind(&FairMQStateMachine::ResetTaskWrapper, this));
static_pointer_cast<FairMQFSM>(fFsm)->fStateHandlers.emplace(RESET_DEVICE, bind(&FairMQStateMachine::ResetWrapper, this));
static_pointer_cast<FairMQFSM>(fFsm)->fStateHandlers.emplace(END, bind(&FairMQStateMachine::Exit, this));
static_pointer_cast<FairMQFSM>(fFsm)->fUnblockHandler = bind(&FairMQStateMachine::Unblock, this);
static_pointer_cast<FairMQFSM>(fFsm)->start();

View File

@ -31,13 +31,55 @@ const std::string fair::mq::PluginManager::fgkLibPrefix = "FairMQPlugin_";
fair::mq::PluginManager::PluginManager()
: fSearchPaths{{"."}}
, fPluginFactories()
, fPluginServices()
, fPlugins()
, fPluginOrder()
, fPluginProgOptions()
, fPluginServices()
{
}
fair::mq::PluginManager::PluginManager(const vector<string> args)
: fSearchPaths{{"."}}
, fPluginFactories()
, fPluginServices()
, fPlugins()
, fPluginOrder()
, fPluginProgOptions()
{
// Parse command line options
auto options = ProgramOptions();
auto vm = po::variables_map{};
try
{
auto parsed = po::command_line_parser(args).options(options).allow_unregistered().run();
po::store(parsed, vm);
po::notify(vm);
} catch (const po::error& e)
{
throw ProgramOptionsParseError{ToString("Error occured while parsing the 'Plugin Manager' program options: ", e.what())};
}
// Process plugin search paths
auto append = vector<fs::path>{};
auto prepend = vector<fs::path>{};
auto searchPaths = vector<fs::path>{};
if (vm.count("plugin-search-path"))
{
for (const auto& path : vm["plugin-search-path"].as<vector<string>>())
{
if (path.substr(0, 1) == "<") { prepend.emplace_back(path.substr(1)); }
else if (path.substr(0, 1) == ">") { append.emplace_back(path.substr(1)); }
else { searchPaths.emplace_back(path); }
}
}
// Set supplied options
SetSearchPaths(searchPaths);
for(const auto& path : prepend) { PrependSearchPath(path); }
for(const auto& path : append) { AppendSearchPath(path); }
if (vm.count("plugin")) { LoadPlugins(vm["plugin"].as<vector<string>>()); }
}
auto fair::mq::PluginManager::ValidateSearchPath(const fs::path& path) -> void
{
if (path.empty()) throw BadSearchPath{"Specified path is empty."};
@ -81,46 +123,6 @@ auto fair::mq::PluginManager::ProgramOptions() -> po::options_description
return plugin_options;
}
auto fair::mq::PluginManager::MakeFromCommandLineOptions(const vector<string> args) -> shared_ptr<PluginManager>
{
// Parse command line options
auto options = ProgramOptions();
auto vm = po::variables_map{};
try
{
auto parsed = po::command_line_parser(args).options(options).allow_unregistered().run();
po::store(parsed, vm);
po::notify(vm);
} catch (const po::error& e)
{
throw ProgramOptionsParseError{ToString("Error occured while parsing the 'Plugin Manager' program options: ", e.what())};
}
// Process plugin search paths
auto append = vector<fs::path>{};
auto prepend = vector<fs::path>{};
auto searchPaths = vector<fs::path>{};
if (vm.count("plugin-search-path"))
{
for (const auto& path : vm["plugin-search-path"].as<vector<string>>())
{
if (path.substr(0, 1) == "<") { prepend.emplace_back(path.substr(1)); }
else if (path.substr(0, 1) == ">") { append.emplace_back(path.substr(1)); }
else { searchPaths.emplace_back(path); }
}
}
// Create PluginManager with supplied options
auto mgr = make_shared<PluginManager>();
mgr->SetSearchPaths(searchPaths);
for(const auto& path : prepend) { mgr->PrependSearchPath(path); }
for(const auto& path : append) { mgr->AppendSearchPath(path); }
if (vm.count("plugin")) { mgr->LoadPlugins(vm["plugin"].as<vector<string>>()); }
// Return the plugin manager and command line options, that have not been recognized.
return mgr;
}
auto fair::mq::PluginManager::LoadPlugin(const string& pluginName) -> void
{
if (pluginName.substr(0,2) == "p:")

View File

@ -47,9 +47,10 @@ namespace mq
class PluginManager
{
public:
using PluginFactory = std::shared_ptr<fair::mq::Plugin>(PluginServices&);
using PluginFactory = std::unique_ptr<fair::mq::Plugin>(PluginServices&);
PluginManager();
PluginManager(const std::vector<std::string> args);
~PluginManager()
{
@ -69,7 +70,7 @@ class PluginManager
struct PluginInstantiationError : std::runtime_error { using std::runtime_error::runtime_error; };
static auto ProgramOptions() -> boost::program_options::options_description;
static auto MakeFromCommandLineOptions(const std::vector<std::string>) -> std::shared_ptr<PluginManager>;
static auto MakeFromCommandLineOptions(const std::vector<std::string>) -> PluginManager;
struct ProgramOptionsParseError : std::runtime_error { using std::runtime_error::runtime_error; };
static auto LibPrefix() -> const std::string& { return fgkLibPrefix; }
@ -116,10 +117,10 @@ class PluginManager
static const std::string fgkLibPrefix;
std::vector<boost::filesystem::path> fSearchPaths;
std::map<std::string, std::function<PluginFactory>> fPluginFactories;
std::map<std::string, std::shared_ptr<Plugin>> fPlugins;
std::unique_ptr<PluginServices> fPluginServices;
std::map<std::string, std::unique_ptr<Plugin>> fPlugins;
std::vector<std::string> fPluginOrder;
std::map<std::string, boost::program_options::options_description> fPluginProgOptions;
std::unique_ptr<PluginServices> fPluginServices;
}; /* class PluginManager */
} /* namespace mq */

View File

@ -98,7 +98,7 @@ auto PluginServices::ChangeDeviceState(const std::string& controller, const Devi
if (fDeviceController == controller)
{
fDevice->ChangeState(fkDeviceStateTransitionMap.at(next));
fDevice.ChangeState(fkDeviceStateTransitionMap.at(next));
}
else
{

View File

@ -38,9 +38,9 @@ class PluginServices
{
public:
PluginServices() = delete;
PluginServices(FairMQProgOptions* config, std::shared_ptr<FairMQDevice> device)
: fConfig{config}
, fDevice{device}
PluginServices(FairMQProgOptions* config, FairMQDevice& device)
: fConfig(config)
, fDevice(device)
, fDeviceController()
, fDeviceControllerMutex()
, fReleaseDeviceControlCondition()
@ -114,7 +114,7 @@ class PluginServices
friend auto operator<<(std::ostream& os, const DeviceStateTransition& transition) -> std::ostream& { return os << ToStr(transition); }
/// @return current device state
auto GetCurrentDeviceState() const -> DeviceState { return fkDeviceStateMap.at(static_cast<FairMQDevice::State>(fDevice->GetCurrentState())); }
auto GetCurrentDeviceState() const -> DeviceState { return fkDeviceStateMap.at(static_cast<FairMQDevice::State>(fDevice.GetCurrentState())); }
/// @brief Become device controller
/// @param controller id
@ -160,14 +160,14 @@ class PluginServices
/// the state is running in.
auto SubscribeToDeviceStateChange(const std::string& subscriber, std::function<void(DeviceState /*newState*/)> callback) -> void
{
fDevice->SubscribeToStateChange(subscriber, [&,callback](FairMQDevice::State newState){
fDevice.SubscribeToStateChange(subscriber, [&,callback](FairMQDevice::State newState){
callback(fkDeviceStateMap.at(newState));
});
}
/// @brief Unsubscribe from device state changes
/// @param subscriber id
auto UnsubscribeFromDeviceStateChange(const std::string& subscriber) -> void { fDevice->UnsubscribeFromStateChange(subscriber); }
auto UnsubscribeFromDeviceStateChange(const std::string& subscriber) -> void { fDevice.UnsubscribeFromStateChange(subscriber); }
// Config API
struct PropertyNotFoundError : std::runtime_error { using std::runtime_error::runtime_error; };
@ -272,7 +272,7 @@ class PluginServices
private:
FairMQProgOptions* fConfig; // TODO make it a shared pointer, once old AliceO2 code is cleaned up
std::shared_ptr<FairMQDevice> fDevice;
FairMQDevice& fDevice;
boost::optional<std::string> fDeviceController;
mutable std::mutex fDeviceControllerMutex;
std::condition_variable fReleaseDeviceControlCondition;

View File

@ -17,6 +17,7 @@
#include <string>
#include <thread>
#include <atomic>
#include "FairMQDevice.h"
@ -38,7 +39,7 @@ class FairMQBenchmarkSampler : public FairMQDevice
protected:
bool fSameMessage;
int fMsgSize;
int fMsgCounter;
std::atomic<int> fMsgCounter;
int fMsgRate;
uint64_t fNumIterations;
uint64_t fMaxIterations;

View File

@ -17,12 +17,11 @@ using namespace std;
namespace
{
// ugly global state, but std::signal gives us no other choice
std::function<void(int)> gSignalHandlerClosure;
volatile sig_atomic_t gSignalStatus = 0;
extern "C" auto signal_handler(int signal) -> void
{
gSignalHandlerClosure(signal);
gSignalStatus = signal;
}
}
@ -37,10 +36,13 @@ Control::Control(const string name, const Plugin::Version version, const string
: Plugin(name, version, maintainer, homepage, pluginServices)
, fControllerThread()
, fSignalHandlerThread()
, fShutdownThread()
, fEvents()
, fEventsMutex()
, fShutdownMutex()
, fNewEvent()
, fDeviceTerminationRequested{false}
, fDeviceTerminationRequested(false)
, fHasShutdown(false)
{
try
{
@ -73,7 +75,7 @@ Control::Control(const string name, const Plugin::Version version, const string
LOG(debug) << "catch-signals: " << GetProperty<int>("catch-signals");
if (GetProperty<int>("catch-signals") > 0)
{
gSignalHandlerClosure = bind(&Control::SignalHandler, this, placeholders::_1);
fSignalHandlerThread = thread(&Control::SignalHandler, this);
signal(SIGINT, signal_handler);
signal(SIGTERM, signal_handler);
}
@ -263,69 +265,92 @@ auto Control::StaticMode() -> void
}
}
auto Control::SignalHandler(int signal) -> void
auto Control::SignalHandler() -> void
{
if (!fDeviceTerminationRequested)
while (true)
{
fDeviceTerminationRequested = true;
StealDeviceControl();
LOG(info) << "Received device shutdown request (signal " << signal << ").";
LOG(info) << "Waiting for graceful device shutdown. Hit Ctrl-C again to abort immediately.";
UnsubscribeFromDeviceStateChange(); // In case, static or interactive mode have subscribed already
SubscribeToDeviceStateChange([&](DeviceState newState)
if (gSignalStatus != 0 && !fHasShutdown)
{
{
lock_guard<mutex> lock{fEventsMutex};
fEvents.push(newState);
}
fNewEvent.notify_one();
});
LOG(info) << "Received device shutdown request (signal " << gSignalStatus << ").";
LOG(info) << "Waiting for graceful device shutdown. Hit Ctrl-C again to abort immediately.";
fSignalHandlerThread = thread(&Control::RunShutdownSequence, this);
if (!fDeviceTerminationRequested)
{
fDeviceTerminationRequested = true;
gSignalStatus = 0;
fShutdownThread = thread(&Control::HandleShutdownSignal, this);
}
else
{
LOG(warn) << "Received 2nd device shutdown request (signal " << gSignalStatus << ").";
LOG(warn) << "Aborting immediately!";
abort();
}
}
else if (fHasShutdown)
{
break;
}
this_thread::sleep_for(chrono::milliseconds(100));
}
else
}
auto Control::HandleShutdownSignal() -> void
{
StealDeviceControl();
UnsubscribeFromDeviceStateChange(); // In case, static or interactive mode have subscribed already
SubscribeToDeviceStateChange([&](DeviceState newState)
{
LOG(warn) << "Received 2nd device shutdown request (signal " << signal << ").";
LOG(warn) << "Aborting immediately !";
abort();
}
{
lock_guard<mutex> lock{fEventsMutex};
fEvents.push(newState);
}
fNewEvent.notify_one();
});
RunShutdownSequence();
}
auto Control::RunShutdownSequence() -> void
{
auto nextState = GetCurrentDeviceState();
EmptyEventQueue();
while (nextState != DeviceState::Exiting)
lock_guard<mutex> lock(fShutdownMutex);
if (!fHasShutdown)
{
switch (nextState)
auto nextState = GetCurrentDeviceState();
EmptyEventQueue();
while (nextState != DeviceState::Exiting)
{
case DeviceState::Idle:
ChangeDeviceState(DeviceStateTransition::End);
break;
case DeviceState::DeviceReady:
ChangeDeviceState(DeviceStateTransition::ResetDevice);
break;
case DeviceState::Ready:
ChangeDeviceState(DeviceStateTransition::ResetTask);
break;
case DeviceState::Running:
ChangeDeviceState(DeviceStateTransition::Stop);
break;
case DeviceState::Paused:
ChangeDeviceState(DeviceStateTransition::Resume);
break;
default:
break;
switch (nextState)
{
case DeviceState::Idle:
ChangeDeviceState(DeviceStateTransition::End);
break;
case DeviceState::DeviceReady:
ChangeDeviceState(DeviceStateTransition::ResetDevice);
break;
case DeviceState::Ready:
ChangeDeviceState(DeviceStateTransition::ResetTask);
break;
case DeviceState::Running:
ChangeDeviceState(DeviceStateTransition::Stop);
break;
case DeviceState::Paused:
ChangeDeviceState(DeviceStateTransition::Resume);
break;
default:
// ignore other states
break;
}
nextState = WaitForNextState();
}
nextState = WaitForNextState();
fHasShutdown = true;
UnsubscribeFromDeviceStateChange();
ReleaseDeviceControl();
}
UnsubscribeFromDeviceStateChange();
ReleaseDeviceControl();
}
auto Control::RunStartupSequence() -> void
@ -357,6 +382,7 @@ Control::~Control()
{
if (fControllerThread.joinable()) fControllerThread.join();
if (fSignalHandlerThread.joinable()) fSignalHandlerThread.join();
if (fShutdownThread.joinable()) fShutdownThread.join();
}
} /* namespace plugins */

View File

@ -37,17 +37,21 @@ class Control : public Plugin
auto PrintInteractiveHelp() -> void;
auto StaticMode() -> void;
auto WaitForNextState() -> DeviceState;
auto SignalHandler(int signal) -> void;
auto SignalHandler() -> void;
auto HandleShutdownSignal() -> void;
auto RunShutdownSequence() -> void;
auto RunStartupSequence() -> void;
auto EmptyEventQueue() -> void;
std::thread fControllerThread;
std::thread fSignalHandlerThread;
std::thread fShutdownThread;
std::queue<DeviceState> fEvents;
std::mutex fEventsMutex;
std::mutex fShutdownMutex;
std::condition_variable fNewEvent;
std::atomic<bool> fDeviceTerminationRequested;
std::atomic<bool> fHasShutdown;
}; /* class Control */
auto ControlPluginProgramOptions() -> Plugin::ProgOptions;

View File

@ -46,7 +46,7 @@ int main(int argc, char* argv[])
// });
runner.AddHook<InstantiateDevice>([](DeviceRunner& r){
r.fDevice = std::shared_ptr<FairMQDevice>{getDevice(r.fConfig)};
r.fDevice = std::unique_ptr<FairMQDevice>{getDevice(r.fConfig)};
});
return runner.Run();

View File

@ -44,7 +44,6 @@ struct PluginServices : ::testing::Test {
{
fRunStateMachineThread = std::thread(&FairMQDevice::RunStateMachine, mDevice.get());
mDevice->SetTransport("zeromq");
}
~PluginServices()

View File

@ -104,14 +104,14 @@ TEST(PluginManager, LoadPluginStatic)
TEST(PluginManager, Factory)
{
const auto args = vector<string>{"-l", "debug", "--help", "-S", ">/lib", "</home/user/lib", "/usr/local/lib", "/usr/lib"};
auto mgr = PluginManager::MakeFromCommandLineOptions(args);
PluginManager mgr(args);
const auto path1 = path{"/home/user/lib"};
const auto path2 = path{"/usr/local/lib"};
const auto path3 = path{"/usr/lib"};
const auto path4 = path{"/lib"};
const auto expected = vector<path>{path1, path2, path3, path4};
ASSERT_TRUE(static_cast<bool>(mgr));
ASSERT_TRUE(mgr->SearchPaths() == expected);
// ASSERT_TRUE(static_cast<bool>(mgr));
ASSERT_TRUE(mgr.SearchPaths() == expected);
}
TEST(PluginManager, SearchPathValidation)