FairMQ: Move static and interactive control modes to plugin (2)

This commit is contained in:
Alexey Rybalchenko 2017-09-14 12:42:07 +02:00 committed by Mohammad Al-Turany
parent 10f67e4c72
commit 334b91785b
15 changed files with 246 additions and 138 deletions

View File

@ -1265,11 +1265,6 @@ void FairMQDevice::Reset()
}
}
bool FairMQDevice::Terminated()
{
return fTerminationRequested;
}
const FairMQChannel& FairMQDevice::GetChannel(const std::string& channelName, const int index) const
{
return fChannels.at(channelName).at(index);

View File

@ -383,7 +383,6 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable
}
}
template<class T>
void OnData(const std::string& channelName, bool (T::* memberFunction)(FairMQParts& parts, int index))
{
@ -410,7 +409,10 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable
}
}
bool Terminated();
bool Terminated()
{
return fTerminationRequested;
}
const FairMQChannel& GetChannel(const std::string& channelName, const int index = 0) const;
@ -572,7 +574,7 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable
/// Signal handler
void SignalHandler(int signal);
bool fCatchingSignals;
bool fTerminationRequested;
std::atomic<bool> fTerminationRequested;
// Interactive state loop helper
std::atomic<bool> fInteractiveRunning;

View File

@ -35,7 +35,6 @@ namespace mq
class Plugin
{
public:
using ProgOptions = boost::optional<boost::program_options::options_description>;
using Version = tools::Version;
@ -72,6 +71,7 @@ class Plugin
auto ChangeDeviceState(const DeviceStateTransition next) -> void { fPluginServices->ChangeDeviceState(fkName, next); }
auto SubscribeToDeviceStateChange(std::function<void(DeviceState)> callback) -> void { fPluginServices->SubscribeToDeviceStateChange(fkName, callback); }
auto UnsubscribeFromDeviceStateChange() -> void { fPluginServices->UnsubscribeFromDeviceStateChange(fkName); }
auto DeviceTerminated() const -> bool { return fPluginServices->DeviceTerminated(); }
// device config API
// see <fairmq/PluginServices.h> for docs
template<typename T>
@ -85,7 +85,6 @@ class Plugin
// auto UnsubscribeFromPropertyChange() -> void { fPluginServices.UnsubscribeFromPropertyChange<T>(fkName); }
private:
const std::string fkName;
const Version fkVersion;
const std::string fkMaintainer;

View File

@ -48,7 +48,6 @@ namespace mq
class PluginManager
{
public:
using PluginFactory = std::shared_ptr<fair::mq::Plugin>(PluginServices&);
PluginManager();
@ -80,7 +79,6 @@ class PluginManager
auto WaitForPluginsToReleaseDeviceControl() -> void { fPluginServices->WaitForReleaseDeviceControl(); }
private:
static auto ValidateSearchPath(const boost::filesystem::path&) -> void;
auto LoadPluginPrelinkedDynamic(const std::string& pluginName) -> void;
@ -118,7 +116,6 @@ class PluginManager
std::vector<std::string> fPluginOrder;
std::map<std::string, boost::program_options::options_description> fPluginProgOptions;
std::unique_ptr<PluginServices> fPluginServices;
}; /* class PluginManager */
} /* namespace mq */

View File

