Update monitor & debug tools for multiple segments

This commit is contained in:
Alexey Rybalchenko 2020-09-01 19:26:35 +02:00
parent 266843cda5
commit fdbf289364
5 changed files with 117 additions and 164 deletions

View File

@ -95,8 +95,8 @@ struct SegmentInfo
}; };
using Uint16SegmentInfoPairAlloc = boost::interprocess::allocator<std::pair<const uint16_t, SegmentInfo>, SegmentManager>; using Uint16SegmentInfoPairAlloc = boost::interprocess::allocator<std::pair<const uint16_t, SegmentInfo>, SegmentManager>;
using Uint16SegmentInfoMap = boost::interprocess::map<uint16_t, SegmentInfo, std::less<uint16_t>, Uint16SegmentInfoPairAlloc>;
using Uint16SegmentInfoHashMap = boost::unordered_map<uint16_t, SegmentInfo, boost::hash<uint16_t>, std::equal_to<uint16_t>, Uint16SegmentInfoPairAlloc>; using Uint16SegmentInfoHashMap = boost::unordered_map<uint16_t, SegmentInfo, boost::hash<uint16_t>, std::equal_to<uint16_t>, Uint16SegmentInfoPairAlloc>;
// using Uint16SegmentInfoMap = boost::interprocess::map<uint16_t, SegmentInfo, std::less<uint16_t>, Uint16SegmentInfoPairAlloc>;
struct DeviceCounter struct DeviceCounter
{ {
@ -107,17 +107,6 @@ struct DeviceCounter
std::atomic<unsigned int> fCount; std::atomic<unsigned int> fCount;
}; };
#ifdef FAIRMQ_DEBUG_MODE
struct MsgCounter
{
MsgCounter(unsigned int c)
: fCount(c)
{}
std::atomic<unsigned int> fCount;
};
#endif
struct RegionCounter struct RegionCounter
{ {
RegionCounter(uint16_t c) RegionCounter(uint16_t c)
@ -137,8 +126,30 @@ struct MetaHeader
}; };
#ifdef FAIRMQ_DEBUG_MODE #ifdef FAIRMQ_DEBUG_MODE
struct MsgCounter
{
MsgCounter()
: fCount(0)
{}
MsgCounter(unsigned int c)
: fCount(c)
{}
std::atomic<unsigned int> fCount;
};
using Uint16MsgCounterPairAlloc = boost::interprocess::allocator<std::pair<const uint16_t, MsgCounter>, SegmentManager>;
using Uint16MsgCounterHashMap = boost::unordered_map<uint16_t, MsgCounter, boost::hash<uint16_t>, std::equal_to<uint16_t>, Uint16MsgCounterPairAlloc>;
struct MsgDebug struct MsgDebug
{ {
MsgDebug()
: fPid(0)
, fSize(0)
, fCreationTime(0)
{}
MsgDebug(pid_t pid, size_t size, const uint64_t creationTime) MsgDebug(pid_t pid, size_t size, const uint64_t creationTime)
: fPid(pid) : fPid(pid)
, fSize(size) , fSize(size)
@ -151,8 +162,10 @@ struct MsgDebug
}; };
using SizetMsgDebugPairAlloc = boost::interprocess::allocator<std::pair<const size_t, MsgDebug>, SegmentManager>; using SizetMsgDebugPairAlloc = boost::interprocess::allocator<std::pair<const size_t, MsgDebug>, SegmentManager>;
using SizetMsgDebugHashMap = boost::unordered_map<size_t, MsgDebug, boost::hash<size_t>, std::equal_to<size_t>, SizetMsgDebugPairAlloc>; // using SizetMsgDebugHashMap = boost::unordered_map<size_t, MsgDebug, boost::hash<size_t>, std::equal_to<size_t>, SizetMsgDebugPairAlloc>;
using SizetMsgDebugMap = boost::interprocess::map<size_t, MsgDebug, std::less<size_t>, SizetMsgDebugPairAlloc>; using SizetMsgDebugMap = boost::interprocess::map<size_t, MsgDebug, std::less<size_t>, SizetMsgDebugPairAlloc>;
using Uint16MsgDebugMapPairAlloc = boost::interprocess::allocator<std::pair<const uint16_t, SizetMsgDebugMap>, SegmentManager>;
using Uint16MsgDebugMapHashMap = boost::unordered_map<uint16_t, SizetMsgDebugMap, boost::hash<uint16_t>, std::equal_to<uint16_t>, Uint16MsgDebugMapPairAlloc>;
#endif #endif
struct RegionBlock struct RegionBlock

