mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 08:41:16 +00:00
shm: integrate mtx and cv into management segment
This commit is contained in:
parent
a911242fbe
commit
0f89b99f50
|
@ -23,7 +23,8 @@
|
|||
#include <boost/filesystem.hpp>
|
||||
#include <boost/interprocess/ipc/message_queue.hpp>
|
||||
#include <boost/interprocess/managed_shared_memory.hpp>
|
||||
#include <boost/interprocess/sync/named_condition.hpp>
|
||||
#include <boost/interprocess/sync/interprocess_condition.hpp>
|
||||
#include <boost/interprocess/sync/interprocess_mutex.hpp>
|
||||
#include <boost/interprocess/sync/named_mutex.hpp>
|
||||
#include <boost/variant.hpp>
|
||||
|
||||
|
@ -134,8 +135,8 @@ class Manager
|
|||
, fDeviceId(std::move(deviceId))
|
||||
, fManagementSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mng").c_str(), 6553600)
|
||||
, fShmVoidAlloc(fManagementSegment.get_segment_manager())
|
||||
, fShmMtx(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mtx").c_str())
|
||||
, fRegionEventsShmCV(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_cv").c_str())
|
||||
, fShmMtx(fManagementSegment.find_or_construct<boost::interprocess::interprocess_mutex>(boost::interprocess::unique_instance)())
|
||||
, fRegionEventsShmCV(fManagementSegment.find_or_construct<boost::interprocess::interprocess_condition>(boost::interprocess::unique_instance)())
|
||||
, fNumObservedEvents(0)
|
||||
, fDeviceCounter(nullptr)
|
||||
, fEventCounter(nullptr)
|
||||
|
@ -188,7 +189,7 @@ class Manager
|
|||
}
|
||||
|
||||
if (autolaunchMonitor) {
|
||||
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
|
||||
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx);
|
||||
StartMonitor(fShmId);
|
||||
}
|
||||
|
||||
|
@ -196,7 +197,7 @@ class Manager
|
|||
|
||||
try {
|
||||
std::stringstream ss;
|
||||
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
|
||||
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx);
|
||||
|
||||
fShmSegments = fManagementSegment.find_or_construct<Uint16SegmentInfoHashMap>(unique_instance)(fShmVoidAlloc);
|
||||
|
||||
|
@ -379,7 +380,7 @@ class Manager
|
|||
std::pair<mapped_region*, uint16_t> result;
|
||||
|
||||
{
|
||||
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
|
||||
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx);
|
||||
|
||||
if (!cfg.id.has_value()) {
|
||||
RegionCounter* rc = fManagementSegment.find<RegionCounter>(unique_instance).first;
|
||||
|
@ -437,7 +438,7 @@ class Manager
|
|||
}
|
||||
}
|
||||
fRegionsGen += 1; // signal TL cache invalidation
|
||||
fRegionEventsShmCV.notify_all();
|
||||
fRegionEventsShmCV->notify_all();
|
||||
|
||||
return result;
|
||||
} catch (interprocess_exception& e) {
|
||||
|
@ -461,7 +462,7 @@ class Manager
|
|||
}
|
||||
}
|
||||
|
||||
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
|
||||
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx);
|
||||
// slow path: check invalidation
|
||||
if (lTlCacheGen != fRegionsGen) {
|
||||
fTlRegionCache.fRegionsTLCache.clear();
|
||||
|
@ -507,14 +508,14 @@ class Manager
|
|||
try {
|
||||
fRegions.at(id)->StopAcks();
|
||||
{
|
||||
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
|
||||
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx);
|
||||
if (fRegions.at(id)->fRemoveOnDestruction) {
|
||||
fShmRegions->at(id).fDestroyed = true;
|
||||
(fEventCounter->fCount)++;
|
||||
}
|
||||
fRegions.erase(id);
|
||||
}
|
||||
fRegionEventsShmCV.notify_all();
|
||||
fRegionEventsShmCV->notify_all();
|
||||
} catch (std::out_of_range& oor) {
|
||||
LOG(debug) << "RemoveRegion() could not locate region with id '" << id << "'";
|
||||
}
|
||||
|
@ -523,7 +524,7 @@ class Manager
|
|||
|
||||
std::vector<fair::mq::RegionInfo> GetRegionInfo()
|
||||
{
|
||||
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
|
||||
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx);
|
||||
return GetRegionInfoUnsafe();
|
||||
}
|
||||
|
||||
|
@ -576,13 +577,13 @@ class Manager
|
|||
{
|
||||
if (fRegionEventThread.joinable()) {
|
||||
LOG(debug) << "Already subscribed. Overwriting previous subscription.";
|
||||
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
|
||||
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx);
|
||||
fRegionEventsSubscriptionActive = false;
|
||||
lock.unlock();
|
||||
fRegionEventsShmCV.notify_all();
|
||||
fRegionEventsShmCV->notify_all();
|
||||
fRegionEventThread.join();
|
||||
}
|
||||
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
|
||||
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx);
|
||||
fRegionEventCallback = callback;
|
||||
fRegionEventsSubscriptionActive = true;
|
||||
fRegionEventThread = std::thread(&Manager::RegionEventsSubscription, this);
|
||||
|
@ -593,10 +594,10 @@ class Manager
|
|||
void UnsubscribeFromRegionEvents()
|
||||
{
|
||||
if (fRegionEventThread.joinable()) {
|
||||
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
|
||||
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx);
|
||||
fRegionEventsSubscriptionActive = false;
|
||||
lock.unlock();
|
||||
fRegionEventsShmCV.notify_all();
|
||||
fRegionEventsShmCV->notify_all();
|
||||
fRegionEventThread.join();
|
||||
lock.lock();
|
||||
fRegionEventCallback = nullptr;
|
||||
|
@ -605,7 +606,7 @@ class Manager
|
|||
|
||||
void RegionEventsSubscription()
|
||||
{
|
||||
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
|
||||
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx);
|
||||
while (fRegionEventsSubscriptionActive) {
|
||||
auto infos = GetRegionInfoUnsafe();
|
||||
for (const auto& i : infos) {
|
||||
|
@ -631,7 +632,7 @@ class Manager
|
|||
}
|
||||
}
|
||||
}
|
||||
fRegionEventsShmCV.wait(lock, [&] { return !fRegionEventsSubscriptionActive || fNumObservedEvents != fEventCounter->fCount; });
|
||||
fRegionEventsShmCV->wait(lock, [&] { return !fRegionEventsSubscriptionActive || fNumObservedEvents != fEventCounter->fCount; });
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -740,7 +741,7 @@ class Manager
|
|||
}
|
||||
}
|
||||
#ifdef FAIRMQ_DEBUG_MODE
|
||||
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
|
||||
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx);
|
||||
IncrementShmMsgCounter(fSegmentId);
|
||||
if (fMsgDebug->count(fSegmentId) == 0) {
|
||||
fMsgDebug->emplace(fSegmentId, fShmVoidAlloc);
|
||||
|
@ -759,7 +760,7 @@ class Manager
|
|||
{
|
||||
char* ptr = GetAddressFromHandle(handle, segmentId);
|
||||
#ifdef FAIRMQ_DEBUG_MODE
|
||||
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
|
||||
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx);
|
||||
DecrementShmMsgCounter(segmentId);
|
||||
try {
|
||||
fMsgDebug->at(segmentId).erase(GetHandleFromAddress(ShmHeader::UserPtr(ptr), fSegmentId));
|
||||
|
@ -784,7 +785,7 @@ class Manager
|
|||
|
||||
bool lastRemoved = false;
|
||||
try {
|
||||
boost::interprocess::scoped_lock<named_mutex> lock(fShmMtx);
|
||||
boost::interprocess::scoped_lock<interprocess_mutex> lock(*fShmMtx);
|
||||
|
||||
(fDeviceCounter->fCount)--;
|
||||
|
||||
|
@ -821,9 +822,9 @@ class Manager
|
|||
std::unordered_map<uint16_t, boost::variant<RBTreeBestFitSegment, SimpleSeqFitSegment>> fSegments;
|
||||
boost::interprocess::managed_shared_memory fManagementSegment;
|
||||
VoidAlloc fShmVoidAlloc;
|
||||
boost::interprocess::named_mutex fShmMtx;
|
||||
boost::interprocess::interprocess_mutex* fShmMtx;
|
||||
|
||||
boost::interprocess::named_condition fRegionEventsShmCV;
|
||||
boost::interprocess::interprocess_condition* fRegionEventsShmCV;
|
||||
std::thread fRegionEventThread;
|
||||
std::function<void(fair::mq::RegionInfo)> fRegionEventCallback;
|
||||
std::map<std::pair<uint16_t, bool>, RegionEvent> fObservedRegionEvents; // pair: <region id, managed>
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
#include <boost/interprocess/file_mapping.hpp>
|
||||
|
||||
#include <boost/interprocess/sync/named_mutex.hpp>
|
||||
#include <boost/interprocess/sync/interprocess_mutex.hpp>
|
||||
#include <boost/interprocess/sync/named_condition.hpp>
|
||||
#include <boost/interprocess/ipc/message_queue.hpp>
|
||||
|
||||
|
@ -390,8 +391,8 @@ void Monitor::PrintDebugInfo(const ShmId& shmId __attribute__((unused)))
|
|||
string managementSegmentName("fmq_" + shmId.shmId + "_mng");
|
||||
try {
|
||||
bipc::managed_shared_memory managementSegment(bipc::open_only, managementSegmentName.c_str());
|
||||
boost::interprocess::named_mutex mtx(boost::interprocess::open_only, string("fmq_" + shmId.shmId + "_mtx").c_str());
|
||||
boost::interprocess::scoped_lock<bipc::named_mutex> lock(mtx);
|
||||
bipc::interprocess_mutex* mtx(managementSegment.find_or_construct<bipc::interprocess_mutex>(bipc::unique_instance)());
|
||||
bipc::scoped_lock<bipc::interprocess_mutex> lock(*mtx);
|
||||
|
||||
Uint16MsgDebugMapHashMap* debug = managementSegment.find<Uint16MsgDebugMapHashMap>(bipc::unique_instance).first;
|
||||
|
||||
|
@ -438,8 +439,8 @@ unordered_map<uint16_t, std::vector<BufferDebugInfo>> Monitor::GetDebugInfo(cons
|
|||
string managementSegmentName("fmq_" + shmId.shmId + "_mng");
|
||||
try {
|
||||
bipc::managed_shared_memory managementSegment(bipc::open_only, managementSegmentName.c_str());
|
||||
boost::interprocess::named_mutex mtx(boost::interprocess::open_only, string("fmq_" + shmId.shmId + "_mtx").c_str());
|
||||
boost::interprocess::scoped_lock<bipc::named_mutex> lock(mtx);
|
||||
bipc::interprocess_mutex* mtx(managementSegment.find_or_construct<bipc::interprocess_mutex>(bipc::unique_instance)());
|
||||
bipc::scoped_lock<bipc::interprocess_mutex> lock(*mtx);
|
||||
|
||||
Uint16MsgDebugMapHashMap* debug = managementSegment.find<Uint16MsgDebugMapHashMap>(bipc::unique_instance).first;
|
||||
|
||||
|
@ -471,8 +472,8 @@ unsigned long Monitor::GetFreeMemory(const ShmId& shmId, uint16_t segmentId)
|
|||
using namespace boost::interprocess;
|
||||
try {
|
||||
bipc::managed_shared_memory managementSegment(bipc::open_only, std::string("fmq_" + shmId.shmId + "_mng").c_str());
|
||||
boost::interprocess::named_mutex mtx(boost::interprocess::open_only, std::string("fmq_" + shmId.shmId + "_mtx").c_str());
|
||||
boost::interprocess::scoped_lock<bipc::named_mutex> lock(mtx);
|
||||
boost::interprocess::interprocess_mutex* mtx(managementSegment.find_or_construct<bipc::interprocess_mutex>(bipc::unique_instance)());
|
||||
boost::interprocess::scoped_lock<bipc::interprocess_mutex> lock(*mtx);
|
||||
|
||||
Uint16SegmentInfoHashMap* shmSegments = managementSegment.find<Uint16SegmentInfoHashMap>(unique_instance).first;
|
||||
|
||||
|
@ -591,9 +592,6 @@ std::vector<std::pair<std::string, bool>> Monitor::Cleanup(const ShmId& shmId, b
|
|||
}
|
||||
}
|
||||
|
||||
result.emplace_back(Remove<bipc::named_mutex>("fmq_" + shmId.shmId + "_mtx", verbose));
|
||||
result.emplace_back(Remove<bipc::named_condition>("fmq_" + shmId.shmId + "_cv", verbose));
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
|
|
|
@ -14,8 +14,6 @@ FairMQ Shared Memory currently uses the following names to register shared memor
|
|||
| --------------------------- | ---------------------------------------------- | ------------------ | ------------------------------ |
|
||||
| `fmq_<shmId>_m_<segmentId>` | managed segment(s) (user data) | one of the devices | devices |
|
||||
| `fmq_<shmId>_mng` | management segment (management data) | one of the devices | devices |
|
||||
| `fmq_<shmId>_mtx` | mutex | one of the devices | devices |
|
||||
| `fmq_<shmId>_cv` | condition variable | one of the devices | devices |
|
||||
| `fmq_<shmId>_rg_<index>` | unmanaged region(s) | one of the devices | devices with unmanaged regions |
|
||||
| `fmq_<shmId>_rgq_<index>` | unmanaged region queue(s) | one of the devices | devices with unmanaged regions |
|
||||
| `fmq_<shmId>_ms` | shmmonitor status | shmmonitor | devices, shmmonitor |
|
||||
|
|
Loading…
Reference in New Issue
Block a user