mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 16:46:47 +00:00
Use TransitionTo in Plugins
This commit is contained in:
parent
857ef0c9d3
commit
8bb6a9518a
|
@ -35,16 +35,12 @@ echo "TOPOLOGY FILE: ${topologyFile}"
|
|||
dds-topology --disable-validation --activate ${topologyFile}
|
||||
|
||||
echo "------------------------"
|
||||
echo "Waiting for Topology to finish ..."
|
||||
echo "...waiting for Topology to finish..."
|
||||
sampler_and_sink="main/(Sampler|Sink)"
|
||||
fairmq-dds-command-ui -p $sampler_and_sink --wait-for-state "RUNNING->READY"
|
||||
echo "..."
|
||||
fairmq-dds-command-ui -c s -w READY
|
||||
fairmq-dds-command-ui -c t -w "DEVICE READY"
|
||||
fairmq-dds-command-ui -c d -w IDLE
|
||||
fairmq-dds-command-ui -c q -w EXITING
|
||||
# fairmq-dds-command-ui -c q! -w EXITING
|
||||
echo "..."
|
||||
fairmq-dds-command-ui -p $sampler_and_sink -w "RUNNING->READY"
|
||||
echo "...$sampler_and_sink are READY, sending shutdown..."
|
||||
fairmq-dds-command-ui -c q! -w "EXITING"
|
||||
echo "...waiting for ${requiredNofAgents} idle agents..."
|
||||
dds-info --wait-for-idle-agents ${requiredNofAgents}
|
||||
echo "------------------------"
|
||||
|
||||
|
|
|
@ -232,7 +232,14 @@ void FairMQDevice::TransitionTo(const fair::mq::State s)
|
|||
case State::Running:
|
||||
ChangeState(Transition::Stop);
|
||||
break;
|
||||
default: // Binding, Connecting, InitializingTask, ResettingDevice, ResettingTask
|
||||
case State::Binding:
|
||||
case State::Connecting:
|
||||
case State::InitializingTask:
|
||||
case State::ResettingDevice:
|
||||
case State::ResettingTask:
|
||||
LOG(debug) << "TransitionTo ignoring state: " << currentState << " (expected, automatic transition).";
|
||||
break;
|
||||
default:
|
||||
LOG(debug) << "TransitionTo ignoring state: " << currentState;
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -79,6 +79,7 @@ class Plugin
|
|||
auto StealDeviceControl() -> void { fPluginServices->StealDeviceControl(fkName); };
|
||||
auto ReleaseDeviceControl() -> void { fPluginServices->ReleaseDeviceControl(fkName); };
|
||||
auto ChangeDeviceState(const DeviceStateTransition next) -> bool { return fPluginServices->ChangeDeviceState(fkName, next); }
|
||||
void TransitionDeviceStateTo(const DeviceState state) { return fPluginServices->TransitionDeviceStateTo(fkName, state); }
|
||||
auto SubscribeToDeviceStateChange(std::function<void(DeviceState)> callback) -> void { fPluginServices->SubscribeToDeviceStateChange(fkName, callback); }
|
||||
auto UnsubscribeFromDeviceStateChange() -> void { fPluginServices->UnsubscribeFromDeviceStateChange(fkName); }
|
||||
|
||||
|
|
|
@ -90,6 +90,23 @@ const unordered_map<State, PluginServices::DeviceState, tools::HashEnum<State>>
|
|||
{State::ResettingDevice, DeviceState::ResettingDevice},
|
||||
{State::Exiting, DeviceState::Exiting}
|
||||
};
|
||||
const unordered_map<PluginServices::DeviceState, State> PluginServices::fkStateMap = {
|
||||
{DeviceState::Ok, State::Ok},
|
||||
{DeviceState::Error, State::Error},
|
||||
{DeviceState::Idle, State::Idle},
|
||||
{DeviceState::InitializingDevice, State::InitializingDevice},
|
||||
{DeviceState::Initialized, State::Initialized},
|
||||
{DeviceState::Binding, State::Binding},
|
||||
{DeviceState::Bound, State::Bound},
|
||||
{DeviceState::Connecting, State::Connecting},
|
||||
{DeviceState::DeviceReady, State::DeviceReady},
|
||||
{DeviceState::InitializingTask, State::InitializingTask},
|
||||
{DeviceState::Ready, State::Ready},
|
||||
{DeviceState::Running, State::Running},
|
||||
{DeviceState::ResettingTask, State::ResettingTask},
|
||||
{DeviceState::ResettingDevice, State::ResettingDevice},
|
||||
{DeviceState::Exiting, State::Exiting}
|
||||
};
|
||||
const unordered_map<PluginServices::DeviceStateTransition, Transition, tools::HashEnum<PluginServices::DeviceStateTransition>> PluginServices::fkDeviceStateTransitionMap = {
|
||||
{DeviceStateTransition::Auto, Transition::Auto},
|
||||
{DeviceStateTransition::InitDevice, Transition::InitDevice},
|
||||
|
@ -125,6 +142,22 @@ auto PluginServices::ChangeDeviceState(const string& controller, const DeviceSta
|
|||
return result;
|
||||
}
|
||||
|
||||
void PluginServices::TransitionDeviceStateTo(const std::string& controller, DeviceState state)
|
||||
{
|
||||
lock_guard<mutex> lock{fDeviceControllerMutex};
|
||||
|
||||
if (!fDeviceController) fDeviceController = controller;
|
||||
|
||||
if (fDeviceController == controller) {
|
||||
fDevice.TransitionTo(fkStateMap.at(state));
|
||||
} else {
|
||||
throw DeviceControlError{tools::ToString(
|
||||
"Plugin '", controller, "' is not allowed to change device states. ",
|
||||
"Currently, plugin '", *fDeviceController, "' has taken control."
|
||||
)};
|
||||
}
|
||||
}
|
||||
|
||||
auto PluginServices::TakeDeviceControl(const string& controller) -> void
|
||||
{
|
||||
lock_guard<mutex> lock{fDeviceControllerMutex};
|
||||
|
|
|
@ -160,6 +160,8 @@ class PluginServices
|
|||
/// If the device control role has not been taken yet, calling this function will take over control implicitely.
|
||||
auto ChangeDeviceState(const std::string& controller, const DeviceStateTransition next) -> bool;
|
||||
|
||||
void TransitionDeviceStateTo(const std::string& controller, DeviceState state);
|
||||
|
||||
/// @brief Subscribe with a callback to device state changes
|
||||
/// @param subscriber id
|
||||
/// @param callback
|
||||
|
@ -313,6 +315,7 @@ class PluginServices
|
|||
static const std::unordered_map<std::string, DeviceStateTransition> fkDeviceStateTransitionStrMap;
|
||||
static const std::unordered_map<DeviceStateTransition, std::string, tools::HashEnum<DeviceStateTransition>> fkStrDeviceStateTransitionMap;
|
||||
static const std::unordered_map<fair::mq::State, DeviceState, tools::HashEnum<fair::mq::State>> fkDeviceStateMap;
|
||||
static const std::unordered_map<DeviceState, fair::mq::State> fkStateMap;
|
||||
static const std::unordered_map<DeviceStateTransition, fair::mq::Transition, tools::HashEnum<DeviceStateTransition>> fkDeviceStateTransitionMap;
|
||||
|
||||
private:
|
||||
|
|
|
@ -379,6 +379,10 @@ auto DDS::SubscribeForCustomCommands() -> void
|
|||
fStateChangeSubscribers.erase(senderId);
|
||||
}
|
||||
fDDSCustomCmd.send("state-changes-unsubscription: " + id + ",OK", to_string(senderId));
|
||||
} else if (cmd == "SHUTDOWN") {
|
||||
TransitionDeviceStateTo(DeviceState::Exiting);
|
||||
} else if (cmd == "STARTUP") {
|
||||
TransitionDeviceStateTo(DeviceState::Ready);
|
||||
} else {
|
||||
LOG(warn) << "Unknown command: " << cmd;
|
||||
LOG(warn) << "Origin: " << senderId;
|
||||
|
|
|
@ -87,47 +87,49 @@ void commandMode(const string& commandIn, const string& topologyPath, CCustomCmd
|
|||
|
||||
while (true) {
|
||||
if (command == "c") {
|
||||
cout << "\033[01;32m > checking state of the devices\033[0m" << endl;
|
||||
cout << "> checking state of the devices" << endl;
|
||||
ddsCustomCmd.send("check-state", topologyPath);
|
||||
} else if (command == "o") {
|
||||
cout << "\033[01;32m > dumping config of the devices\033[0m" << endl;
|
||||
cout << "> dumping config of the devices" << endl;
|
||||
ddsCustomCmd.send("dump-config", topologyPath);
|
||||
} else if (command == "i") {
|
||||
cout << "\033[01;32m > init devices\033[0m" << endl;
|
||||
cout << "> init devices" << endl;
|
||||
ddsCustomCmd.send("INIT DEVICE", topologyPath);
|
||||
} else if (command == "b") {
|
||||
cout << "\033[01;32m > bind devices\033[0m" << endl;
|
||||
cout << "> bind devices" << endl;
|
||||
ddsCustomCmd.send("BIND", topologyPath);
|
||||
} else if (command == "x") {
|
||||
cout << "\033[01;32m > connect devices\033[0m" << endl;
|
||||
cout << "> connect devices" << endl;
|
||||
ddsCustomCmd.send("CONNECT", topologyPath);
|
||||
} else if (command == "j") {
|
||||
cout << "\033[01;32m > init tasks\033[0m" << endl;
|
||||
cout << "> init tasks" << endl;
|
||||
ddsCustomCmd.send("INIT TASK", topologyPath);
|
||||
} else if (command == "p") {
|
||||
cout << "\033[01;32m > pause devices\033[0m" << endl;
|
||||
cout << "> pause devices" << endl;
|
||||
ddsCustomCmd.send("PAUSE", topologyPath);
|
||||
} else if (command == "r") {
|
||||
cout << "\033[01;32m > run tasks\033[0m" << endl;
|
||||
cout << "> run tasks" << endl;
|
||||
ddsCustomCmd.send("RUN", topologyPath);
|
||||
} else if (command == "s") {
|
||||
cout << "\033[01;32m > stop devices\033[0m" << endl;
|
||||
cout << "> stop devices" << endl;
|
||||
ddsCustomCmd.send("STOP", topologyPath);
|
||||
} else if (command == "t") {
|
||||
cout << "\033[01;32m > reset tasks\033[0m" << endl;
|
||||
cout << "> reset tasks" << endl;
|
||||
ddsCustomCmd.send("RESET TASK", topologyPath);
|
||||
} else if (command == "d") {
|
||||
cout << "\033[01;32m > reset devices\033[0m" << endl;
|
||||
cout << "> reset devices" << endl;
|
||||
ddsCustomCmd.send("RESET DEVICE", topologyPath);
|
||||
} else if (command == "h") {
|
||||
cout << "\033[01;32m > help\033[0m" << endl;
|
||||
cout << "> help" << endl;
|
||||
printControlsHelp();
|
||||
} else if (command == "q") {
|
||||
cout << "\033[01;32m > end\033[0m" << endl;
|
||||
cout << "> end" << endl;
|
||||
ddsCustomCmd.send("END", topologyPath);
|
||||
} else if (command == "q!") {
|
||||
cout << "> shutdown" << endl;
|
||||
ddsCustomCmd.send("SHUTDOWN", topologyPath);
|
||||
} else if (command == "r!") {
|
||||
cout << "> startup" << endl;
|
||||
ddsCustomCmd.send("STARTUP", topologyPath);
|
||||
} else {
|
||||
cout << "\033[01;32mInvalid input: [" << c << "]\033[0m" << endl;
|
||||
|
@ -150,10 +152,14 @@ struct WaitMode
|
|||
: fTargetState(targetState)
|
||||
{}
|
||||
|
||||
void Run(const chrono::milliseconds& timeout, const string& topologyPath, CCustomCmd& ddsCustomCmd)
|
||||
void Run(const chrono::milliseconds& timeout, const string& topologyPath, CCustomCmd& ddsCustomCmd, const string& command = "")
|
||||
{
|
||||
StateSubscription stateSubscription(topologyPath, ddsCustomCmd);
|
||||
|
||||
if (command != "") {
|
||||
commandMode(command, topologyPath, ddsCustomCmd);
|
||||
}
|
||||
|
||||
// TODO once DDS provides an API to retrieve actual number of tasks, use it here
|
||||
auto condition = [&] { return !fTargetStates.empty() && all_of(fTargetStates.cbegin(),
|
||||
fTargetStates.cend(),
|
||||
|
@ -256,73 +262,10 @@ int main(int argc, char* argv[])
|
|||
|
||||
service.start(sessionID);
|
||||
|
||||
|
||||
if (targetState == "") {
|
||||
commandMode(command, topologyPath, ddsCustomCmd);
|
||||
} else {
|
||||
PrintControlsHelp();
|
||||
}
|
||||
|
||||
while (cin >> c) {
|
||||
switch (c) {
|
||||
case 'c':
|
||||
cout << " > checking state of the devices" << endl;
|
||||
ddsCustomCmd.send("check-state", topologyPath);
|
||||
break;
|
||||
case 'o':
|
||||
cout << " > dumping config of the devices" << endl;
|
||||
ddsCustomCmd.send("dump-config", topologyPath);
|
||||
break;
|
||||
case 'i':
|
||||
cout << " > init devices" << endl;
|
||||
ddsCustomCmd.send("INIT DEVICE", topologyPath);
|
||||
break;
|
||||
case 'b':
|
||||
cout << " > bind" << endl;
|
||||
ddsCustomCmd.send("BIND", topologyPath);
|
||||
break;
|
||||
case 'x':
|
||||
cout << " > connect" << endl;
|
||||
ddsCustomCmd.send("CONNECT", topologyPath);
|
||||
break;
|
||||
case 'j':
|
||||
cout << " > init tasks" << endl;
|
||||
ddsCustomCmd.send("INIT TASK", topologyPath);
|
||||
break;
|
||||
case 'r':
|
||||
cout << " > run tasks" << endl;
|
||||
ddsCustomCmd.send("RUN", topologyPath);
|
||||
break;
|
||||
case 's':
|
||||
cout << " > stop devices" << endl;
|
||||
ddsCustomCmd.send("STOP", topologyPath);
|
||||
break;
|
||||
case 't':
|
||||
cout << " > reset tasks" << endl;
|
||||
ddsCustomCmd.send("RESET TASK", topologyPath);
|
||||
break;
|
||||
case 'd':
|
||||
cout << " > reset devices" << endl;
|
||||
ddsCustomCmd.send("RESET DEVICE", topologyPath);
|
||||
break;
|
||||
case 'h':
|
||||
cout << " > help" << endl;
|
||||
PrintControlsHelp();
|
||||
break;
|
||||
case 'q':
|
||||
cout << " > end" << endl;
|
||||
ddsCustomCmd.send("END", topologyPath);
|
||||
break;
|
||||
default:
|
||||
cout << "Invalid input: [" << c << "]" << endl;
|
||||
PrintControlsHelp();
|
||||
break;
|
||||
}
|
||||
|
||||
if (command != "") {
|
||||
commandMode(command, topologyPath, ddsCustomCmd);
|
||||
}
|
||||
waitMode.Run(chrono::milliseconds(timeout), topologyPath, ddsCustomCmd);
|
||||
waitMode.Run(chrono::milliseconds(timeout), topologyPath, ddsCustomCmd, command);
|
||||
}
|
||||
} catch (exception& e) {
|
||||
cerr << "Error: " << e.what() << endl;
|
||||
|
|
Loading…
Reference in New Issue
Block a user