Shm: Provide debug infos only in debug mode

This commit is contained in:
Alexey Rybalchenko 2020-08-11 12:18:01 +02:00
parent 70a583d08d
commit b63f31d0e0
6 changed files with 54 additions and 2 deletions

View File

@ -53,6 +53,8 @@ fairmq_build_option(FAST_BUILD "Fast production build. Not recommended
DEFAULT OFF) DEFAULT OFF)
fairmq_build_option(USE_EXTERNAL_GTEST "Do not use bundled GTest. Not recommended." fairmq_build_option(USE_EXTERNAL_GTEST "Do not use bundled GTest. Not recommended."
DEFAULT OFF) DEFAULT OFF)
fairmq_build_option(FAIRMQ_DEBUG_MODE "Compile in debug mode (may decrease performance)."
DEFAULT OFF)
################################################################################ ################################################################################
@ -429,4 +431,10 @@ message(STATUS " ${Cyan}INSTALL PREFIX${CR} ${BGreen}${CMAKE_INSTALL_PREFIX
message(STATUS " ") message(STATUS " ")
message(STATUS " ${Cyan}RUN STATIC ANALYSIS ${static_ana_summary}") message(STATUS " ${Cyan}RUN STATIC ANALYSIS ${static_ana_summary}")
message(STATUS " ") message(STATUS " ")
if(FAIRMQ_DEBUG_MODE)
message(STATUS " ${Cyan}DEBUG_MODE${CR} ${BGreen}${FAIRMQ_DEBUG_MODE}${CR} (disable with ${BMagenta}-DFAIRMQ_DEBUG_MODE=OFF${CR})")
else()
message(STATUS " ${Cyan}DEBUG_MODE${CR} ${BRed}${FAIRMQ_DEBUG_MODE}${CR} (enable with ${BMagenta}-DFAIRMQ_DEBUG_MODE=ON${CR})")
endif()
message(STATUS " ")
################################################################################ ################################################################################

View File

@ -278,6 +278,9 @@ if(BUILD_FAIRMQ)
# preprocessor definitions # # preprocessor definitions #
############################ ############################
target_compile_definitions(${_target} PUBLIC BOOST_ERROR_CODE_HEADER_ONLY) target_compile_definitions(${_target} PUBLIC BOOST_ERROR_CODE_HEADER_ONLY)
if(FAIRMQ_DEBUG_MODE)
target_compile_definitions(${_target} PUBLIC FAIRMQ_DEBUG_MODE)
endif()
if(BUILD_OFI_TRANSPORT) if(BUILD_OFI_TRANSPORT)
target_compile_definitions(${_target} PRIVATE BUILD_OFI_TRANSPORT) target_compile_definitions(${_target} PRIVATE BUILD_OFI_TRANSPORT)
endif() endif()
@ -374,6 +377,9 @@ if(BUILD_FAIRMQ)
add_executable(fairmq-shmmonitor shmem/Monitor.cxx shmem/Monitor.h shmem/runMonitor.cxx) add_executable(fairmq-shmmonitor shmem/Monitor.cxx shmem/Monitor.h shmem/runMonitor.cxx)
target_compile_definitions(fairmq-shmmonitor PUBLIC BOOST_ERROR_CODE_HEADER_ONLY) target_compile_definitions(fairmq-shmmonitor PUBLIC BOOST_ERROR_CODE_HEADER_ONLY)
if(FAIRMQ_DEBUG_MODE)
target_compile_definitions(fairmq-shmmonitor PUBLIC FAIRMQ_DEBUG_MODE)
endif()
target_link_libraries(fairmq-shmmonitor PUBLIC target_link_libraries(fairmq-shmmonitor PUBLIC
Threads::Threads Threads::Threads
$<$<PLATFORM_ID:Linux>:rt> $<$<PLATFORM_ID:Linux>:rt>

View File

@ -73,6 +73,7 @@ struct DeviceCounter
std::atomic<unsigned int> fCount; std::atomic<unsigned int> fCount;
}; };
#ifdef FAIRMQ_DEBUG_MODE
struct MsgCounter struct MsgCounter
{ {
MsgCounter(unsigned int c) MsgCounter(unsigned int c)
@ -81,6 +82,7 @@ struct MsgCounter
std::atomic<unsigned int> fCount; std::atomic<unsigned int> fCount;
}; };
#endif
struct RegionCounter struct RegionCounter
{ {
@ -99,6 +101,7 @@ struct MetaHeader
boost::interprocess::managed_shared_memory::handle_t fHandle; boost::interprocess::managed_shared_memory::handle_t fHandle;
}; };
#ifdef FAIRMQ_DEBUG_MODE
struct MsgDebug struct MsgDebug
{ {
MsgDebug(pid_t pid, size_t size, const uint64_t creationTime) MsgDebug(pid_t pid, size_t size, const uint64_t creationTime)
@ -115,6 +118,7 @@ struct MsgDebug
using Uint64MsgDebugPairAlloc = boost::interprocess::allocator<std::pair<const size_t, MsgDebug>, SegmentManager>; using Uint64MsgDebugPairAlloc = boost::interprocess::allocator<std::pair<const size_t, MsgDebug>, SegmentManager>;
using Uint64MsgDebugHashMap = boost::unordered_map<size_t, MsgDebug, boost::hash<size_t>, std::equal_to<size_t>, Uint64MsgDebugPairAlloc>; using Uint64MsgDebugHashMap = boost::unordered_map<size_t, MsgDebug, boost::hash<size_t>, std::equal_to<size_t>, Uint64MsgDebugPairAlloc>;
using Uint64MsgDebugMap = boost::interprocess::map<size_t, MsgDebug, std::less<size_t>, Uint64MsgDebugPairAlloc>; using Uint64MsgDebugMap = boost::interprocess::map<size_t, MsgDebug, std::less<size_t>, Uint64MsgDebugPairAlloc>;
#endif
struct RegionBlock struct RegionBlock
{ {

View File

@ -80,8 +80,10 @@ class Manager
, fRegionInfos(nullptr) , fRegionInfos(nullptr)
, fInterrupted(false) , fInterrupted(false)
, fMsgCounter(0) , fMsgCounter(0)
#ifdef FAIRMQ_DEBUG_MODE
, fMsgDebug(nullptr) , fMsgDebug(nullptr)
, fShmMsgCounter(nullptr) , fShmMsgCounter(nullptr)
#endif
, fHeartbeatThread() , fHeartbeatThread()
, fSendHeartbeats(true) , fSendHeartbeats(true)
, fThrowOnBadAlloc(true) , fThrowOnBadAlloc(true)
@ -119,7 +121,9 @@ class Manager
} }
fRegionInfos = fManagementSegment.find_or_construct<Uint64RegionInfoMap>(unique_instance)(fShmVoidAlloc); fRegionInfos = fManagementSegment.find_or_construct<Uint64RegionInfoMap>(unique_instance)(fShmVoidAlloc);
#ifdef FAIRMQ_DEBUG_MODE
fMsgDebug = fManagementSegment.find_or_construct<Uint64MsgDebugMap>(unique_instance)(fShmVoidAlloc); fMsgDebug = fManagementSegment.find_or_construct<Uint64MsgDebugMap>(unique_instance)(fShmVoidAlloc);
#endif
// store info about the managed segment as region with id 0 // store info about the managed segment as region with id 0
fRegionInfos->emplace(0, RegionInfo("", 0, 0, fShmVoidAlloc)); fRegionInfos->emplace(0, RegionInfo("", 0, 0, fShmVoidAlloc));
@ -137,6 +141,7 @@ class Manager
LOG(debug) << "initialized device counter with: " << fDeviceCounter->fCount; LOG(debug) << "initialized device counter with: " << fDeviceCounter->fCount;
} }
#ifdef FAIRMQ_DEBUG_MODE
fShmMsgCounter = fManagementSegment.find<MsgCounter>(unique_instance).first; fShmMsgCounter = fManagementSegment.find<MsgCounter>(unique_instance).first;
if (fShmMsgCounter) { if (fShmMsgCounter) {
@ -146,6 +151,7 @@ class Manager
fShmMsgCounter = fManagementSegment.construct<MsgCounter>(unique_instance)(0); fShmMsgCounter = fManagementSegment.construct<MsgCounter>(unique_instance)(0);
LOG(debug) << "initialized message counter with: " << fShmMsgCounter->fCount; LOG(debug) << "initialized message counter with: " << fShmMsgCounter->fCount;
} }
#endif
fHeartbeatThread = std::thread(&Manager::SendHeartbeats, this); fHeartbeatThread = std::thread(&Manager::SendHeartbeats, this);
} }
@ -407,6 +413,7 @@ class Manager
void IncrementMsgCounter() { fMsgCounter.fetch_add(1, std::memory_order_relaxed); } void IncrementMsgCounter() { fMsgCounter.fetch_add(1, std::memory_order_relaxed); }
void DecrementMsgCounter() { fMsgCounter.fetch_sub(1, std::memory_order_relaxed); } void DecrementMsgCounter() { fMsgCounter.fetch_sub(1, std::memory_order_relaxed); }
#ifdef FAIRMQ_DEBUG_MODE
void IncrementShmMsgCounter() { ++(fShmMsgCounter->fCount); } void IncrementShmMsgCounter() { ++(fShmMsgCounter->fCount); }
void DecrementShmMsgCounter() { --(fShmMsgCounter->fCount); } void DecrementShmMsgCounter() { --(fShmMsgCounter->fCount); }
@ -419,6 +426,7 @@ class Manager
{ {
fMsgDebug->erase(handle); fMsgDebug->erase(handle);
} }
#endif
boost::interprocess::named_mutex& GetMtx() { return fShmMtx; } boost::interprocess::named_mutex& GetMtx() { return fShmMtx; }
@ -501,8 +509,10 @@ class Manager
std::atomic<bool> fInterrupted; std::atomic<bool> fInterrupted;
std::atomic<int32_t> fMsgCounter; // TODO: find a better lifetime solution instead of the counter std::atomic<int32_t> fMsgCounter; // TODO: find a better lifetime solution instead of the counter
#ifdef FAIRMQ_DEBUG_MODE
Uint64MsgDebugMap* fMsgDebug; Uint64MsgDebugMap* fMsgDebug;
MsgCounter* fShmMsgCounter; MsgCounter* fShmMsgCounter;
#endif
std::thread fHeartbeatThread; std::thread fHeartbeatThread;
bool fSendHeartbeats; bool fSendHeartbeats;

