diff --git a/fairmq/shmem/Common.h b/fairmq/shmem/Common.h index 83650055..8e9cdc11 100644 --- a/fairmq/shmem/Common.h +++ b/fairmq/shmem/Common.h @@ -95,8 +95,8 @@ struct SegmentInfo }; using Uint16SegmentInfoPairAlloc = boost::interprocess::allocator, SegmentManager>; -using Uint16SegmentInfoMap = boost::interprocess::map, Uint16SegmentInfoPairAlloc>; using Uint16SegmentInfoHashMap = boost::unordered_map, std::equal_to, Uint16SegmentInfoPairAlloc>; +// using Uint16SegmentInfoMap = boost::interprocess::map, Uint16SegmentInfoPairAlloc>; struct DeviceCounter { @@ -107,17 +107,6 @@ struct DeviceCounter std::atomic fCount; }; -#ifdef FAIRMQ_DEBUG_MODE -struct MsgCounter -{ - MsgCounter(unsigned int c) - : fCount(c) - {} - - std::atomic fCount; -}; -#endif - struct RegionCounter { RegionCounter(uint16_t c) @@ -137,8 +126,30 @@ struct MetaHeader }; #ifdef FAIRMQ_DEBUG_MODE +struct MsgCounter +{ + MsgCounter() + : fCount(0) + {} + + MsgCounter(unsigned int c) + : fCount(c) + {} + + std::atomic fCount; +}; + +using Uint16MsgCounterPairAlloc = boost::interprocess::allocator, SegmentManager>; +using Uint16MsgCounterHashMap = boost::unordered_map, std::equal_to, Uint16MsgCounterPairAlloc>; + struct MsgDebug { + MsgDebug() + : fPid(0) + , fSize(0) + , fCreationTime(0) + {} + MsgDebug(pid_t pid, size_t size, const uint64_t creationTime) : fPid(pid) , fSize(size) @@ -151,8 +162,10 @@ struct MsgDebug }; using SizetMsgDebugPairAlloc = boost::interprocess::allocator, SegmentManager>; -using SizetMsgDebugHashMap = boost::unordered_map, std::equal_to, SizetMsgDebugPairAlloc>; +// using SizetMsgDebugHashMap = boost::unordered_map, std::equal_to, SizetMsgDebugPairAlloc>; using SizetMsgDebugMap = boost::interprocess::map, SizetMsgDebugPairAlloc>; +using Uint16MsgDebugMapPairAlloc = boost::interprocess::allocator, SegmentManager>; +using Uint16MsgDebugMapHashMap = boost::unordered_map, std::equal_to, Uint16MsgDebugMapPairAlloc>; #endif struct RegionBlock diff --git a/fairmq/shmem/Manager.h b/fairmq/shmem/Manager.h index 9a92430c..6651da9a 100644 --- a/fairmq/shmem/Manager.h +++ b/fairmq/shmem/Manager.h @@ -60,7 +60,7 @@ class Manager public: Manager(std::string shmId, std::string deviceId, size_t size, const ProgOptions* config) : fShmId(std::move(shmId)) - , fSegmentId(0) + , fSegmentId(config ? config->GetProperty("shm-segment-id", 0) : 0) , fDeviceId(std::move(deviceId)) , fSegments() , fManagementSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mng").c_str(), 6553600) @@ -75,11 +75,11 @@ class Manager , fMsgCounter(0) #ifdef FAIRMQ_DEBUG_MODE , fMsgDebug(nullptr) - , fShmMsgCounter(nullptr) + , fShmMsgCounters(nullptr) #endif , fHeartbeatThread() , fSendHeartbeats(true) - , fThrowOnBadAlloc(true) + , fThrowOnBadAlloc(config ? config->GetProperty("shm-throw-bad-alloc", true) : true) { using namespace boost::interprocess; @@ -89,20 +89,13 @@ class Manager std::string allocationAlgorithm("rbtree_best_fit"); if (config) { mlockSegment = config->GetProperty("shm-mlock-segment", mlockSegment); - fSegmentId = config->GetProperty("shm-segment-id", fSegmentId); zeroSegment = config->GetProperty("shm-zero-segment", zeroSegment); autolaunchMonitor = config->GetProperty("shm-monitor", autolaunchMonitor); - fThrowOnBadAlloc = config->GetProperty("shm-throw-bad-alloc", fThrowOnBadAlloc); allocationAlgorithm = config->GetProperty("shm-allocation", allocationAlgorithm); } else { LOG(debug) << "ProgOptions not available! Using defaults."; } - if (allocationAlgorithm != "rbtree_best_fit" && allocationAlgorithm != "simple_seq_fit") { - LOG(error) << "Provided shared memory allocation algorithm '" << allocationAlgorithm << "' is not supported. Supported are 'rbtree_best_fit'/'simple_seq_fit'"; - throw SharedMemoryError(tools::ToString("Provided shared memory allocation algorithm '", allocationAlgorithm, "' is not supported. Supported are 'rbtree_best_fit'/'simple_seq_fit'")); - } - if (autolaunchMonitor) { StartMonitor(fShmId); } @@ -170,10 +163,6 @@ class Manager fShmRegions = fManagementSegment.find_or_construct(unique_instance)(fShmVoidAlloc); -#ifdef FAIRMQ_DEBUG_MODE - fMsgDebug = fManagementSegment.find_or_construct(unique_instance)(fShmVoidAlloc); -#endif - fDeviceCounter = fManagementSegment.find(unique_instance).first; if (fDeviceCounter) { @@ -187,15 +176,8 @@ class Manager } #ifdef FAIRMQ_DEBUG_MODE - fShmMsgCounter = fManagementSegment.find(unique_instance).first; - - if (fShmMsgCounter) { - LOG(debug) << "message counter found, with value of " << fShmMsgCounter->fCount << "."; - } else { - LOG(debug) << "no message counter found, creating one and initializing with 0"; - fShmMsgCounter = fManagementSegment.construct(unique_instance)(0); - LOG(debug) << "initialized message counter with: " << fShmMsgCounter->fCount; - } + fMsgDebug = fManagementSegment.find_or_construct(unique_instance)(fShmVoidAlloc); + fShmMsgCounters = fManagementSegment.find_or_construct(unique_instance)(fShmVoidAlloc); #endif } @@ -457,18 +439,8 @@ class Manager void DecrementMsgCounter() { fMsgCounter.fetch_sub(1, std::memory_order_relaxed); } #ifdef FAIRMQ_DEBUG_MODE - void IncrementShmMsgCounter() { ++(fShmMsgCounter->fCount); } - void DecrementShmMsgCounter() { --(fShmMsgCounter->fCount); } - - void AddMsgDebug(pid_t pid, size_t size, size_t handle, uint64_t time) - { - fMsgDebug->emplace(handle, MsgDebug(pid, size, time)); - } - - void RemoveMsgDebug(size_t handle) - { - fMsgDebug->erase(handle); - } + void IncrementShmMsgCounter(uint16_t segmentId) { ++((*fShmMsgCounters)[segmentId].fCount); } + void DecrementShmMsgCounter(uint16_t segmentId) { --((*fShmMsgCounters)[segmentId].fCount); } #endif boost::interprocess::named_mutex& GetMtx() { return fShmMtx; } @@ -562,8 +534,14 @@ class Manager } #ifdef FAIRMQ_DEBUG_MODE boost::interprocess::scoped_lock lock(fShmMtx); - IncrementShmMsgCounter(); - AddMsgDebug(getpid(), size, static_cast(GetHandleFromAddress(ptr)), std::chrono::system_clock::now().time_since_epoch().count()); + IncrementShmMsgCounter(fSegmentId); + if (fMsgDebug->count(fSegmentId) == 0) { + (*fMsgDebug).emplace(fSegmentId, fShmVoidAlloc); + } + (*fMsgDebug).at(fSegmentId).emplace( + static_cast(GetHandleFromAddress(ptr, fSegmentId)), + MsgDebug(getpid(), size, std::chrono::system_clock::now().time_since_epoch().count()) + ); #endif } @@ -575,8 +553,12 @@ class Manager boost::apply_visitor(SegmentDeallocate{GetAddressFromHandle(handle, segmentId)}, fSegments.at(segmentId)); #ifdef FAIRMQ_DEBUG_MODE boost::interprocess::scoped_lock lock(fShmMtx); - DecrementShmMsgCounter(); - RemoveMsgDebug(handle); + DecrementShmMsgCounter(segmentId); + try { + (*fMsgDebug).at(segmentId).erase(handle); + } catch(const std::out_of_range& oor) { + LOG(debug) << "could not locate debug container for " << segmentId << ": " << oor.what(); + } #endif } @@ -646,8 +628,8 @@ class Manager std::atomic fInterrupted; std::atomic fMsgCounter; // TODO: find a better lifetime solution instead of the counter #ifdef FAIRMQ_DEBUG_MODE - SizetMsgDebugMap* fMsgDebug; - MsgCounter* fShmMsgCounter; + Uint16MsgDebugMapHashMap* fMsgDebug; + Uint16MsgCounterHashMap* fShmMsgCounters; #endif std::thread fHeartbeatThread; diff --git a/fairmq/shmem/Monitor.cxx b/fairmq/shmem/Monitor.cxx index fbe96ce9..91ebbf5e 100644 --- a/fairmq/shmem/Monitor.cxx +++ b/fairmq/shmem/Monitor.cxx @@ -27,6 +27,7 @@ #include #include #include +#include #include #include @@ -189,8 +190,6 @@ void Monitor::Interactive() cout << endl; PrintHelp(); - cout << endl; - PrintHeader(); while (!fTerminating) { if (poll(cinfd, 1, fIntervalInMS)) { @@ -232,8 +231,6 @@ void Monitor::Interactive() if (fTerminating) { break; } - - PrintHeader(); } if (fTerminating) { @@ -251,34 +248,10 @@ void Monitor::Interactive() void Monitor::CheckSegment() { using namespace boost::interprocess; - char c = '#'; - - if (fInteractive) { - static uint64_t counter = 0; - int mod = counter++ % 5; - switch (mod) { - case 0: - c = '-'; - break; - case 1: - c = '\\'; - break; - case 2: - c = '|'; - break; - case 3: - c = '-'; - break; - case 4: - c = '/'; - break; - default: - break; - } - } try { managed_shared_memory managementSegment(open_only, fManagementSegmentName.c_str()); + VoidAlloc allocInstance(managementSegment.get_segment_manager()); Uint16SegmentInfoHashMap* segmentInfos = managementSegment.find(unique_instance).first; std::unordered_map> segments; @@ -300,7 +273,7 @@ void Monitor::CheckSegment() unsigned int numDevices = 0; #ifdef FAIRMQ_DEBUG_MODE - unsigned int numMessages = 0; + Uint16MsgCounterHashMap* msgCounters = nullptr; #endif if (fInteractive || fViewOnly) { @@ -309,10 +282,7 @@ void Monitor::CheckSegment() numDevices = dc->fCount; } #ifdef FAIRMQ_DEBUG_MODE - MsgCounter* mc = managementSegment.find(unique_instance).first; - if (mc) { - numMessages = mc->fCount; - } + msgCounters = managementSegment.find_or_construct(unique_instance)(allocInstance); #endif } @@ -329,54 +299,37 @@ void Monitor::CheckSegment() } } - if (fInteractive) { - cout << "| " - << setw(18) << fSegmentName << " | " - << setw(10) << boost::apply_visitor(SegmentSize{}, segments.at(0)) << " | " - << setw(10) << boost::apply_visitor(SegmentFreeMemory{}, segments.at(0)) << " | " - << setw(8) << numDevices << " | " + if (fInteractive || fViewOnly) { + stringstream ss; + size_t mfree = managementSegment.get_free_memory(); + size_t mtotal = managementSegment.get_size(); + size_t mused = mtotal - mfree; + + ss << "shm id: " << fShmId + << ", devices: " << numDevices << ", segments:\n"; + for (const auto& s : segments) { + size_t free = boost::apply_visitor(SegmentFreeMemory{}, s.second); + size_t total = boost::apply_visitor(SegmentSize{}, s.second); + size_t used = total - free; + ss << " [" << s.first + << "]: total: " << total #ifdef FAIRMQ_DEBUG_MODE - << setw(8) << numMessages << " | " + << ", msgs: " << (*msgCounters)[s.first].fCount #else - << setw(8) << "nodebug" << " | " + << ", msgs: NODEBUG" #endif - << setw(10) << (fViewOnly ? "view only" : to_string(duration)) << " |" - << c << flush; - } else if (fViewOnly) { - size_t free = boost::apply_visitor(SegmentFreeMemory{}, segments.at(0)); - size_t total = boost::apply_visitor(SegmentSize{}, segments.at(0)); - size_t used = total - free; - // size_t mfree = managementSegment.get_free_memory(); - // size_t mtotal = managementSegment.get_size(); - // size_t mused = mtotal - mfree; - LOGV(info, user1) << "[" << fSegmentName - << "] devices: " << numDevices - << ", total: " << total -#ifdef FAIRMQ_DEBUG_MODE - << ", msgs: " << numMessages -#else - << ", msgs: NODEBUG" -#endif - << ", free: " << free - << ", used: " << used; - // << "\n " - // << "[" << fManagementSegmentName - // << "] total: " << mtotal - // << ", free: " << mfree - // << ", used: " << mused; + << ", free: " << free + << ", used: " << used + << "\n"; + } + ss << " [m]: " + << "total: " << mtotal + << ", free: " << mfree + << ", used: " << mused; + LOGV(info, user1) << ss.str(); } } catch (bie&) { fHeartbeatTriggered = false; - if (fInteractive) { - cout << "| " - << setw(18) << "-" << " | " - << setw(10) << "-" << " | " - << setw(10) << "-" << " | " - << setw(8) << "-" << " | " - << setw(8) << "-" << " | " - << setw(10) << "-" << " |" - << c << flush; - } auto now = chrono::high_resolution_clock::now(); unsigned int duration = chrono::duration_cast(now - fLastHeartbeat).count(); @@ -408,24 +361,32 @@ void Monitor::PrintDebugInfo(const ShmId& shmId __attribute__((unused))) boost::interprocess::named_mutex mtx(boost::interprocess::open_only, string("fmq_" + shmId.shmId + "_mtx").c_str()); boost::interprocess::scoped_lock lock(mtx); - SizetMsgDebugMap* debug = managementSegment.find(bipc::unique_instance).first; + Uint16MsgDebugMapHashMap* debug = managementSegment.find(bipc::unique_instance).first; - cout << endl << "found " << debug->size() << " message(s):" << endl; + size_t numMessages = 0; for (const auto& e : *debug) { - using time_point = chrono::system_clock::time_point; - time_point tmpt{chrono::duration_cast(chrono::nanoseconds(e.second.fCreationTime))}; - time_t t = chrono::system_clock::to_time_t(tmpt); - uint64_t ms = e.second.fCreationTime % 1000000; - auto tm = localtime(&t); - cout << "offset: " << setw(12) << setfill(' ') << e.first - << ", size: " << setw(10) << setfill(' ') << e.second.fSize - << ", creator PID: " << e.second.fPid << setfill('0') - << ", at: " << setw(2) << tm->tm_hour << ":" << setw(2) << tm->tm_min << ":" << setw(2) << tm->tm_sec << "." << setw(6) << ms << endl; + numMessages += e.second.size(); + } + cout << endl << "found " << numMessages << " messages." << endl; + + for (const auto& s : *debug) { + for (const auto& e : s.second) { + using time_point = chrono::system_clock::time_point; + time_point tmpt{chrono::duration_cast(chrono::nanoseconds(e.second.fCreationTime))}; + time_t t = chrono::system_clock::to_time_t(tmpt); + uint64_t ms = e.second.fCreationTime % 1000000; + auto tm = localtime(&t); + cout << "segment: " << setw(3) << setfill(' ') << s.first + << ", offset: " << setw(12) << setfill(' ') << e.first + << ", size: " << setw(10) << setfill(' ') << e.second.fSize + << ", creator PID: " << e.second.fPid << setfill('0') + << ", at: " << setw(2) << tm->tm_hour << ":" << setw(2) << tm->tm_min << ":" << setw(2) << tm->tm_sec << "." << setw(6) << ms << endl; + } } cout << setfill(' '); } catch (bie&) { - cout << "no segment found" << endl; + cout << "no segments found" << endl; } #else cout << "FairMQ was not compiled in debug mode (FAIRMQ_DEBUG_MODE)" << endl; @@ -438,9 +399,9 @@ void Monitor::PrintDebugInfo(const SessionId& sessionId) PrintDebugInfo(shmId); } -vector Monitor::GetDebugInfo(const ShmId& shmId __attribute__((unused))) +unordered_map> Monitor::GetDebugInfo(const ShmId& shmId __attribute__((unused))) { - vector result; + unordered_map> result; #ifdef FAIRMQ_DEBUG_MODE string managementSegmentName("fmq_" + shmId.shmId + "_mng"); @@ -449,15 +410,18 @@ vector Monitor::GetDebugInfo(const ShmId& shmId __attribute__(( boost::interprocess::named_mutex mtx(boost::interprocess::open_only, string("fmq_" + shmId.shmId + "_mtx").c_str()); boost::interprocess::scoped_lock lock(mtx); - SizetMsgDebugMap* debug = managementSegment.find(bipc::unique_instance).first; + Uint16MsgDebugMapHashMap* debug = managementSegment.find(bipc::unique_instance).first; result.reserve(debug->size()); - for (const auto& e : *debug) { - result.emplace_back(e.first, e.second.fPid, e.second.fSize, e.second.fCreationTime); + for (const auto& s : *debug) { + result[s.first].reserve(s.second.size()); + for (const auto& e : s.second) { + result[s.first][e.first] = BufferDebugInfo(e.first, e.second.fPid, e.second.fSize, e.second.fCreationTime); + } } } catch (bie&) { - cout << "no segment found" << endl; + cout << "no segments found" << endl; } #else cout << "FairMQ was not compiled in debug mode (FAIRMQ_DEBUG_MODE)" << endl; @@ -465,24 +429,12 @@ vector Monitor::GetDebugInfo(const ShmId& shmId __attribute__(( return result; } -vector Monitor::GetDebugInfo(const SessionId& sessionId) +unordered_map> Monitor::GetDebugInfo(const SessionId& sessionId) { ShmId shmId{buildShmIdFromSessionIdAndUserId(sessionId.sessionId)}; return GetDebugInfo(shmId); } -void Monitor::PrintHeader() -{ - cout << "| " - << setw(18) << "name" << " | " - << setw(10) << "size" << " | " - << setw(10) << "free" << " | " - << setw(8) << "devices" << " | " - << setw(8) << "msgs" << " | " - << setw(10) << "last hb" << " |" - << endl; -} - void Monitor::PrintHelp() { cout << "controls: [x] close memory, " diff --git a/fairmq/shmem/Monitor.h b/fairmq/shmem/Monitor.h index 1f8f7796..8efab7c6 100644 --- a/fairmq/shmem/Monitor.h +++ b/fairmq/shmem/Monitor.h @@ -83,8 +83,8 @@ class Monitor static void PrintDebugInfo(const ShmId& shmId); static void PrintDebugInfo(const SessionId& shmId); - static std::vector GetDebugInfo(const ShmId& shmId); - static std::vector GetDebugInfo(const SessionId& shmId); + static std::unordered_map> GetDebugInfo(const ShmId& shmId); + static std::unordered_map> GetDebugInfo(const SessionId& shmId); static bool RemoveObject(const std::string& name); static bool RemoveFileMapping(const std::string& name); @@ -95,7 +95,6 @@ class Monitor struct DaemonPresent : std::runtime_error { using std::runtime_error::runtime_error; }; private: - void PrintHeader(); void PrintHelp(); void MonitorHeartbeats(); void CheckSegment(); diff --git a/fairmq/shmem/TransportFactory.h b/fairmq/shmem/TransportFactory.h index c77d4ca5..5d5ddaaa 100644 --- a/fairmq/shmem/TransportFactory.h +++ b/fairmq/shmem/TransportFactory.h @@ -58,14 +58,21 @@ class TransportFactory final : public fair::mq::TransportFactory int numIoThreads = 1; std::string sessionName = "default"; size_t segmentSize = 2ULL << 30; + std::string allocationAlgorithm("rbtree_best_fit"); if (config) { numIoThreads = config->GetProperty("io-threads", numIoThreads); sessionName = config->GetProperty("session", sessionName); segmentSize = config->GetProperty("shm-segment-size", segmentSize); + allocationAlgorithm = config->GetProperty("shm-allocation", allocationAlgorithm); } else { LOG(debug) << "ProgOptions not available! Using defaults."; } + if (allocationAlgorithm != "rbtree_best_fit" && allocationAlgorithm != "simple_seq_fit") { + LOG(error) << "Provided shared memory allocation algorithm '" << allocationAlgorithm << "' is not supported. Supported are 'rbtree_best_fit'/'simple_seq_fit'"; + throw SharedMemoryError(tools::ToString("Provided shared memory allocation algorithm '", allocationAlgorithm, "' is not supported. Supported are 'rbtree_best_fit'/'simple_seq_fit'")); + } + fShmId = buildShmIdFromSessionIdAndUserId(sessionName); LOG(debug) << "Generated shmid '" << fShmId << "' out of session id '" << sessionName << "'.";