View File

@ -60,7 +60,7 @@ class Manager
public: public:
Manager(std::string shmId, std::string deviceId, size_t size, const ProgOptions* config) Manager(std::string shmId, std::string deviceId, size_t size, const ProgOptions* config)
: fShmId(std::move(shmId)) : fShmId(std::move(shmId))
, fSegmentId(0) , fSegmentId(config ? config->GetProperty<uint16_t>("shm-segment-id", 0) : 0)
, fDeviceId(std::move(deviceId)) , fDeviceId(std::move(deviceId))
, fSegments() , fSegments()
, fManagementSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mng").c_str(), 6553600) , fManagementSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mng").c_str(), 6553600)
@ -75,11 +75,11 @@ class Manager
, fMsgCounter(0) , fMsgCounter(0)
#ifdef FAIRMQ_DEBUG_MODE #ifdef FAIRMQ_DEBUG_MODE
, fMsgDebug(nullptr) , fMsgDebug(nullptr)
, fShmMsgCounter(nullptr) , fShmMsgCounters(nullptr)
#endif #endif
, fHeartbeatThread() , fHeartbeatThread()
, fSendHeartbeats(true) , fSendHeartbeats(true)
, fThrowOnBadAlloc(true) , fThrowOnBadAlloc(config ? config->GetProperty<bool>("shm-throw-bad-alloc", true) : true)
{ {
using namespace boost::interprocess; using namespace boost::interprocess;
@ -89,20 +89,13 @@ class Manager
std::string allocationAlgorithm("rbtree_best_fit"); std::string allocationAlgorithm("rbtree_best_fit");
if (config) { if (config) {
mlockSegment = config->GetProperty<bool>("shm-mlock-segment", mlockSegment); mlockSegment = config->GetProperty<bool>("shm-mlock-segment", mlockSegment);
fSegmentId = config->GetProperty<uint16_t>("shm-segment-id", fSegmentId);
zeroSegment = config->GetProperty<bool>("shm-zero-segment", zeroSegment); zeroSegment = config->GetProperty<bool>("shm-zero-segment", zeroSegment);
autolaunchMonitor = config->GetProperty<bool>("shm-monitor", autolaunchMonitor); autolaunchMonitor = config->GetProperty<bool>("shm-monitor", autolaunchMonitor);
fThrowOnBadAlloc = config->GetProperty<bool>("shm-throw-bad-alloc", fThrowOnBadAlloc);
allocationAlgorithm = config->GetProperty<std::string>("shm-allocation", allocationAlgorithm); allocationAlgorithm = config->GetProperty<std::string>("shm-allocation", allocationAlgorithm);
} else { } else {
LOG(debug) << "ProgOptions not available! Using defaults."; LOG(debug) << "ProgOptions not available! Using defaults.";
} }
if (allocationAlgorithm != "rbtree_best_fit" && allocationAlgorithm != "simple_seq_fit") {
LOG(error) << "Provided shared memory allocation algorithm '" << allocationAlgorithm << "' is not supported. Supported are 'rbtree_best_fit'/'simple_seq_fit'";
throw SharedMemoryError(tools::ToString("Provided shared memory allocation algorithm '", allocationAlgorithm, "' is not supported. Supported are 'rbtree_best_fit'/'simple_seq_fit'"));
}
if (autolaunchMonitor) { if (autolaunchMonitor) {
StartMonitor(fShmId); StartMonitor(fShmId);
} }
@ -170,10 +163,6 @@ class Manager
fShmRegions = fManagementSegment.find_or_construct<Uint16RegionInfoHashMap>(unique_instance)(fShmVoidAlloc); fShmRegions = fManagementSegment.find_or_construct<Uint16RegionInfoHashMap>(unique_instance)(fShmVoidAlloc);
#ifdef FAIRMQ_DEBUG_MODE
fMsgDebug = fManagementSegment.find_or_construct<SizetMsgDebugMap>(unique_instance)(fShmVoidAlloc);
#endif
fDeviceCounter = fManagementSegment.find<DeviceCounter>(unique_instance).first; fDeviceCounter = fManagementSegment.find<DeviceCounter>(unique_instance).first;
if (fDeviceCounter) { if (fDeviceCounter) {
@ -187,15 +176,8 @@ class Manager
} }
#ifdef FAIRMQ_DEBUG_MODE #ifdef FAIRMQ_DEBUG_MODE
fShmMsgCounter = fManagementSegment.find<MsgCounter>(unique_instance).first; fMsgDebug = fManagementSegment.find_or_construct<Uint16MsgDebugMapHashMap>(unique_instance)(fShmVoidAlloc);
fShmMsgCounters = fManagementSegment.find_or_construct<Uint16MsgCounterHashMap>(unique_instance)(fShmVoidAlloc);
if (fShmMsgCounter) {
LOG(debug) << "message counter found, with value of " << fShmMsgCounter->fCount << ".";
} else {
LOG(debug) << "no message counter found, creating one and initializing with 0";
fShmMsgCounter = fManagementSegment.construct<MsgCounter>(unique_instance)(0);
LOG(debug) << "initialized message counter with: " << fShmMsgCounter->fCount;
}
#endif #endif
} }
@ -457,18 +439,8 @@ class Manager
void DecrementMsgCounter() { fMsgCounter.fetch_sub(1, std::memory_order_relaxed); } void DecrementMsgCounter() { fMsgCounter.fetch_sub(1, std::memory_order_relaxed); }
#ifdef FAIRMQ_DEBUG_MODE #ifdef FAIRMQ_DEBUG_MODE
void IncrementShmMsgCounter() { ++(fShmMsgCounter->fCount); } void IncrementShmMsgCounter(uint16_t segmentId) { ++((*fShmMsgCounters)[segmentId].fCount); }
void DecrementShmMsgCounter() { --(fShmMsgCounter->fCount); } void DecrementShmMsgCounter(uint16_t segmentId) { --((*fShmMsgCounters)[segmentId].fCount); }
void AddMsgDebug(pid_t pid, size_t size, size_t handle, uint64_t time)
{
fMsgDebug->emplace(handle, MsgDebug(pid, size, time));
}
void RemoveMsgDebug(size_t handle)
{
fMsgDebug->erase(handle);
}
#endif #endif
boost::interprocess::named_mutex& GetMtx() { return fShmMtx; } boost::interprocess::named_mutex& GetMtx() { return fShmMtx; }
@ -562,8 +534,14 @@ class Manager
} }
#ifdef FAIRMQ_DEBUG_MODE #ifdef FAIRMQ_DEBUG_MODE
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx); boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
IncrementShmMsgCounter(); IncrementShmMsgCounter(fSegmentId);
AddMsgDebug(getpid(), size, static_cast<size_t>(GetHandleFromAddress(ptr)), std::chrono::system_clock::now().time_since_epoch().count()); if (fMsgDebug->count(fSegmentId) == 0) {
(*fMsgDebug).emplace(fSegmentId, fShmVoidAlloc);
}
(*fMsgDebug).at(fSegmentId).emplace(
static_cast<size_t>(GetHandleFromAddress(ptr, fSegmentId)),
MsgDebug(getpid(), size, std::chrono::system_clock::now().time_since_epoch().count())
);
#endif #endif
} }
@ -575,8 +553,12 @@ class Manager
boost::apply_visitor(SegmentDeallocate{GetAddressFromHandle(handle, segmentId)}, fSegments.at(segmentId)); boost::apply_visitor(SegmentDeallocate{GetAddressFromHandle(handle, segmentId)}, fSegments.at(segmentId));
#ifdef FAIRMQ_DEBUG_MODE #ifdef FAIRMQ_DEBUG_MODE
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx); boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
DecrementShmMsgCounter(); DecrementShmMsgCounter(segmentId);
RemoveMsgDebug(handle); try {
(*fMsgDebug).at(segmentId).erase(handle);
} catch(const std::out_of_range& oor) {
LOG(debug) << "could not locate debug container for " << segmentId << ": " << oor.what();
}
#endif #endif
} }
@ -646,8 +628,8 @@ class Manager
std::atomic<bool> fInterrupted; std::atomic<bool> fInterrupted;
std::atomic<int32_t> fMsgCounter; // TODO: find a better lifetime solution instead of the counter std::atomic<int32_t> fMsgCounter; // TODO: find a better lifetime solution instead of the counter
#ifdef FAIRMQ_DEBUG_MODE #ifdef FAIRMQ_DEBUG_MODE
SizetMsgDebugMap* fMsgDebug; Uint16MsgDebugMapHashMap* fMsgDebug;
MsgCounter* fShmMsgCounter; Uint16MsgCounterHashMap* fShmMsgCounters;
#endif #endif
std::thread fHeartbeatThread; std::thread fHeartbeatThread;

