9 #ifndef FAIR_MQ_SDK_TOPOLOGY_H
10 #define FAIR_MQ_SDK_TOPOLOGY_H
12 #include <fairmq/sdk/AsioAsyncOp.h>
13 #include <fairmq/sdk/AsioBase.h>
14 #include <fairmq/sdk/commands/Commands.h>
15 #include <fairmq/sdk/DDSCollection.h>
16 #include <fairmq/sdk/DDSInfo.h>
17 #include <fairmq/sdk/DDSSession.h>
18 #include <fairmq/sdk/DDSTask.h>
19 #include <fairmq/sdk/DDSTopology.h>
20 #include <fairmq/sdk/Error.h>
21 #include <fairmq/States.h>
22 #include <fairmq/tools/Semaphore.h>
23 #include <fairmq/tools/Unique.h>
25 #include <fairlogger/Logger.h>
30 #include <asio/associated_executor.hpp>
31 #include <asio/async_result.hpp>
32 #include <asio/steady_timer.hpp>
33 #include <asio/system_executor.hpp>
37 #include <condition_variable>
47 #include <unordered_map>
51 namespace fair::mq::sdk
54 using DeviceId = std::string;
55 using DeviceState = fair::mq::State;
56 using DeviceTransition = fair::mq::Transition;
58 const std::map<DeviceTransition, DeviceState> expectedState =
60 { DeviceTransition::InitDevice, DeviceState::InitializingDevice },
61 { DeviceTransition::CompleteInit, DeviceState::Initialized },
62 { DeviceTransition::Bind, DeviceState::Bound },
63 { DeviceTransition::Connect, DeviceState::DeviceReady },
64 { DeviceTransition::InitTask, DeviceState::Ready },
65 { DeviceTransition::Run, DeviceState::Running },
66 { DeviceTransition::Stop, DeviceState::Ready },
67 { DeviceTransition::ResetTask, DeviceState::DeviceReady },
68 { DeviceTransition::ResetDevice, DeviceState::Idle },
69 { DeviceTransition::End, DeviceState::Exiting }
73 enum class AggregatedTopologyState : int
75 Undefined =
static_cast<int>(fair::mq::State::Undefined),
76 Ok =
static_cast<int>(fair::mq::State::Ok),
77 Error =
static_cast<int>(fair::mq::State::Error),
78 Idle =
static_cast<int>(fair::mq::State::Idle),
79 InitializingDevice =
static_cast<int>(fair::mq::State::InitializingDevice),
80 Initialized =
static_cast<int>(fair::mq::State::Initialized),
81 Binding =
static_cast<int>(fair::mq::State::Binding),
82 Bound =
static_cast<int>(fair::mq::State::Bound),
83 Connecting =
static_cast<int>(fair::mq::State::Connecting),
84 DeviceReady =
static_cast<int>(fair::mq::State::DeviceReady),
85 InitializingTask =
static_cast<int>(fair::mq::State::InitializingTask),
86 Ready =
static_cast<int>(fair::mq::State::Ready),
87 Running =
static_cast<int>(fair::mq::State::Running),
88 ResettingTask =
static_cast<int>(fair::mq::State::ResettingTask),
89 ResettingDevice =
static_cast<int>(fair::mq::State::ResettingDevice),
90 Exiting =
static_cast<int>(fair::mq::State::Exiting),
94 inline auto operator==(DeviceState lhs, AggregatedTopologyState rhs) ->
bool
96 return static_cast<int>(lhs) ==
static_cast<int>(rhs);
99 inline auto operator==(AggregatedTopologyState lhs, DeviceState rhs) ->
bool
101 return static_cast<int>(lhs) ==
static_cast<int>(rhs);
104 inline std::ostream& operator<<(std::ostream& os,
const AggregatedTopologyState& state)
106 if (state == AggregatedTopologyState::Mixed) {
107 return os <<
"MIXED";
109 return os << static_cast<DeviceState>(state);
113 inline std::string GetAggregatedTopologyStateName(AggregatedTopologyState s)
115 if (s == AggregatedTopologyState::Mixed) {
118 return GetStateName(
static_cast<State
>(s));
122 inline AggregatedTopologyState GetAggregatedTopologyState(
const std::string& state)
124 if (state ==
"MIXED") {
125 return AggregatedTopologyState::Mixed;
127 return static_cast<AggregatedTopologyState
>(GetState(state));
133 bool subscribed_to_state_changes;
134 DeviceState lastState;
137 DDSCollection::Id collectionId;
140 using DeviceProperty = std::pair<std::string, std::string>;
141 using DeviceProperties = std::vector<DeviceProperty>;
142 using DevicePropertyQuery = std::string;
143 using FailedDevices = std::set<DeviceId>;
149 DeviceProperties props;
151 std::unordered_map<DeviceId, Device> devices;
152 FailedDevices failed;
155 using TopologyState = std::vector<DeviceStatus>;
156 using TopologyStateIndex = std::unordered_map<DDSTask::Id, int>;
157 using TopologyStateByTask = std::unordered_map<DDSTask::Id, DeviceStatus>;
158 using TopologyStateByCollection = std::unordered_map<DDSCollection::Id, std::vector<DeviceStatus>>;
159 using TopologyTransition = fair::mq::Transition;
161 inline AggregatedTopologyState AggregateState(
const TopologyState& topologyState)
163 DeviceState first = topologyState.begin()->state;
165 if (std::all_of(topologyState.cbegin(), topologyState.cend(), [&](TopologyState::value_type i) {
166 return i.state == first;
168 return static_cast<AggregatedTopologyState
>(first);
171 return AggregatedTopologyState::Mixed;
174 inline bool StateEqualsTo(
const TopologyState& topologyState, DeviceState state)
176 return AggregateState(topologyState) ==
static_cast<AggregatedTopologyState
>(state);
179 inline TopologyStateByCollection GroupByCollectionId(
const TopologyState& topologyState)
181 TopologyStateByCollection state;
182 for (
const auto& ds : topologyState) {
183 if (ds.collectionId != 0) {
184 state[ds.collectionId].push_back(ds);
191 inline TopologyStateByTask GroupByTaskId(
const TopologyState& topologyState)
193 TopologyStateByTask state;
194 for (
const auto& ds : topologyState) {
195 state[ds.taskId] = ds;
211 template <
typename Executor,
typename Allocator>
220 :
BasicTopology<Executor, Allocator>(asio::system_executor(), std::move(topo), std::move(session), blockUntilConnected)
232 bool blockUntilConnected =
false,
233 Allocator alloc = DefaultAllocator())
234 :
AsioBase<Executor, Allocator>(ex, std::move(alloc))
235 , fDDSSession(std::move(session))
236 , fDDSTopo(std::move(topo))
239 , fMtx(std::make_unique<std::mutex>())
240 , fStateChangeSubscriptionsCV(std::make_unique<std::condition_variable>())
241 , fNumStateChangePublishers(0)
242 , fHeartbeatsTimer(asio::system_executor())
243 , fHeartbeatInterval(600000)
247 std::string activeTopo(fDDSSession.RequestCommanderInfo().activeTopologyName);
248 std::string givenTopo(fDDSTopo.
GetName());
249 if (activeTopo != givenTopo) {
250 throw RuntimeError(
"Given topology ", givenTopo,
" is not activated (active: ", activeTopo,
")");
253 SubscribeToCommands();
255 fDDSSession.StartDDSService();
256 SubscribeToStateChanges();
257 if (blockUntilConnected) {
258 WaitForPublisherCount(fStateIndex.size());
272 UnsubscribeFromStateChanges();
274 std::lock_guard<std::mutex> lk(*fMtx);
275 fDDSSession.UnsubscribeFromCommands();
277 for (
auto& op : fChangeStateOps) {
278 op.second.Complete(MakeErrorCode(ErrorCode::OperationCanceled));
283 void SubscribeToStateChanges()
286 cmd::Cmds cmds(cmd::make<cmd::SubscribeToStateChange>(fHeartbeatInterval.count()));
287 fDDSSession.SendCommand(cmds.Serialize());
289 fHeartbeatsTimer.expires_after(fHeartbeatInterval);
290 fHeartbeatsTimer.async_wait(std::bind(&BasicTopology::SendSubscriptionHeartbeats,
this, std::placeholders::_1));
293 void WaitForPublisherCount(
unsigned int number)
295 std::unique_lock<std::mutex> lk(*fMtx);
296 fStateChangeSubscriptionsCV->wait(lk, [&](){
297 return fNumStateChangePublishers == number;
301 void SendSubscriptionHeartbeats(
const std::error_code& ec)
305 fDDSSession.SendCommand(cmd::Cmds(cmd::make<cmd::SubscriptionHeartbeat>(fHeartbeatInterval.count())).Serialize());
307 fHeartbeatsTimer.expires_after(fHeartbeatInterval);
308 fHeartbeatsTimer.async_wait(std::bind(&BasicTopology::SendSubscriptionHeartbeats,
this, std::placeholders::_1));
309 }
else if (ec == asio::error::operation_aborted) {
312 FAIR_LOG(error) <<
"Timer error: " << ec;
316 void UnsubscribeFromStateChanges()
319 fHeartbeatsTimer.cancel();
322 fDDSSession.SendCommand(cmd::Cmds(cmd::make<cmd::UnsubscribeFromStateChange>()).Serialize());
325 WaitForPublisherCount(0);
328 void SubscribeToCommands()
330 fDDSSession.SubscribeToCommands([&](
const std::string& msg,
const std::string& , DDSChannel::Id senderId) {
332 inCmds.Deserialize(msg);
335 for (
const auto& cmd : inCmds) {
337 switch (cmd->GetType()) {
338 case cmd::Type::state_change_subscription:
339 HandleCmd(static_cast<cmd::StateChangeSubscription&>(*cmd));
341 case cmd::Type::state_change_unsubscription:
342 HandleCmd(static_cast<cmd::StateChangeUnsubscription&>(*cmd));
344 case cmd::Type::state_change:
345 HandleCmd(static_cast<cmd::StateChange&>(*cmd), senderId);
347 case cmd::Type::transition_status:
348 HandleCmd(static_cast<cmd::TransitionStatus&>(*cmd));
350 case cmd::Type::properties:
351 HandleCmd(static_cast<cmd::Properties&>(*cmd));
353 case cmd::Type::properties_set:
354 HandleCmd(static_cast<cmd::PropertiesSet&>(*cmd));
357 FAIR_LOG(warn) <<
"Unexpected/unknown command received: " << cmd->GetType();
358 FAIR_LOG(warn) <<
"Origin: " << senderId;
365 auto HandleCmd(cmd::StateChangeSubscription
const& cmd) ->
void
367 if (cmd.GetResult() == cmd::Result::Ok) {
368 DDSTask::Id taskId(cmd.GetTaskId());
371 std::unique_lock<std::mutex> lk(*fMtx);
372 DeviceStatus& task = fStateData.at(fStateIndex.at(taskId));
373 if (!task.subscribed_to_state_changes) {
374 task.subscribed_to_state_changes =
true;
375 ++fNumStateChangePublishers;
377 FAIR_LOG(warn) <<
"Task '" << task.taskId <<
"' sent subscription confirmation more than once";
380 fStateChangeSubscriptionsCV->notify_one();
381 }
catch (
const std::exception& e) {
382 FAIR_LOG(error) <<
"Exception in HandleCmd(cmd::StateChangeSubscription const&): " << e.what();
383 FAIR_LOG(error) <<
"Possibly no task with id '" << taskId <<
"'?";
386 FAIR_LOG(error) <<
"State change subscription failed for device: " << cmd.GetDeviceId() <<
", task id: " << cmd.GetTaskId();
390 auto HandleCmd(cmd::StateChangeUnsubscription
const& cmd) ->
void
392 if (cmd.GetResult() == cmd::Result::Ok) {
393 DDSTask::Id taskId(cmd.GetTaskId());
396 std::unique_lock<std::mutex> lk(*fMtx);
397 DeviceStatus& task = fStateData.at(fStateIndex.at(taskId));
398 if (task.subscribed_to_state_changes) {
399 task.subscribed_to_state_changes =
false;
400 --fNumStateChangePublishers;
402 FAIR_LOG(warn) <<
"Task '" << task.taskId <<
"' sent unsubscription confirmation more than once";
405 fStateChangeSubscriptionsCV->notify_one();
406 }
catch (
const std::exception& e) {
407 FAIR_LOG(error) <<
"Exception in HandleCmd(cmd::StateChangeUnsubscription const&): " << e.what();
410 FAIR_LOG(error) <<
"State change unsubscription failed for device: " << cmd.GetDeviceId() <<
", task id: " << cmd.GetTaskId();
414 auto HandleCmd(cmd::StateChange
const& cmd, DDSChannel::Id
const& senderId) ->
void
416 if (cmd.GetCurrentState() == DeviceState::Exiting) {
417 fDDSSession.SendCommand(cmd::Cmds(cmd::make<cmd::StateChangeExitingReceived>()).Serialize(), senderId);
420 DDSTask::Id taskId(cmd.GetTaskId());
423 std::lock_guard<std::mutex> lk(*fMtx);
424 DeviceStatus& task = fStateData.at(fStateIndex.at(taskId));
425 task.lastState = cmd.GetLastState();
426 task.state = cmd.GetCurrentState();
428 if (task.state == DeviceState::Exiting) {
429 task.subscribed_to_state_changes =
false;
430 --fNumStateChangePublishers;
434 for (
auto& op : fChangeStateOps) {
435 op.second.Update(taskId, cmd.GetCurrentState());
437 for (
auto& op : fWaitForStateOps) {
438 op.second.Update(taskId, cmd.GetLastState(), cmd.GetCurrentState());
440 }
catch (
const std::exception& e) {
441 FAIR_LOG(error) <<
"Exception in HandleCmd(cmd::StateChange const&): " << e.what();
445 auto HandleCmd(cmd::TransitionStatus
const& cmd) ->
void
447 if (cmd.GetResult() != cmd::Result::Ok) {
448 DDSTask::Id taskId(cmd.GetTaskId());
449 std::lock_guard<std::mutex> lk(*fMtx);
450 for (
auto& op : fChangeStateOps) {
451 if (!op.second.IsCompleted() && op.second.ContainsTask(taskId)) {
452 if (fStateData.at(fStateIndex.at(taskId)).state != op.second.GetTargetState()) {
453 FAIR_LOG(error) << cmd.GetTransition() <<
" transition failed for " << cmd.GetDeviceId() <<
", device is in " << cmd.GetCurrentState() <<
" state.";
454 op.second.Complete(MakeErrorCode(ErrorCode::DeviceChangeStateFailed));
456 FAIR_LOG(debug) << cmd.GetTransition() <<
" transition failed for " << cmd.GetDeviceId() <<
", device is already in " << cmd.GetCurrentState() <<
" state.";
463 auto HandleCmd(cmd::Properties
const& cmd) ->
void
465 std::unique_lock<std::mutex> lk(*fMtx);
467 auto& op(fGetPropertiesOps.at(cmd.GetRequestId()));
469 op.Update(cmd.GetDeviceId(), cmd.GetResult(), cmd.GetProps());
470 }
catch (std::out_of_range& e) {
471 FAIR_LOG(debug) <<
"GetProperties operation (request id: " << cmd.GetRequestId()
472 <<
") not found (probably completed or timed out), "
473 <<
"discarding reply of device " << cmd.GetDeviceId();
477 auto HandleCmd(cmd::PropertiesSet
const& cmd) ->
void
479 std::unique_lock<std::mutex> lk(*fMtx);
481 auto& op(fSetPropertiesOps.at(cmd.GetRequestId()));
483 op.Update(cmd.GetDeviceId(), cmd.GetResult());
484 }
catch (std::out_of_range& e) {
485 FAIR_LOG(debug) <<
"SetProperties operation (request id: " << cmd.GetRequestId()
486 <<
") not found (probably completed or timed out), "
487 <<
"discarding reply of device " << cmd.GetDeviceId();
491 using Duration = std::chrono::microseconds;
492 using ChangeStateCompletionSignature = void(std::error_code, TopologyState);
497 using Id = std::size_t;
498 using Count =
unsigned int;
500 template<
typename Handler>
502 const TopologyTransition transition,
503 std::vector<DDSTask> tasks,
504 TopologyState& stateData,
508 Allocator
const & alloc,
511 , fOp(ex, alloc, std::move(handler))
512 , fStateData(stateData)
515 , fTasks(std::move(tasks))
516 , fTargetState(expectedState.at(transition))
519 if (timeout > std::chrono::milliseconds(0)) {
520 fTimer.expires_after(timeout);
521 fTimer.async_wait([&](std::error_code ec) {
523 std::lock_guard<std::mutex> lk(fMtx);
524 fOp.Timeout(fStateData);
528 if (fTasks.empty()) {
529 FAIR_LOG(warn) <<
"ChangeState initiated on an empty set of tasks, check the path argument.";
532 ChangeStateOp() =
delete;
533 ChangeStateOp(
const ChangeStateOp&) =
delete;
534 ChangeStateOp& operator=(
const ChangeStateOp&) =
delete;
535 ChangeStateOp(ChangeStateOp&&) =
default;
536 ChangeStateOp& operator=(ChangeStateOp&&) =
default;
537 ~ChangeStateOp() =
default;
540 auto ResetCount(
const TopologyStateIndex& stateIndex,
const TopologyState& stateData) ->
void
542 fCount = std::count_if(stateIndex.cbegin(), stateIndex.cend(), [=](
const auto& s) {
543 if (ContainsTask(stateData.at(s.second).taskId)) {
544 return stateData.at(s.second).state == fTargetState;
552 auto Update(
const DDSTask::Id taskId,
const DeviceState currentState) ->
void
554 if (!fOp.IsCompleted() && ContainsTask(taskId)) {
555 if (currentState == fTargetState) {
563 auto TryCompletion() ->
void
565 if (!fOp.IsCompleted() && fCount == fTasks.size()) {
566 Complete(std::error_code());
571 auto Complete(std::error_code ec) ->
void
574 fOp.Complete(ec, fStateData);
578 auto ContainsTask(DDSTask::Id
id) ->
bool
580 auto it = std::find_if(fTasks.begin(), fTasks.end(), [
id](
const DDSTask& t) { return t.GetId() == id; });
581 return it != fTasks.end();
584 bool IsCompleted() {
return fOp.IsCompleted(); }
586 auto GetTargetState() const -> DeviceState {
return fTargetState; }
590 AsioAsyncOp<Executor, Allocator, ChangeStateCompletionSignature> fOp;
591 TopologyState& fStateData;
592 asio::steady_timer fTimer;
594 std::vector<DDSTask> fTasks;
595 DeviceState fTargetState;
677 template<
typename CompletionToken>
679 const std::string& path,
681 CompletionToken&& token)
683 return asio::async_initiate<CompletionToken, ChangeStateCompletionSignature>([&](
auto handler) {
684 typename ChangeStateOp::Id
const id(tools::UuidHash());
686 std::lock_guard<std::mutex> lk(*fMtx);
688 for (
auto it = begin(fChangeStateOps); it != end(fChangeStateOps);) {
689 if (it->second.IsCompleted()) {
690 it = fChangeStateOps.erase(it);
696 auto p = fChangeStateOps.emplace(
697 std::piecewise_construct,
698 std::forward_as_tuple(
id),
699 std::forward_as_tuple(
id,
701 fDDSTopo.GetTasks(path),
707 std::move(handler)));
709 cmd::Cmds cmds(cmd::make<cmd::ChangeState>(transition));
710 fDDSSession.SendCommand(cmds.Serialize(), path);
712 p.first->second.ResetCount(fStateIndex, fStateData);
714 p.first->second.TryCompletion();
725 template<
typename CompletionToken>
728 return AsyncChangeState(transition,
"", Duration(0), std::move(token));
737 template<
typename CompletionToken>
738 auto AsyncChangeState(
const TopologyTransition transition, Duration timeout, CompletionToken&& token)
740 return AsyncChangeState(transition,
"", timeout, std::move(token));
749 template<
typename CompletionToken>
750 auto AsyncChangeState(
const TopologyTransition transition,
const std::string& path, CompletionToken&& token)
752 return AsyncChangeState(transition, path, Duration(0), std::move(token));
760 auto ChangeState(
const TopologyTransition transition,
const std::string& path =
"", Duration timeout = Duration(0))
761 -> std::pair<std::error_code, TopologyState>
766 AsyncChangeState(transition, path, timeout, [&, blocker](std::error_code _ec, TopologyState _state)
mutable {
779 auto ChangeState(
const TopologyTransition transition, Duration timeout)
780 -> std::pair<std::error_code, TopologyState>
782 return ChangeState(transition,
"", timeout);
789 std::lock_guard<std::mutex> lk(*fMtx);
793 auto AggregateState() const -> DeviceState {
return sdk::AggregateState(GetCurrentState()); }
795 auto StateEqualsTo(DeviceState state)
const ->
bool {
return sdk::StateEqualsTo(GetCurrentState(), state); }
797 using WaitForStateCompletionSignature = void(std::error_code);
800 struct WaitForStateOp
802 using Id = std::size_t;
803 using Count =
unsigned int;
805 template<
typename Handler>
806 WaitForStateOp(Id
id,
807 DeviceState targetLastState,
808 DeviceState targetCurrentState,
809 std::vector<DDSTask> tasks,
813 Allocator
const & alloc,
816 , fOp(ex, alloc, std::move(handler))
819 , fTasks(std::move(tasks))
820 , fTargetLastState(targetLastState)
821 , fTargetCurrentState(targetCurrentState)
824 if (timeout > std::chrono::milliseconds(0)) {
825 fTimer.expires_after(timeout);
826 fTimer.async_wait([&](std::error_code ec) {
828 std::lock_guard<std::mutex> lk(fMtx);
833 if (fTasks.empty()) {
834 FAIR_LOG(warn) <<
"WaitForState initiated on an empty set of tasks, check the path argument.";
837 WaitForStateOp() =
delete;
838 WaitForStateOp(
const WaitForStateOp&) =
delete;
839 WaitForStateOp& operator=(
const WaitForStateOp&) =
delete;
840 WaitForStateOp(WaitForStateOp&&) =
default;
841 WaitForStateOp& operator=(WaitForStateOp&&) =
default;
842 ~WaitForStateOp() =
default;
845 auto ResetCount(
const TopologyStateIndex& stateIndex,
const TopologyState& stateData) ->
void
847 fCount = std::count_if(stateIndex.cbegin(), stateIndex.cend(), [=](
const auto& s) {
848 if (ContainsTask(stateData.at(s.second).taskId)) {
849 return stateData.at(s.second).state == fTargetCurrentState &&
850 (stateData.at(s.second).lastState == fTargetLastState || fTargetLastState == DeviceState::Undefined);
858 auto Update(
const DDSTask::Id taskId,
const DeviceState lastState,
const DeviceState currentState) ->
void
860 if (!fOp.IsCompleted() && ContainsTask(taskId)) {
861 if (currentState == fTargetCurrentState &&
862 (lastState == fTargetLastState || fTargetLastState == DeviceState::Undefined)) {
870 auto TryCompletion() ->
void
872 if (!fOp.IsCompleted() && fCount == fTasks.size()) {
878 bool IsCompleted() {
return fOp.IsCompleted(); }
882 AsioAsyncOp<Executor, Allocator, WaitForStateCompletionSignature> fOp;
883 asio::steady_timer fTimer;
885 std::vector<DDSTask> fTasks;
886 DeviceState fTargetLastState;
887 DeviceState fTargetCurrentState;
891 auto ContainsTask(DDSTask::Id
id) ->
bool
893 auto it = std::find_if(fTasks.begin(), fTasks.end(), [
id](
const DDSTask& t) { return t.GetId() == id; });
894 return it != fTasks.end();
907 template<
typename CompletionToken>
909 const DeviceState targetCurrentState,
910 const std::string& path,
912 CompletionToken&& token)
914 return asio::async_initiate<CompletionToken, WaitForStateCompletionSignature>([&](
auto handler) {
915 typename GetPropertiesOp::Id
const id(tools::UuidHash());
917 std::lock_guard<std::mutex> lk(*fMtx);
919 for (
auto it = begin(fWaitForStateOps); it != end(fWaitForStateOps);) {
920 if (it->second.IsCompleted()) {
921 it = fWaitForStateOps.erase(it);
927 auto p = fWaitForStateOps.emplace(
928 std::piecewise_construct,
929 std::forward_as_tuple(
id),
930 std::forward_as_tuple(
id,
933 fDDSTopo.GetTasks(path),
938 std::move(handler)));
939 p.first->second.ResetCount(fStateIndex, fStateData);
941 p.first->second.TryCompletion();
952 template<
typename CompletionToken>
953 auto AsyncWaitForState(
const DeviceState targetLastState,
const DeviceState targetCurrentState, CompletionToken&& token)
955 return AsyncWaitForState(targetLastState, targetCurrentState,
"", Duration(0), std::move(token));
963 template<
typename CompletionToken>
966 return AsyncWaitForState(DeviceState::Undefined, targetCurrentState,
"", Duration(0), std::move(token));
975 auto WaitForState(
const DeviceState targetLastState,
const DeviceState targetCurrentState,
const std::string& path =
"", Duration timeout = Duration(0))
980 AsyncWaitForState(targetLastState, targetCurrentState, path, timeout, [&, blocker](std::error_code _ec)
mutable {
993 auto WaitForState(
const DeviceState targetCurrentState,
const std::string& path =
"", Duration timeout = Duration(0))
996 return WaitForState(DeviceState::Undefined, targetCurrentState, path, timeout);
1002 struct GetPropertiesOp
1004 using Id = std::size_t;
1005 using GetCount =
unsigned int;
1007 template<
typename Handler>
1008 GetPropertiesOp(Id
id,
1009 GetCount expectedCount,
1012 Executor
const & ex,
1013 Allocator
const & alloc,
1016 , fOp(ex, alloc, std::move(handler))
1019 , fExpectedCount(expectedCount)
1022 if (timeout > std::chrono::milliseconds(0)) {
1023 fTimer.expires_after(timeout);
1024 fTimer.async_wait([&](std::error_code ec) {
1026 std::lock_guard<std::mutex> lk(fMtx);
1027 fOp.Timeout(fResult);
1031 if (expectedCount == 0) {
1032 FAIR_LOG(warn) <<
"GetProperties initiated on an empty set of tasks, check the path argument.";
1036 GetPropertiesOp() =
delete;
1037 GetPropertiesOp(
const GetPropertiesOp&) =
delete;
1038 GetPropertiesOp& operator=(
const GetPropertiesOp&) =
delete;
1039 GetPropertiesOp(GetPropertiesOp&&) =
default;
1040 GetPropertiesOp& operator=(GetPropertiesOp&&) =
default;
1041 ~GetPropertiesOp() =
default;
1043 auto Update(
const std::string& deviceId, cmd::Result result, DeviceProperties props) ->
void
1045 std::lock_guard<std::mutex> lk(fMtx);
1046 if (cmd::Result::Ok != result) {
1047 fResult.failed.insert(deviceId);
1049 fResult.devices.insert({deviceId, {std::move(props)}});
1055 bool IsCompleted() {
return fOp.IsCompleted(); }
1059 AsioAsyncOp<Executor, Allocator, GetPropertiesCompletionSignature> fOp;
1060 asio::steady_timer fTimer;
1062 GetCount
const fExpectedCount;
1063 GetPropertiesResult fResult;
1067 auto TryCompletion() ->
void
1069 if (!fOp.IsCompleted() && fCount == fExpectedCount) {
1071 if (fResult.failed.size() > 0) {
1072 fOp.Complete(MakeErrorCode(ErrorCode::DeviceGetPropertiesFailed), std::move(fResult));
1074 fOp.Complete(std::move(fResult));
1088 template<
typename CompletionToken>
1090 const std::string& path,
1092 CompletionToken&& token)
1094 return asio::async_initiate<CompletionToken, GetPropertiesCompletionSignature>(
1096 typename GetPropertiesOp::Id
const id(tools::UuidHash());
1098 std::lock_guard<std::mutex> lk(*fMtx);
1100 for (
auto it = begin(fGetPropertiesOps); it != end(fGetPropertiesOps);) {
1101 if (it->second.IsCompleted()) {
1102 it = fGetPropertiesOps.erase(it);
1108 fGetPropertiesOps.emplace(
1109 std::piecewise_construct,
1110 std::forward_as_tuple(
id),
1111 std::forward_as_tuple(
id,
1112 fDDSTopo.GetTasks(path).size(),
1117 std::move(handler)));
1119 cmd::Cmds const cmds(cmd::make<cmd::GetProperties>(
id, query));
1120 fDDSSession.SendCommand(cmds.Serialize(), path);
1130 template<
typename CompletionToken>
1133 return AsyncGetProperties(query,
"", Duration(0), std::move(token));
1141 auto GetProperties(DevicePropertyQuery
const& query,
const std::string& path =
"", Duration timeout = Duration(0))
1142 -> std::pair<std::error_code, GetPropertiesResult>
1147 AsyncGetProperties(query, path, timeout, [&, blocker](std::error_code _ec,
GetPropertiesResult _result)
mutable {
1153 return {ec, result};
1156 using SetPropertiesCompletionSignature = void(std::error_code, FailedDevices);
1159 struct SetPropertiesOp
1161 using Id = std::size_t;
1162 using SetCount =
unsigned int;
1164 template<
typename Handler>
1165 SetPropertiesOp(Id
id,
1166 SetCount expectedCount,
1169 Executor
const & ex,
1170 Allocator
const & alloc,
1173 , fOp(ex, alloc, std::move(handler))
1176 , fExpectedCount(expectedCount)
1180 if (timeout > std::chrono::milliseconds(0)) {
1181 fTimer.expires_after(timeout);
1182 fTimer.async_wait([&](std::error_code ec) {
1184 std::lock_guard<std::mutex> lk(fMtx);
1185 fOp.Timeout(fFailedDevices);
1189 if (expectedCount == 0) {
1190 FAIR_LOG(warn) <<
"SetProperties initiated on an empty set of tasks, check the path argument.";
1194 SetPropertiesOp() =
delete;
1195 SetPropertiesOp(
const SetPropertiesOp&) =
delete;
1196 SetPropertiesOp& operator=(
const SetPropertiesOp&) =
delete;
1197 SetPropertiesOp(SetPropertiesOp&&) =
default;
1198 SetPropertiesOp& operator=(SetPropertiesOp&&) =
default;
1199 ~SetPropertiesOp() =
default;
1201 auto Update(
const std::string& deviceId, cmd::Result result) ->
void
1203 std::lock_guard<std::mutex> lk(fMtx);
1204 if (cmd::Result::Ok != result) {
1205 fFailedDevices.insert(deviceId);
1211 bool IsCompleted() {
return fOp.IsCompleted(); }
1215 AsioAsyncOp<Executor, Allocator, SetPropertiesCompletionSignature> fOp;
1216 asio::steady_timer fTimer;
1218 SetCount
const fExpectedCount;
1219 FailedDevices fFailedDevices;
1223 auto TryCompletion() ->
void
1225 if (!fOp.IsCompleted() && fCount == fExpectedCount) {
1227 if (fFailedDevices.size() > 0) {
1228 fOp.Complete(MakeErrorCode(ErrorCode::DeviceSetPropertiesFailed), fFailedDevices);
1230 fOp.Complete(fFailedDevices);
1244 template<
typename CompletionToken>
1246 const std::string& path,
1248 CompletionToken&& token)
1250 return asio::async_initiate<CompletionToken, SetPropertiesCompletionSignature>(
1252 typename SetPropertiesOp::Id
const id(tools::UuidHash());
1254 std::lock_guard<std::mutex> lk(*fMtx);
1256 for (
auto it = begin(fGetPropertiesOps); it != end(fGetPropertiesOps);) {
1257 if (it->second.IsCompleted()) {
1258 it = fGetPropertiesOps.erase(it);
1264 fSetPropertiesOps.emplace(
1265 std::piecewise_construct,
1266 std::forward_as_tuple(
id),
1267 std::forward_as_tuple(
id,
1268 fDDSTopo.GetTasks(path).size(),
1273 std::move(handler)));
1275 cmd::Cmds const cmds(cmd::make<cmd::SetProperties>(
id, props));
1276 fDDSSession.SendCommand(cmds.Serialize(), path);
1286 template<
typename CompletionToken>
1289 return AsyncSetProperties(props,
"", Duration(0), std::move(token));
1297 auto SetProperties(DeviceProperties
const& properties,
const std::string& path =
"", Duration timeout = Duration(0))
1298 -> std::pair<std::error_code, FailedDevices>
1302 FailedDevices failed;
1303 AsyncSetProperties(properties, path, timeout, [&, blocker](std::error_code _ec, FailedDevices _failed)
mutable {
1309 return {ec, failed};
1312 Duration GetHeartbeatInterval()
const {
return fHeartbeatInterval; }
1313 void SetHeartbeatInterval(Duration duration) { fHeartbeatInterval = duration; }
1316 using TransitionedCount =
unsigned int;
1318 DDSSession fDDSSession;
1319 DDSTopology fDDSTopo;
1320 TopologyState fStateData;
1321 TopologyStateIndex fStateIndex;
1323 mutable std::unique_ptr<std::mutex> fMtx;
1325 std::unique_ptr<std::condition_variable> fStateChangeSubscriptionsCV;
1326 unsigned int fNumStateChangePublishers;
1327 asio::steady_timer fHeartbeatsTimer;
1328 Duration fHeartbeatInterval;
1330 std::unordered_map<typename ChangeStateOp::Id, ChangeStateOp> fChangeStateOps;
1331 std::unordered_map<typename WaitForStateOp::Id, WaitForStateOp> fWaitForStateOps;
1332 std::unordered_map<typename SetPropertiesOp::Id, SetPropertiesOp> fSetPropertiesOps;
1333 std::unordered_map<typename GetPropertiesOp::Id, GetPropertiesOp> fGetPropertiesOps;
1335 auto makeTopologyState() ->
void
1337 fStateData.reserve(fDDSTopo.GetTasks().size());
1341 for (
const auto& task : fDDSTopo.GetTasks()) {
1342 fStateData.push_back(DeviceStatus{
false, DeviceState::Undefined, DeviceState::Undefined, task.GetId(), task.GetCollectionId()});
1343 fStateIndex.emplace(task.GetId(), index);
1349 auto GetCurrentStateUnsafe() const -> TopologyState
1355 using Topology = BasicTopology<DefaultExecutor, DefaultAllocator>;
1356 using Topo = Topology;
1363 auto MakeTopology(dds::topology_api::CTopology nativeTopo,
1364 std::shared_ptr<dds::tools_api::CSession> nativeSession,
1366 bool blockUntilConnected =
false) -> Topology;