9 #ifndef FAIR_MQ_OFI_SOCKET_H 10 #define FAIR_MQ_OFI_SOCKET_H 12 #include <FairMQSocket.h> 13 #include <FairMQMessage.h> 14 #include <fairmq/ofi/Context.h> 15 #include <fairmq/ofi/ControlMessages.h> 17 #include <asiofi/connected_endpoint.hpp> 18 #include <asiofi/memory_resources.hpp> 19 #include <asiofi/passive_endpoint.hpp> 20 #include <asiofi/semaphore.hpp> 21 #include <boost/asio.hpp> 42 Socket(
Context& context,
const std::string& type,
const std::string& name,
const std::string&
id =
"");
46 auto GetId()
const -> std::string
override {
return fId; }
48 auto Bind(
const std::string& address) ->
bool override;
49 auto Connect(
const std::string& address) ->
bool override;
51 auto Send(MessagePtr& msg,
int timeout = 0) ->
int override;
52 auto Receive(MessagePtr& msg,
int timeout = 0) ->
int override;
53 auto Send(std::vector<MessagePtr>& msgVec,
int timeout = 0) -> int64_t
override;
54 auto Receive(std::vector<MessagePtr>& msgVec,
int timeout = 0) -> int64_t
override;
56 auto GetSocket()
const ->
void* {
return nullptr; }
58 void SetLinger(
const int value)
override;
59 int GetLinger()
const override;
60 void SetSndBufSize(
const int value)
override;
61 int GetSndBufSize()
const override;
62 void SetRcvBufSize(
const int value)
override;
63 int GetRcvBufSize()
const override;
64 void SetSndKernelSize(
const int value)
override;
65 int GetSndKernelSize()
const override;
66 void SetRcvKernelSize(
const int value)
override;
67 int GetRcvKernelSize()
const override;
69 auto Close() ->
void override;
71 auto SetOption(
const std::string& option,
const void* value,
size_t valueSize) ->
void override;
72 auto GetOption(
const std::string& option,
void* value,
size_t* valueSize) ->
void override;
74 auto GetBytesTx()
const ->
unsigned long override {
return fBytesTx; }
75 auto GetBytesRx()
const ->
unsigned long override {
return fBytesRx; }
76 auto GetMessagesTx()
const ->
unsigned long override {
return fMessagesTx; }
77 auto GetMessagesRx()
const ->
unsigned long override {
return fMessagesRx; }
79 static auto GetConstant(
const std::string& constant) -> int;
85 asiofi::allocated_pool_resource fControlMemPool;
86 std::unique_ptr<asiofi::info> fOfiInfo;
87 std::unique_ptr<asiofi::fabric> fOfiFabric;
88 std::unique_ptr<asiofi::domain> fOfiDomain;
89 std::unique_ptr<asiofi::passive_endpoint> fPassiveEndpoint;
90 std::unique_ptr<asiofi::connected_endpoint> fDataEndpoint, fControlEndpoint;
92 std::atomic<unsigned long> fBytesTx;
93 std::atomic<unsigned long> fBytesRx;
94 std::atomic<unsigned long> fMessagesTx;
95 std::atomic<unsigned long> fMessagesRx;
100 std::mutex fSendQueueMutex, fRecvQueueMutex;
101 std::queue<std::vector<MessagePtr>> fSendQueue, fRecvQueue;
102 std::vector<MessagePtr> fInflightMultiPartMessage;
103 int64_t fMultiPartRecvCounter;
104 asiofi::synchronized_semaphore fSendPushSem, fSendPopSem, fRecvPushSem, fRecvPopSem;
105 std::atomic<bool> fNeedOfiMemoryRegistration;
107 auto InitOfi(
Address addr) -> void;
108 auto BindControlEndpoint() -> void;
109 auto BindDataEndpoint() -> void;
110 enum class Band { Control, Data };
111 auto ConnectEndpoint(std::unique_ptr<asiofi::connected_endpoint>& endpoint, Band type) -> void;
112 auto SendQueueReader() -> void;
113 auto SendQueueReaderStatic() -> void;
114 auto RecvControlQueueReader() -> void;
115 auto RecvQueueReaderStatic() -> void;
116 auto OnRecvControl(ofi::unique_ptr<ControlMessage> ctrl) -> void;
117 auto DataMessageReceived(MessagePtr msg) -> void;
Transport-wide context.
Definition: Context.h:56
Definition: FairMQSocket.h:74
Definition: FairMQSocket.h:19
Tools for interfacing containers to the transport via polymorphic allocators.
Definition: DeviceRunner.h:23