FairMQ  1.4.14
C++ Message Queuing Library and Framework
Manager.h
1 /********************************************************************************
2  * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
15 #ifndef FAIR_MQ_SHMEM_MANAGER_H_
16 #define FAIR_MQ_SHMEM_MANAGER_H_
17 
18 #include "Common.h"
19 #include "Region.h"
20 
21 #include <FairMQLogger.h>
22 #include <FairMQUnmanagedRegion.h>
23 
24 #include <boost/interprocess/ipc/message_queue.hpp>
25 #include <boost/interprocess/managed_shared_memory.hpp>
26 #include <boost/interprocess/sync/named_condition.hpp>
27 #include <boost/interprocess/sync/named_mutex.hpp>
28 
29 #include <set>
30 #include <stdexcept>
31 #include <string>
32 #include <thread>
33 #include <unordered_map>
34 #include <utility>
35 #include <vector>
36 
37 namespace fair
38 {
39 namespace mq
40 {
41 namespace shmem
42 {
43 
44 struct SharedMemoryError : std::runtime_error { using std::runtime_error::runtime_error; };
45 
46 class Manager
47 {
48  friend struct Region;
49 
50  public:
51  Manager(const std::string& id, size_t size);
52 
53  Manager() = delete;
54 
55  Manager(const Manager&) = delete;
56  Manager operator=(const Manager&) = delete;
57 
58  ~Manager();
59 
60  boost::interprocess::managed_shared_memory& Segment() { return fSegment; }
61  boost::interprocess::managed_shared_memory& ManagementSegment() { return fManagementSegment; }
62 
63  static void StartMonitor(const std::string&);
64 
65  void Interrupt() { fInterrupted.store(true); }
66  void Resume() { fInterrupted.store(false); }
67  bool Interrupted() { return fInterrupted.load(); }
68 
69  int GetDeviceCounter();
70  int IncrementDeviceCounter();
71  int DecrementDeviceCounter();
72 
73  std::pair<boost::interprocess::mapped_region*, uint64_t> CreateRegion(const size_t size, const int64_t userFlags, RegionCallback callback, const std::string& path = "", int flags = 0);
74  Region* GetRegion(const uint64_t id);
75  Region* GetRegionUnsafe(const uint64_t id);
76  void RemoveRegion(const uint64_t id);
77 
78  std::vector<fair::mq::RegionInfo> GetRegionInfo();
79  std::vector<fair::mq::RegionInfo> GetRegionInfoUnsafe();
80  void SubscribeToRegionEvents(RegionEventCallback callback);
81  void UnsubscribeFromRegionEvents();
82  void RegionEventsSubscription();
83 
84  void RemoveSegments();
85 
86  private:
87  std::string fShmId;
88  std::string fSegmentName;
89  std::string fManagementSegmentName;
90  boost::interprocess::managed_shared_memory fSegment;
91  boost::interprocess::managed_shared_memory fManagementSegment;
92  VoidAlloc fShmVoidAlloc;
93  boost::interprocess::named_mutex fShmMtx;
94 
95  boost::interprocess::named_condition fRegionEventsCV;
96  std::thread fRegionEventThread;
97  std::atomic<bool> fRegionEventsSubscriptionActive;
98  std::function<void(fair::mq::RegionInfo)> fRegionEventCallback;
99  std::unordered_map<uint64_t, RegionEvent> fObservedRegionEvents;
100 
101  DeviceCounter* fDeviceCounter;
102  Uint64RegionInfoMap* fRegionInfos;
103  std::unordered_map<uint64_t, std::unique_ptr<Region>> fRegions;
104 
105  std::atomic<bool> fInterrupted;
106 };
107 
108 } // namespace shmem
109 } // namespace mq
110 } // namespace fair
111 
112 #endif /* FAIR_MQ_SHMEM_MANAGER_H_ */
Definition: Manager.h:46
Definition: Region.h:41
Definition: Manager.h:44
Definition: Common.h:64
Tools for interfacing containers to the transport via polymorphic allocators.
Definition: DeviceRunner.h:23

privacy