FairMQ  1.4.33
C++ Message Queuing Library and Framework
AsioAsyncOp.h
1 /********************************************************************************
2  * Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
8 
9 #ifndef FAIR_MQ_SDK_ASIOASYNCOP_H
10 #define FAIR_MQ_SDK_ASIOASYNCOP_H
11 
12 #include <asio/associated_allocator.hpp>
13 #include <asio/associated_executor.hpp>
14 #include <asio/executor_work_guard.hpp>
15 #include <asio/dispatch.hpp>
16 #include <asio/system_executor.hpp>
17 #include <chrono>
18 #include <exception>
19 #include <fairmq/sdk/Error.h>
20 #include <fairmq/sdk/Traits.h>
21 #include <functional>
22 #include <memory>
23 #include <system_error>
24 #include <type_traits>
25 #include <utility>
26 
27 #include <fairlogger/Logger.h>
28 #ifndef FAIR_LOG
29 #define FAIR_LOG LOG
30 #endif /* ifndef FAIR_LOG */
31 
32 namespace fair::mq::sdk
33 {
34 
35 template<typename... SignatureArgTypes>
37 {
38  virtual auto Complete(std::error_code, SignatureArgTypes...) -> void = 0;
39  virtual auto IsCompleted() const -> bool = 0;
40 };
41 
46 template<typename Executor1, typename Allocator1, typename Handler, typename... SignatureArgTypes>
47 struct AsioAsyncOpImpl : AsioAsyncOpImplBase<SignatureArgTypes...>
48 {
50  using Allocator2 = typename asio::associated_allocator<Handler, Allocator1>::type;
51 
53  using Executor2 = typename asio::associated_executor<Handler, Executor1>::type;
54 
56  AsioAsyncOpImpl(const Executor1& ex1, Allocator1 alloc1, Handler&& handler)
57  : fWork1(ex1)
58  , fWork2(asio::get_associated_executor(handler, ex1))
59  , fHandler(std::move(handler))
60  , fAlloc1(std::move(alloc1))
61  {}
62 
63  auto GetAlloc2() const -> Allocator2 { return asio::get_associated_allocator(fHandler, fAlloc1); }
64  auto GetEx2() const -> Executor2 { return asio::get_associated_executor(fWork2); }
65 
66  auto Complete(std::error_code ec, SignatureArgTypes... args) -> void override
67  {
68  if (IsCompleted()) {
69  throw RuntimeError("Async operation already completed");
70  }
71 
72  asio::dispatch(GetEx2(),
73  [=, handler = std::move(fHandler)]() mutable {
74  try {
75  handler(ec, args...);
76  } catch (const std::exception& e) {
77  FAIR_LOG(error) << "Uncaught exception in AsioAsyncOp completion handler: " << e.what();
78  } catch (...) {
79  FAIR_LOG(error) << "Unknown uncaught exception in AsioAsyncOp completion handler.";
80  }
81  });
82 
83  fWork1.reset();
84  fWork2.reset();
85  }
86 
87  auto IsCompleted() const -> bool override
88  {
89  return !fWork1.owns_work() && !fWork2.owns_work();
90  }
91 
92  private:
94  asio::executor_work_guard<Executor1> fWork1;
95  asio::executor_work_guard<Executor2> fWork2;
96  Handler fHandler;
97  Allocator1 fAlloc1;
98 };
99 
113 template<typename Executor, typename Allocator, typename CompletionSignature>
115 {
116 };
117 
127 template<typename Executor,
128  typename Allocator,
129  typename SignatureReturnType,
130  typename SignatureFirstArgType,
131  typename... SignatureArgTypes>
132 struct AsioAsyncOp<Executor,
133  Allocator,
134  SignatureReturnType(SignatureFirstArgType, SignatureArgTypes...)>
135 {
136  static_assert(std::is_void<SignatureReturnType>::value,
137  "return value of CompletionSignature must be void");
138  static_assert(std::is_same<SignatureFirstArgType, std::error_code>::value,
139  "first argument of CompletionSignature must be std::error_code");
140  using Duration = std::chrono::milliseconds;
141 
142  private:
143  using Impl = AsioAsyncOpImplBase<SignatureArgTypes...>;
144  using ImplPtr = std::unique_ptr<Impl, std::function<void(Impl*)>>;
145  ImplPtr fImpl;
146 
147  public:
150  : fImpl(nullptr)
151  {}
152 
154  template<typename Handler>
155  AsioAsyncOp(Executor ex1, Allocator alloc1, Handler&& handler)
156  : AsioAsyncOp()
157  {
158  // Async operation type to be allocated and constructed
159  using Op = AsioAsyncOpImpl<Executor, Allocator, Handler, SignatureArgTypes...>;
160 
161  // Create allocator for concrete op type
162  // Allocator2, see https://www.boost.org/doc/libs/1_70_0/doc/html/boost_asio/reference/asynchronous_operations.html#boost_asio.reference.asynchronous_operations.allocation_of_intermediate_storage
163  using OpAllocator =
164  typename std::allocator_traits<typename Op::Allocator2>::template rebind_alloc<Op>;
165  OpAllocator opAlloc;
166 
167  // Allocate memory
168  auto mem(std::allocator_traits<OpAllocator>::allocate(opAlloc, 1));
169 
170  // Construct object
171  auto ptr(new (mem) Op(std::move(ex1),
172  std::move(alloc1),
173  std::forward<Handler>(handler)));
174 
175  // Assign ownership to this object
176  fImpl = ImplPtr(ptr, [opAlloc](Impl* p) mutable {
177  std::allocator_traits<OpAllocator>::deallocate(opAlloc, static_cast<Op*>(p), 1);
178  });
179  }
180 
182  template<typename Handler>
183  AsioAsyncOp(Executor ex1, Handler&& handler)
184  : AsioAsyncOp(std::move(ex1), Allocator(), std::forward<Handler>(handler))
185  {}
186 
188  template<typename Handler>
189  explicit AsioAsyncOp(Handler&& handler)
190  : AsioAsyncOp(asio::system_executor(), std::forward<Handler>(handler))
191  {}
192 
193  auto IsCompleted() -> bool { return (fImpl == nullptr) || fImpl->IsCompleted(); }
194 
195  auto Complete(std::error_code ec, SignatureArgTypes... args) -> void
196  {
197  if(IsCompleted()) {
198  throw RuntimeError("Async operation already completed");
199  }
200 
201  fImpl->Complete(ec, args...);
202  fImpl.reset(nullptr);
203  }
204 
205  auto Complete(SignatureArgTypes... args) -> void
206  {
207  Complete(std::error_code(), args...);
208  }
209 
210  auto Cancel(SignatureArgTypes... args) -> void
211  {
212  Complete(MakeErrorCode(ErrorCode::OperationCanceled), args...);
213  }
214 
215  auto Timeout(SignatureArgTypes... args) -> void
216  {
217  Complete(MakeErrorCode(ErrorCode::OperationTimeout), args...);
218  }
219 };
220 
221 } // namespace fair::mq::sdk
222 
223 #endif /* FAIR_MQ_SDK_ASIOASYNCOP_H */
fair::mq::sdk::AsioAsyncOp< Executor, Allocator, SignatureReturnType(SignatureFirstArgType, SignatureArgTypes...)>::AsioAsyncOp
AsioAsyncOp()
Default Ctor.
Definition: AsioAsyncOp.h:149
fair::mq::sdk::AsioAsyncOp
Interface for Asio-compliant asynchronous operation, see https://www.boost.org/doc/libs/1_70_0/doc/ht...
Definition: AsioAsyncOp.h:115
fair::mq::sdk::AsioAsyncOpImpl
Definition: AsioAsyncOp.h:48
fair::mq::sdk::AsioAsyncOpImpl::Allocator2
typename asio::associated_allocator< Handler, Allocator1 >::type Allocator2
See https://www.boost.org/doc/libs/1_70_0/doc/html/boost_asio/reference/asynchronous_operations....
Definition: AsioAsyncOp.h:50
fair::mq::sdk::AsioAsyncOp< Executor, Allocator, SignatureReturnType(SignatureFirstArgType, SignatureArgTypes...)>::AsioAsyncOp
AsioAsyncOp(Executor ex1, Handler &&handler)
Ctor with handler #2.
Definition: AsioAsyncOp.h:183
fair::mq::sdk::AsioAsyncOpImpl::Executor2
typename asio::associated_executor< Handler, Executor1 >::type Executor2
See https://www.boost.org/doc/libs/1_70_0/doc/html/boost_asio/reference/asynchronous_operations....
Definition: AsioAsyncOp.h:53
fair::mq::sdk::AsioAsyncOp< Executor, Allocator, SignatureReturnType(SignatureFirstArgType, SignatureArgTypes...)>::AsioAsyncOp
AsioAsyncOp(Handler &&handler)
Ctor with handler #3.
Definition: AsioAsyncOp.h:189
fair::mq::sdk::AsioAsyncOp< Executor, Allocator, SignatureReturnType(SignatureFirstArgType, SignatureArgTypes...)>::AsioAsyncOp
AsioAsyncOp(Executor ex1, Allocator alloc1, Handler &&handler)
Ctor with handler.
Definition: AsioAsyncOp.h:155
fair::mq::sdk::AsioAsyncOpImpl::AsioAsyncOpImpl
AsioAsyncOpImpl(const Executor1 &ex1, Allocator1 alloc1, Handler &&handler)
Ctor.
Definition: AsioAsyncOp.h:56
fair::mq::sdk::AsioAsyncOpImplBase
Definition: AsioAsyncOp.h:37

privacy