mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-15 17:41:45 +00:00
shm: when refCount segment size is zero, fallback to old behaviour
, which is to store reference counts inside the main data segment
This commit is contained in:
@@ -251,7 +251,12 @@ class Message final : public fair::mq::Message
|
||||
if (!fRegionPtr) {
|
||||
throw TransportError(tools::ToString("Cannot get unmanaged region with id ", fRegionId));
|
||||
}
|
||||
return fRegionPtr->GetRefCountAddressFromHandle(fShared)->Get();
|
||||
if (fRegionPtr->fRcSegmentSize > 0) {
|
||||
return fRegionPtr->GetRefCountAddressFromHandle(fShared)->Get();
|
||||
} else {
|
||||
fManager.GetSegment(fSegmentId);
|
||||
return ShmHeader::RefCount(fManager.GetAddressFromHandle(fShared, fSegmentId));
|
||||
}
|
||||
}
|
||||
|
||||
void Copy(const fair::mq::Message& other) override
|
||||
@@ -277,19 +282,29 @@ class Message final : public fair::mq::Message
|
||||
if (!fRegionPtr) {
|
||||
throw TransportError(tools::ToString("Cannot get unmanaged region with id ", otherMsg.fRegionId));
|
||||
}
|
||||
if (otherMsg.fShared < 0) {
|
||||
// UR msg not yet shared, create the reference counting object with count 2
|
||||
try {
|
||||
otherMsg.fShared = fRegionPtr->HandleFromAddress(&(fRegionPtr->MakeRefCount(2)));
|
||||
} catch (boost::interprocess::bad_alloc& ba) {
|
||||
throw RefCountBadAlloc(tools::ToString(
|
||||
"Insufficient space in the reference count segment ",
|
||||
otherMsg.fRegionId,
|
||||
", original exception: bad_alloc: ",
|
||||
ba.what()));
|
||||
if (fRegionPtr->fRcSegmentSize > 0) {
|
||||
if (otherMsg.fShared < 0) {
|
||||
// UR msg not yet shared, create the reference counting object with count 2
|
||||
try {
|
||||
otherMsg.fShared = fRegionPtr->HandleFromAddress(&(fRegionPtr->MakeRefCount(2)));
|
||||
} catch (boost::interprocess::bad_alloc& ba) {
|
||||
throw RefCountBadAlloc(tools::ToString("Insufficient space in the reference count segment ", otherMsg.fRegionId, ", original exception: bad_alloc: ", ba.what()));
|
||||
}
|
||||
} else {
|
||||
fRegionPtr->GetRefCountAddressFromHandle(otherMsg.fShared)->Increment();
|
||||
}
|
||||
} else { // if RefCount segment size is 0, store the ref count in the managed segment
|
||||
if (otherMsg.fShared < 0) { // if UR msg is not yet shared
|
||||
char* ptr = fManager.Allocate(2, 0);
|
||||
// point the fShared in the unmanaged region message to the refCount holder
|
||||
otherMsg.fShared = fManager.GetHandleFromAddress(ptr, fSegmentId);
|
||||
// the message needs to be able to locate in which segment the refCount is stored
|
||||
otherMsg.fSegmentId = fSegmentId;
|
||||
ShmHeader::IncrementRefCount(ptr);
|
||||
} else { // if the UR msg is already shared
|
||||
fManager.GetSegment(otherMsg.fSegmentId);
|
||||
ShmHeader::IncrementRefCount(fManager.GetAddressFromHandle(otherMsg.fShared, otherMsg.fSegmentId));
|
||||
}
|
||||
} else {
|
||||
fRegionPtr->GetRefCountAddressFromHandle(otherMsg.fShared)->Increment();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -357,10 +372,21 @@ class Message final : public fair::mq::Message
|
||||
if (!fRegionPtr) {
|
||||
throw TransportError(tools::ToString("Cannot get unmanaged region with id ", fRegionId));
|
||||
}
|
||||
uint16_t refCount = fRegionPtr->GetRefCountAddressFromHandle(fShared)->Decrement();
|
||||
if (refCount == 1) {
|
||||
fRegionPtr->RemoveRefCount(*(fRegionPtr->GetRefCountAddressFromHandle(fShared)));
|
||||
ReleaseUnmanagedRegionBlock();
|
||||
if (fRegionPtr->fRcSegmentSize > 0) {
|
||||
uint16_t refCount = fRegionPtr->GetRefCountAddressFromHandle(fShared)->Decrement();
|
||||
if (refCount == 1) {
|
||||
fRegionPtr->RemoveRefCount(*(fRegionPtr->GetRefCountAddressFromHandle(fShared)));
|
||||
ReleaseUnmanagedRegionBlock();
|
||||
}
|
||||
} else { // if RefCount segment size is 0, get the ref count from the managed segment
|
||||
// make sure segment is initialized in this transport
|
||||
fManager.GetSegment(fSegmentId);
|
||||
// release unmanaged region block if ref count is one
|
||||
uint16_t refCount = ShmHeader::DecrementRefCount(fManager.GetAddressFromHandle(fShared, fSegmentId));
|
||||
if (refCount == 1) {
|
||||
fManager.Deallocate(fShared, fSegmentId);
|
||||
ReleaseUnmanagedRegionBlock();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
ReleaseUnmanagedRegionBlock();
|
||||
|
@@ -65,6 +65,7 @@ struct UnmanagedRegion
|
||||
, fShmemObject()
|
||||
, fFile(nullptr)
|
||||
, fFileMapping()
|
||||
, fRcSegmentSize(cfg.rcSegmentSize)
|
||||
, fQueue(nullptr)
|
||||
, fCallback(nullptr)
|
||||
, fBulkCallback(nullptr)
|
||||
@@ -146,13 +147,13 @@ struct UnmanagedRegion
|
||||
LOG(debug) << "Successfully zeroed free memory of region " << id << ".";
|
||||
}
|
||||
|
||||
InitializeRefCountSegment(cfg.rcSegmentSize);
|
||||
InitializeRefCountSegment(fRcSegmentSize);
|
||||
|
||||
if (fControlling && created) {
|
||||
Register(shmId, cfg);
|
||||
}
|
||||
|
||||
LOG(debug) << (created ? "Created" : "Opened") << " unmanaged shared memory region: " << fName << " (" << (fControlling ? "controller" : "viewer") << ")";
|
||||
LOG(debug) << (created ? "Created" : "Opened") << " unmanaged shared memory region: " << fName << " (" << (fControlling ? "controller" : "viewer") << "), refCount segment size: " << fRcSegmentSize;
|
||||
}
|
||||
|
||||
UnmanagedRegion() = delete;
|
||||
@@ -266,6 +267,7 @@ struct UnmanagedRegion
|
||||
std::condition_variable fBlockSendCV;
|
||||
std::vector<RegionBlock> fBlocksToFree;
|
||||
const std::size_t fAckBunchSize = 256;
|
||||
uint64_t fRcSegmentSize;
|
||||
std::unique_ptr<boost::interprocess::message_queue> fQueue;
|
||||
std::unique_ptr<boost::interprocess::managed_shared_memory> fRefCountSegment;
|
||||
std::unique_ptr<RefCountPool> fRefCountPool;
|
||||
@@ -321,7 +323,7 @@ struct UnmanagedRegion
|
||||
void InitializeRefCountSegment(uint64_t size)
|
||||
{
|
||||
using namespace boost::interprocess;
|
||||
if (!fRefCountSegment) {
|
||||
if (!fRefCountSegment && size > 0) {
|
||||
fRefCountSegment = std::make_unique<managed_shared_memory>(open_or_create, fRefCountSegmentName.c_str(), size);
|
||||
LOG(trace) << "shmem: initialized ref count segment: " << fRefCountSegmentName;
|
||||
fRefCountPool = std::make_unique<RefCountPool>(fRefCountSegment->get_segment_manager());
|
||||
|
Reference in New Issue
Block a user