FairMQ  1.4.14
C++ Message Queuing Library and Framework
TransportFactory.h
1 /********************************************************************************
2  * Copyright (C) 2016-2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
8 
9 #ifndef FAIR_MQ_SHMEM_TRANSPORTFACTORY_H_
10 #define FAIR_MQ_SHMEM_TRANSPORTFACTORY_H_
11 
12 #include "Manager.h"
13 #include "Common.h"
14 #include "Message.h"
15 #include "Socket.h"
16 #include "Poller.h"
17 #include "UnmanagedRegion.h"
18 
19 #include <FairMQTransportFactory.h>
20 #include <fairmq/ProgOptions.h>
21 
22 #include <vector>
23 #include <string>
24 #include <thread>
25 #include <atomic>
26 
27 namespace fair
28 {
29 namespace mq
30 {
31 namespace shmem
32 {
33 
35 {
36  public:
37  TransportFactory(const std::string& id = "", const ProgOptions* config = nullptr);
38  TransportFactory(const TransportFactory&) = delete;
39  TransportFactory operator=(const TransportFactory&) = delete;
40 
41  MessagePtr CreateMessage() override;
42  MessagePtr CreateMessage(const size_t size) override;
43  MessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override;
44  MessagePtr CreateMessage(UnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0) override;
45 
46  SocketPtr CreateSocket(const std::string& type, const std::string& name) override;
47 
48  PollerPtr CreatePoller(const std::vector<FairMQChannel>& channels) const override;
49  PollerPtr CreatePoller(const std::vector<FairMQChannel*>& channels) const override;
50  PollerPtr CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const override;
51 
52  UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) const override;
53  UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) const override;
54 
55  void SubscribeToRegionEvents(RegionEventCallback callback) override;
56  void UnsubscribeFromRegionEvents() override;
57  std::vector<fair::mq::RegionInfo> GetRegionInfo() override;
58 
59  Transport GetType() const override;
60 
61  void Interrupt() override { fManager->Interrupt(); }
62  void Resume() override { fManager->Resume(); }
63  void Reset() override;
64 
65  void IncrementMsgCounter() { ++fMsgCounter; }
66  void DecrementMsgCounter() { --fMsgCounter; }
67 
68  ~TransportFactory() override;
69 
70  private:
71  void SendHeartbeats();
72 
73  static Transport fTransportType;
74  std::string fDeviceId;
75  std::string fShmId;
76  void* fZMQContext;
77  std::unique_ptr<Manager> fManager;
78  std::thread fHeartbeatThread;
79  std::atomic<bool> fSendHeartbeats;
80  std::atomic<int32_t> fMsgCounter;
81 };
82 
83 } // namespace shmem
84 } // namespace mq
85 } // namespace fair
86 
87 #endif /* FAIR_MQ_SHMEM_TRANSPORTFACTORY_H_ */
PollerPtr CreatePoller(const std::vector< FairMQChannel > &channels) const override
Create a poller for a single channel (all subchannels)
SocketPtr CreateSocket(const std::string &type, const std::string &name) override
Create a socket.
Definition: FairMQTransportFactory.h:30
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionCallback callback=nullptr, const std::string &path="", int flags=0) const override
Create new UnmanagedRegion.
Definition: TransportFactory.cxx:162
MessagePtr CreateMessage() override
Create empty FairMQMessage.
Definition: TransportFactory.cxx:121
Definition: ProgOptions.h:36
Definition: TransportFactory.h:34
void UnsubscribeFromRegionEvents() override
Unsubscribe from region events.
Definition: TransportFactory.cxx:177
Tools for interfacing containers to the transport via polymorphic allocators.
Definition: DeviceRunner.h:23
Transport GetType() const override
Get transport type.
Definition: TransportFactory.cxx:187
void SubscribeToRegionEvents(RegionEventCallback callback) override
Subscribe to region events (creation, destruction, ...)
Definition: TransportFactory.cxx:172

privacy