diff --git a/examples/dds/fairmq-start-ex-dds.sh.in b/examples/dds/fairmq-start-ex-dds.sh.in index 0168edcb..12c73d43 100755 --- a/examples/dds/fairmq-start-ex-dds.sh.in +++ b/examples/dds/fairmq-start-ex-dds.sh.in @@ -48,13 +48,26 @@ echo "TOPOLOGY FILE: ${topologyFile}" # dds-info --active-topology dds-topology --disable-validation --activate ${topologyFile} # dds-info --active-topology +# dds-info --wait-for-executing-agents ${requiredNofAgents} +sleep 1 echo "------------------------" echo "...waiting for Topology to finish..." +# TODO Retrieve number of devices from DDS topology API instead of having the user pass it explicitely +fairmq-dds-command-ui -w "IDLE" -n ${requiredNofAgents} +fairmq-dds-command-ui -c i -w "INITIALIZING DEVICE" -n ${requiredNofAgents} +fairmq-dds-command-ui -c k -w "INITIALIZED" -n ${requiredNofAgents} +fairmq-dds-command-ui -c b -w "BOUND" -n ${requiredNofAgents} +fairmq-dds-command-ui -c x -w "DEVICE READY" -n ${requiredNofAgents} +fairmq-dds-command-ui -c j -w "READY" -n ${requiredNofAgents} +fairmq-dds-command-ui -c r sampler_and_sink="main/(Sampler|Sink)" -fairmq-dds-command-ui -p $sampler_and_sink -w "RUNNING->READY" +fairmq-dds-command-ui -p $sampler_and_sink -w "RUNNING->READY" -n 2 echo "...$sampler_and_sink are READY, sending shutdown..." -fairmq-dds-command-ui -c q! -w "EXITING" +fairmq-dds-command-ui -c s -w "RUNNING->READY" -n ${requiredNofAgents} +fairmq-dds-command-ui -c t -w "DEVICE READY" -n ${requiredNofAgents} +fairmq-dds-command-ui -c d -w "IDLE" -n ${requiredNofAgents} +fairmq-dds-command-ui -c q echo "...waiting for ${requiredNofAgents} idle agents..." dds-info --wait-for-idle-agents ${requiredNofAgents} echo "------------------------" diff --git a/fairmq/plugins/DDS/DDS.cxx b/fairmq/plugins/DDS/DDS.cxx index b4b7d272..cc631fd6 100644 --- a/fairmq/plugins/DDS/DDS.cxx +++ b/fairmq/plugins/DDS/DDS.cxx @@ -31,9 +31,22 @@ namespace mq namespace plugins { -DDS::DDS(const string& name, const Plugin::Version version, const string& maintainer, const string& homepage, PluginServices* pluginServices) +DDS::DDS(const string& name, + const Plugin::Version version, + const string& maintainer, + const string& homepage, + PluginServices* pluginServices) : Plugin(name, version, maintainer, homepage, pluginServices) - , fTransitions({ "BIND", "CONNECT", "INIT TASK", "RUN", "STOP", "RESET TASK", "RESET DEVICE" }) + , fTransitions({"INIT DEVICE", + "COMPLETE INIT", + "BIND", + "CONNECT", + "INIT TASK", + "RUN", + "STOP", + "RESET TASK", + "RESET DEVICE", + "END"}) , fCurrentState(DeviceState::Idle) , fLastState(DeviceState::Idle) , fDeviceTerminationRequested(false) @@ -282,25 +295,13 @@ auto DDS::SubscribeForCustomCommands() -> void if (cmd == "check-state") { fDDS.Send(id + ": " + ToStr(GetCurrentDeviceState()), to_string(senderId)); - } else if (cmd == "INIT DEVICE") { - if (ChangeDeviceState(ToDeviceStateTransition(cmd))) { - fDDS.Send(id + ": queued, " + cmd, to_string(senderId)); - } else { - fDDS.Send(id + ": could not queue, " + cmd, to_string(senderId)); - } } else if (fTransitions.find(cmd) != fTransitions.end()) { if (ChangeDeviceState(ToDeviceStateTransition(cmd))) { fDDS.Send(id + ": queued, " + cmd, to_string(senderId)); } else { fDDS.Send(id + ": could not queue, " + cmd, to_string(senderId)); } - } else if (cmd == "END") { - if (ChangeDeviceState(ToDeviceStateTransition(cmd))) { - fDDS.Send(id + ": queued, " + cmd, to_string(senderId)); - } else { - fDDS.Send(id + ": could not queue, " + cmd, to_string(senderId)); - } - if (ToStr(GetCurrentDeviceState()) == "EXITING") { + if (cmd == "END" && ToStr(GetCurrentDeviceState()) == "EXITING") { unique_lock lock(fStopMutex); fStopCondition.notify_one(); } @@ -343,9 +344,9 @@ auto DDS::SubscribeForCustomCommands() -> void } fDDS.Send("state-changes-unsubscription: " + id + ",OK", to_string(senderId)); } else if (cmd == "SHUTDOWN") { - TransitionDeviceStateTo(DeviceState::Exiting); + TransitionDeviceStateTo(DeviceState::Exiting); } else if (cmd == "STARTUP") { - TransitionDeviceStateTo(DeviceState::Running); + TransitionDeviceStateTo(DeviceState::Running); } else { LOG(warn) << "Unknown command: " << cmd; LOG(warn) << "Origin: " << senderId; diff --git a/fairmq/plugins/DDS/runDDSCommandUI.cxx b/fairmq/plugins/DDS/runDDSCommandUI.cxx index f5f935a9..7ad31829 100644 --- a/fairmq/plugins/DDS/runDDSCommandUI.cxx +++ b/fairmq/plugins/DDS/runDDSCommandUI.cxx @@ -70,7 +70,7 @@ struct StateSubscription { void printControlsHelp() { cout << "Use keys to control the devices:" << endl; - cout << "[c] check states, [o] dump config, [h] help, [r] run, [s] stop, [t] reset task, [d] reset device, [q] end, [j] init task, [i] init device, [b] bind, [x] connect" << endl; + cout << "[c] check states, [o] dump config, [h] help, [r] run, [s] stop, [t] reset task, [d] reset device, [q] end, [j] init task, [i] init device, [k] complete init, [b] bind, [x] connect" << endl; cout << "To quit press Ctrl+C" << endl; } @@ -95,6 +95,9 @@ void commandMode(const string& commandIn, const string& topologyPath, CCustomCmd } else if (command == "i") { cout << "> init devices" << endl; ddsCustomCmd.send("INIT DEVICE", topologyPath); + } else if (command == "k") { + cout << "> complete init" << endl; + ddsCustomCmd.send("COMPLETE INIT", topologyPath); } else if (command == "b") { cout << "> bind devices" << endl; ddsCustomCmd.send("BIND", topologyPath); @@ -152,7 +155,11 @@ struct WaitMode : fTargetState(targetState) {} - void Run(const chrono::milliseconds& timeout, const string& topologyPath, CCustomCmd& ddsCustomCmd, const string& command = "") + void Run(const chrono::milliseconds& timeout, + const string& topologyPath, + CCustomCmd& ddsCustomCmd, + unsigned int numberDevices, + const string& command = "") { StateSubscription stateSubscription(topologyPath, ddsCustomCmd); @@ -161,11 +168,18 @@ struct WaitMode } // TODO once DDS provides an API to retrieve actual number of tasks, use it here - auto condition = [&] { return !fTargetStates.empty() && all_of(fTargetStates.cbegin(), - fTargetStates.cend(), - [&](unordered_map::value_type i) { - return boost::algorithm::ends_with(i.second, fTargetState); - }); + auto condition = [&] { + bool res(!fTargetStates.empty() + && all_of(fTargetStates.cbegin(), + fTargetStates.cend(), + [&](unordered_map::value_type i) { + return boost::algorithm::ends_with(i.second, fTargetState); + })); + if (numberDevices > 0) { + res = res && (fTargetStates.size() == numberDevices); + } + cout << "waiting for " << numberDevices << " devices to reach " << fTargetState << ", condition check: " << res << endl; + return res; }; unique_lock lock(fMtx); @@ -202,6 +216,7 @@ int main(int argc, char* argv[]) string topologyPath; string targetState; unsigned int timeout; + unsigned int numberDevices(0); bpo::options_description options("Common options"); @@ -217,6 +232,7 @@ int main(int argc, char* argv[]) ("path,p", bpo::value (&topologyPath)->default_value(""), "DDS Topology path to send command to (empty - send to all tasks)") ("wait-for-state,w", bpo::value (&targetState)->default_value(""), "Wait until targeted FairMQ devices reach the given state") ("timeout,t", bpo::value (&timeout)->default_value(0), "Timeout in milliseconds when waiting for a device state (0 - wait infinitely)") + ("number-devices,n", bpo::value (&numberDevices)->default_value(0), "Number of devices (will be removed in the future)") ("help,h", "Produce help message"); bpo::variables_map vm; @@ -224,7 +240,7 @@ int main(int argc, char* argv[]) if (vm.count("help")) { cout << "FairMQ DDS Command UI" << endl << options << endl; - cout << "Commands: [c] check state, [o] dump config, [h] help, [r] run, [s] stop, [t] reset task, [d] reset device, [q] end, [j] init task, [i] init device" << endl; + cout << "Commands: [c] check state, [o] dump config, [h] help, [r] run, [s] stop, [t] reset task, [d] reset device, [q] end, [j] init task, [i] init device, [k] complete init, [b] bind, [x] connect" << endl; return EXIT_SUCCESS; } @@ -241,12 +257,13 @@ int main(int argc, char* argv[]) // subscribe to receive messages from DDS ddsCustomCmd.subscribe([&](const string& msg, const string& /*condition*/, uint64_t senderId) { - cerr << "Received: " << msg << endl; + // cerr << "Received: " << msg << endl; vector parts; boost::algorithm::split(parts, msg, boost::algorithm::is_any_of(":,")); if (parts[0] == "state-change") { + // cerr << "Received: " << msg << endl; boost::trim(parts[2]); - waitMode.AddNewStateEntry(senderId, parts[2]); + waitMode.AddNewStateEntry(senderId, parts[3]); } else if (parts[0] == "state-changes-subscription") { if (parts[2] != "OK") { cerr << "state-changes-subscription failed with return code: " << parts[2]; @@ -265,7 +282,7 @@ int main(int argc, char* argv[]) if (targetState == "") { commandMode(command, topologyPath, ddsCustomCmd); } else { - waitMode.Run(chrono::milliseconds(timeout), topologyPath, ddsCustomCmd, command); + waitMode.Run(chrono::milliseconds(timeout), topologyPath, ddsCustomCmd, numberDevices, command); } } catch (exception& e) { cerr << "Error: " << e.what() << endl; diff --git a/fairmq/sdk/Topology.cxx b/fairmq/sdk/Topology.cxx index 2bf69774..b05a135a 100644 --- a/fairmq/sdk/Topology.cxx +++ b/fairmq/sdk/Topology.cxx @@ -72,7 +72,7 @@ Topology::Topology(DDSTopology topo, DDSSession session) { std::vector deviceList = fDDSTopo.GetDeviceList(); for (const auto& d : deviceList) { - LOG(info) << "Adding device " << d; + // LOG(debug) << "Adding device " << d; fState.emplace(d, DeviceStatus{ false, DeviceState::Ok }); } fDDSSession.SubscribeToCommands([this](const std::string& msg, const std::string& /* condition */, uint64_t senderId) { @@ -122,7 +122,7 @@ auto Topology::ChangeState(TopologyTransition transition, ChangeStateCallback cb throw std::runtime_error("A state change request is already in progress, concurrent requests are currently not supported"); lock.unlock(); } - LOG(info) << "Initiating ChangeState with " << transition << " to " << expectedState.at(transition); + LOG(debug) << "Initiating ChangeState with " << transition << " to " << expectedState.at(transition); fStateChangeOngoing = true; fChangeStateCallback = cb; fStateChangeTimeout = timeout; @@ -204,7 +204,7 @@ void Topology::AddNewStateEntry(uint64_t senderId, const std::string& state) { std::size_t pos = state.find("->"); std::string endState = state.substr(pos + 2); - LOG(info) << "Adding new state entry: " << senderId << ", " << state << ", end state: " << endState; + // LOG(debug) << "Adding new state entry: " << senderId << ", " << state << ", end state: " << endState; { try { std::unique_lock lock(fMtx); diff --git a/test/sdk/_topology.cxx b/test/sdk/_topology.cxx index f65f18fd..85ff9e42 100644 --- a/test/sdk/_topology.cxx +++ b/test/sdk/_topology.cxx @@ -43,7 +43,7 @@ TEST_F(Topology, ChangeStateAsync) Topology topo(mDDSTopo, mDDSSession); fair::mq::tools::Semaphore blocker; - topo.ChangeState(TopologyTransition::Run, [&blocker, &topo](Topology::ChangeStateResult result) { + topo.ChangeState(TopologyTransition::InitDevice, [&blocker, &topo](Topology::ChangeStateResult result) { LOG(info) << result; EXPECT_EQ(result.rc, fair::mq::AsyncOpResultCode::Ok); EXPECT_NO_THROW(fair::mq::sdk::AggregateState(result.state)); @@ -51,14 +51,6 @@ TEST_F(Topology, ChangeStateAsync) blocker.Signal(); }); blocker.Wait(); - topo.ChangeState(TopologyTransition::Stop, [&blocker, &topo](Topology::ChangeStateResult result) { - LOG(info) << result; - EXPECT_EQ(result.rc, fair::mq::AsyncOpResultCode::Ok); - EXPECT_NO_THROW(fair::mq::sdk::AggregateState(result.state)); - EXPECT_EQ(fair::mq::sdk::StateEqualsTo(result.state, fair::mq::sdk::DeviceState::Ready), true); - blocker.Signal(); - }); - blocker.Wait(); } TEST_F(Topology, ChangeStateSync)