@ -139,9 +139,7 @@ auto PluginServices::ReleaseDeviceControl(const std::string& controller) -> void
}
else
{
throw DeviceControlError{tools::ToString(
"Plugin ", controller, " cannot release control because it has not taken over control."
)};
throw DeviceControlError{tools::ToString("Plugin ", controller, " cannot release control because it has not taken over control.")};
}
}
@ -159,7 +157,7 @@ auto PluginServices::WaitForReleaseDeviceControl() -> void
{
unique_lock<mutex> lock{fDeviceControllerMutex};
while(GetDeviceController())
while (fDeviceController)
{
fReleaseDeviceControlCondition.wait(lock);
}

View File

@ -35,7 +35,6 @@ namespace mq
class PluginServices
{
public:
PluginServices() = delete;
PluginServices(FairMQProgOptions* config, std::shared_ptr<FairMQDevice> device)
: fDevice{device}
@ -149,6 +148,7 @@ class PluginServices
/// @param subscriber id
auto UnsubscribeFromDeviceStateChange(const std::string& subscriber) -> void { fDevice->UnsubscribeFromStateChange(subscriber); }
auto DeviceTerminated() const -> bool { return fDevice->Terminated(); }
// Config API
@ -222,7 +222,6 @@ class PluginServices
static const std::unordered_map<DeviceStateTransition, FairMQDevice::Event, tools::HashEnum<DeviceStateTransition>> fkDeviceStateTransitionMap;
private:
FairMQProgOptions* fConfig; // TODO make it a shared pointer, once old AliceO2 code is cleaned up
std::shared_ptr<FairMQDevice> fDevice;
boost::optional<std::string> fDeviceController;

View File

@ -16,7 +16,6 @@
#include <vector>
#include <chrono>
#include <thread>
#include "../FairMQLogger.h"
#include "../options/FairMQProgOptions.h"
@ -30,6 +29,7 @@ FairMQBenchmarkSampler::FairMQBenchmarkSampler()
, fMsgRate(1)
, fNumMsgs(0)
, fOutChannelName()
, fResetMsgCounter()
{
}
@ -46,10 +46,13 @@ void FairMQBenchmarkSampler::InitTask()
fOutChannelName = fConfig->GetValue<string>("out-channel");
}
void FairMQBenchmarkSampler::PreRun()
{
fResetMsgCounter = std::thread(&FairMQBenchmarkSampler::ResetMsgCounter, this);
}
void FairMQBenchmarkSampler::Run()
{
std::thread resetMsgCounter(&FairMQBenchmarkSampler::ResetMsgCounter, this);
uint64_t numSentMsgs = 0;
// store the channel reference to avoid traversing the map on every loop iteration
@ -108,7 +111,11 @@ void FairMQBenchmarkSampler::Run()
LOG(INFO) << "Leaving RUNNING state. Sent " << numSentMsgs << " messages in " << chrono::duration<double, milli>(tEnd - tStart).count() << "ms.";
resetMsgCounter.join();
}
void FairMQBenchmarkSampler::PostRun()
{
fResetMsgCounter.join();
}
void FairMQBenchmarkSampler::ResetMsgCounter()

View File

@ -16,6 +16,7 @@
#define FAIRMQBENCHMARKSAMPLER_H_
#include <string>
#include <thread>
#include "FairMQDevice.h"
@ -29,6 +30,9 @@ class FairMQBenchmarkSampler : public FairMQDevice
FairMQBenchmarkSampler();
virtual ~FairMQBenchmarkSampler();
void PreRun() override;
void PostRun() override;
void ResetMsgCounter();
protected:
@ -38,6 +42,7 @@ class FairMQBenchmarkSampler : public FairMQDevice
int fMsgRate;
uint64_t fNumMsgs;
std::string fOutChannelName;
std::thread fResetMsgCounter;
virtual void InitTask();
virtual void Run();

View File

@ -7,9 +7,11 @@
********************************************************************************/
#include "Control.h"
#include <chrono>
#include <thread>
#include <chrono>
#include <termios.h> // for the interactive mode
#include <poll.h> // for the interactive mode
using namespace std;
@ -20,13 +22,12 @@ namespace mq
namespace plugins
{
Control::Control(
const string name,
const Plugin::Version version,
const string maintainer,
const string homepage,
PluginServices* pluginServices)
Control::Control(const string name, const Plugin::Version version, const string maintainer, const string homepage, PluginServices* pluginServices)
: Plugin(name, version, maintainer, homepage, pluginServices)
, fControllerThread()
, fEvents()
, fEventsMutex()
, fNewEvent()
{
try
{
@ -37,21 +38,17 @@ Control::Control(
if (control == "static")
{
LOG(DEBUG) << "Running builtin controller: static";
thread t(&Control::StaticMode, this);
t.detach();
fControllerThread = thread(&Control::StaticMode, this);
}
else if (control == "interactive")
{
LOG(DEBUG) << "Running builtin controller: interactive";
thread t(&Control::InteractiveMode, this);
t.detach();
fControllerThread = thread(&Control::InteractiveMode, this);
}
else
{
LOG(ERROR) << "Unrecognized control mode '" << control << "' requested via command line. "
<< "Ignoring and falling back to interactive control mode.";
thread t(&Control::InteractiveMode, this);
t.detach();
LOG(ERROR) << "Unrecognized control mode '" << control << "' requested via command line. " << "Ignoring and falling back to static control mode.";
fControllerThread = thread(&Control::StaticMode, this);
}
}
catch (PluginServices::DeviceControlError& e)
@ -62,19 +59,142 @@ Control::Control(
auto ControlPluginProgramOptions() -> Plugin::ProgOptions
{
auto plugin_options = boost::program_options::options_description{"Control (builtin) Plugin"};
plugin_options.add_options()
auto pluginOptions = boost::program_options::options_description{"Control (builtin) Plugin"};
pluginOptions.add_options()
("ctrlmode", boost::program_options::value<string>(), "Control mode, 'static' or 'interactive'");
// should rename to --control and remove control from device options ?
return plugin_options;
return pluginOptions;
}
auto Control::InteractiveMode() -> void
{
LOG(ERROR) << "NOT YET IMPLEMENTED";
SubscribeToDeviceStateChange([&](DeviceState newState)
{
{
lock_guard<mutex> lock{fEventsMutex};
fEvents.push(newState);
}
fNewEvent.notify_one();
});
ChangeDeviceState(DeviceStateTransition::InitDevice);
while (WaitForNextState() != DeviceState::DeviceReady) {}
ChangeDeviceState(DeviceStateTransition::InitTask);
while (WaitForNextState() != DeviceState::Ready) {}
ChangeDeviceState(DeviceStateTransition::Run);
char input; // hold the user console input
pollfd cinfd[1];
cinfd[0].fd = fileno(stdin);
cinfd[0].events = POLLIN;
struct termios t;
tcgetattr(STDIN_FILENO, &t); // get the current terminal I/O structure
t.c_lflag &= ~ICANON; // disable canonical input
tcsetattr(STDIN_FILENO, TCSANOW, &t); // apply the new settings
PrintInteractiveHelp();
bool keepRunning = true;
while (keepRunning)
{
if (poll(cinfd, 1, 500))
{
if (DeviceTerminated())
{
keepRunning = false;
break;
}
cin >> input;
switch (input)
{
case 'i':
LOG(INFO) << "[i] init device";
ChangeDeviceState(DeviceStateTransition::InitDevice);
break;
case 'j':
LOG(INFO) << "[j] init task";
ChangeDeviceState(DeviceStateTransition::InitTask);
break;
case 'p':
LOG(INFO) << "[p] pause";
ChangeDeviceState(DeviceStateTransition::Pause);
break;
case 'r':
LOG(INFO) << "[r] run";
ChangeDeviceState(DeviceStateTransition::Run);
break;
case 's':
LOG(INFO) << "[s] stop";
ChangeDeviceState(DeviceStateTransition::Stop);
break;
case 't':
LOG(INFO) << "[t] reset task";
ChangeDeviceState(DeviceStateTransition::ResetTask);
break;
case 'd':
LOG(INFO) << "[d] reset device";
ChangeDeviceState(DeviceStateTransition::ResetDevice);
break;
case 'h':
LOG(INFO) << "[h] help";
PrintInteractiveHelp();
break;
// case 'x':
// LOG(INFO) << "[x] ERROR";
// ChangeDeviceState(DeviceStateTransition::ERROR_FOUND);
// break;
case 'q':
LOG(INFO) << "[q] end";
ChangeDeviceState(DeviceStateTransition::Stop);
ChangeDeviceState(DeviceStateTransition::ResetTask);
while (WaitForNextState() != DeviceState::DeviceReady) {}
ChangeDeviceState(DeviceStateTransition::ResetDevice);
while (WaitForNextState() != DeviceState::Idle) {}
ChangeDeviceState(DeviceStateTransition::End);
if (GetCurrentDeviceState() == DeviceState::Exiting)
{
keepRunning = false;
}
LOG(INFO) << "Exiting.";
break;
default:
LOG(INFO) << "Invalid input: [" << input << "]";
PrintInteractiveHelp();
break;
}
}
if (DeviceTerminated())
{
keepRunning = false;
}
}
tcgetattr(STDIN_FILENO, &t); // get the current terminal I/O structure
t.c_lflag |= ICANON; // re-enable canonical input
tcsetattr(STDIN_FILENO, TCSANOW, &t); // apply the new settings
UnsubscribeFromDeviceStateChange();
ReleaseDeviceControl();
}
auto Control::PrintInteractiveHelp() -> void
{
LOG(INFO) << "Use keys to control the state machine:";
LOG(INFO) << "[h] help, [p] pause, [r] run, [s] stop, [t] reset task, [d] reset device, [q] end, [j] init task, [i] init device";
}
auto Control::WaitForNextState() -> DeviceState
{
unique_lock<mutex> lock{fEventsMutex};
@ -82,7 +202,7 @@ auto Control::WaitForNextState() -> DeviceState
{
fNewEvent.wait(lock);
}
// lock.lock();
auto result = fEvents.front();
fEvents.pop();
return result;
@ -90,46 +210,42 @@ auto Control::WaitForNextState() -> DeviceState
auto Control::StaticMode() -> void
{
clock_t cStart = clock();
auto tStart = chrono::high_resolution_clock::now();
SubscribeToDeviceStateChange(
[&](DeviceState newState){
SubscribeToDeviceStateChange([&](DeviceState newState)
{
{
lock_guard<mutex> lock{fEventsMutex};
fEvents.push(newState);
}
fNewEvent.notify_one();
}
);
});
ChangeDeviceState(DeviceStateTransition::InitDevice);
while(WaitForNextState() != DeviceState::DeviceReady) {};
clock_t cEnd = std::clock();
auto tEnd = chrono::high_resolution_clock::now();
LOG(DEBUG) << "Init time (CPU) : " << fixed << setprecision(2) << 1000.0 * (cEnd - cStart) / CLOCKS_PER_SEC << " ms";
LOG(DEBUG) << "Init time (Wall): " << chrono::duration<double, milli>(tEnd - tStart).count() << " ms";
while (WaitForNextState() != DeviceState::DeviceReady) {}
ChangeDeviceState(DeviceStateTransition::InitTask);
while(WaitForNextState() != DeviceState::Ready) {};
while (WaitForNextState() != DeviceState::Ready) {}
ChangeDeviceState(DeviceStateTransition::Run);
// WaitForNextState();
// ChangeDeviceState(DeviceStateTransition::ResetTask);
// WaitForNextState();
// WaitForNextState();
// ChangeDeviceState(DeviceStateTransition::ResetDevice);
// WaitForNextState();
// WaitForNextState();
// ChangeDeviceState(DeviceStateTransition::End);
while(WaitForNextState() != DeviceState::Exiting) {};
LOG(WARN) << "1";
while (WaitForNextState() != DeviceState::Ready) {}
if (!DeviceTerminated())
{
ChangeDeviceState(DeviceStateTransition::ResetTask);
while (WaitForNextState() != DeviceState::DeviceReady) {}
ChangeDeviceState(DeviceStateTransition::ResetDevice);
while (WaitForNextState() != DeviceState::Idle) {}
ChangeDeviceState(DeviceStateTransition::End);
while (WaitForNextState() != DeviceState::Exiting) {}
}
UnsubscribeFromDeviceStateChange();
LOG(WARN) << "2";
ReleaseDeviceControl();
LOG(WARN) << "3";
}
Control::~Control()
{
if (fControllerThread.joinable())
{
fControllerThread.join();
}
}
} /* namespace plugins */

View File

@ -10,10 +10,12 @@
#define FAIR_MQ_PLUGINS_CONTROL
#include <fairmq/Plugin.h>
#include <condition_variable>
#include <mutex>
#include <string>
#include <queue>
#include <thread>
namespace fair
{
@ -25,25 +27,20 @@ namespace plugins
class Control : public Plugin
{
public:
Control(const std::string name, const Plugin::Version version, const std::string maintainer, const std::string homepage, PluginServices* pluginServices);
Control(
const std::string name,
const Plugin::Version version,
const std::string maintainer,
const std::string homepage,
PluginServices* pluginServices
);
~Control();
private:
auto InteractiveMode() -> void;
auto PrintInteractiveHelp() -> void;
auto StaticMode() -> void;
auto WaitForNextState() -> DeviceState;
std::thread fControllerThread;
std::queue<DeviceState> fEvents;
std::mutex fEventsMutex;
std::condition_variable fNewEvent;
}; /* class Control */
auto ControlPluginProgramOptions() -> Plugin::ProgOptions;

View File

@ -117,9 +117,7 @@ int main(int argc, const char** argv)
pluginManager->InstantiatePlugins();
// Wait for control plugin to release device control
LOG(ERROR) << "1";
pluginManager->WaitForPluginsToReleaseDeviceControl();
LOG(ERROR) << "2";
}
catch (std::exception& e)
{

View File

@ -34,6 +34,7 @@ TEST_F(PluginServices, OnlySingleController)
ASSERT_NO_THROW(mServices.ReleaseDeviceControl("foo"));
ASSERT_FALSE(mServices.GetDeviceController());
// take control implicitely
ASSERT_NO_THROW(mServices.TakeDeviceControl("foo"));
ASSERT_NO_THROW(mServices.ChangeDeviceState("foo", DeviceStateTransition::InitDevice));
EXPECT_EQ(mServices.GetDeviceController(), string{"foo"});
@ -47,6 +48,7 @@ TEST_F(PluginServices, OnlySingleController)
TEST_F(PluginServices, Control)
{
ASSERT_EQ(mServices.GetCurrentDeviceState(), DeviceState::Idle);
ASSERT_NO_THROW(mServices.TakeDeviceControl("foo"));
ASSERT_NO_THROW(mServices.ChangeDeviceState("foo", DeviceStateTransition::InitDevice));
DeviceState nextState;

View File

@ -72,12 +72,14 @@ TEST(PluginManager, LoadPluginStatic)
auto device = make_shared<FairMQDevice>();
mgr.EmplacePluginServices(&config, device);
ASSERT_NO_THROW(mgr.LoadPlugin("s:control_static"));
device->SetTransport("zeromq");
ASSERT_NO_THROW(mgr.LoadPlugin("s:control"));
ASSERT_NO_THROW(mgr.InstantiatePlugins());
// check order
const auto expected = vector<string>{"control_static"};
const auto expected = vector<string>{"control"};
auto actual = vector<string>{};
mgr.ForEachPlugin([&](Plugin& plugin){ actual.push_back(plugin.GetName()); });
ASSERT_TRUE(actual == expected);
@ -87,7 +89,7 @@ TEST(PluginManager, LoadPluginStatic)
mgr.ForEachPluginProgOptions([&count](const options_description& d){ ++count; });
ASSERT_EQ(count, 1);
control(device);
mgr.WaitForPluginsToReleaseDeviceControl();
}
TEST(PluginManager, Factory)

View File

@ -27,21 +27,12 @@ inline int runStateMachine(TMQDevice& device, FairMQProgOptions& cfg)
std::string config = cfg.GetValue<std::string>("config");
std::string control = cfg.GetValue<std::string>("control");
std::clock_t cStart = std::clock();
auto tStart = std::chrono::high_resolution_clock::now();
device.ChangeState(TMQDevice::INIT_DEVICE);
// Wait for the binding channels to bind
device.WaitForInitialValidation();
device.WaitForEndOfState(TMQDevice::INIT_DEVICE);
std::clock_t cEnd = std::clock();
auto tEnd = std::chrono::high_resolution_clock::now();
LOG(DEBUG) << "Init time (CPU) : " << std::fixed << std::setprecision(2) << 1000.0 * (cEnd - cStart) / CLOCKS_PER_SEC << " ms";
LOG(DEBUG) << "Init time (Wall): " << std::chrono::duration<double, std::milli>(tEnd - tStart).count() << " ms";
device.ChangeState(TMQDevice::INIT_TASK);
device.WaitForEndOfState(TMQDevice::INIT_TASK);