FairMQ  1.4.14
C++ Message Queuing Library and Framework
Socket.h
1 /********************************************************************************
2  * Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
8 
9 #ifndef FAIR_MQ_OFI_SOCKET_H
10 #define FAIR_MQ_OFI_SOCKET_H
11 
12 #include <FairMQSocket.h>
13 #include <FairMQMessage.h>
14 #include <fairmq/ofi/Context.h>
15 #include <fairmq/ofi/ControlMessages.h>
16 
17 #include <asiofi/connected_endpoint.hpp>
18 #include <asiofi/memory_resources.hpp>
19 #include <asiofi/passive_endpoint.hpp>
20 #include <asiofi/semaphore.hpp>
21 #include <boost/asio.hpp>
22 #include <memory> // unique_ptr
23 #include <mutex>
24 
25 
26 namespace fair
27 {
28 namespace mq
29 {
30 namespace ofi
31 {
32 
39 class Socket final : public fair::mq::Socket
40 {
41  public:
42  Socket(Context& context, const std::string& type, const std::string& name, const std::string& id = "");
43  Socket(const Socket&) = delete;
44  Socket operator=(const Socket&) = delete;
45 
46  auto GetId() const -> std::string override { return fId; }
47 
48  auto Bind(const std::string& address) -> bool override;
49  auto Connect(const std::string& address) -> bool override;
50 
51  auto Send(MessagePtr& msg, int timeout = 0) -> int override;
52  auto Receive(MessagePtr& msg, int timeout = 0) -> int override;
53  auto Send(std::vector<MessagePtr>& msgVec, int timeout = 0) -> int64_t override;
54  auto Receive(std::vector<MessagePtr>& msgVec, int timeout = 0) -> int64_t override;
55 
56  auto GetSocket() const -> void* { return nullptr; }
57 
58  void SetLinger(const int value) override;
59  int GetLinger() const override;
60  void SetSndBufSize(const int value) override;
61  int GetSndBufSize() const override;
62  void SetRcvBufSize(const int value) override;
63  int GetRcvBufSize() const override;
64  void SetSndKernelSize(const int value) override;
65  int GetSndKernelSize() const override;
66  void SetRcvKernelSize(const int value) override;
67  int GetRcvKernelSize() const override;
68 
69  auto Close() -> void override;
70 
71  auto SetOption(const std::string& option, const void* value, size_t valueSize) -> void override;
72  auto GetOption(const std::string& option, void* value, size_t* valueSize) -> void override;
73 
74  auto GetBytesTx() const -> unsigned long override { return fBytesTx; }
75  auto GetBytesRx() const -> unsigned long override { return fBytesRx; }
76  auto GetMessagesTx() const -> unsigned long override { return fMessagesTx; }
77  auto GetMessagesRx() const -> unsigned long override { return fMessagesRx; }
78 
79  static auto GetConstant(const std::string& constant) -> int;
80 
81  ~Socket() override;
82 
83  private:
84  Context& fContext;
85  asiofi::allocated_pool_resource fControlMemPool;
86  std::unique_ptr<asiofi::info> fOfiInfo;
87  std::unique_ptr<asiofi::fabric> fOfiFabric;
88  std::unique_ptr<asiofi::domain> fOfiDomain;
89  std::unique_ptr<asiofi::passive_endpoint> fPassiveEndpoint;
90  std::unique_ptr<asiofi::connected_endpoint> fDataEndpoint, fControlEndpoint;
91  std::string fId;
92  std::atomic<unsigned long> fBytesTx;
93  std::atomic<unsigned long> fBytesRx;
94  std::atomic<unsigned long> fMessagesTx;
95  std::atomic<unsigned long> fMessagesRx;
96  Address fRemoteAddr;
97  Address fLocalAddr;
98  int fSndTimeout;
99  int fRcvTimeout;
100  std::mutex fSendQueueMutex, fRecvQueueMutex;
101  std::queue<std::vector<MessagePtr>> fSendQueue, fRecvQueue;
102  std::vector<MessagePtr> fInflightMultiPartMessage;
103  int64_t fMultiPartRecvCounter;
104  asiofi::synchronized_semaphore fSendPushSem, fSendPopSem, fRecvPushSem, fRecvPopSem;
105  std::atomic<bool> fNeedOfiMemoryRegistration;
106 
107  auto InitOfi(Address addr) -> void;
108  auto BindControlEndpoint() -> void;
109  auto BindDataEndpoint() -> void;
110  enum class Band { Control, Data };
111  auto ConnectEndpoint(std::unique_ptr<asiofi::connected_endpoint>& endpoint, Band type) -> void;
112  auto SendQueueReader() -> void;
113  auto SendQueueReaderStatic() -> void;
114  auto RecvControlQueueReader() -> void;
115  auto RecvQueueReaderStatic() -> void;
116  auto OnRecvControl(ofi::unique_ptr<ControlMessage> ctrl) -> void;
117  auto DataMessageReceived(MessagePtr msg) -> void;
118 }; /* class Socket */
119 
120 struct SilentSocketError : SocketError { using SocketError::SocketError; };
121 
122 } /* namespace ofi */
123 } /* namespace mq */
124 } /* namespace fair */
125 
126 #endif /* FAIR_MQ_OFI_SOCKET_H */
Transport-wide context.
Definition: Context.h:56
Definition: Context.h:36
Definition: FairMQSocket.h:74
Definition: Socket.h:39
Definition: FairMQSocket.h:19
Tools for interfacing containers to the transport via polymorphic allocators.
Definition: DeviceRunner.h:23
Definition: Socket.h:120

privacy