FairMQ  1.4.14
C++ Message Queuing Library and Framework
Topology.h
1 /********************************************************************************
2  * Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
8 
9 #ifndef FAIR_MQ_SDK_TOPOLOGY_H
10 #define FAIR_MQ_SDK_TOPOLOGY_H
11 
12 #include <fairmq/sdk/AsioAsyncOp.h>
13 #include <fairmq/sdk/AsioBase.h>
14 #include <fairmq/sdk/commands/Commands.h>
15 #include <fairmq/sdk/DDSCollection.h>
16 #include <fairmq/sdk/DDSInfo.h>
17 #include <fairmq/sdk/DDSSession.h>
18 #include <fairmq/sdk/DDSTask.h>
19 #include <fairmq/sdk/DDSTopology.h>
20 #include <fairmq/sdk/Error.h>
21 #include <fairmq/States.h>
22 #include <fairmq/tools/Semaphore.h>
23 #include <fairmq/tools/Unique.h>
24 
25 #include <fairlogger/Logger.h>
26 #ifndef FAIR_LOG
27 #define FAIR_LOG LOG
28 #endif /* ifndef FAIR_LOG */
29 
30 #include <asio/associated_executor.hpp>
31 #include <asio/async_result.hpp>
32 #include <asio/steady_timer.hpp>
33 #include <asio/system_executor.hpp>
34 
35 #include <algorithm>
36 #include <chrono>
37 #include <condition_variable>
38 #include <functional>
39 #include <map>
40 #include <memory>
41 #include <mutex>
42 #include <ostream>
43 #include <set>
44 #include <stdexcept>
45 #include <string>
46 #include <thread>
47 #include <unordered_map>
48 #include <utility>
49 #include <vector>
50 
51 namespace fair {
52 namespace mq {
53 namespace sdk {
54 
55 using DeviceId = std::string;
56 using DeviceState = fair::mq::State;
57 using DeviceTransition = fair::mq::Transition;
58 
59 const std::map<DeviceTransition, DeviceState> expectedState =
60 {
61  { DeviceTransition::InitDevice, DeviceState::InitializingDevice },
62  { DeviceTransition::CompleteInit, DeviceState::Initialized },
63  { DeviceTransition::Bind, DeviceState::Bound },
64  { DeviceTransition::Connect, DeviceState::DeviceReady },
65  { DeviceTransition::InitTask, DeviceState::Ready },
66  { DeviceTransition::Run, DeviceState::Running },
67  { DeviceTransition::Stop, DeviceState::Ready },
68  { DeviceTransition::ResetTask, DeviceState::DeviceReady },
69  { DeviceTransition::ResetDevice, DeviceState::Idle },
70  { DeviceTransition::End, DeviceState::Exiting }
71 };
72 
74 {
75  bool subscribed_to_state_changes;
76  DeviceState lastState;
77  DeviceState state;
78  DDSTask::Id taskId;
79  DDSCollection::Id collectionId;
80 };
81 
82 using DeviceProperty = std::pair<std::string, std::string>;
83 using DeviceProperties = std::vector<DeviceProperty>;
84 using DevicePropertyQuery = std::string;
85 using FailedDevices = std::set<DeviceId>;
86 
88 {
89  struct Device
90  {
91  DeviceProperties props;
92  };
93  std::unordered_map<DeviceId, Device> devices;
94  FailedDevices failed;
95 };
96 
97 using TopologyState = std::vector<DeviceStatus>;
98 using TopologyStateIndex = std::unordered_map<DDSTask::Id, int>; // task id -> index in the data vector
99 using TopologyStateByTask = std::unordered_map<DDSTask::Id, DeviceStatus>;
100 using TopologyStateByCollection = std::unordered_map<DDSCollection::Id, std::vector<DeviceStatus>>;
101 using TopologyTransition = fair::mq::Transition;
102 
103 inline DeviceState AggregateState(const TopologyState& topologyState)
104 {
105  DeviceState first = topologyState.begin()->state;
106 
107  if (std::all_of(topologyState.cbegin(), topologyState.cend(), [&](TopologyState::value_type i) {
108  return i.state == first;
109  })) {
110  return first;
111  }
112 
113  throw MixedStateError("State is not uniform");
114 }
115 
116 inline bool StateEqualsTo(const TopologyState& topologyState, DeviceState state)
117 {
118  return AggregateState(topologyState) == state;
119 }
120 
121 inline TopologyStateByCollection GroupByCollectionId(const TopologyState& topologyState)
122 {
123  TopologyStateByCollection state;
124  for (const auto& ds : topologyState) {
125  if (ds.collectionId != 0) {
126  state[ds.collectionId].push_back(ds);
127  }
128  }
129 
130  return state;
131 }
132 
133 inline TopologyStateByTask GroupByTaskId(const TopologyState& topologyState)
134 {
135  TopologyStateByTask state;
136  for (const auto& ds : topologyState) {
137  state[ds.taskId] = ds;
138  }
139 
140  return state;
141 }
142 
153 template <typename Executor, typename Allocator>
154 class BasicTopology : public AsioBase<Executor, Allocator>
155 {
156  public:
161  : BasicTopology<Executor, Allocator>(asio::system_executor(), std::move(topo), std::move(session))
162  {}
163 
169  BasicTopology(const Executor& ex,
170  DDSTopology topo,
171  DDSSession session,
172  Allocator alloc = DefaultAllocator())
173  : AsioBase<Executor, Allocator>(ex, std::move(alloc))
174  , fDDSSession(std::move(session))
175  , fDDSTopo(std::move(topo))
176  , fStateData()
177  , fStateIndex()
178  , fHeartbeatsTimer(asio::system_executor())
179  , fHeartbeatInterval(600000)
180  {
181  makeTopologyState();
182 
183  std::string activeTopo(fDDSSession.RequestCommanderInfo().activeTopologyName);
184  std::string givenTopo(fDDSTopo.GetName());
185  if (activeTopo != givenTopo) {
186  throw RuntimeError("Given topology ", givenTopo, " is not activated (active: ", activeTopo, ")");
187  }
188 
189  SubscribeToCommands();
190 
191  fDDSSession.StartDDSService();
192  SubscribeToStateChanges();
193  }
194 
196  BasicTopology(const BasicTopology&) = delete;
197  BasicTopology& operator=(const BasicTopology&) = delete;
198 
200  BasicTopology(BasicTopology&&) = default;
201  BasicTopology& operator=(BasicTopology&&) = default;
202 
203  ~BasicTopology()
204  {
205  UnsubscribeFromStateChanges();
206 
207  std::lock_guard<std::mutex> lk(fMtx);
208  fDDSSession.UnsubscribeFromCommands();
209  try {
210  for (auto& op : fChangeStateOps) {
211  op.second.Complete(MakeErrorCode(ErrorCode::OperationCanceled));
212  }
213  } catch (...) {}
214  }
215 
216  void SubscribeToStateChanges()
217  {
218  // FAIR_LOG(debug) << "Subscribing to state change";
219  cmd::Cmds cmds(cmd::make<cmd::SubscribeToStateChange>(fHeartbeatInterval.count()));
220  fDDSSession.SendCommand(cmds.Serialize());
221 
222  fHeartbeatsTimer.expires_after(fHeartbeatInterval);
223  fHeartbeatsTimer.async_wait(std::bind(&BasicTopology::SendSubscriptionHeartbeats, this, std::placeholders::_1));
224  }
225 
226  void SendSubscriptionHeartbeats(const std::error_code& ec)
227  {
228  if (!ec) {
229  // Timer expired.
230  fDDSSession.SendCommand(cmd::Cmds(cmd::make<cmd::SubscriptionHeartbeat>(fHeartbeatInterval.count())).Serialize());
231  // schedule again
232  fHeartbeatsTimer.expires_after(fHeartbeatInterval);
233  fHeartbeatsTimer.async_wait(std::bind(&BasicTopology::SendSubscriptionHeartbeats, this, std::placeholders::_1));
234  } else if (ec == asio::error::operation_aborted) {
235  // FAIR_LOG(debug) << "Heartbeats timer canceled";
236  } else {
237  FAIR_LOG(error) << "Timer error: " << ec;
238  }
239  }
240 
241  void UnsubscribeFromStateChanges()
242  {
243  // stop sending heartbeats
244  fHeartbeatsTimer.cancel();
245 
246  // unsubscribe from state changes
247  fDDSSession.SendCommand(cmd::Cmds(cmd::make<cmd::UnsubscribeFromStateChange>()).Serialize());
248 
249  // wait for all tasks to confirm unsubscription
250  std::unique_lock<std::mutex> lk(fMtx);
251  fStateChangeUnsubscriptionCV.wait(lk, [&](){
252  unsigned int count = std::count_if(fStateIndex.cbegin(), fStateIndex.cend(), [=](const auto& s) {
253  return fStateData.at(s.second).subscribed_to_state_changes == false;
254  });
255  return count == fStateIndex.size();
256  });
257  }
258 
259  void SubscribeToCommands()
260  {
261  fDDSSession.SubscribeToCommands([&](const std::string& msg, const std::string& /* condition */, DDSChannel::Id senderId) {
262  cmd::Cmds inCmds;
263  inCmds.Deserialize(msg);
264  // FAIR_LOG(debug) << "Received " << inCmds.Size() << " command(s) with total size of " << msg.length() << " bytes: ";
265 
266  for (const auto& cmd : inCmds) {
267  // FAIR_LOG(debug) << " > " << cmd->GetType();
268  switch (cmd->GetType()) {
269  case cmd::Type::state_change_subscription:
270  HandleCmd(static_cast<cmd::StateChangeSubscription&>(*cmd));
271  break;
272  case cmd::Type::state_change_unsubscription:
273  HandleCmd(static_cast<cmd::StateChangeUnsubscription&>(*cmd));
274  break;
275  case cmd::Type::state_change:
276  HandleCmd(static_cast<cmd::StateChange&>(*cmd), senderId);
277  break;
278  case cmd::Type::transition_status:
279  HandleCmd(static_cast<cmd::TransitionStatus&>(*cmd));
280  break;
281  case cmd::Type::properties:
282  HandleCmd(static_cast<cmd::Properties&>(*cmd));
283  break;
284  case cmd::Type::properties_set:
285  HandleCmd(static_cast<cmd::PropertiesSet&>(*cmd));
286  break;
287  default:
288  FAIR_LOG(warn) << "Unexpected/unknown command received: " << cmd->GetType();
289  FAIR_LOG(warn) << "Origin: " << senderId;
290  break;
291  }
292  }
293  });
294  }
295 
296  auto HandleCmd(cmd::StateChangeSubscription const& cmd) -> void
297  {
298  if (cmd.GetResult() == cmd::Result::Ok) {
299  DDSTask::Id taskId(cmd.GetTaskId());
300 
301  try {
302  std::lock_guard<std::mutex> lk(fMtx);
303  DeviceStatus& task = fStateData.at(fStateIndex.at(taskId));
304  task.subscribed_to_state_changes = true;
305  } catch (const std::exception& e) {
306  FAIR_LOG(error) << "Exception in HandleCmd(cmd::StateChangeSubscription const&): " << e.what();
307  }
308  } else {
309  FAIR_LOG(error) << "State change subscription failed for device: " << cmd.GetDeviceId() << ", task id: " << cmd.GetTaskId();
310  }
311  }
312 
313  auto HandleCmd(cmd::StateChangeUnsubscription const& cmd) -> void
314  {
315  if (cmd.GetResult() == cmd::Result::Ok) {
316  DDSTask::Id taskId(cmd.GetTaskId());
317 
318  try {
319  std::unique_lock<std::mutex> lk(fMtx);
320  DeviceStatus& task = fStateData.at(fStateIndex.at(taskId));
321  task.subscribed_to_state_changes = false;
322  lk.unlock();
323  fStateChangeUnsubscriptionCV.notify_one();
324  } catch (const std::exception& e) {
325  FAIR_LOG(error) << "Exception in HandleCmd(cmd::StateChangeUnsubscription const&): " << e.what();
326  }
327  } else {
328  FAIR_LOG(error) << "State change unsubscription failed for device: " << cmd.GetDeviceId() << ", task id: " << cmd.GetTaskId();
329  }
330  }
331 
332  auto HandleCmd(cmd::StateChange const& cmd, DDSChannel::Id const& senderId) -> void
333  {
334  if (cmd.GetCurrentState() == DeviceState::Exiting) {
335  fDDSSession.SendCommand(cmd::Cmds(cmd::make<cmd::StateChangeExitingReceived>()).Serialize(), senderId);
336  }
337 
338  DDSTask::Id taskId(cmd.GetTaskId());
339 
340  try {
341  std::lock_guard<std::mutex> lk(fMtx);
342  DeviceStatus& task = fStateData.at(fStateIndex.at(taskId));
343  task.lastState = cmd.GetLastState();
344  task.state = cmd.GetCurrentState();
345  // if the task is exiting, it will not respond to unsubscription request anymore, set it to false now.
346  if (task.state == DeviceState::Exiting) {
347  task.subscribed_to_state_changes = false;
348  }
349  // FAIR_LOG(debug) << "Updated state entry: taskId=" << taskId << ", state=" << state;
350 
351  for (auto& op : fChangeStateOps) {
352  op.second.Update(taskId, cmd.GetCurrentState());
353  }
354  for (auto& op : fWaitForStateOps) {
355  op.second.Update(taskId, cmd.GetLastState(), cmd.GetCurrentState());
356  }
357  } catch (const std::exception& e) {
358  FAIR_LOG(error) << "Exception in HandleCmd(cmd::StateChange const&): " << e.what();
359  }
360  }
361 
362  auto HandleCmd(cmd::TransitionStatus const& cmd) -> void
363  {
364  if (cmd.GetResult() != cmd::Result::Ok) {
365  FAIR_LOG(error) << cmd.GetTransition() << " transition failed for " << cmd.GetDeviceId();
366  DDSTask::Id taskId(cmd.GetTaskId());
367  std::lock_guard<std::mutex> lk(fMtx);
368  for (auto& op : fChangeStateOps) {
369  if (!op.second.IsCompleted() && op.second.ContainsTask(taskId) &&
370  fStateData.at(fStateIndex.at(taskId)).state != op.second.GetTargetState()) {
371  op.second.Complete(MakeErrorCode(ErrorCode::DeviceChangeStateFailed));
372  }
373  }
374  }
375  }
376 
377  auto HandleCmd(cmd::Properties const& cmd) -> void
378  {
379  std::unique_lock<std::mutex> lk(fMtx);
380  try {
381  auto& op(fGetPropertiesOps.at(cmd.GetRequestId()));
382  lk.unlock();
383  op.Update(cmd.GetDeviceId(), cmd.GetResult(), cmd.GetProps());
384  } catch (std::out_of_range& e) {
385  FAIR_LOG(debug) << "GetProperties operation (request id: " << cmd.GetRequestId()
386  << ") not found (probably completed or timed out), "
387  << "discarding reply of device " << cmd.GetDeviceId();
388  }
389  }
390 
391  auto HandleCmd(cmd::PropertiesSet const& cmd) -> void
392  {
393  std::unique_lock<std::mutex> lk(fMtx);
394  try {
395  auto& op(fSetPropertiesOps.at(cmd.GetRequestId()));
396  lk.unlock();
397  op.Update(cmd.GetDeviceId(), cmd.GetResult());
398  } catch (std::out_of_range& e) {
399  FAIR_LOG(debug) << "SetProperties operation (request id: " << cmd.GetRequestId()
400  << ") not found (probably completed or timed out), "
401  << "discarding reply of device " << cmd.GetDeviceId();
402  }
403  }
404 
405  using Duration = std::chrono::milliseconds;
406  using ChangeStateCompletionSignature = void(std::error_code, TopologyState);
407 
408  private:
409  struct ChangeStateOp
410  {
411  using Id = std::size_t;
412  using Count = unsigned int;
413 
414  template<typename Handler>
415  ChangeStateOp(Id id,
416  const TopologyTransition transition,
417  std::vector<DDSTask> tasks,
418  TopologyState& stateData,
419  Duration timeout,
420  std::mutex& mutex,
421  Executor const & ex,
422  Allocator const & alloc,
423  Handler&& handler)
424  : fId(id)
425  , fOp(ex, alloc, std::move(handler))
426  , fStateData(stateData)
427  , fTimer(ex)
428  , fCount(0)
429  , fTasks(std::move(tasks))
430  , fTargetState(expectedState.at(transition))
431  , fMtx(mutex)
432  {
433  if (timeout > std::chrono::milliseconds(0)) {
434  fTimer.expires_after(timeout);
435  fTimer.async_wait([&](std::error_code ec) {
436  if (!ec) {
437  std::lock_guard<std::mutex> lk(fMtx);
438  fOp.Timeout(fStateData);
439  }
440  });
441  }
442  }
443  ChangeStateOp() = delete;
444  ChangeStateOp(const ChangeStateOp&) = delete;
445  ChangeStateOp& operator=(const ChangeStateOp&) = delete;
446  ChangeStateOp(ChangeStateOp&&) = default;
447  ChangeStateOp& operator=(ChangeStateOp&&) = default;
448  ~ChangeStateOp() = default;
449 
451  auto ResetCount(const TopologyStateIndex& stateIndex, const TopologyState& stateData) -> void
452  {
453  fCount = std::count_if(stateIndex.cbegin(), stateIndex.cend(), [=](const auto& s) {
454  if (ContainsTask(stateData.at(s.second).taskId)) {
455  return stateData.at(s.second).state == fTargetState;
456  } else {
457  return false;
458  }
459  });
460  }
461 
463  auto Update(const DDSTask::Id taskId, const DeviceState currentState) -> void
464  {
465  if (!fOp.IsCompleted() && ContainsTask(taskId)) {
466  if (currentState == fTargetState) {
467  ++fCount;
468  }
469  TryCompletion();
470  }
471  }
472 
474  auto TryCompletion() -> void
475  {
476  if (!fOp.IsCompleted() && fCount == fTasks.size()) {
477  Complete(std::error_code());
478  }
479  }
480 
482  auto Complete(std::error_code ec) -> void
483  {
484  fTimer.cancel();
485  fOp.Complete(ec, fStateData);
486  }
487 
489  auto ContainsTask(DDSTask::Id id) -> bool
490  {
491  auto it = std::find_if(fTasks.begin(), fTasks.end(), [id](const DDSTask& t) { return t.GetId() == id; });
492  return it != fTasks.end();
493  }
494 
495  bool IsCompleted() { return fOp.IsCompleted(); }
496 
497  auto GetTargetState() const -> DeviceState { return fTargetState; }
498 
499  private:
500  Id const fId;
502  TopologyState& fStateData;
503  asio::steady_timer fTimer;
504  Count fCount;
505  std::vector<DDSTask> fTasks;
506  DeviceState fTargetState;
507  std::mutex& fMtx;
508  };
509 
510  public:
588  template<typename CompletionToken>
589  auto AsyncChangeState(const TopologyTransition transition,
590  const std::string& path,
591  Duration timeout,
592  CompletionToken&& token)
593  {
594  return asio::async_initiate<CompletionToken, ChangeStateCompletionSignature>([&](auto handler) {
595  typename ChangeStateOp::Id const id(tools::UuidHash());
596 
597  std::lock_guard<std::mutex> lk(fMtx);
598 
599  for (auto it = begin(fChangeStateOps); it != end(fChangeStateOps);) {
600  if (it->second.IsCompleted()) {
601  it = fChangeStateOps.erase(it);
602  } else {
603  ++it;
604  }
605  }
606 
607  auto p = fChangeStateOps.emplace(
608  std::piecewise_construct,
609  std::forward_as_tuple(id),
610  std::forward_as_tuple(id,
611  transition,
612  fDDSTopo.GetTasks(path),
613  fStateData,
614  timeout,
615  fMtx,
618  std::move(handler)));
619 
620  cmd::Cmds cmds(cmd::make<cmd::ChangeState>(transition));
621  fDDSSession.SendCommand(cmds.Serialize(), path);
622 
623  p.first->second.ResetCount(fStateIndex, fStateData);
624  // TODO: make sure following operation properly queues the completion and not doing it directly out of initiation call.
625  p.first->second.TryCompletion();
626 
627  },
628  token);
629  }
630 
636  template<typename CompletionToken>
637  auto AsyncChangeState(const TopologyTransition transition, CompletionToken&& token)
638  {
639  return AsyncChangeState(transition, "", Duration(0), std::move(token));
640  }
641 
648  template<typename CompletionToken>
649  auto AsyncChangeState(const TopologyTransition transition, Duration timeout, CompletionToken&& token)
650  {
651  return AsyncChangeState(transition, "", timeout, std::move(token));
652  }
653 
660  template<typename CompletionToken>
661  auto AsyncChangeState(const TopologyTransition transition, const std::string& path, CompletionToken&& token)
662  {
663  return AsyncChangeState(transition, path, Duration(0), std::move(token));
664  }
665 
671  auto ChangeState(const TopologyTransition transition, const std::string& path = "", Duration timeout = Duration(0))
672  -> std::pair<std::error_code, TopologyState>
673  {
674  tools::SharedSemaphore blocker;
675  std::error_code ec;
676  TopologyState state;
677  AsyncChangeState(transition, path, timeout, [&, blocker](std::error_code _ec, TopologyState _state) mutable {
678  ec = _ec;
679  state = _state;
680  blocker.Signal();
681  });
682  blocker.Wait();
683  return {ec, state};
684  }
685 
690  auto ChangeState(const TopologyTransition transition, Duration timeout)
691  -> std::pair<std::error_code, TopologyState>
692  {
693  return ChangeState(transition, "", timeout);
694  }
695 
698  auto GetCurrentState() const -> TopologyState
699  {
700  std::lock_guard<std::mutex> lk(fMtx);
701  return fStateData;
702  }
703 
704  auto AggregateState() const -> DeviceState { return sdk::AggregateState(GetCurrentState()); }
705 
706  auto StateEqualsTo(DeviceState state) const -> bool { return sdk::StateEqualsTo(GetCurrentState(), state); }
707 
708  using WaitForStateCompletionSignature = void(std::error_code);
709 
710  private:
711  struct WaitForStateOp
712  {
713  using Id = std::size_t;
714  using Count = unsigned int;
715 
716  template<typename Handler>
717  WaitForStateOp(Id id,
718  DeviceState targetLastState,
719  DeviceState targetCurrentState,
720  std::vector<DDSTask> tasks,
721  Duration timeout,
722  std::mutex& mutex,
723  Executor const & ex,
724  Allocator const & alloc,
725  Handler&& handler)
726  : fId(id)
727  , fOp(ex, alloc, std::move(handler))
728  , fTimer(ex)
729  , fCount(0)
730  , fTasks(std::move(tasks))
731  , fTargetLastState(targetLastState)
732  , fTargetCurrentState(targetCurrentState)
733  , fMtx(mutex)
734  {
735  if (timeout > std::chrono::milliseconds(0)) {
736  fTimer.expires_after(timeout);
737  fTimer.async_wait([&](std::error_code ec) {
738  if (!ec) {
739  std::lock_guard<std::mutex> lk(fMtx);
740  fOp.Timeout();
741  }
742  });
743  }
744  }
745  WaitForStateOp() = delete;
746  WaitForStateOp(const WaitForStateOp&) = delete;
747  WaitForStateOp& operator=(const WaitForStateOp&) = delete;
748  WaitForStateOp(WaitForStateOp&&) = default;
749  WaitForStateOp& operator=(WaitForStateOp&&) = default;
750  ~WaitForStateOp() = default;
751 
753  auto ResetCount(const TopologyStateIndex& stateIndex, const TopologyState& stateData) -> void
754  {
755  fCount = std::count_if(stateIndex.cbegin(), stateIndex.cend(), [=](const auto& s) {
756  if (ContainsTask(stateData.at(s.second).taskId)) {
757  return stateData.at(s.second).state == fTargetCurrentState
758  &&
759  (stateData.at(s.second).lastState == fTargetLastState || fTargetLastState == DeviceState::Ok);
760  } else {
761  return false;
762  }
763  });
764  }
765 
767  auto Update(const DDSTask::Id taskId, const DeviceState lastState, const DeviceState currentState) -> void
768  {
769  if (!fOp.IsCompleted() && ContainsTask(taskId)) {
770  if (currentState == fTargetCurrentState &&
771  (lastState == fTargetLastState ||
772  fTargetLastState == DeviceState::Ok)) {
773  ++fCount;
774  }
775  TryCompletion();
776  }
777  }
778 
780  auto TryCompletion() -> void
781  {
782  if (!fOp.IsCompleted() && fCount == fTasks.size()) {
783  fTimer.cancel();
784  fOp.Complete();
785  }
786  }
787 
788  bool IsCompleted() { return fOp.IsCompleted(); }
789 
790  private:
791  Id const fId;
793  asio::steady_timer fTimer;
794  Count fCount;
795  std::vector<DDSTask> fTasks;
796  DeviceState fTargetLastState;
797  DeviceState fTargetCurrentState;
798  std::mutex& fMtx;
799 
801  auto ContainsTask(DDSTask::Id id) -> bool
802  {
803  auto it = std::find_if(fTasks.begin(), fTasks.end(), [id](const DDSTask& t) { return t.GetId() == id; });
804  return it != fTasks.end();
805  }
806  };
807 
808  public:
817  template<typename CompletionToken>
818  auto AsyncWaitForState(const DeviceState targetLastState,
819  const DeviceState targetCurrentState,
820  const std::string& path,
821  Duration timeout,
822  CompletionToken&& token)
823  {
824  return asio::async_initiate<CompletionToken, WaitForStateCompletionSignature>([&](auto handler) {
825  typename GetPropertiesOp::Id const id(tools::UuidHash());
826 
827  std::lock_guard<std::mutex> lk(fMtx);
828 
829  for (auto it = begin(fWaitForStateOps); it != end(fWaitForStateOps);) {
830  if (it->second.IsCompleted()) {
831  it = fWaitForStateOps.erase(it);
832  } else {
833  ++it;
834  }
835  }
836 
837  auto p = fWaitForStateOps.emplace(
838  std::piecewise_construct,
839  std::forward_as_tuple(id),
840  std::forward_as_tuple(id,
841  targetLastState,
842  targetCurrentState,
843  fDDSTopo.GetTasks(path),
844  timeout,
845  fMtx,
848  std::move(handler)));
849  p.first->second.ResetCount(fStateIndex, fStateData);
850  // TODO: make sure following operation properly queues the completion and not doing it directly out of initiation call.
851  p.first->second.TryCompletion();
852  },
853  token);
854  }
855 
862  template<typename CompletionToken>
863  auto AsyncWaitForState(const DeviceState targetLastState, const DeviceState targetCurrentState, CompletionToken&& token)
864  {
865  return AsyncWaitForState(targetLastState, targetCurrentState, "", Duration(0), std::move(token));
866  }
867 
873  template<typename CompletionToken>
874  auto AsyncWaitForState(const DeviceState targetCurrentState, CompletionToken&& token)
875  {
876  return AsyncWaitForState(DeviceState::Ok, targetCurrentState, "", Duration(0), std::move(token));
877  }
878 
885  auto WaitForState(const DeviceState targetLastState, const DeviceState targetCurrentState, const std::string& path = "", Duration timeout = Duration(0))
886  -> std::error_code
887  {
888  tools::SharedSemaphore blocker;
889  std::error_code ec;
890  AsyncWaitForState(targetLastState, targetCurrentState, path, timeout, [&, blocker](std::error_code _ec) mutable {
891  ec = _ec;
892  blocker.Signal();
893  });
894  blocker.Wait();
895  return ec;
896  }
897 
903  auto WaitForState(const DeviceState targetCurrentState, const std::string& path = "", Duration timeout = Duration(0))
904  -> std::error_code
905  {
906  return WaitForState(DeviceState::Ok, targetCurrentState, path, timeout);
907  }
908 
909  using GetPropertiesCompletionSignature = void(std::error_code, GetPropertiesResult);
910 
911  private:
912  struct GetPropertiesOp
913  {
914  using Id = std::size_t;
915  using GetCount = unsigned int;
916 
917  template<typename Handler>
918  GetPropertiesOp(Id id,
919  GetCount expectedCount,
920  Duration timeout,
921  std::mutex& mutex,
922  Executor const & ex,
923  Allocator const & alloc,
924  Handler&& handler)
925  : fId(id)
926  , fOp(ex, alloc, std::move(handler))
927  , fTimer(ex)
928  , fCount(0)
929  , fExpectedCount(expectedCount)
930  , fMtx(mutex)
931  {
932  if (timeout > std::chrono::milliseconds(0)) {
933  fTimer.expires_after(timeout);
934  fTimer.async_wait([&](std::error_code ec) {
935  if (!ec) {
936  std::lock_guard<std::mutex> lk(fMtx);
937  fOp.Timeout(fResult);
938  }
939  });
940  }
941  // FAIR_LOG(debug) << "GetProperties " << fId << " with expected count of " << fExpectedCount << " started.";
942  }
943  GetPropertiesOp() = delete;
944  GetPropertiesOp(const GetPropertiesOp&) = delete;
945  GetPropertiesOp& operator=(const GetPropertiesOp&) = delete;
946  GetPropertiesOp(GetPropertiesOp&&) = default;
947  GetPropertiesOp& operator=(GetPropertiesOp&&) = default;
948  ~GetPropertiesOp() = default;
949 
950  auto Update(const std::string& deviceId, cmd::Result result, DeviceProperties props) -> void
951  {
952  std::lock_guard<std::mutex> lk(fMtx);
953  if (cmd::Result::Ok != result) {
954  fResult.failed.insert(deviceId);
955  } else {
956  fResult.devices.insert({deviceId, {std::move(props)}});
957  }
958  ++fCount;
959  TryCompletion();
960  }
961 
962  bool IsCompleted() { return fOp.IsCompleted(); }
963 
964  private:
965  Id const fId;
967  asio::steady_timer fTimer;
968  GetCount fCount;
969  GetCount const fExpectedCount;
970  GetPropertiesResult fResult;
971  std::mutex& fMtx;
972 
974  auto TryCompletion() -> void
975  {
976  if (!fOp.IsCompleted() && fCount == fExpectedCount) {
977  fTimer.cancel();
978  if (fResult.failed.size() > 0) {
979  fOp.Complete(MakeErrorCode(ErrorCode::DeviceGetPropertiesFailed), std::move(fResult));
980  } else {
981  fOp.Complete(std::move(fResult));
982  }
983  }
984  }
985  };
986 
987  public:
995  template<typename CompletionToken>
996  auto AsyncGetProperties(DevicePropertyQuery const& query,
997  const std::string& path,
998  Duration timeout,
999  CompletionToken&& token)
1000  {
1001  return asio::async_initiate<CompletionToken, GetPropertiesCompletionSignature>(
1002  [&](auto handler) {
1003  typename GetPropertiesOp::Id const id(tools::UuidHash());
1004 
1005  std::lock_guard<std::mutex> lk(fMtx);
1006 
1007  for (auto it = begin(fGetPropertiesOps); it != end(fGetPropertiesOps);) {
1008  if (it->second.IsCompleted()) {
1009  it = fGetPropertiesOps.erase(it);
1010  } else {
1011  ++it;
1012  }
1013  }
1014 
1015  fGetPropertiesOps.emplace(
1016  std::piecewise_construct,
1017  std::forward_as_tuple(id),
1018  std::forward_as_tuple(id,
1019  fDDSTopo.GetTasks(path).size(),
1020  timeout,
1021  fMtx,
1024  std::move(handler)));
1025 
1026  cmd::Cmds const cmds(cmd::make<cmd::GetProperties>(id, query));
1027  fDDSSession.SendCommand(cmds.Serialize(), path);
1028  },
1029  token);
1030  }
1031 
1037  template<typename CompletionToken>
1038  auto AsyncGetProperties(DevicePropertyQuery const& query, CompletionToken&& token)
1039  {
1040  return AsyncGetProperties(query, "", Duration(0), std::move(token));
1041  }
1042 
1048  auto GetProperties(DevicePropertyQuery const& query, const std::string& path = "", Duration timeout = Duration(0))
1049  -> std::pair<std::error_code, GetPropertiesResult>
1050  {
1051  tools::SharedSemaphore blocker;
1052  std::error_code ec;
1053  GetPropertiesResult result;
1054  AsyncGetProperties(query, path, timeout, [&, blocker](std::error_code _ec, GetPropertiesResult _result) mutable {
1055  ec = _ec;
1056  result = _result;
1057  blocker.Signal();
1058  });
1059  blocker.Wait();
1060  return {ec, result};
1061  }
1062 
1063  using SetPropertiesCompletionSignature = void(std::error_code, FailedDevices);
1064 
1065  private:
1066  struct SetPropertiesOp
1067  {
1068  using Id = std::size_t;
1069  using SetCount = unsigned int;
1070 
1071  template<typename Handler>
1072  SetPropertiesOp(Id id,
1073  SetCount expectedCount,
1074  Duration timeout,
1075  std::mutex& mutex,
1076  Executor const & ex,
1077  Allocator const & alloc,
1078  Handler&& handler)
1079  : fId(id)
1080  , fOp(ex, alloc, std::move(handler))
1081  , fTimer(ex)
1082  , fCount(0)
1083  , fExpectedCount(expectedCount)
1084  , fFailedDevices()
1085  , fMtx(mutex)
1086  {
1087  if (timeout > std::chrono::milliseconds(0)) {
1088  fTimer.expires_after(timeout);
1089  fTimer.async_wait([&](std::error_code ec) {
1090  if (!ec) {
1091  std::lock_guard<std::mutex> lk(fMtx);
1092  fOp.Timeout(fFailedDevices);
1093  }
1094  });
1095  }
1096  // FAIR_LOG(debug) << "SetProperties " << fId << " with expected count of " << fExpectedCount << " started.";
1097  }
1098  SetPropertiesOp() = delete;
1099  SetPropertiesOp(const SetPropertiesOp&) = delete;
1100  SetPropertiesOp& operator=(const SetPropertiesOp&) = delete;
1101  SetPropertiesOp(SetPropertiesOp&&) = default;
1102  SetPropertiesOp& operator=(SetPropertiesOp&&) = default;
1103  ~SetPropertiesOp() = default;
1104 
1105  auto Update(const std::string& deviceId, cmd::Result result) -> void
1106  {
1107  std::lock_guard<std::mutex> lk(fMtx);
1108  if (cmd::Result::Ok != result) {
1109  fFailedDevices.insert(deviceId);
1110  }
1111  ++fCount;
1112  TryCompletion();
1113  }
1114 
1115  bool IsCompleted() { return fOp.IsCompleted(); }
1116 
1117  private:
1118  Id const fId;
1120  asio::steady_timer fTimer;
1121  SetCount fCount;
1122  SetCount const fExpectedCount;
1123  FailedDevices fFailedDevices;
1124  std::mutex& fMtx;
1125 
1127  auto TryCompletion() -> void
1128  {
1129  if (!fOp.IsCompleted() && fCount == fExpectedCount) {
1130  fTimer.cancel();
1131  if (fFailedDevices.size() > 0) {
1132  fOp.Complete(MakeErrorCode(ErrorCode::DeviceSetPropertiesFailed), fFailedDevices);
1133  } else {
1134  fOp.Complete(fFailedDevices);
1135  }
1136  }
1137  }
1138  };
1139 
1140  public:
1148  template<typename CompletionToken>
1149  auto AsyncSetProperties(const DeviceProperties& props,
1150  const std::string& path,
1151  Duration timeout,
1152  CompletionToken&& token)
1153  {
1154  return asio::async_initiate<CompletionToken, SetPropertiesCompletionSignature>(
1155  [&](auto handler) {
1156  typename SetPropertiesOp::Id const id(tools::UuidHash());
1157 
1158  std::lock_guard<std::mutex> lk(fMtx);
1159 
1160  for (auto it = begin(fGetPropertiesOps); it != end(fGetPropertiesOps);) {
1161  if (it->second.IsCompleted()) {
1162  it = fGetPropertiesOps.erase(it);
1163  } else {
1164  ++it;
1165  }
1166  }
1167 
1168  fSetPropertiesOps.emplace(
1169  std::piecewise_construct,
1170  std::forward_as_tuple(id),
1171  std::forward_as_tuple(id,
1172  fDDSTopo.GetTasks(path).size(),
1173  timeout,
1174  fMtx,
1177  std::move(handler)));
1178 
1179  cmd::Cmds const cmds(cmd::make<cmd::SetProperties>(id, props));
1180  fDDSSession.SendCommand(cmds.Serialize(), path);
1181  },
1182  token);
1183  }
1184 
1190  template<typename CompletionToken>
1191  auto AsyncSetProperties(DeviceProperties const & props, CompletionToken&& token)
1192  {
1193  return AsyncSetProperties(props, "", Duration(0), std::move(token));
1194  }
1195 
1201  auto SetProperties(DeviceProperties const& properties, const std::string& path = "", Duration timeout = Duration(0))
1202  -> std::pair<std::error_code, FailedDevices>
1203  {
1204  tools::SharedSemaphore blocker;
1205  std::error_code ec;
1206  FailedDevices failed;
1207  AsyncSetProperties(properties, path, timeout, [&, blocker](std::error_code _ec, FailedDevices _failed) mutable {
1208  ec = _ec;
1209  failed = _failed;
1210  blocker.Signal();
1211  });
1212  blocker.Wait();
1213  return {ec, failed};
1214  }
1215 
1216  Duration GetHeartbeatInterval() const { return fHeartbeatInterval; }
1217  void SetHeartbeatInterval(Duration duration) { fHeartbeatInterval = duration; }
1218 
1219  private:
1220  using TransitionedCount = unsigned int;
1221 
1222  DDSSession fDDSSession;
1223  DDSTopology fDDSTopo;
1224  TopologyState fStateData;
1225  TopologyStateIndex fStateIndex;
1226 
1227  mutable std::mutex fMtx;
1228 
1229  std::condition_variable fStateChangeUnsubscriptionCV;
1230  asio::steady_timer fHeartbeatsTimer;
1231  Duration fHeartbeatInterval;
1232 
1233  std::unordered_map<typename ChangeStateOp::Id, ChangeStateOp> fChangeStateOps;
1234  std::unordered_map<typename WaitForStateOp::Id, WaitForStateOp> fWaitForStateOps;
1235  std::unordered_map<typename SetPropertiesOp::Id, SetPropertiesOp> fSetPropertiesOps;
1236  std::unordered_map<typename GetPropertiesOp::Id, GetPropertiesOp> fGetPropertiesOps;
1237 
1238  auto makeTopologyState() -> void
1239  {
1240  fStateData.reserve(fDDSTopo.GetTasks().size());
1241 
1242  int index = 0;
1243 
1244  for (const auto& task : fDDSTopo.GetTasks()) {
1245  fStateData.push_back(DeviceStatus{false, DeviceState::Ok, DeviceState::Ok, task.GetId(), task.GetCollectionId()});
1246  fStateIndex.emplace(task.GetId(), index);
1247  index++;
1248  }
1249  }
1250 
1252  auto GetCurrentStateUnsafe() const -> TopologyState
1253  {
1254  return fStateData;
1255  }
1256 };
1257 
1259 using Topo = Topology;
1260 
1265 auto MakeTopology(dds::topology_api::CTopology nativeTopo,
1266  std::shared_ptr<dds::tools_api::CSession> nativeSession,
1267  DDSEnv env = {}) -> Topology;
1268 
1269 } // namespace sdk
1270 } // namespace mq
1271 } // namespace fair
1272 
1273 #endif /* FAIR_MQ_SDK_TOPOLOGY_H */
Represents a FairMQ topology.
Definition: Topology.h:154
Represents a DDS session.
Definition: DDSSession.h:56
auto AsyncChangeState(const TopologyTransition transition, CompletionToken &&token)
Initiate state transition on all FairMQ devices in this topology.
Definition: Topology.h:637
Definition: Topology.h:87
auto GetExecutor() const noexcept -> ExecutorType
Get associated I/O executor.
Definition: AsioBase.h:41
auto AsyncSetProperties(DeviceProperties const &props, CompletionToken &&token)
Initiate property update on selected FairMQ devices in this topology.
Definition: Topology.h:1191
Definition: Commands.h:189
auto GetAllocator() const noexcept -> AllocatorType
Get associated default allocator.
Definition: AsioBase.h:46
Base for creating Asio-enabled I/O objects.
Definition: AsioBase.h:35
Definition: Error.h:56
auto AsyncWaitForState(const DeviceState targetCurrentState, CompletionToken &&token)
Initiate waiting for selected FairMQ devices to reach given current state in this topology...
Definition: Topology.h:874
auto AsyncWaitForState(const DeviceState targetLastState, const DeviceState targetCurrentState, CompletionToken &&token)
Initiate waiting for selected FairMQ devices to reach given last & current state in this topology...
Definition: Topology.h:863
auto WaitForState(const DeviceState targetLastState, const DeviceState targetCurrentState, const std::string &path="", Duration timeout=Duration(0)) -> std::error_code
Wait for selected FairMQ devices to reach given last & current state in this topology.
Definition: Topology.h:885
auto AsyncWaitForState(const DeviceState targetLastState, const DeviceState targetCurrentState, const std::string &path, Duration timeout, CompletionToken &&token)
Initiate waiting for selected FairMQ devices to reach given last & current state in this topology...
Definition: Topology.h:818
Represents a DDS task.
Definition: DDSTask.h:25
auto AsyncChangeState(const TopologyTransition transition, Duration timeout, CompletionToken &&token)
Initiate state transition on all FairMQ devices in this topology with a timeout.
Definition: Topology.h:649
Definition: Commands.h:303
auto AsyncGetProperties(DevicePropertyQuery const &query, CompletionToken &&token)
Initiate property query on selected FairMQ devices in this topology.
Definition: Topology.h:1038
auto AsyncChangeState(const TopologyTransition transition, const std::string &path, Duration timeout, CompletionToken &&token)
Initiate state transition on all FairMQ devices in this topology.
Definition: Topology.h:589
auto ChangeState(const TopologyTransition transition, const std::string &path="", Duration timeout=Duration(0)) -> std::pair< std::error_code, TopologyState >
Perform state transition on FairMQ devices in this topology for a specified topology path...
Definition: Topology.h:671
auto ChangeState(const TopologyTransition transition, Duration timeout) -> std::pair< std::error_code, TopologyState >
Perform state transition on all FairMQ devices in this topology with a timeout.
Definition: Topology.h:690
Definition: Topology.h:73
Definition: Error.h:28
auto AsyncSetProperties(const DeviceProperties &props, const std::string &path, Duration timeout, CompletionToken &&token)
Initiate property update on selected FairMQ devices in this topology.
Definition: Topology.h:1149
auto AsyncGetProperties(DevicePropertyQuery const &query, const std::string &path, Duration timeout, CompletionToken &&token)
Initiate property query on selected FairMQ devices in this topology.
Definition: Topology.h:996
auto GetTasks(const std::string &="") const -> std::vector< DDSTask >
Get list of tasks in this topology, optionally matching provided path.
Definition: DDSTopology.cxx:67
Definition: Commands.h:329
Definition: Commands.h:356
A simple copyable blocking semaphore.
Definition: Semaphore.h:45
auto GetCurrentState() const -> TopologyState
Returns the current state of the topology.
Definition: Topology.h:698
BasicTopology(const Executor &ex, DDSTopology topo, DDSSession session, Allocator alloc=DefaultAllocator())
(Re)Construct a FairMQ topology from an existing DDS topology
Definition: Topology.h:169
auto GetProperties(DevicePropertyQuery const &query, const std::string &path="", Duration timeout=Duration(0)) -> std::pair< std::error_code, GetPropertiesResult >
Query properties on selected FairMQ devices in this topology.
Definition: Topology.h:1048
Definition: Error.h:20
Definition: Commands.h:277
Tools for interfacing containers to the transport via polymorphic allocators.
Definition: DeviceRunner.h:23
auto SetProperties(DeviceProperties const &properties, const std::string &path="", Duration timeout=Duration(0)) -> std::pair< std::error_code, FailedDevices >
Set properties on selected FairMQ devices in this topology.
Definition: Topology.h:1201
BasicTopology(DDSTopology topo, DDSSession session)
(Re)Construct a FairMQ topology from an existing DDS topology
Definition: Topology.h:160
auto WaitForState(const DeviceState targetCurrentState, const std::string &path="", Duration timeout=Duration(0)) -> std::error_code
Wait for selected FairMQ devices to reach given current state in this topology.
Definition: Topology.h:903
Sets up the DDS environment (object helper)
Definition: DDSEnvironment.h:24
Definition: Traits.h:16
Represents a DDS topology.
Definition: DDSTopology.h:29
auto AsyncChangeState(const TopologyTransition transition, const std::string &path, CompletionToken &&token)
Initiate state transition on all FairMQ devices in this topology with a timeout.
Definition: Topology.h:661

privacy