View File

@ -27,6 +27,7 @@
#include <ctime> #include <ctime>
#include <time.h> #include <time.h>
#include <iomanip> #include <iomanip>
#include <sstream>
#include <termios.h> #include <termios.h>
#include <poll.h> #include <poll.h>
@ -189,8 +190,6 @@ void Monitor::Interactive()
cout << endl; cout << endl;
PrintHelp(); PrintHelp();
cout << endl;
PrintHeader();
while (!fTerminating) { while (!fTerminating) {
if (poll(cinfd, 1, fIntervalInMS)) { if (poll(cinfd, 1, fIntervalInMS)) {
@ -232,8 +231,6 @@ void Monitor::Interactive()
if (fTerminating) { if (fTerminating) {
break; break;
} }
PrintHeader();
} }
if (fTerminating) { if (fTerminating) {
@ -251,34 +248,10 @@ void Monitor::Interactive()
void Monitor::CheckSegment() void Monitor::CheckSegment()
{ {
using namespace boost::interprocess; using namespace boost::interprocess;
char c = '#';
if (fInteractive) {
static uint64_t counter = 0;
int mod = counter++ % 5;
switch (mod) {
case 0:
c = '-';
break;
case 1:
c = '\\';
break;
case 2:
c = '|';
break;
case 3:
c = '-';
break;
case 4:
c = '/';
break;
default:
break;
}
}
try { try {
managed_shared_memory managementSegment(open_only, fManagementSegmentName.c_str()); managed_shared_memory managementSegment(open_only, fManagementSegmentName.c_str());
VoidAlloc allocInstance(managementSegment.get_segment_manager());
Uint16SegmentInfoHashMap* segmentInfos = managementSegment.find<Uint16SegmentInfoHashMap>(unique_instance).first; Uint16SegmentInfoHashMap* segmentInfos = managementSegment.find<Uint16SegmentInfoHashMap>(unique_instance).first;
std::unordered_map<uint16_t, boost::variant<RBTreeBestFitSegment, SimpleSeqFitSegment>> segments; std::unordered_map<uint16_t, boost::variant<RBTreeBestFitSegment, SimpleSeqFitSegment>> segments;
@ -300,7 +273,7 @@ void Monitor::CheckSegment()
unsigned int numDevices = 0; unsigned int numDevices = 0;
#ifdef FAIRMQ_DEBUG_MODE #ifdef FAIRMQ_DEBUG_MODE
unsigned int numMessages = 0; Uint16MsgCounterHashMap* msgCounters = nullptr;
#endif #endif
if (fInteractive || fViewOnly) { if (fInteractive || fViewOnly) {
@ -309,10 +282,7 @@ void Monitor::CheckSegment()
numDevices = dc->fCount; numDevices = dc->fCount;
} }
#ifdef FAIRMQ_DEBUG_MODE #ifdef FAIRMQ_DEBUG_MODE
MsgCounter* mc = managementSegment.find<MsgCounter>(unique_instance).first; msgCounters = managementSegment.find_or_construct<Uint16MsgCounterHashMap>(unique_instance)(allocInstance);
if (mc) {
numMessages = mc->fCount;
}
#endif #endif
} }
@ -329,54 +299,37 @@ void Monitor::CheckSegment()
} }
} }
if (fInteractive) { if (fInteractive || fViewOnly) {
cout << "| " stringstream ss;
<< setw(18) << fSegmentName << " | " size_t mfree = managementSegment.get_free_memory();
<< setw(10) << boost::apply_visitor(SegmentSize{}, segments.at(0)) << " | " size_t mtotal = managementSegment.get_size();
<< setw(10) << boost::apply_visitor(SegmentFreeMemory{}, segments.at(0)) << " | " size_t mused = mtotal - mfree;
<< setw(8) << numDevices << " | "
#ifdef FAIRMQ_DEBUG_MODE ss << "shm id: " << fShmId
<< setw(8) << numMessages << " | " << ", devices: " << numDevices << ", segments:\n";
#else for (const auto& s : segments) {
<< setw(8) << "nodebug" << " | " size_t free = boost::apply_visitor(SegmentFreeMemory{}, s.second);
#endif size_t total = boost::apply_visitor(SegmentSize{}, s.second);
<< setw(10) << (fViewOnly ? "view only" : to_string(duration)) << " |"
<< c << flush;
} else if (fViewOnly) {
size_t free = boost::apply_visitor(SegmentFreeMemory{}, segments.at(0));
size_t total = boost::apply_visitor(SegmentSize{}, segments.at(0));
size_t used = total - free; size_t used = total - free;
// size_t mfree = managementSegment.get_free_memory(); ss << " [" << s.first
// size_t mtotal = managementSegment.get_size(); << "]: total: " << total
// size_t mused = mtotal - mfree;
LOGV(info, user1) << "[" << fSegmentName
<< "] devices: " << numDevices
<< ", total: " << total
#ifdef FAIRMQ_DEBUG_MODE #ifdef FAIRMQ_DEBUG_MODE
<< ", msgs: " << numMessages << ", msgs: " << (*msgCounters)[s.first].fCount
#else #else
<< ", msgs: NODEBUG" << ", msgs: NODEBUG"
#endif #endif
<< ", free: " << free << ", free: " << free
<< ", used: " << used; << ", used: " << used
// << "\n " << "\n";
// << "[" << fManagementSegmentName }
// << "] total: " << mtotal ss << " [m]: "
// << ", free: " << mfree << "total: " << mtotal
// << ", used: " << mused; << ", free: " << mfree
<< ", used: " << mused;
LOGV(info, user1) << ss.str();
} }
} catch (bie&) { } catch (bie&) {
fHeartbeatTriggered = false; fHeartbeatTriggered = false;
if (fInteractive) {
cout << "| "
<< setw(18) << "-" << " | "
<< setw(10) << "-" << " | "
<< setw(10) << "-" << " | "
<< setw(8) << "-" << " | "
<< setw(8) << "-" << " | "
<< setw(10) << "-" << " |"
<< c << flush;
}
auto now = chrono::high_resolution_clock::now(); auto now = chrono::high_resolution_clock::now();
unsigned int duration = chrono::duration_cast<chrono::milliseconds>(now - fLastHeartbeat).count(); unsigned int duration = chrono::duration_cast<chrono::milliseconds>(now - fLastHeartbeat).count();
@ -408,24 +361,32 @@ void Monitor::PrintDebugInfo(const ShmId& shmId __attribute__((unused)))
boost::interprocess::named_mutex mtx(boost::interprocess::open_only, string("fmq_" + shmId.shmId + "_mtx").c_str()); boost::interprocess::named_mutex mtx(boost::interprocess::open_only, string("fmq_" + shmId.shmId + "_mtx").c_str());
boost::interprocess::scoped_lock<bipc::named_mutex> lock(mtx); boost::interprocess::scoped_lock<bipc::named_mutex> lock(mtx);
SizetMsgDebugMap* debug = managementSegment.find<SizetMsgDebugMap>(bipc::unique_instance).first; Uint16MsgDebugMapHashMap* debug = managementSegment.find<Uint16MsgDebugMapHashMap>(bipc::unique_instance).first;
cout << endl << "found " << debug->size() << " message(s):" << endl; size_t numMessages = 0;
for (const auto& e : *debug) { for (const auto& e : *debug) {
numMessages += e.second.size();
}
cout << endl << "found " << numMessages << " messages." << endl;
for (const auto& s : *debug) {
for (const auto& e : s.second) {
using time_point = chrono::system_clock::time_point; using time_point = chrono::system_clock::time_point;
time_point tmpt{chrono::duration_cast<time_point::duration>(chrono::nanoseconds(e.second.fCreationTime))}; time_point tmpt{chrono::duration_cast<time_point::duration>(chrono::nanoseconds(e.second.fCreationTime))};
time_t t = chrono::system_clock::to_time_t(tmpt); time_t t = chrono::system_clock::to_time_t(tmpt);
uint64_t ms = e.second.fCreationTime % 1000000; uint64_t ms = e.second.fCreationTime % 1000000;
auto tm = localtime(&t); auto tm = localtime(&t);
cout << "offset: " << setw(12) << setfill(' ') << e.first cout << "segment: " << setw(3) << setfill(' ') << s.first
<< ", offset: " << setw(12) << setfill(' ') << e.first
<< ", size: " << setw(10) << setfill(' ') << e.second.fSize << ", size: " << setw(10) << setfill(' ') << e.second.fSize
<< ", creator PID: " << e.second.fPid << setfill('0') << ", creator PID: " << e.second.fPid << setfill('0')
<< ", at: " << setw(2) << tm->tm_hour << ":" << setw(2) << tm->tm_min << ":" << setw(2) << tm->tm_sec << "." << setw(6) << ms << endl; << ", at: " << setw(2) << tm->tm_hour << ":" << setw(2) << tm->tm_min << ":" << setw(2) << tm->tm_sec << "." << setw(6) << ms << endl;
} }
}
cout << setfill(' '); cout << setfill(' ');
} catch (bie&) { } catch (bie&) {
cout << "no segment found" << endl; cout << "no segments found" << endl;
} }
#else #else
cout << "FairMQ was not compiled in debug mode (FAIRMQ_DEBUG_MODE)" << endl; cout << "FairMQ was not compiled in debug mode (FAIRMQ_DEBUG_MODE)" << endl;
@ -438,9 +399,9 @@ void Monitor::PrintDebugInfo(const SessionId& sessionId)
PrintDebugInfo(shmId); PrintDebugInfo(shmId);
} }
vector<BufferDebugInfo> Monitor::GetDebugInfo(const ShmId& shmId __attribute__((unused))) unordered_map<uint16_t, std::vector<BufferDebugInfo>> Monitor::GetDebugInfo(const ShmId& shmId __attribute__((unused)))
{ {
vector<BufferDebugInfo> result; unordered_map<uint16_t, std::vector<BufferDebugInfo>> result;
#ifdef FAIRMQ_DEBUG_MODE #ifdef FAIRMQ_DEBUG_MODE
string managementSegmentName("fmq_" + shmId.shmId + "_mng"); string managementSegmentName("fmq_" + shmId.shmId + "_mng");
@ -449,15 +410,18 @@ vector<BufferDebugInfo> Monitor::GetDebugInfo(const ShmId& shmId __attribute__((
boost::interprocess::named_mutex mtx(boost::interprocess::open_only, string("fmq_" + shmId.shmId + "_mtx").c_str()); boost::interprocess::named_mutex mtx(boost::interprocess::open_only, string("fmq_" + shmId.shmId + "_mtx").c_str());
boost::interprocess::scoped_lock<bipc::named_mutex> lock(mtx); boost::interprocess::scoped_lock<bipc::named_mutex> lock(mtx);
SizetMsgDebugMap* debug = managementSegment.find<SizetMsgDebugMap>(bipc::unique_instance).first; Uint16MsgDebugMapHashMap* debug = managementSegment.find<Uint16MsgDebugMapHashMap>(bipc::unique_instance).first;
result.reserve(debug->size()); result.reserve(debug->size());
for (const auto& e : *debug) { for (const auto& s : *debug) {
result.emplace_back(e.first, e.second.fPid, e.second.fSize, e.second.fCreationTime); result[s.first].reserve(s.second.size());
for (const auto& e : s.second) {
result[s.first][e.first] = BufferDebugInfo(e.first, e.second.fPid, e.second.fSize, e.second.fCreationTime);
}
} }
} catch (bie&) { } catch (bie&) {
cout << "no segment found" << endl; cout << "no segments found" << endl;
} }
#else #else
cout << "FairMQ was not compiled in debug mode (FAIRMQ_DEBUG_MODE)" << endl; cout << "FairMQ was not compiled in debug mode (FAIRMQ_DEBUG_MODE)" << endl;
@ -465,24 +429,12 @@ vector<BufferDebugInfo> Monitor::GetDebugInfo(const ShmId& shmId __attribute__((
return result; return result;
} }
vector<BufferDebugInfo> Monitor::GetDebugInfo(const SessionId& sessionId) unordered_map<uint16_t, std::vector<BufferDebugInfo>> Monitor::GetDebugInfo(const SessionId& sessionId)
{ {
ShmId shmId{buildShmIdFromSessionIdAndUserId(sessionId.sessionId)}; ShmId shmId{buildShmIdFromSessionIdAndUserId(sessionId.sessionId)};
return GetDebugInfo(shmId); return GetDebugInfo(shmId);
} }
void Monitor::PrintHeader()
{
cout << "| "
<< setw(18) << "name" << " | "
<< setw(10) << "size" << " | "
<< setw(10) << "free" << " | "
<< setw(8) << "devices" << " | "
<< setw(8) << "msgs" << " | "
<< setw(10) << "last hb" << " |"
<< endl;
}
void Monitor::PrintHelp() void Monitor::PrintHelp()
{ {
cout << "controls: [x] close memory, " cout << "controls: [x] close memory, "

View File

@ -83,8 +83,8 @@ class Monitor
static void PrintDebugInfo(const ShmId& shmId); static void PrintDebugInfo(const ShmId& shmId);
static void PrintDebugInfo(const SessionId& shmId); static void PrintDebugInfo(const SessionId& shmId);
static std::vector<BufferDebugInfo> GetDebugInfo(const ShmId& shmId); static std::unordered_map<uint16_t, std::vector<BufferDebugInfo>> GetDebugInfo(const ShmId& shmId);
static std::vector<BufferDebugInfo> GetDebugInfo(const SessionId& shmId); static std::unordered_map<uint16_t, std::vector<BufferDebugInfo>> GetDebugInfo(const SessionId& shmId);
static bool RemoveObject(const std::string& name); static bool RemoveObject(const std::string& name);
static bool RemoveFileMapping(const std::string& name); static bool RemoveFileMapping(const std::string& name);
@ -95,7 +95,6 @@ class Monitor
struct DaemonPresent : std::runtime_error { using std::runtime_error::runtime_error; }; struct DaemonPresent : std::runtime_error { using std::runtime_error::runtime_error; };
private: private:
void PrintHeader();
void PrintHelp(); void PrintHelp();
void MonitorHeartbeats(); void MonitorHeartbeats();
void CheckSegment(); void CheckSegment();

View File

@ -58,14 +58,21 @@ class TransportFactory final : public fair::mq::TransportFactory
int numIoThreads = 1; int numIoThreads = 1;
std::string sessionName = "default"; std::string sessionName = "default";
size_t segmentSize = 2ULL << 30; size_t segmentSize = 2ULL << 30;
std::string allocationAlgorithm("rbtree_best_fit");
if (config) { if (config) {
numIoThreads = config->GetProperty<int>("io-threads", numIoThreads); numIoThreads = config->GetProperty<int>("io-threads", numIoThreads);
sessionName = config->GetProperty<std::string>("session", sessionName); sessionName = config->GetProperty<std::string>("session", sessionName);
segmentSize = config->GetProperty<size_t>("shm-segment-size", segmentSize); segmentSize = config->GetProperty<size_t>("shm-segment-size", segmentSize);
allocationAlgorithm = config->GetProperty<std::string>("shm-allocation", allocationAlgorithm);
} else { } else {
LOG(debug) << "ProgOptions not available! Using defaults."; LOG(debug) << "ProgOptions not available! Using defaults.";
} }
if (allocationAlgorithm != "rbtree_best_fit" && allocationAlgorithm != "simple_seq_fit") {
LOG(error) << "Provided shared memory allocation algorithm '" << allocationAlgorithm << "' is not supported. Supported are 'rbtree_best_fit'/'simple_seq_fit'";
throw SharedMemoryError(tools::ToString("Provided shared memory allocation algorithm '", allocationAlgorithm, "' is not supported. Supported are 'rbtree_best_fit'/'simple_seq_fit'"));
}
fShmId = buildShmIdFromSessionIdAndUserId(sessionName); fShmId = buildShmIdFromSessionIdAndUserId(sessionName);
LOG(debug) << "Generated shmid '" << fShmId << "' out of session id '" << sessionName << "'."; LOG(debug) << "Generated shmid '" << fShmId << "' out of session id '" << sessionName << "'.";