mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 08:41:16 +00:00
improve message counter cache line use
This commit is contained in:
parent
e065e11d0e
commit
7a8c1bf0fb
|
@ -73,7 +73,6 @@ class Manager
|
||||||
, fShmSegments(nullptr)
|
, fShmSegments(nullptr)
|
||||||
, fShmRegions(nullptr)
|
, fShmRegions(nullptr)
|
||||||
, fInterrupted(false)
|
, fInterrupted(false)
|
||||||
, fMsgCounter(0)
|
|
||||||
#ifdef FAIRMQ_DEBUG_MODE
|
#ifdef FAIRMQ_DEBUG_MODE
|
||||||
, fMsgDebug(nullptr)
|
, fMsgDebug(nullptr)
|
||||||
, fShmMsgCounters(nullptr)
|
, fShmMsgCounters(nullptr)
|
||||||
|
@ -82,6 +81,8 @@ class Manager
|
||||||
, fSendHeartbeats(true)
|
, fSendHeartbeats(true)
|
||||||
, fThrowOnBadAlloc(config ? config->GetProperty<bool>("shm-throw-bad-alloc", true) : true)
|
, fThrowOnBadAlloc(config ? config->GetProperty<bool>("shm-throw-bad-alloc", true) : true)
|
||||||
, fNoCleanup(config ? config->GetProperty<bool>("shm-no-cleanup", false) : false)
|
, fNoCleanup(config ? config->GetProperty<bool>("shm-no-cleanup", false) : false)
|
||||||
|
, fMsgCounterNew(0)
|
||||||
|
, fMsgCounterDelete(0)
|
||||||
{
|
{
|
||||||
using namespace boost::interprocess;
|
using namespace boost::interprocess;
|
||||||
|
|
||||||
|
@ -260,9 +261,10 @@ class Manager
|
||||||
void Resume() { fInterrupted.store(false); }
|
void Resume() { fInterrupted.store(false); }
|
||||||
void Reset()
|
void Reset()
|
||||||
{
|
{
|
||||||
if (fMsgCounter.load() != 0) {
|
auto diff = fMsgCounterNew.load() - fMsgCounterDelete.load();
|
||||||
LOG(error) << "Message counter during Reset expected to be 0, found: " << fMsgCounter.load();
|
if (diff != 0) {
|
||||||
throw MessageError(tools::ToString("Message counter during Reset expected to be 0, found: ", fMsgCounter.load()));
|
LOG(error) << "Message counter during Reset expected to be 0, found: " << diff;
|
||||||
|
throw MessageError(tools::ToString("Message counter during Reset expected to be 0, found: ", diff));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
bool Interrupted() { return fInterrupted.load(); }
|
bool Interrupted() { return fInterrupted.load(); }
|
||||||
|
@ -496,8 +498,8 @@ class Manager
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void IncrementMsgCounter() { fMsgCounter.fetch_add(1, std::memory_order_relaxed); }
|
void IncrementMsgCounter() { fMsgCounterNew.fetch_add(1, std::memory_order_relaxed); }
|
||||||
void DecrementMsgCounter() { fMsgCounter.fetch_sub(1, std::memory_order_relaxed); }
|
void DecrementMsgCounter() { fMsgCounterDelete.fetch_add(1, std::memory_order_relaxed); }
|
||||||
|
|
||||||
#ifdef FAIRMQ_DEBUG_MODE
|
#ifdef FAIRMQ_DEBUG_MODE
|
||||||
void IncrementShmMsgCounter(uint16_t segmentId) { ++((*fShmMsgCounters)[segmentId].fCount); }
|
void IncrementShmMsgCounter(uint16_t segmentId) { ++((*fShmMsgCounters)[segmentId].fCount); }
|
||||||
|
@ -696,7 +698,6 @@ class Manager
|
||||||
} fTlRegionCache;
|
} fTlRegionCache;
|
||||||
|
|
||||||
std::atomic<bool> fInterrupted;
|
std::atomic<bool> fInterrupted;
|
||||||
std::atomic<int32_t> fMsgCounter; // TODO: find a better lifetime solution instead of the counter
|
|
||||||
#ifdef FAIRMQ_DEBUG_MODE
|
#ifdef FAIRMQ_DEBUG_MODE
|
||||||
Uint16MsgDebugMapHashMap* fMsgDebug;
|
Uint16MsgDebugMapHashMap* fMsgDebug;
|
||||||
Uint16MsgCounterHashMap* fShmMsgCounters;
|
Uint16MsgCounterHashMap* fShmMsgCounters;
|
||||||
|
@ -709,6 +710,10 @@ class Manager
|
||||||
|
|
||||||
bool fThrowOnBadAlloc;
|
bool fThrowOnBadAlloc;
|
||||||
bool fNoCleanup;
|
bool fNoCleanup;
|
||||||
|
|
||||||
|
// make sure the counters are not thrashing the cache line between threads doing creation and deallocation
|
||||||
|
alignas(128) std::atomic_uint64_t fMsgCounterNew; // TODO: find a better lifetime solution instead of the counter
|
||||||
|
alignas(128) std::atomic_uint64_t fMsgCounterDelete;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace fair::mq::shmem
|
} // namespace fair::mq::shmem
|
||||||
|
|
Loading…
Reference in New Issue
Block a user