View File

@ -277,9 +277,11 @@ class Message final : public fair::mq::Message
} }
} }
fMeta.fHandle = fManager.Segment().get_handle_from_address(fLocalPtr); fMeta.fHandle = fManager.Segment().get_handle_from_address(fLocalPtr);
#ifdef FAIRMQ_DEBUG_MODE
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fManager.GetMtx()); boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fManager.GetMtx());
fManager.IncrementShmMsgCounter(); fManager.IncrementShmMsgCounter();
fManager.AddMsgDebug(getpid(), size, static_cast<size_t>(fMeta.fHandle), std::chrono::system_clock::now().time_since_epoch().count()); fManager.AddMsgDebug(getpid(), size, static_cast<size_t>(fMeta.fHandle), std::chrono::system_clock::now().time_since_epoch().count());
#endif
} }
fMeta.fSize = size; fMeta.fSize = size;
@ -291,9 +293,11 @@ class Message final : public fair::mq::Message
if (fMeta.fHandle >= 0 && !fQueued) { if (fMeta.fHandle >= 0 && !fQueued) {
if (fMeta.fRegionId == 0) { if (fMeta.fRegionId == 0) {
fManager.Segment().deallocate(fManager.Segment().get_address_from_handle(fMeta.fHandle)); fManager.Segment().deallocate(fManager.Segment().get_address_from_handle(fMeta.fHandle));
#ifdef FAIRMQ_DEBUG_MODE
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fManager.GetMtx()); boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fManager.GetMtx());
fManager.DecrementShmMsgCounter(); fManager.DecrementShmMsgCounter();
fManager.RemoveMsgDebug(fMeta.fHandle); fManager.RemoveMsgDebug(fMeta.fHandle);
#endif
fMeta.fHandle = -1; fMeta.fHandle = -1;
} else { } else {
if (!fRegionPtr) { if (!fRegionPtr) {

View File

@ -287,17 +287,21 @@ void Monitor::CheckSegment()
fSeenOnce = true; fSeenOnce = true;
unsigned int numDevices = 0; unsigned int numDevices = 0;
#ifdef FAIRMQ_DEBUG_MODE
unsigned int numMessages = 0; unsigned int numMessages = 0;
#endif
if (fInteractive || fViewOnly) { if (fInteractive || fViewOnly) {
DeviceCounter* dc = managementSegment.find<DeviceCounter>(bipc::unique_instance).first; DeviceCounter* dc = managementSegment.find<DeviceCounter>(bipc::unique_instance).first;
if (dc) { if (dc) {
numDevices = dc->fCount; numDevices = dc->fCount;
} }
#ifdef FAIRMQ_DEBUG_MODE
MsgCounter* mc = managementSegment.find<MsgCounter>(bipc::unique_instance).first; MsgCounter* mc = managementSegment.find<MsgCounter>(bipc::unique_instance).first;
if (mc) { if (mc) {
numMessages = mc->fCount; numMessages = mc->fCount;
} }
#endif
} }
auto now = chrono::high_resolution_clock::now(); auto now = chrono::high_resolution_clock::now();
@ -319,7 +323,11 @@ void Monitor::CheckSegment()
<< setw(10) << segment.get_size() << " | " << setw(10) << segment.get_size() << " | "
<< setw(10) << segment.get_free_memory() << " | " << setw(10) << segment.get_free_memory() << " | "
<< setw(8) << numDevices << " | " << setw(8) << numDevices << " | "
#ifdef FAIRMQ_DEBUG_MODE
<< setw(8) << numMessages << " | " << setw(8) << numMessages << " | "
#else
<< setw(8) << "nodebug" << " | "
#endif
<< setw(10) << (fViewOnly ? "view only" : to_string(duration)) << " |" << setw(10) << (fViewOnly ? "view only" : to_string(duration)) << " |"
<< c << flush; << c << flush;
} else if (fViewOnly) { } else if (fViewOnly) {
@ -332,7 +340,11 @@ void Monitor::CheckSegment()
LOGV(info, user1) << "[" << fSegmentName LOGV(info, user1) << "[" << fSegmentName
<< "] devices: " << numDevices << "] devices: " << numDevices
<< ", total: " << total << ", total: " << total
#ifdef FAIRMQ_DEBUG_MODE
<< ", msgs: " << numMessages << ", msgs: " << numMessages
#else
<< ", msgs: NODEBUG"
#endif
<< ", free: " << free << ", free: " << free
<< ", used: " << used; << ", used: " << used;
// << "\n " // << "\n "
@ -375,8 +387,9 @@ void Monitor::CheckSegment()
} }
} }
void Monitor::PrintDebug(const ShmId& shmId) void Monitor::PrintDebug(const ShmId& shmId __attribute__((unused)))
{ {
#ifdef FAIRMQ_DEBUG_MODE
string managementSegmentName("fmq_" + shmId.shmId + "_mng"); string managementSegmentName("fmq_" + shmId.shmId + "_mng");
try { try {
bipc::managed_shared_memory managementSegment(bipc::open_only, managementSegmentName.c_str()); bipc::managed_shared_memory managementSegment(bipc::open_only, managementSegmentName.c_str());
@ -402,6 +415,9 @@ void Monitor::PrintDebug(const ShmId& shmId)
} catch (bie&) { } catch (bie&) {
cout << "no segment found" << endl; cout << "no segment found" << endl;
} }
#else
cout << "FairMQ was not compiled in debug mode (FAIRMQ_DEBUG_MODE)" << endl;
#endif
} }
void Monitor::PrintQueues() void Monitor::PrintQueues()
@ -456,7 +472,11 @@ void Monitor::PrintHeader()
void Monitor::PrintHelp() void Monitor::PrintHelp()
{ {
cout << "controls: [x] close memory, [p] print queues, [] print a list of allocated messages, [h] help, [q] quit." << endl; cout << "controls: [x] close memory, "
<< "[p] print queues, "
<< "[b] print a list of allocated messages (only available when compiled with FAIMQ_DEBUG_MODE=ON), "
<< "[h] help, "
<< "[q] quit." << endl;
} }
void Monitor::RemoveObject(const string& name) void Monitor::RemoveObject(const string& name)