diff --git a/examples/region/CMakeLists.txt b/examples/region/CMakeLists.txt index 4d92d6dc..3528f611 100644 --- a/examples/region/CMakeLists.txt +++ b/examples/region/CMakeLists.txt @@ -12,7 +12,10 @@ target_link_libraries(fairmq-ex-region-sampler PRIVATE FairMQ) add_executable(fairmq-ex-region-sink sink.cxx) target_link_libraries(fairmq-ex-region-sink PRIVATE FairMQ) -add_custom_target(ExampleRegion DEPENDS fairmq-ex-region-sampler fairmq-ex-region-sink) +add_executable(fairmq-ex-region-keep-alive keep-alive.cxx) +target_link_libraries(fairmq-ex-region-keep-alive PRIVATE FairMQ) + +add_custom_target(ExampleRegion DEPENDS fairmq-ex-region-sampler fairmq-ex-region-sink fairmq-ex-region-keep-alive) set(EX_BIN_DIR ${CMAKE_CURRENT_BINARY_DIR}) set(FAIRMQ_BIN_DIR ${CMAKE_BINARY_DIR}/fairmq) diff --git a/examples/region/keep-alive.cxx b/examples/region/keep-alive.cxx new file mode 100644 index 00000000..de22b442 --- /dev/null +++ b/examples/region/keep-alive.cxx @@ -0,0 +1,108 @@ +/******************************************************************************** + * Copyright (C) 2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +#include +#include +#include +#include + +#include + +#include + +#include + +#include +#include +#include + +using namespace std; + +namespace +{ + volatile sig_atomic_t gStopping = 0; +} + +void signalHandler(int /* signal */) +{ + gStopping = 1; +} + +struct ShmRemover +{ + ShmRemover(std::string _shmId) : shmId(std::move(_shmId)) {} + ~ShmRemover() + { + // This will clean all segments, regions and any other shmem objects belonging to this shmId + fair::mq::shmem::Monitor::Cleanup(fair::mq::shmem::ShmId{shmId}); + } + + std::string shmId; +}; + +int main(int /* argc */, char** /* argv */) +{ + fair::Logger::SetConsoleColor(true); + + signal(SIGINT, signalHandler); + signal(SIGTERM, signalHandler); + + try { + const string session = "default"; // to_string(fair::mq::tools::UuidHash()); + // generate shmId out of session id + user id (geteuid). + const string shmId = fair::mq::shmem::makeShmIdStr(session); + + const uint16_t s1id = 0; + const uint64_t s1size = 100000000; + const uint16_t s2id = 1; + const uint64_t s2size = 200000000; + + const uint16_t r1id = 0; + const uint64_t r1size = 100000000; + const uint16_t r2id = 1; + const uint64_t r2size = 200000000; + + // cleanup when done + ShmRemover shmRemover(shmId); + + // managed segments + fair::mq::shmem::Segment segment1(shmId, s1id, s1size, fair::mq::shmem::rbTreeBestFit); + segment1.Lock(); + segment1.Zero(); + LOG(info) << "Created segment " << s1id << " of size " << segment1.GetSize() << " starting at " << segment1.GetData(); + + fair::mq::shmem::Segment segment2(shmId, s2id, s2size, fair::mq::shmem::rbTreeBestFit); + segment2.Lock(); + segment2.Zero(); + LOG(info) << "Created segment " << s2id << " of size " << segment2.GetSize() << " starting at " << segment2.GetData(); + + // unmanaged regions + fair::mq::shmem::UnmanagedRegion region1(shmId, r1id, r1size); + region1.Lock(); + region1.Zero(); + LOG(info) << "Created region " << r1id << " of size " << region1.GetSize() << " starting at " << region1.GetData(); + + fair::mq::shmem::UnmanagedRegion region2(shmId, r2id, r2size); + region2.Lock(); + region2.Zero(); + LOG(info) << "Created region " << r2id << " of size " << region2.GetSize() << " starting at " << region2.GetData(); + + // for a "soft reset" call (shmem should not be in active use by (no messages in flight) devices during this call): + // fair::mq::shmem::Monitor::ResetContent(fair::mq::shmem::ShmId{shmId}); + + while (!gStopping) { + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } + + LOG(info) << "stopping."; + } catch (exception& e) { + LOG(error) << "Unhandled Exception reached the top of main: " << e.what() << ", application will now exit"; + return 2; + } + + return 0; +} diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index eb6bed44..5ff7b7d4 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -192,6 +192,9 @@ if(BUILD_FAIRMQ) runDevice.h runFairMQDevice.h shmem/Monitor.h + shmem/Common.h + shmem/UnmanagedRegion.h + shmem/Segment.h ) set(FAIRMQ_PRIVATE_HEADER_FILES @@ -206,12 +209,10 @@ if(BUILD_FAIRMQ) plugins/control/Control.h shmem/Message.h shmem/Poller.h - shmem/UnmanagedRegion.h + shmem/UnmanagedRegionImpl.h shmem/Socket.h shmem/TransportFactory.h - shmem/Common.h shmem/Manager.h - shmem/Region.h zeromq/Common.h zeromq/Context.h zeromq/Message.h diff --git a/fairmq/UnmanagedRegion.h b/fairmq/UnmanagedRegion.h index 538d8e06..f624e64b 100644 --- a/fairmq/UnmanagedRegion.h +++ b/fairmq/UnmanagedRegion.h @@ -119,13 +119,6 @@ inline std::ostream& operator<<(std::ostream& os, const RegionEvent& event) } } -enum class RegionConstruction : int -{ - open, - create, - open_or_create -}; - struct RegionConfig { RegionConfig() = default; @@ -138,7 +131,6 @@ struct RegionConfig bool lock = false; /// mlock region after creation bool zero = false; /// zero region content after creation bool removeOnDestruction = true; /// remove the region on object destruction - RegionConstruction constructionMode = RegionConstruction::create; /// how to construct the region: create/open/open_or_create int creationFlags = 0; /// flags passed to the underlying transport on region creation int64_t userFlags = 0; /// custom flags that have no effect on the transport, but can be retrieved from the region by the user std::string path = ""; /// file path, if the region is backed by a file diff --git a/fairmq/plugins/config/Config.cxx b/fairmq/plugins/config/Config.cxx index a8a75acb..c68c09cb 100644 --- a/fairmq/plugins/config/Config.cxx +++ b/fairmq/plugins/config/Config.cxx @@ -67,6 +67,7 @@ Plugin::ProgOptions ConfigPluginProgramOptions() ("shm-segment-size", po::value()->default_value(2ULL << 30), "Shared memory: size of the shared memory segment (in bytes).") ("shm-allocation", po::value()->default_value("rbtree_best_fit"), "Shared memory allocation algorithm: rbtree_best_fit/simple_seq_fit.") ("shm-segment-id", po::value()->default_value(0), "EXPERIMENTAL: Shared memory segment id for message creation.") + ("shmid", po::value(), "EXPERIMENTAL: Fixed shmid to use instead of deriving it from the session name.") ("shm-mlock-segment", po::value()->default_value(false), "Shared memory: mlock the shared memory segment after initialization (opened or created).") ("shm-mlock-segment-on-creation", po::value()->default_value(false), "Shared memory: mlock the shared memory segment only once when created.") ("shm-zero-segment", po::value()->default_value(false), "Shared memory: zero the shared memory segment memory after initialization (opened or created).") diff --git a/fairmq/shmem/Common.cxx b/fairmq/shmem/Common.cxx index 78fec8f7..268a798f 100644 --- a/fairmq/shmem/Common.cxx +++ b/fairmq/shmem/Common.cxx @@ -12,6 +12,7 @@ #include +#include #include #include @@ -44,5 +45,12 @@ uint64_t makeShmIdUint64(const std::string& sessionId) return id; } +std::string makeShmIdStr(uint64_t val) +{ + std::stringstream ss; + ss << std::setfill('0') << std::setw(8) << std::hex << val; + return ss.str(); +} + } // namespace fair::mq::shmem diff --git a/fairmq/shmem/Common.h b/fairmq/shmem/Common.h index 63f1ae52..a2f9acf5 100644 --- a/fairmq/shmem/Common.h +++ b/fairmq/shmem/Common.h @@ -31,11 +31,11 @@ namespace fair::mq::shmem struct SharedMemoryError : std::runtime_error { using std::runtime_error::runtime_error; }; using SimpleSeqFitSegment = boost::interprocess::basic_managed_shared_memory, + boost::interprocess::simple_seq_fit>, boost::interprocess::null_index>; // boost::interprocess::iset_index>; using RBTreeBestFitSegment = boost::interprocess::basic_managed_shared_memory, + boost::interprocess::rbtree_best_fit>, boost::interprocess::null_index>; // boost::interprocess::iset_index>; @@ -146,6 +146,7 @@ struct MetaHeader mutable boost::interprocess::managed_shared_memory::handle_t fShared; uint16_t fRegionId; mutable uint16_t fSegmentId; + bool fManaged; }; #ifdef FAIRMQ_DEBUG_MODE @@ -210,8 +211,10 @@ struct RegionBlock // a hash of user id + session id, truncated to 8 characters (to accommodate for name size limit on some systems (MacOS)). std::string makeShmIdStr(const std::string& sessionId, const std::string& userId); std::string makeShmIdStr(const std::string& sessionId); +std::string makeShmIdStr(uint64_t val); uint64_t makeShmIdUint64(const std::string& sessionId); + struct SegmentSize : public boost::static_visitor { template diff --git a/fairmq/shmem/Manager.h b/fairmq/shmem/Manager.h index 5b4f98cd..f878ab18 100644 --- a/fairmq/shmem/Manager.h +++ b/fairmq/shmem/Manager.h @@ -11,7 +11,7 @@ #include "Common.h" #include "Monitor.h" -#include "Region.h" +#include "UnmanagedRegion.h" #include #include #include @@ -129,8 +129,8 @@ class Manager { public: Manager(const std::string& sessionName, size_t size, const ProgOptions* config) - : fShmId64(makeShmIdUint64(sessionName)) - , fShmId(makeShmIdStr(sessionName)) + : fShmId64(config ? config->GetProperty("shmid", makeShmIdUint64(sessionName)) : makeShmIdUint64(sessionName)) + , fShmId(makeShmIdStr(fShmId64)) , fSegmentId(config ? config->GetProperty("shm-segment-id", 0) : 0) , fManagementSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mng").c_str(), 6553600) , fShmVoidAlloc(fManagementSegment.get_segment_manager()) @@ -360,14 +360,14 @@ class Manager } bool Interrupted() { return fInterrupted.load(); } - std::pair CreateRegion(const size_t size, - RegionCallback callback, - RegionBulkCallback bulkCallback, - RegionConfig cfg) + std::pair CreateRegion(const size_t size, + RegionCallback callback, + RegionBulkCallback bulkCallback, + RegionConfig cfg) { using namespace boost::interprocess; try { - std::pair result; + std::pair result; { boost::interprocess::scoped_lock lock(*fShmMtx); @@ -386,46 +386,39 @@ class Manager } cfg.id = rc->fCount; - } else if (cfg.id.value() == 0) { - LOG(error) << "User-given UnmanagedRegion ID must not be 0."; - throw TransportError("User-given UnmanagedRegion ID must not be 0."); } - auto it = fRegions.find(cfg.id.value()); - if (it != fRegions.end()) { - LOG(error) << "Trying to open/create a UnmanagedRegion that already exists (id: " << cfg.id.value() << ")"; - throw TransportError(tools::ToString("Trying to open/create a UnmanagedRegion that already exists (id: ", cfg.id.value(), ")")); - } + const uint16_t id = cfg.id.value(); - Region& region = *(fRegions.emplace(cfg.id.value(), std::make_unique(fShmId, size, false, callback, bulkCallback, cfg)).first->second); - // LOG(debug) << "Created region with id '" << cfg.id.value() << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'"; + auto res = fRegions.emplace(id, std::make_unique(fShmId, size, false, cfg)); + bool newRegionCreated = res.second; + UnmanagedRegion& region = *(res.first->second); + // LOG(debug) << "Created region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'"; + + if (!newRegionCreated) { + region.fRemote = false; // TODO: this should be more clear, refactor it. + } if (cfg.lock) { - LOG(debug) << "Locking region " << cfg.id.value() << "..."; - if (mlock(region.fRegion.get_address(), region.fRegion.get_size()) == -1) { - LOG(error) << "Could not lock region " << cfg.id.value() << ". Code: " << errno << ", reason: " << strerror(errno); - throw TransportError(tools::ToString("Could not lock region ", cfg.id.value(), ": ", strerror(errno))); - } - LOG(debug) << "Successfully locked region " << cfg.id.value() << "."; + LOG(debug) << "Locking region " << id << "..."; + region.Lock(); + LOG(debug) << "Successfully locked region " << id << "."; } if (cfg.zero) { - LOG(debug) << "Zeroing free memory of region " << cfg.id.value() << "..."; - memset(region.fRegion.get_address(), 0x00, region.fRegion.get_size()); - LOG(debug) << "Successfully zeroed free memory of region " << cfg.id.value() << "."; + LOG(debug) << "Zeroing free memory of region " << id << "..."; + region.Zero(); + LOG(debug) << "Successfully zeroed free memory of region " << id << "."; } - bool newRegionCreated = fShmRegions->emplace(cfg.id.value(), RegionInfo(cfg.path.c_str(), cfg.creationFlags, cfg.userFlags, fShmVoidAlloc)).second; - // start ack receiver only if a callback has been provided. if (callback || bulkCallback) { + region.SetCallbacks(callback, bulkCallback); + region.InitializeQueues(); + region.StartAckSender(); region.StartAckReceiver(); } - result.first = &(region.fRegion); - result.second = cfg.id.value(); - - if (newRegionCreated) { - (fEventCounter->fCount)++; - } + result.first = &(region); + result.second = id; } fRegionsGen += 1; // signal TL cache invalidation fRegionEventsShmCV->notify_all(); @@ -438,7 +431,7 @@ class Manager } } - Region* GetRegion(const uint16_t id) + UnmanagedRegion* GetRegion(const uint16_t id) { // NOTE: gcc optimizations. Prevent loading tls addresses many times in the fast path const auto &lTlCache = fTlRegionCache; @@ -464,7 +457,7 @@ class Manager return lRegion; } - Region* GetRegionUnsafe(const uint16_t id) + UnmanagedRegion* GetRegionUnsafe(const uint16_t id) { // remote region could actually be a local one if a message originates from this device (has been sent out and returned) auto it = fRegions.find(id); @@ -480,7 +473,9 @@ class Manager cfg.path = regionInfo.fPath.c_str(); // LOG(debug) << "Located remote region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'"; - auto r = fRegions.emplace(id, std::make_unique(fShmId, 0, true, nullptr, nullptr, std::move(cfg))); + auto r = fRegions.emplace(id, std::make_unique(fShmId, 0, true, std::move(cfg))); + r.first->second->InitializeQueues(); + r.first->second->StartAckSender(); return r.first->second.get(); } catch (std::out_of_range& oor) { LOG(error) << "Could not get remote region with id '" << id << "'. Does the region creator run with the same session id?"; @@ -499,7 +494,7 @@ class Manager fRegions.at(id)->StopAcks(); { boost::interprocess::scoped_lock lock(*fShmMtx); - if (fRegions.at(id)->fRemoveOnDestruction) { + if (fRegions.at(id)->RemoveOnDestruction()) { fShmRegions->at(id).fDestroyed = true; (fEventCounter->fCount)++; } @@ -531,8 +526,8 @@ class Manager if (!e.second.fDestroyed) { auto region = GetRegionUnsafe(info.id); if (region) { - info.ptr = region->fRegion.get_address(); - info.size = region->fRegion.get_size(); + info.ptr = region->GetData(); + info.size = region->GetSize(); } else { throw std::runtime_error(tools::ToString("GetRegionInfoUnsafe() could not get region with id '", info.id, "'")); } @@ -789,8 +784,12 @@ class Manager LOG(error) << "Manager could not acquire lock: " << e.what(); } - if (lastRemoved && !fNoCleanup) { - Monitor::Cleanup(ShmId{fShmId}); + if (lastRemoved) { + if (!fNoCleanup) { + Monitor::Cleanup(ShmId{fShmId}); + } else { + Monitor::RemoveObject("fmq_" + fShmId + "_mng"); + } } } @@ -808,8 +807,8 @@ class Manager uint64_t fShmId64; std::string fShmId; uint16_t fSegmentId; - std::unordered_map> fSegments; // TODO: can use Segment class directly here - boost::interprocess::managed_shared_memory fManagementSegment; + std::unordered_map> fSegments; // TODO: refactor to use Segment class + boost::interprocess::managed_shared_memory fManagementSegment; // TODO: refactor to use ManagementSegment class VoidAlloc fShmVoidAlloc; boost::interprocess::interprocess_mutex* fShmMtx; @@ -823,11 +822,11 @@ class Manager EventCounter* fEventCounter; Uint16SegmentInfoHashMap* fShmSegments; Uint16RegionInfoHashMap* fShmRegions; - std::unordered_map> fRegions; + std::unordered_map> fRegions; inline static std::atomic fRegionsGen = 0ul; inline static thread_local struct ManagerTLCache { unsigned long fRegionsTLCacheGen; - std::vector> fRegionsTLCache; + std::vector> fRegionsTLCache; } fTlRegionCache; #ifdef FAIRMQ_DEBUG_MODE diff --git a/fairmq/shmem/Message.h b/fairmq/shmem/Message.h index 7409d450..b81a3340 100644 --- a/fairmq/shmem/Message.h +++ b/fairmq/shmem/Message.h @@ -10,8 +10,8 @@ #include "Common.h" #include "Manager.h" -#include "Region.h" #include "UnmanagedRegion.h" +#include "UnmanagedRegionImpl.h" #include #include @@ -39,7 +39,7 @@ class Message final : public fair::mq::Message : fair::mq::Message(factory) , fManager(manager) , fQueued(false) - , fMeta{0, 0, -1, -1, 0, fManager.GetSegmentId()} + , fMeta{0, 0, -1, -1, 0, fManager.GetSegmentId(), true} , fRegionPtr(nullptr) , fLocalPtr(nullptr) { @@ -50,7 +50,7 @@ class Message final : public fair::mq::Message : fair::mq::Message(factory) , fManager(manager) , fQueued(false) - , fMeta{0, 0, -1, -1, 0, fManager.GetSegmentId()} + , fMeta{0, 0, -1, -1, 0, fManager.GetSegmentId(), true} , fAlignment(alignment.alignment) , fRegionPtr(nullptr) , fLocalPtr(nullptr) @@ -62,7 +62,7 @@ class Message final : public fair::mq::Message : fair::mq::Message(factory) , fManager(manager) , fQueued(false) - , fMeta{0, 0, -1, -1, 0, fManager.GetSegmentId()} + , fMeta{0, 0, -1, -1, 0, fManager.GetSegmentId(), true} , fRegionPtr(nullptr) , fLocalPtr(nullptr) { @@ -74,7 +74,7 @@ class Message final : public fair::mq::Message : fair::mq::Message(factory) , fManager(manager) , fQueued(false) - , fMeta{0, 0, -1, -1, 0, fManager.GetSegmentId()} + , fMeta{0, 0, -1, -1, 0, fManager.GetSegmentId(), true} , fAlignment(alignment.alignment) , fRegionPtr(nullptr) , fLocalPtr(nullptr) @@ -87,7 +87,7 @@ class Message final : public fair::mq::Message : fair::mq::Message(factory) , fManager(manager) , fQueued(false) - , fMeta{0, 0, -1, -1, 0, fManager.GetSegmentId()} + , fMeta{0, 0, -1, -1, 0, fManager.GetSegmentId(), true} , fRegionPtr(nullptr) , fLocalPtr(nullptr) { @@ -106,7 +106,7 @@ class Message final : public fair::mq::Message : fair::mq::Message(factory) , fManager(manager) , fQueued(false) - , fMeta{size, reinterpret_cast(hint), -1, -1, static_cast(region.get())->fRegionId, fManager.GetSegmentId()} + , fMeta{size, reinterpret_cast(hint), -1, -1, static_cast(region.get())->fRegionId, fManager.GetSegmentId(), false} , fRegionPtr(nullptr) , fLocalPtr(static_cast(data)) { @@ -187,7 +187,7 @@ class Message final : public fair::mq::Message void* GetData() const override { if (!fLocalPtr) { - if (fMeta.fRegionId == 0) { + if (fMeta.fManaged) { if (fMeta.fSize > 0) { fManager.GetSegment(fMeta.fSegmentId); fLocalPtr = ShmHeader::UserPtr(fManager.GetAddressFromHandle(fMeta.fHandle, fMeta.fSegmentId)); @@ -197,7 +197,7 @@ class Message final : public fair::mq::Message } else { fRegionPtr = fManager.GetRegion(fMeta.fRegionId); if (fRegionPtr) { - fLocalPtr = reinterpret_cast(fRegionPtr->fRegion.get_address()) + fMeta.fHandle; + fLocalPtr = reinterpret_cast(fRegionPtr->GetData()) + fMeta.fHandle; } else { // LOG(warn) << "could not get pointer from a region message"; fLocalPtr = nullptr; @@ -259,7 +259,7 @@ class Message final : public fair::mq::Message return 1; } - if (fMeta.fRegionId == 0) { // managed segment + if (fMeta.fManaged) { // managed segment fManager.GetSegment(fMeta.fSegmentId); return ShmHeader::RefCount(fManager.GetAddressFromHandle(fMeta.fHandle, fMeta.fSegmentId)); } else { // unmanaged region @@ -286,7 +286,7 @@ class Message final : public fair::mq::Message CloseMessage(); } - if (otherMsg.fMeta.fRegionId == 0) { // managed segment + if (otherMsg.fMeta.fManaged) { // managed segment fMeta = otherMsg.fMeta; fManager.GetSegment(fMeta.fSegmentId); ShmHeader::IncrementRefCount(fManager.GetAddressFromHandle(fMeta.fHandle, fMeta.fSegmentId)); @@ -317,7 +317,7 @@ class Message final : public fair::mq::Message bool fQueued; MetaHeader fMeta; size_t fAlignment; - mutable Region* fRegionPtr; + mutable UnmanagedRegion* fRegionPtr; mutable char* fLocalPtr; char* InitializeChunk(const size_t size, size_t alignment = 0) @@ -336,7 +336,7 @@ class Message final : public fair::mq::Message void Deallocate() { if (fMeta.fHandle >= 0 && !fQueued) { - if (fMeta.fRegionId == 0) { // managed segment + if (fMeta.fManaged) { // managed segment fManager.GetSegment(fMeta.fSegmentId); uint16_t refCount = ShmHeader::DecrementRefCount(fManager.GetAddressFromHandle(fMeta.fHandle, fMeta.fSegmentId)); if (refCount == 1) { diff --git a/fairmq/shmem/Monitor.cxx b/fairmq/shmem/Monitor.cxx index 3811a488..4e97db4c 100644 --- a/fairmq/shmem/Monitor.cxx +++ b/fairmq/shmem/Monitor.cxx @@ -8,7 +8,7 @@ #include "Monitor.h" #include "Common.h" -#include "Region.h" +#include "UnmanagedRegion.h" #include #include @@ -185,16 +185,14 @@ bool Monitor::PrintShm(const ShmId& shmId) std::unordered_map> segments; Uint16RegionInfoHashMap* shmRegions = managementSegment.find(unique_instance).first; - std::unordered_map> regions; if (!shmSegments) { LOG(error) << "Found management segment, but cannot locate segment info, something went wrong..."; return false; } - if (!shmSegments) { + if (!shmRegions) { LOG(error) << "Found management segment, but cannot locate region info..."; - return false; } for (const auto& s : *shmSegments) { @@ -237,13 +235,24 @@ bool Monitor::PrintShm(const ShmId& shmId) size_t free = boost::apply_visitor(SegmentFreeMemory(), s.second); size_t total = boost::apply_visitor(SegmentSize(), s.second); size_t used = total - free; - ss << " [" << s.first - << "]: total: " << total + + std::string msgCount; #ifdef FAIRMQ_DEBUG_MODE - << ", msgs: " << ( (msgCounters != nullptr) ? to_string((*msgCounters)[s.first].fCount) : "unknown") + if (msgCounters) { + auto it = msgCounters->find(s.first); + if (it != msgCounters->end()) { + msgCount = to_string(it->second.fCount.load()); + } else { + msgCount = "0"; + } + } #else - << ", msgs: NODEBUG" + msgCount = "NODEBUG"; #endif + + ss << " [" << s.first << "]" + << ": total: " << total + << ", msgs: " << msgCount << ", free: " << free << ", used: " << used << "\n"; @@ -254,10 +263,17 @@ bool Monitor::PrintShm(const ShmId& shmId) << ", free: " << mfree << ", used: " << mused; - if (!shmRegions->empty()) { - ss << "\n unmanaged regions:\n"; + if (shmRegions && !shmRegions->empty()) { + ss << "\n unmanaged regions:"; for (const auto& r : *shmRegions) { - ss << " [" << r.first << "]: " << (r.second.fDestroyed ? "destroyed" : "alive"); + ss << "\n [" << r.first << "]: " << (r.second.fDestroyed ? "destroyed" : "alive"); + + try { + boost::interprocess::message_queue q(open_only, std::string("fmq_" + std::string(shmId) + "_rgq_" + to_string(r.first)).c_str()); + ss << ", ack queue: " << q.get_num_msg() << " messages"; + } catch (bie&) { + ss << ", ack queue: not found"; + } } } LOGV(info, user1) << ss.str(); @@ -620,6 +636,62 @@ std::vector> Monitor::CleanupFull(const SessionId& return CleanupFull(shmId, verbose); } +void Monitor::ResetContent(const ShmId& shmId, bool verbose /* = true */) +{ + if (verbose) { + cout << "Resetting segments content for shared memory id '" << shmId.shmId << "'..." << endl; + } + + string managementSegmentName("fmq_" + shmId.shmId + "_mng"); + try { + using namespace boost::interprocess; + managed_shared_memory managementSegment(open_only, managementSegmentName.c_str()); + + Uint16SegmentInfoHashMap* segmentInfos = managementSegment.find(unique_instance).first; + + for (const auto& s : *segmentInfos) { + if (verbose) { + cout << "Resetting content of segment '" << "fmq_" << shmId.shmId << "_m_" << s.first << "'..." << endl; + } + try { + if (s.second.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) { + RBTreeBestFitSegment segment(open_only, std::string("fmq_" + shmId.shmId + "_m_" + to_string(s.first)).c_str()); + void* ptr = segment.get_segment_manager(); + size_t size = segment.get_segment_manager()->get_size(); + new(ptr) segment_manager>, null_index>(size); + } else { + SimpleSeqFitSegment segment(open_only, std::string("fmq_" + shmId.shmId + "_m_" + to_string(s.first)).c_str()); + void* ptr = segment.get_segment_manager(); + size_t size = segment.get_segment_manager()->get_size(); + new(ptr) segment_manager>, null_index>(size); + } + } catch (bie& e) { + if (verbose) { + cout << "Error resetting content of segment '" << std::string("fmq_" + shmId.shmId + "_m_" + to_string(s.first)) << "': " << e.what() << endl; + } + } + } + } catch (bie& e) { + if (verbose) { + cout << "Could not find '" << managementSegmentName << "' segment. Nothing to cleanup." << endl; + cout << e.what() << endl; + } + } + + if (verbose) { + cout << "Done resetting segment content for shared memory id '" << shmId.shmId << "'." << endl; + } +} + +void Monitor::ResetContent(const SessionId& sessionId, bool verbose /* = true */) +{ + ShmId shmId{makeShmIdStr(sessionId.sessionId)}; + if (verbose) { + cout << "ResetContent called with session id '" << sessionId.sessionId << "', translating to shared memory id '" << shmId.shmId << "'" << endl; + } + ResetContent(shmId, verbose); +} + Monitor::~Monitor() { if (fSignalThread.joinable()) { diff --git a/fairmq/shmem/Monitor.h b/fairmq/shmem/Monitor.h index 285c2f58..fe8e419e 100644 --- a/fairmq/shmem/Monitor.h +++ b/fairmq/shmem/Monitor.h @@ -80,6 +80,15 @@ class Monitor /// @param verbose output cleanup results to stdout static std::vector> CleanupFull(const SessionId& sessionId, bool verbose = true); + /// @brief [EXPERIMENTAL] cleanup the content of the shem segment, without recreating it + /// @param shmId shared memory id + /// Only call this when segment is not in use + static void ResetContent(const ShmId& shmId, bool verbose = true); + /// @brief [EXPERIMENTAL] cleanup the content of the shem segment, without recreating it + /// @param sessionId session id + /// Only call this when segment is not in use + static void ResetContent(const SessionId& sessionId, bool verbose = true); + /// @brief Outputs list of messages in shmem (if compiled with FAIRMQ_DEBUG_MODE=ON) /// @param shmId shmem id static void PrintDebugInfo(const ShmId& shmId); diff --git a/fairmq/shmem/Region.h b/fairmq/shmem/Region.h deleted file mode 100644 index c7ba44b3..00000000 --- a/fairmq/shmem/Region.h +++ /dev/null @@ -1,330 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ - -#ifndef FAIR_MQ_SHMEM_REGION_H_ -#define FAIR_MQ_SHMEM_REGION_H_ - -#include "Common.h" -#include -#include - -#include - -#include -#include -#include -#include -#include - -#include // min -#include -#include -#include // make_unique -#include -#include -#include -#include -#include -#include -#include // move - -namespace fair::mq::shmem -{ - -struct Region -{ - Region(const std::string& shmId, uint64_t size, bool remote, RegionCallback callback, RegionBulkCallback bulkCallback, RegionConfig cfg) - : fRemote(remote) - , fRemoveOnDestruction(cfg.removeOnDestruction) - , fLinger(cfg.linger) - , fStopAcks(false) - , fName("fmq_" + shmId + "_rg_" + std::to_string(cfg.id.value())) - , fQueueName("fmq_" + shmId + "_rgq_" + std::to_string(cfg.id.value())) - , fShmemObject() - , fFile(nullptr) - , fFileMapping() - , fQueue(nullptr) - , fCallback(std::move(callback)) - , fBulkCallback(std::move(bulkCallback)) - { - using namespace boost::interprocess; - - if (!cfg.path.empty()) { - fName = std::string(cfg.path + fName); - - if (!fRemote) { - // create a file - std::filebuf fbuf; - if (fbuf.open(fName, std::ios_base::in | std::ios_base::out | std::ios_base::trunc | std::ios_base::binary)) { - // set the size - fbuf.pubseekoff(size - 1, std::ios_base::beg); - fbuf.sputc(0); - } - } - - fFile = fopen(fName.c_str(), "r+"); - - if (!fFile) { - LOG(error) << "Failed to initialize file: " << fName; - LOG(error) << "errno: " << errno << ": " << strerror(errno); - throw std::runtime_error(tools::ToString("Failed to initialize file for shared memory region: ", strerror(errno))); - } - fFileMapping = file_mapping(fName.c_str(), read_write); - LOG(debug) << "shmem: initialized file: " << fName; - fRegion = mapped_region(fFileMapping, read_write, 0, size, 0, cfg.creationFlags); - } else { - try { - if (fRemote) { - fShmemObject = shared_memory_object(open_only, fName.c_str(), read_write); - } else { - switch (cfg.constructionMode) { - case RegionConstruction::create: - fShmemObject = shared_memory_object(create_only, fName.c_str(), read_write); - fShmemObject.truncate(size); - break; - case RegionConstruction::open: - fShmemObject = shared_memory_object(open_only, fName.c_str(), read_write); - break; - case RegionConstruction::open_or_create: - try { - fShmemObject = shared_memory_object(open_only, fName.c_str(), read_write); - } catch (interprocess_exception&) { - fShmemObject = shared_memory_object(create_only, fName.c_str(), read_write); - fShmemObject.truncate(size); - } - break; - default: - throw TransportError(tools::ToString("Unknown RegionConstruction mode provided ")); - break; - } - } - } catch (interprocess_exception& e) { - LOG(error) << "Failed " << (fRemote ? "opening" : "creating") << " shared_memory_object for region id '" << cfg.id.value() << "': " << e.what(); - throw; - } - try { - fRegion = mapped_region(fShmemObject, read_write, 0, 0, 0, cfg.creationFlags); - } catch (interprocess_exception& e) { - LOG(error) << "Failed mapping shared_memory_object for region id '" << cfg.id.value() << "': " << e.what(); - throw; - } - } - - // skip queues initialization if region is being created without callbacks passed - if (fRemote || (fCallback || fBulkCallback)) { - InitializeQueues(); - StartAckSender(); - } else { - LOG(trace) << "skipping queues creation & ack sender thread, because created region locally but no callback were provided"; - } - - LOG(trace) << "shmem: initialized region: " << fName << " (" << (fRemote ? "remote" : "local") << ")"; - } - - Region() = delete; - - Region(const Region&) = delete; - Region(Region&&) = delete; - Region& operator=(const Region&) = delete; - Region& operator=(Region&&) = delete; - - void InitializeQueues() - { - using namespace boost::interprocess; - fQueue = std::make_unique(open_or_create, fQueueName.c_str(), 1024, fAckBunchSize * sizeof(RegionBlock)); - LOG(trace) << "shmem: initialized region queue: " << fQueueName << " (" << (fRemote ? "remote" : "local") << ")"; - } - - void StartAckSender() - { - fAcksSender = std::thread(&Region::SendAcks, this); - } - void SendAcks() - { - std::unique_ptr blocks = std::make_unique(fAckBunchSize); - size_t blocksToSend = 0; - - while (true) { - { - std::unique_lock lock(fBlockMtx); - - // try to get blocks - if (fBlocksToFree.size() < fAckBunchSize) { - fBlockSendCV.wait_for(lock, std::chrono::milliseconds(500)); - } - - // send whatever blocks we have - blocksToSend = std::min(fBlocksToFree.size(), fAckBunchSize); - - copy_n(fBlocksToFree.end() - blocksToSend, blocksToSend, blocks.get()); - fBlocksToFree.resize(fBlocksToFree.size() - blocksToSend); - } - - if (blocksToSend > 0) { - while (!fQueue->try_send(blocks.get(), blocksToSend * sizeof(RegionBlock), 0) && !fStopAcks) { - // receiver slow? yield and try again... - std::this_thread::yield(); - } - // LOG(debug) << "Sent " << blocksToSend << " blocks."; - } else { // blocksToSend == 0 - if (fStopAcks) { - break; - } - } - } - - LOG(trace) << "AcksSender for " << fName << " leaving " << "(blocks left to free: " << fBlocksToFree.size() << ", " - << " blocks left to send: " << blocksToSend << ")."; - } - - void StartAckReceiver() - { - if (!fAcksReceiver.joinable()) { - fAcksReceiver = std::thread(&Region::ReceiveAcks, this); - } - } - void ReceiveAcks() - { - unsigned int priority = 0; - boost::interprocess::message_queue::size_type recvdSize = 0; - std::unique_ptr blocks = std::make_unique(fAckBunchSize); - std::vector result; - result.reserve(fAckBunchSize); - - while (true) { - uint32_t timeout = 100; - bool leave = false; - if (fStopAcks) { - timeout = fLinger; - leave = true; - } - auto rcvTill = boost::posix_time::microsec_clock::universal_time() + boost::posix_time::milliseconds(timeout); - - while (fQueue->timed_receive(blocks.get(), fAckBunchSize * sizeof(RegionBlock), recvdSize, priority, rcvTill)) { - const auto numBlocks = recvdSize / sizeof(RegionBlock); - // LOG(debug) << "Received " << numBlocks << " blocks (recvdSize: " << recvdSize << "). (remaining queue size: " << fQueue->get_num_msg() << ")."; - if (fBulkCallback) { - result.clear(); - for (size_t i = 0; i < numBlocks; i++) { - result.emplace_back(reinterpret_cast(fRegion.get_address()) + blocks[i].fHandle, blocks[i].fSize, reinterpret_cast(blocks[i].fHint)); - } - fBulkCallback(result); - } else if (fCallback) { - for (size_t i = 0; i < numBlocks; i++) { - fCallback(reinterpret_cast(fRegion.get_address()) + blocks[i].fHandle, blocks[i].fSize, reinterpret_cast(blocks[i].fHint)); - } - } - } - - if (leave) { - break; - } - } - - LOG(trace) << "AcksReceiver for " << fName << " leaving (remaining queue size: " << fQueue->get_num_msg() << ")."; - } - - void ReleaseBlock(const RegionBlock& block) - { - std::unique_lock lock(fBlockMtx); - - fBlocksToFree.emplace_back(block); - - if (fBlocksToFree.size() >= fAckBunchSize) { - lock.unlock(); - fBlockSendCV.notify_one(); - } - } - - void SetLinger(uint32_t linger) { fLinger = linger; } - uint32_t GetLinger() const { return fLinger; } - - void StopAcks() - { - fStopAcks = true; - - if (fAcksSender.joinable()) { - fBlockSendCV.notify_one(); - fAcksSender.join(); - } - - if (!fRemote) { - if (fAcksReceiver.joinable()) { - fAcksReceiver.join(); - } - } - } - - ~Region() - { - fStopAcks = true; - - if (fAcksSender.joinable()) { - fBlockSendCV.notify_one(); - fAcksSender.join(); - } - - if (!fRemote) { - if (fAcksReceiver.joinable()) { - fAcksReceiver.join(); - } - - if (fRemoveOnDestruction) { - if (boost::interprocess::shared_memory_object::remove(fName.c_str())) { - LOG(trace) << "Region '" << fName << "' destroyed."; - } - - if (boost::interprocess::file_mapping::remove(fName.c_str())) { - LOG(trace) << "File mapping '" << fName << "' destroyed."; - } - } else { - LOG(debug) << "Skipping removal of " << fName << " unmanaged region, because RegionConfig::removeOnDestruction is false"; - } - - if (boost::interprocess::message_queue::remove(fQueueName.c_str())) { - LOG(trace) << "Region queue '" << fQueueName << "' destroyed."; - } else { - LOG(debug) << "Region queue '" << fQueueName << "' not destroyed."; - } - - if (fFile) { - fclose(fFile); - } - } else { - // LOG(debug) << "Region queue '" << fQueueName << "' is remote, no cleanup necessary"; - } - - // LOG(debug) << "Region '" << fName << "' (" << (fRemote ? "remote" : "local") << ") destructed."; - } - - bool fRemote; - bool fRemoveOnDestruction; - uint32_t fLinger; - std::atomic fStopAcks; - std::string fName; - std::string fQueueName; - boost::interprocess::shared_memory_object fShmemObject; - FILE* fFile; - boost::interprocess::file_mapping fFileMapping; - boost::interprocess::mapped_region fRegion; - - std::mutex fBlockMtx; - std::condition_variable fBlockSendCV; - std::vector fBlocksToFree; - const std::size_t fAckBunchSize = 256; - std::unique_ptr fQueue; - - std::thread fAcksReceiver; - std::thread fAcksSender; - RegionCallback fCallback; - RegionBulkCallback fBulkCallback; -}; - -} // namespace fair::mq::shmem - -#endif /* FAIR_MQ_SHMEM_REGION_H_ */ diff --git a/fairmq/shmem/Segment.h b/fairmq/shmem/Segment.h new file mode 100644 index 00000000..b3435ef6 --- /dev/null +++ b/fairmq/shmem/Segment.h @@ -0,0 +1,88 @@ +/******************************************************************************** + * Copyright (C) 2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +#ifndef FAIR_MQ_SHMEM_SEGMENT_H_ +#define FAIR_MQ_SHMEM_SEGMENT_H_ + +#include +#include + +#include + +#include +#include + +namespace fair::mq::shmem +{ + +struct SimpleSeqFit {}; +struct RBTreeBestFit {}; +static const SimpleSeqFit simpleSeqFit = SimpleSeqFit(); +static const RBTreeBestFit rbTreeBestFit = RBTreeBestFit(); + +struct Segment +{ + Segment(const std::string& shmId, uint16_t id, size_t size, SimpleSeqFit) + : fSegment(SimpleSeqFitSegment(boost::interprocess::open_or_create, + std::string("fmq_" + shmId + "_m_" + std::to_string(id)).c_str(), + size)) + { + Register(shmId, id, AllocationAlgorithm::simple_seq_fit); + } + + Segment(const std::string& shmId, uint16_t id, size_t size, RBTreeBestFit) + : fSegment(RBTreeBestFitSegment(boost::interprocess::open_or_create, + std::string("fmq_" + shmId + "_m_" + std::to_string(id)).c_str(), + size)) + { + Register(shmId, id, AllocationAlgorithm::rbtree_best_fit); + } + + size_t GetSize() const { return boost::apply_visitor(SegmentSize(), fSegment); } + void* GetData() { return boost::apply_visitor(SegmentAddress(), fSegment); } + + size_t GetFreeMemory() const { return boost::apply_visitor(SegmentFreeMemory(), fSegment); } + + void Zero() { boost::apply_visitor(SegmentMemoryZeroer(), fSegment); } + void Lock() + { + if (mlock(GetData(), GetSize()) == -1) { + throw TransportError(tools::ToString("Could not lock the managed segment memory: ", strerror(errno))); + } + } + + static void Remove(const std::string& shmId, uint16_t id) + { + Monitor::RemoveObject("fmq_" + shmId + "_m_" + std::to_string(id)); + } + + private: + boost::variant fSegment; + + static void Register(const std::string& shmId, uint16_t id, AllocationAlgorithm allocAlgo) + { + using namespace boost::interprocess; + managed_shared_memory mngSegment(open_or_create, std::string("fmq_" + shmId + "_mng").c_str(), 6553600); + VoidAlloc alloc(mngSegment.get_segment_manager()); + + Uint16SegmentInfoHashMap* shmSegments = mngSegment.find_or_construct(unique_instance)(alloc); + + EventCounter* eventCounter = mngSegment.find(unique_instance).first; + if (!eventCounter) { + eventCounter = mngSegment.construct(unique_instance)(0); + } + + bool newSegmentRegistered = shmSegments->emplace(id, allocAlgo).second; + if (newSegmentRegistered) { + (eventCounter->fCount)++; + } + } +}; + +} // namespace fair::mq::shmem + +#endif /* FAIR_MQ_SHMEM_SEGMENT_H_ */ diff --git a/fairmq/shmem/TransportFactory.h b/fairmq/shmem/TransportFactory.h index dbabf6c3..ed02c7af 100644 --- a/fairmq/shmem/TransportFactory.h +++ b/fairmq/shmem/TransportFactory.h @@ -14,7 +14,7 @@ #include "Message.h" #include "Poller.h" #include "Socket.h" -#include "UnmanagedRegion.h" +#include "UnmanagedRegionImpl.h" #include #include #include @@ -184,7 +184,7 @@ class TransportFactory final : public fair::mq::TransportFactory UnmanagedRegionPtr CreateUnmanagedRegion(size_t size, RegionCallback callback, RegionBulkCallback bulkCallback, fair::mq::RegionConfig cfg) { - return std::make_unique(*fManager, size, callback, bulkCallback, std::move(cfg), this); + return std::make_unique(*fManager, size, callback, bulkCallback, std::move(cfg), this); } void SubscribeToRegionEvents(RegionEventCallback callback) override { fManager->SubscribeToRegionEvents(callback); } diff --git a/fairmq/shmem/UnmanagedRegion.h b/fairmq/shmem/UnmanagedRegion.h index 956e7aa9..28503ac4 100644 --- a/fairmq/shmem/UnmanagedRegion.h +++ b/fairmq/shmem/UnmanagedRegion.h @@ -1,5 +1,5 @@ /******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence (LGPL) version 3, * @@ -9,64 +9,359 @@ #ifndef FAIR_MQ_SHMEM_UNMANAGEDREGION_H_ #define FAIR_MQ_SHMEM_UNMANAGEDREGION_H_ -#include "Manager.h" +#include +#include +#include #include #include -#include -#include +#include +#include +#include +#include +#include -#include // size_t -#include +#include // min +#include +#include +#include // make_unique +#include +#include +#include +#include +#include +#include +#include // move namespace fair::mq::shmem { -class Message; -class Socket; - -class UnmanagedRegion final : public fair::mq::UnmanagedRegion +struct UnmanagedRegion { friend class Message; - friend class Socket; + friend class Manager; - public: - UnmanagedRegion(Manager& manager, - const size_t size, - RegionCallback callback, - RegionBulkCallback bulkCallback, - fair::mq::RegionConfig cfg, - FairMQTransportFactory* factory) - : FairMQUnmanagedRegion(factory) - , fManager(manager) - , fRegion(nullptr) - , fRegionId(0) + UnmanagedRegion(const std::string& shmId, uint16_t id, uint64_t size) + : UnmanagedRegion(shmId, size, false, makeRegionConfig(id)) + {} + + UnmanagedRegion(const std::string& shmId, uint64_t size, RegionConfig cfg) + : UnmanagedRegion(shmId, size, false, std::move(cfg)) + {} + + UnmanagedRegion(const std::string& shmId, uint64_t size, bool remote, RegionConfig cfg) + : fRemote(remote) + , fRemoveOnDestruction(cfg.removeOnDestruction) + , fLinger(cfg.linger) + , fStopAcks(false) + , fName("fmq_" + shmId + "_rg_" + std::to_string(cfg.id.value())) + , fQueueName("fmq_" + shmId + "_rgq_" + std::to_string(cfg.id.value())) + , fShmemObject() + , fFile(nullptr) + , fFileMapping() + , fQueue(nullptr) + , fCallback(nullptr) + , fBulkCallback(nullptr) { - auto result = fManager.CreateRegion(size, callback, bulkCallback, std::move(cfg)); - fRegion = result.first; - fRegionId = result.second; + using namespace boost::interprocess; + + if (!cfg.path.empty()) { + fName = std::string(cfg.path + fName); + + if (!fRemote) { + // create a file + std::filebuf fbuf; + if (fbuf.open(fName, std::ios_base::in | std::ios_base::out | std::ios_base::trunc | std::ios_base::binary)) { + // set the size + fbuf.pubseekoff(size - 1, std::ios_base::beg); + fbuf.sputc(0); + } + } + + fFile = fopen(fName.c_str(), "r+"); + + if (!fFile) { + LOG(error) << "Failed to initialize file: " << fName; + LOG(error) << "errno: " << errno << ": " << strerror(errno); + throw std::runtime_error(tools::ToString("Failed to initialize file for shared memory region: ", strerror(errno))); + } + fFileMapping = file_mapping(fName.c_str(), read_write); + LOG(debug) << "shmem: initialized file: " << fName; + fRegion = mapped_region(fFileMapping, read_write, 0, size, 0, cfg.creationFlags); + } else { + try { + fShmemObject = shared_memory_object(open_or_create, fName.c_str(), read_write); + if (size != 0) { + fShmemObject.truncate(size); + } + } catch (interprocess_exception& e) { + LOG(error) << "Failed " << (remote ? "opening" : "creating") << " shared_memory_object for region id '" << cfg.id.value() << "': " << e.what(); + throw; + } + try { + fRegion = mapped_region(fShmemObject, read_write, 0, 0, 0, cfg.creationFlags); + } catch (interprocess_exception& e) { + LOG(error) << "Failed mapping shared_memory_object for region id '" << cfg.id.value() << "': " << e.what(); + throw; + } + } + + if (!remote) { + Register(shmId, cfg); + } + + LOG(trace) << "shmem: initialized region: " << fName << " (" << (remote ? "remote" : "local") << ")"; } + UnmanagedRegion() = delete; + UnmanagedRegion(const UnmanagedRegion&) = delete; UnmanagedRegion(UnmanagedRegion&&) = delete; UnmanagedRegion& operator=(const UnmanagedRegion&) = delete; UnmanagedRegion& operator=(UnmanagedRegion&&) = delete; - void* GetData() const override { return fRegion->get_address(); } - size_t GetSize() const override { return fRegion->get_size(); } - uint16_t GetId() const override { return fRegionId; } - void SetLinger(uint32_t linger) override { fManager.GetRegion(fRegionId)->SetLinger(linger); } - uint32_t GetLinger() const override { return fManager.GetRegion(fRegionId)->GetLinger(); } + void Zero() + { + memset(fRegion.get_address(), 0x00, fRegion.get_size()); + } + void Lock() + { + if (mlock(fRegion.get_address(), fRegion.get_size()) == -1) { + LOG(error) << "Could not lock region " << fName << ". Code: " << errno << ", reason: " << strerror(errno); + throw TransportError(tools::ToString("Could not lock region ", fName, ": ", strerror(errno))); + } + } - Transport GetType() const override { return fair::mq::Transport::SHM; } + void* GetData() const { return fRegion.get_address(); } + size_t GetSize() const { return fRegion.get_size(); } - ~UnmanagedRegion() override { fManager.RemoveRegion(fRegionId); } + void SetLinger(uint32_t linger) { fLinger = linger; } + uint32_t GetLinger() const { return fLinger; } + + bool RemoveOnDestruction() { return fRemoveOnDestruction; } + + ~UnmanagedRegion() + { + fStopAcks = true; + + if (fAcksSender.joinable()) { + fBlockSendCV.notify_one(); + fAcksSender.join(); + } + + if (!fRemote) { + if (fAcksReceiver.joinable()) { + fAcksReceiver.join(); + } + + if (fRemoveOnDestruction) { + if (Monitor::RemoveObject(fName.c_str())) { + LOG(trace) << "Region '" << fName << "' destroyed."; + } + if (Monitor::RemoveFileMapping(fName.c_str())) { + LOG(trace) << "File mapping '" << fName << "' destroyed."; + } + } else { + LOG(debug) << "Skipping removal of " << fName << " unmanaged region, because RegionConfig::removeOnDestruction is false"; + } + + if (boost::interprocess::message_queue::remove(fQueueName.c_str())) { + LOG(trace) << "Region queue '" << fQueueName << "' destroyed."; + } else { + LOG(debug) << "Region queue '" << fQueueName << "' not destroyed."; + } + + if (fFile) { + fclose(fFile); + } + } else { + // LOG(debug) << "Region queue '" << fQueueName << "' is remote, no cleanup necessary"; + } + + // LOG(debug) << "Region '" << fName << "' (" << (fRemote ? "remote" : "local") << ") destructed."; + } private: - Manager& fManager; - boost::interprocess::mapped_region* fRegion; - uint16_t fRegionId; + bool fRemote; + bool fRemoveOnDestruction; + uint32_t fLinger; + std::atomic fStopAcks; + std::string fName; + std::string fQueueName; + boost::interprocess::shared_memory_object fShmemObject; + FILE* fFile; + boost::interprocess::file_mapping fFileMapping; + boost::interprocess::mapped_region fRegion; + + std::mutex fBlockMtx; + std::condition_variable fBlockSendCV; + std::vector fBlocksToFree; + const std::size_t fAckBunchSize = 256; + std::unique_ptr fQueue; + + std::thread fAcksReceiver; + std::thread fAcksSender; + RegionCallback fCallback; + RegionBulkCallback fBulkCallback; + + static RegionConfig makeRegionConfig(uint16_t id) + { + RegionConfig regionCfg; + regionCfg.id = id; + return regionCfg; + } + + static void Register(const std::string& shmId, RegionConfig& cfg) + { + using namespace boost::interprocess; + managed_shared_memory mngSegment(open_or_create, std::string("fmq_" + shmId + "_mng").c_str(), 6553600); + VoidAlloc alloc(mngSegment.get_segment_manager()); + + Uint16RegionInfoHashMap* shmRegions = mngSegment.find_or_construct(unique_instance)(alloc); + + EventCounter* eventCounter = mngSegment.find(unique_instance).first; + if (!eventCounter) { + eventCounter = mngSegment.construct(unique_instance)(0); + } + + bool newShmRegionCreated = shmRegions->emplace(cfg.id.value(), RegionInfo(cfg.path.c_str(), cfg.creationFlags, cfg.userFlags, alloc)).second; + if (newShmRegionCreated) { + (eventCounter->fCount)++; + } + } + + void SetCallbacks(RegionCallback callback, RegionBulkCallback bulkCallback) + { + fCallback = std::move(callback); + fBulkCallback = std::move(bulkCallback); + } + + void InitializeQueues() + { + using namespace boost::interprocess; + if (!fQueue) { + fQueue = std::make_unique(open_or_create, fQueueName.c_str(), 1024, fAckBunchSize * sizeof(RegionBlock)); + LOG(trace) << "shmem: initialized region queue: " << fQueueName; + } + } + + void StartAckSender() + { + if (!fAcksSender.joinable()) { + fAcksSender = std::thread(&UnmanagedRegion::SendAcks, this); + } + } + void SendAcks() + { + std::unique_ptr blocks = std::make_unique(fAckBunchSize); + size_t blocksToSend = 0; + + while (true) { + { + std::unique_lock lock(fBlockMtx); + + // try to get blocks + if (fBlocksToFree.size() < fAckBunchSize) { + fBlockSendCV.wait_for(lock, std::chrono::milliseconds(500)); + } + + // send whatever blocks we have + blocksToSend = std::min(fBlocksToFree.size(), fAckBunchSize); + + copy_n(fBlocksToFree.end() - blocksToSend, blocksToSend, blocks.get()); + fBlocksToFree.resize(fBlocksToFree.size() - blocksToSend); + } + + if (blocksToSend > 0) { + while (!fQueue->try_send(blocks.get(), blocksToSend * sizeof(RegionBlock), 0) && !fStopAcks) { + // receiver slow? yield and try again... + std::this_thread::yield(); + } + // LOG(debug) << "Sent " << blocksToSend << " blocks."; + } else { // blocksToSend == 0 + if (fStopAcks) { + break; + } + } + } + + LOG(trace) << "AcksSender for " << fName << " leaving " << "(blocks left to free: " << fBlocksToFree.size() << ", " + << " blocks left to send: " << blocksToSend << ")."; + } + + void StartAckReceiver() + { + if (!fAcksReceiver.joinable()) { + fAcksReceiver = std::thread(&UnmanagedRegion::ReceiveAcks, this); + } + } + void ReceiveAcks() + { + unsigned int priority = 0; + boost::interprocess::message_queue::size_type recvdSize = 0; + std::unique_ptr blocks = std::make_unique(fAckBunchSize); + std::vector result; + result.reserve(fAckBunchSize); + + while (true) { + uint32_t timeout = 100; + bool leave = false; + if (fStopAcks) { + timeout = fLinger; + leave = true; + } + auto rcvTill = boost::posix_time::microsec_clock::universal_time() + boost::posix_time::milliseconds(timeout); + + while (fQueue->timed_receive(blocks.get(), fAckBunchSize * sizeof(RegionBlock), recvdSize, priority, rcvTill)) { + const auto numBlocks = recvdSize / sizeof(RegionBlock); + // LOG(debug) << "Received " << numBlocks << " blocks (recvdSize: " << recvdSize << "). (remaining queue size: " << fQueue->get_num_msg() << ")."; + if (fBulkCallback) { + result.clear(); + for (size_t i = 0; i < numBlocks; i++) { + result.emplace_back(reinterpret_cast(fRegion.get_address()) + blocks[i].fHandle, blocks[i].fSize, reinterpret_cast(blocks[i].fHint)); + } + fBulkCallback(result); + } else if (fCallback) { + for (size_t i = 0; i < numBlocks; i++) { + fCallback(reinterpret_cast(fRegion.get_address()) + blocks[i].fHandle, blocks[i].fSize, reinterpret_cast(blocks[i].fHint)); + } + } + } + + if (leave) { + break; + } + } + + LOG(trace) << "AcksReceiver for " << fName << " leaving (remaining queue size: " << fQueue->get_num_msg() << ")."; + } + + void ReleaseBlock(const RegionBlock& block) + { + std::unique_lock lock(fBlockMtx); + + fBlocksToFree.emplace_back(block); + + if (fBlocksToFree.size() >= fAckBunchSize) { + lock.unlock(); + fBlockSendCV.notify_one(); + } + } + + void StopAcks() + { + fStopAcks = true; + + if (fAcksSender.joinable()) { + fBlockSendCV.notify_one(); + fAcksSender.join(); + } + + if (fAcksReceiver.joinable()) { + fAcksReceiver.join(); + } + } }; } // namespace fair::mq::shmem diff --git a/fairmq/shmem/UnmanagedRegionImpl.h b/fairmq/shmem/UnmanagedRegionImpl.h new file mode 100644 index 00000000..82caa7d5 --- /dev/null +++ b/fairmq/shmem/UnmanagedRegionImpl.h @@ -0,0 +1,71 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#ifndef FAIR_MQ_SHMEM_UNMANAGEDREGIONIMPL_H_ +#define FAIR_MQ_SHMEM_UNMANAGEDREGIONIMPL_H_ + +#include "Manager.h" +#include "UnmanagedRegion.h" +#include + +#include + +#include // size_t + +namespace fair::mq::shmem +{ + +class Message; +class Socket; + +class UnmanagedRegionImpl final : public fair::mq::UnmanagedRegion +{ + friend class Message; + friend class Socket; + + public: + UnmanagedRegionImpl(Manager& manager, + const size_t size, + RegionCallback callback, + RegionBulkCallback bulkCallback, + fair::mq::RegionConfig cfg, + FairMQTransportFactory* factory) + : fair::mq::UnmanagedRegion(factory) + , fManager(manager) + , fRegion(nullptr) + , fRegionId(0) + { + auto result = fManager.CreateRegion(size, callback, bulkCallback, std::move(cfg)); + fRegion = result.first; + fRegionId = result.second; + } + + UnmanagedRegionImpl(const UnmanagedRegionImpl&) = delete; + UnmanagedRegionImpl(UnmanagedRegionImpl&&) = delete; + UnmanagedRegionImpl& operator=(const UnmanagedRegionImpl&) = delete; + UnmanagedRegionImpl& operator=(UnmanagedRegionImpl&&) = delete; + + void* GetData() const override { return fRegion->GetData(); } + size_t GetSize() const override { return fRegion->GetSize(); } + uint16_t GetId() const override { return fRegionId; } + void SetLinger(uint32_t linger) override { fRegion->SetLinger(linger); } + uint32_t GetLinger() const override { return fRegion->GetLinger(); } + + Transport GetType() const override { return fair::mq::Transport::SHM; } + + ~UnmanagedRegionImpl() override { fManager.RemoveRegion(fRegionId); } + + private: + Manager& fManager; + shmem::UnmanagedRegion* fRegion; + uint16_t fRegionId; +}; + +} // namespace fair::mq::shmem + +#endif /* FAIR_MQ_SHMEM_UNMANAGEDREGIONIMPL_H_ */ diff --git a/fairmq/shmem/runMonitor.cxx b/fairmq/shmem/runMonitor.cxx index 4ea8d4e6..1d80d7b6 100644 --- a/fairmq/shmem/runMonitor.cxx +++ b/fairmq/shmem/runMonitor.cxx @@ -76,6 +76,7 @@ int main(int argc, char** argv) string sessionName; string shmId; bool cleanup = false; + bool resetContent = false; bool selfDestruct = false; bool interactive = false; bool viewOnly = false; @@ -97,6 +98,7 @@ int main(int argc, char** argv) ("session,s" , value(&sessionName)->default_value("default"), "Session id") ("shmid" , value(&shmId)->default_value(""), "Shmem id (if not provided, it is generated out of session id and user id)") ("cleanup,c" , value(&cleanup)->implicit_value(true), "Perform cleanup and quit") + ("reset-content,r", value(&resetContent)->implicit_value(true), "[EXPERIMENTAL] Reset content of the segments (only call this when segment is not in use)") ("self-destruct,x", value(&selfDestruct)->implicit_value(true), "Quit after first closing of the memory") ("interactive,i" , value(&interactive)->implicit_value(true), "Interactive run") ("view,v" , value(&viewOnly)->implicit_value(true), "Run in view only mode") @@ -146,6 +148,11 @@ int main(int argc, char** argv) return 0; } + if (resetContent) { + Monitor::ResetContent(ShmId{shmId}); + return 0; + } + if (debug) { Monitor::PrintDebugInfo(ShmId{shmId}); return 0; diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 530e4847..28e9243c 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -100,6 +100,7 @@ add_testsuite(Message add_testsuite(Region SOURCES ${CMAKE_CURRENT_BINARY_DIR}/runner.cxx + region/_creation.cxx region/_region.cxx LINKS FairMQ diff --git a/test/region/_creation.cxx b/test/region/_creation.cxx new file mode 100644 index 00000000..88d38932 --- /dev/null +++ b/test/region/_creation.cxx @@ -0,0 +1,92 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#include +#include +#include + +#include +#include +#include + +#include + +#include + +#include + +#include + +namespace +{ + +using namespace std; + +struct ShmRemover +{ + ShmRemover(std::string _shmId) : shmId(std::move(_shmId)) {} + ~ShmRemover() { fair::mq::shmem::Monitor::Cleanup(fair::mq::shmem::ShmId{shmId}); } + + std::string shmId; +}; + +void Preallocate() +{ + const string session = to_string(fair::mq::tools::UuidHash()); + // generate shmId out of session id + user id (geteuid). + const string shmId = fair::mq::shmem::makeShmIdStr(session); + + // const uint64_t shmId64 = 0; + // const string shmId = fair::mq::shmem::makeShmIdStr(shmId64); + // LOG(info) << shmId; + + const uint16_t s1id = 0; + const uint64_t s1size = 10000000; + const uint16_t s2id = 1; + const uint64_t s2size = 20000000; + + const uint16_t r1id = 0; + const uint64_t r1size = 10000000; + const uint16_t r2id = 1; + const uint64_t r2size = 20000000; + + // cleanup when done + ShmRemover shmRemover(shmId); + + // managed segments + fair::mq::shmem::Segment segment1(shmId, s1id, s1size, fair::mq::shmem::rbTreeBestFit); + segment1.Lock(); + segment1.Zero(); + LOG(info) << "Created segment " << s1id << " of size " << segment1.GetSize() << " starting at " << segment1.GetData(); + + fair::mq::shmem::Segment segment2(shmId, s2id, s2size, fair::mq::shmem::rbTreeBestFit); + segment2.Lock(); + segment2.Zero(); + LOG(info) << "Created segment " << s2id << " of size " << segment2.GetSize() << " starting at " << segment2.GetData(); + + // unmanaged regions + fair::mq::shmem::UnmanagedRegion region1(shmId, r1id, r1size); + region1.Lock(); + region1.Zero(); + LOG(info) << "Created region " << r1id << " of size " << region1.GetSize() << " starting at " << region1.GetData(); + + fair::mq::shmem::UnmanagedRegion region2(shmId, r2id, r2size); + region2.Lock(); + region2.Zero(); + LOG(info) << "Created region " << r2id << " of size " << region2.GetSize() << " starting at " << region2.GetData(); + + // for a "soft reset" call (shmem should not be in active use by (no messages in flight) devices during this call): + // fair::mq::shmem::Monitor::ResetContent(fair::mq::shmem::ShmId{shmId}); +} + +TEST(PreallocateInsideSession, shmem) +{ + Preallocate(); +} + +}