shm: Make sure no event notifications are missed

This commit is contained in:
Alexey Rybalchenko 2021-02-26 11:40:33 +01:00
parent 9b48b31a75
commit 0fd2fcadc2
2 changed files with 30 additions and 1 deletions

View File

@ -103,6 +103,15 @@ struct DeviceCounter
std::atomic<unsigned int> fCount; std::atomic<unsigned int> fCount;
}; };
struct EventCounter
{
EventCounter(uint64_t c)
: fCount(c)
{}
std::atomic<uint64_t> fCount;
};
struct RegionCounter struct RegionCounter
{ {
RegionCounter(uint16_t c) RegionCounter(uint16_t c)

View File

@ -64,7 +64,9 @@ class Manager
, fShmMtx(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mtx").c_str()) , fShmMtx(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mtx").c_str())
, fRegionEventsCV(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_cv").c_str()) , fRegionEventsCV(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_cv").c_str())
, fRegionEventsSubscriptionActive(false) , fRegionEventsSubscriptionActive(false)
, fNumObservedEvents(0)
, fDeviceCounter(nullptr) , fDeviceCounter(nullptr)
, fEventCounter(nullptr)
, fShmSegments(nullptr) , fShmSegments(nullptr)
, fShmRegions(nullptr) , fShmRegions(nullptr)
, fInterrupted(false) , fInterrupted(false)
@ -102,6 +104,16 @@ class Manager
fShmSegments = fManagementSegment.find_or_construct<Uint16SegmentInfoHashMap>(unique_instance)(fShmVoidAlloc); fShmSegments = fManagementSegment.find_or_construct<Uint16SegmentInfoHashMap>(unique_instance)(fShmVoidAlloc);
fEventCounter = fManagementSegment.find<EventCounter>(unique_instance).first;
if (fEventCounter) {
LOG(debug) << "event counter found: " << fEventCounter->fCount;
} else {
LOG(debug) << "no event counter found, creating one and initializing with 0";
fEventCounter = fManagementSegment.construct<EventCounter>(unique_instance)(0);
LOG(debug) << "initialized event counter with: " << fEventCounter->fCount;
}
try { try {
auto it = fShmSegments->find(fSegmentId); auto it = fShmSegments->find(fSegmentId);
if (it == fShmSegments->end()) { if (it == fShmSegments->end()) {
@ -114,6 +126,7 @@ class Manager
fShmSegments->emplace(fSegmentId, AllocationAlgorithm::simple_seq_fit); fShmSegments->emplace(fSegmentId, AllocationAlgorithm::simple_seq_fit);
} }
ss << "Created "; ss << "Created ";
(fEventCounter->fCount)++;
} else { } else {
// found segment with the given id, opening // found segment with the given id, opening
if (it->second.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) { if (it->second.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) {
@ -276,6 +289,8 @@ class Manager
r.first->second->StartReceivingAcks(); r.first->second->StartReceivingAcks();
result.first = &(r.first->second->fRegion); result.first = &(r.first->second->fRegion);
result.second = id; result.second = id;
(fEventCounter->fCount)++;
} }
fRegionEventsCV.notify_all(); fRegionEventsCV.notify_all();
@ -327,6 +342,7 @@ class Manager
{ {
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx); boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
fShmRegions->at(id).fDestroyed = true; fShmRegions->at(id).fDestroyed = true;
(fEventCounter->fCount)++;
} }
fRegionEventsCV.notify_all(); fRegionEventsCV.notify_all();
} }
@ -419,10 +435,12 @@ class Manager
if (el == fObservedRegionEvents.end()) { if (el == fObservedRegionEvents.end()) {
fRegionEventCallback(i); fRegionEventCallback(i);
fObservedRegionEvents.emplace(std::make_pair(i.id, i.managed), i.event); fObservedRegionEvents.emplace(std::make_pair(i.id, i.managed), i.event);
++fNumObservedEvents;
} else { } else {
if (el->second == RegionEvent::created && i.event == RegionEvent::destroyed) { if (el->second == RegionEvent::created && i.event == RegionEvent::destroyed) {
fRegionEventCallback(i); fRegionEventCallback(i);
el->second = i.event; el->second = i.event;
++fNumObservedEvents;
} else { } else {
// LOG(debug) << "ignoring event for id" << i.id << ":"; // LOG(debug) << "ignoring event for id" << i.id << ":";
// LOG(debug) << "incoming event: " << i.event; // LOG(debug) << "incoming event: " << i.event;
@ -430,7 +448,7 @@ class Manager
} }
} }
} }
fRegionEventsCV.wait(lock); fRegionEventsCV.wait(lock, [&] { return !fRegionEventsSubscriptionActive || fNumObservedEvents != fEventCounter->fCount; });
} }
} }
@ -618,8 +636,10 @@ class Manager
bool fRegionEventsSubscriptionActive; bool fRegionEventsSubscriptionActive;
std::function<void(fair::mq::RegionInfo)> fRegionEventCallback; std::function<void(fair::mq::RegionInfo)> fRegionEventCallback;
std::map<std::pair<uint16_t, bool>, RegionEvent> fObservedRegionEvents; std::map<std::pair<uint16_t, bool>, RegionEvent> fObservedRegionEvents;
uint64_t fNumObservedEvents;
DeviceCounter* fDeviceCounter; DeviceCounter* fDeviceCounter;
EventCounter* fEventCounter;
Uint16SegmentInfoHashMap* fShmSegments; Uint16SegmentInfoHashMap* fShmSegments;
Uint16RegionInfoHashMap* fShmRegions; Uint16RegionInfoHashMap* fShmRegions;
std::unordered_map<uint16_t, std::unique_ptr<Region>> fRegions; std::unordered_map<uint16_t, std::unique_ptr<Region>> fRegions;