diff --git a/fairmq/TransportFactory.h b/fairmq/TransportFactory.h index effe54bd..ac83b4e7 100644 --- a/fairmq/TransportFactory.h +++ b/fairmq/TransportFactory.h @@ -145,7 +145,6 @@ class TransportFactory int flags = 0, RegionConfig cfg = RegionConfig()) = 0; - /// @brief Create new UnmanagedRegion /// @param size size of the region /// @param callback callback to be called when a message belonging to this region is no longer needed by the transport diff --git a/fairmq/UnmanagedRegion.h b/fairmq/UnmanagedRegion.h index f4c2e4e0..538d8e06 100644 --- a/fairmq/UnmanagedRegion.h +++ b/fairmq/UnmanagedRegion.h @@ -16,6 +16,7 @@ #include // std::function #include // std::unique_ptr +#include // std::optional #include #include #include @@ -118,6 +119,13 @@ inline std::ostream& operator<<(std::ostream& os, const RegionEvent& event) } } +enum class RegionConstruction : int +{ + open, + create, + open_or_create +}; + struct RegionConfig { RegionConfig() = default; @@ -129,9 +137,12 @@ struct RegionConfig bool lock = false; /// mlock region after creation bool zero = false; /// zero region content after creation + bool removeOnDestruction = true; /// remove the region on object destruction + RegionConstruction constructionMode = RegionConstruction::create; /// how to construct the region: create/open/open_or_create int creationFlags = 0; /// flags passed to the underlying transport on region creation int64_t userFlags = 0; /// custom flags that have no effect on the transport, but can be retrieved from the region by the user std::string path = ""; /// file path, if the region is backed by a file + std::optional id = std::nullopt; /// region id uint32_t linger = 100; /// delay in ms before region destruction to collect outstanding events }; diff --git a/fairmq/shmem/Manager.h b/fairmq/shmem/Manager.h index 5b1587d7..1a3ed777 100644 --- a/fairmq/shmem/Manager.h +++ b/fairmq/shmem/Manager.h @@ -234,15 +234,16 @@ class Manager std::string op("create/open"); try { + std::string segmentName("fmq_" + fShmId + "_m_" + std::to_string(fSegmentId)); auto it = fShmSegments->find(fSegmentId); if (it == fShmSegments->end()) { op = "create"; // no segment with given id exists, creating if (allocationAlgorithm == "rbtree_best_fit") { - fSegments.emplace(fSegmentId, RBTreeBestFitSegment(create_only, std::string("fmq_" + fShmId + "_m_" + std::to_string(fSegmentId)).c_str(), size)); + fSegments.emplace(fSegmentId, RBTreeBestFitSegment(create_only, segmentName.c_str(), size)); fShmSegments->emplace(fSegmentId, AllocationAlgorithm::rbtree_best_fit); } else if (allocationAlgorithm == "simple_seq_fit") { - fSegments.emplace(fSegmentId, SimpleSeqFitSegment(create_only, std::string("fmq_" + fShmId + "_m_" + std::to_string(fSegmentId)).c_str(), size)); + fSegments.emplace(fSegmentId, SimpleSeqFitSegment(create_only, segmentName.c_str(), size)); fShmSegments->emplace(fSegmentId, AllocationAlgorithm::simple_seq_fit); } ss << "Created "; @@ -257,13 +258,13 @@ class Manager op = "open"; // found segment with the given id, opening if (it->second.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) { - fSegments.emplace(fSegmentId, RBTreeBestFitSegment(open_only, std::string("fmq_" + fShmId + "_m_" + std::to_string(fSegmentId)).c_str())); + fSegments.emplace(fSegmentId, RBTreeBestFitSegment(open_only, segmentName.c_str())); if (allocationAlgorithm != "rbtree_best_fit") { LOG(warn) << "Allocation algorithm of the opened segment is rbtree_best_fit, but requested is " << allocationAlgorithm << ". Ignoring requested setting."; allocationAlgorithm = "rbtree_best_fit"; } } else { - fSegments.emplace(fSegmentId, SimpleSeqFitSegment(open_only, std::string("fmq_" + fShmId + "_m_" + std::to_string(fSegmentId)).c_str())); + fSegments.emplace(fSegmentId, SimpleSeqFitSegment(open_only, segmentName.c_str())); if (allocationAlgorithm != "simple_seq_fit") { LOG(warn) << "Allocation algorithm of the opened segment is simple_seq_fit, but requested is " << allocationAlgorithm << ". Ignoring requested setting."; allocationAlgorithm = "simple_seq_fit"; @@ -276,7 +277,7 @@ class Manager << " Available: " << boost::apply_visitor(SegmentFreeMemory(), fSegments.at(fSegmentId)) << " bytes." << " Allocation algorithm: " << allocationAlgorithm; LOG(debug) << ss.str(); - } catch(interprocess_exception& bie) { + } catch (interprocess_exception& bie) { LOG(error) << "Failed to " << op << " shared memory segment (" << "fmq_" << fShmId << "_m_" << fSegmentId << "): " << bie.what(); throw TransportError(tools::ToString("Failed to ", op, " shared memory segment (", "fmq_", fShmId, "_m_", fSegmentId, "): ", bie.what())); } @@ -380,50 +381,60 @@ class Manager { boost::interprocess::scoped_lock lock(fShmMtx); - RegionCounter* rc = fManagementSegment.find(unique_instance).first; + if (!cfg.id.has_value()) { + RegionCounter* rc = fManagementSegment.find(unique_instance).first; - if (rc) { - LOG(debug) << "region counter found, with value of " << rc->fCount << ". incrementing."; - (rc->fCount)++; - LOG(debug) << "incremented region counter, now: " << rc->fCount; - } else { - LOG(debug) << "no region counter found, creating one and initializing with 1024"; - rc = fManagementSegment.construct(unique_instance)(1024); - LOG(debug) << "initialized region counter with: " << rc->fCount; + if (rc) { + LOG(trace) << "region counter found, with value of " << rc->fCount << ". incrementing."; + (rc->fCount)++; + LOG(trace) << "incremented region counter, now: " << rc->fCount; + } else { + LOG(trace) << "no region counter found, creating one and initializing with 1024"; + rc = fManagementSegment.construct(unique_instance)(1024); + LOG(trace) << "initialized region counter with: " << rc->fCount; + } + + cfg.id = rc->fCount; + } else if (cfg.id.value() == 0) { + LOG(error) << "User-given UnmanagedRegion ID must not be 0."; + throw TransportError("User-given UnmanagedRegion ID must not be 0."); } - uint16_t id = rc->fCount; - - auto it = fRegions.find(id); + auto it = fRegions.find(cfg.id.value()); if (it != fRegions.end()) { - LOG(error) << "Trying to create a region that already exists"; - return {nullptr, id}; + LOG(error) << "Trying to open/create a UnmanagedRegion that already exists (id: " << cfg.id.value() << ")"; + throw TransportError(tools::ToString("Trying to open/create a UnmanagedRegion that already exists (id: ", cfg.id.value(), ")")); } - auto r = fRegions.emplace(id, std::make_unique(fShmId, id, size, false, callback, bulkCallback, cfg)); - // LOG(debug) << "Created region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'"; + Region& region = *(fRegions.emplace(cfg.id.value(), std::make_unique(fShmId, size, false, callback, bulkCallback, cfg)).first->second); + // LOG(debug) << "Created region with id '" << cfg.id.value() << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'"; if (cfg.lock) { - LOG(debug) << "Locking region " << id << "..."; - if (mlock(r.first->second->fRegion.get_address(), r.first->second->fRegion.get_size()) == -1) { - LOG(error) << "Could not lock region " << id << ". Code: " << errno << ", reason: " << strerror(errno); - throw TransportError(tools::ToString("Could not lock region ", id, ": ", strerror(errno))); + LOG(debug) << "Locking region " << cfg.id.value() << "..."; + if (mlock(region.fRegion.get_address(), region.fRegion.get_size()) == -1) { + LOG(error) << "Could not lock region " << cfg.id.value() << ". Code: " << errno << ", reason: " << strerror(errno); + throw TransportError(tools::ToString("Could not lock region ", cfg.id.value(), ": ", strerror(errno))); } - LOG(debug) << "Successfully locked region " << id << "."; + LOG(debug) << "Successfully locked region " << cfg.id.value() << "."; } if (cfg.zero) { - LOG(debug) << "Zeroing free memory of region " << id << "..."; - memset(r.first->second->fRegion.get_address(), 0x00, r.first->second->fRegion.get_size()); - LOG(debug) << "Successfully zeroed free memory of region " << id << "."; + LOG(debug) << "Zeroing free memory of region " << cfg.id.value() << "..."; + memset(region.fRegion.get_address(), 0x00, region.fRegion.get_size()); + LOG(debug) << "Successfully zeroed free memory of region " << cfg.id.value() << "."; } - fShmRegions->emplace(id, RegionInfo(cfg.path.c_str(), cfg.creationFlags, cfg.userFlags, fShmVoidAlloc)); + bool newRegionCreated = fShmRegions->emplace(cfg.id.value(), RegionInfo(cfg.path.c_str(), cfg.creationFlags, cfg.userFlags, fShmVoidAlloc)).second; - r.first->second->StartReceivingAcks(); - result.first = &(r.first->second->fRegion); - result.second = id; + // start ack receiver only if a callback has been provided. + if (callback || bulkCallback) { + region.StartAckReceiver(); + } + result.first = &(region.fRegion); + result.second = cfg.id.value(); - (fEventCounter->fCount)++; + if (newRegionCreated) { + (fEventCounter->fCount)++; + } } fRegionsGen += 1; // signal TL cache invalidation fRegionEventsShmCV.notify_all(); @@ -473,11 +484,12 @@ class Manager // get region info RegionInfo regionInfo = fShmRegions->at(id); RegionConfig cfg; + cfg.id = id; cfg.creationFlags = regionInfo.fCreationFlags; cfg.path = regionInfo.fPath.c_str(); // LOG(debug) << "Located remote region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'"; - auto r = fRegions.emplace(id, std::make_unique(fShmId, id, 0, true, nullptr, nullptr, std::move(cfg))); + auto r = fRegions.emplace(id, std::make_unique(fShmId, 0, true, nullptr, nullptr, std::move(cfg))); return r.first->second.get(); } catch (std::out_of_range& oor) { LOG(error) << "Could not get remote region with id '" << id << "'. Does the region creator run with the same session id?"; @@ -496,12 +508,14 @@ class Manager fRegions.at(id)->StopAcks(); { boost::interprocess::scoped_lock lock(fShmMtx); - fShmRegions->at(id).fDestroyed = true; + if (fRegions.at(id)->fRemoveOnDestruction) { + fShmRegions->at(id).fDestroyed = true; + (fEventCounter->fCount)++; + } fRegions.erase(id); - (fEventCounter->fCount)++; } fRegionEventsShmCV.notify_all(); - } catch(std::out_of_range& oor) { + } catch (std::out_of_range& oor) { LOG(debug) << "RemoveRegion() could not locate region with id '" << id << "'"; } fRegionsGen += 1; // signal TL cache invalidation @@ -749,7 +763,7 @@ class Manager DecrementShmMsgCounter(segmentId); try { fMsgDebug->at(segmentId).erase(GetHandleFromAddress(ShmHeader::UserPtr(ptr), fSegmentId)); - } catch(const std::out_of_range& oor) { + } catch (const std::out_of_range& oor) { LOG(debug) << "could not locate debug container for " << segmentId << ": " << oor.what(); } #endif diff --git a/fairmq/shmem/Message.h b/fairmq/shmem/Message.h index ab43630f..7409d450 100644 --- a/fairmq/shmem/Message.h +++ b/fairmq/shmem/Message.h @@ -381,9 +381,9 @@ class Message final : public fair::mq::Message Deallocate(); fAlignment = 0; fManager.DecrementMsgCounter(); - } catch(SharedMemoryError& sme) { + } catch (SharedMemoryError& sme) { LOG(error) << "error closing message: " << sme.what(); - } catch(boost::interprocess::lock_exception& le) { + } catch (boost::interprocess::lock_exception& le) { LOG(error) << "error closing message: " << le.what(); } } diff --git a/fairmq/shmem/Monitor.cxx b/fairmq/shmem/Monitor.cxx index f1d2eadd..59e0efcd 100644 --- a/fairmq/shmem/Monitor.cxx +++ b/fairmq/shmem/Monitor.cxx @@ -8,6 +8,7 @@ #include "Monitor.h" #include "Common.h" +#include "Region.h" #include #include @@ -179,15 +180,23 @@ bool Monitor::PrintShm(const ShmId& shmId) managed_shared_memory managementSegment(open_read_only, std::string("fmq_" + shmId.shmId + "_mng").c_str()); VoidAlloc allocInstance(managementSegment.get_segment_manager()); - Uint16SegmentInfoHashMap* segmentInfos = managementSegment.find(unique_instance).first; + Uint16SegmentInfoHashMap* shmSegments = managementSegment.find(unique_instance).first; std::unordered_map> segments; - if (!segmentInfos) { + Uint16RegionInfoHashMap* shmRegions = managementSegment.find(unique_instance).first; + std::unordered_map> regions; + + if (!shmSegments) { LOG(error) << "Found management segment, but cannot locate segment info, something went wrong..."; return false; } - for (const auto& s : *segmentInfos) { + if (!shmSegments) { + LOG(error) << "Found management segment, but cannot locate region info..."; + return false; + } + + for (const auto& s : *shmSegments) { if (s.second.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) { segments.emplace(s.first, RBTreeBestFitSegment(open_read_only, std::string("fmq_" + shmId.shmId + "_m_" + to_string(s.first)).c_str())); } else { @@ -221,7 +230,7 @@ bool Monitor::PrintShm(const ShmId& shmId) << ", session: " << sessionName << ", creator id: " << creatorId << ", devices: " << numDevices - << ", segments:\n"; + << ", managed segments:\n"; for (const auto& s : segments) { size_t free = boost::apply_visitor(SegmentFreeMemory(), s.second); @@ -243,6 +252,13 @@ bool Monitor::PrintShm(const ShmId& shmId) << "total: " << mtotal << ", free: " << mfree << ", used: " << mused; + + if (!shmRegions->empty()) { + ss << "\n unmanaged regions:\n"; + for (const auto& r : *shmRegions) { + ss << " [" << r.first << "]: " << (r.second.fDestroyed ? "destroyed" : "alive"); + } + } LOGV(info, user1) << ss.str(); } catch (bie&) { return false; @@ -458,15 +474,15 @@ unsigned long Monitor::GetFreeMemory(const ShmId& shmId, uint16_t segmentId) boost::interprocess::named_mutex mtx(boost::interprocess::open_only, std::string("fmq_" + shmId.shmId + "_mtx").c_str()); boost::interprocess::scoped_lock lock(mtx); - Uint16SegmentInfoHashMap* segmentInfos = managementSegment.find(unique_instance).first; + Uint16SegmentInfoHashMap* shmSegments = managementSegment.find(unique_instance).first; - if (!segmentInfos) { + if (!shmSegments) { LOG(error) << "Found management segment, but could not locate segment info"; throw MonitorError("Found management segment, but could not locate segment info"); } - auto it = segmentInfos->find(segmentId); - if (it != segmentInfos->end()) { + auto it = shmSegments->find(segmentId); + if (it != shmSegments->end()) { if (it->second.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) { RBTreeBestFitSegment segment(open_read_only, std::string("fmq_" + shmId.shmId + "_m_" + std::to_string(segmentId)).c_str()); return segment.get_free_memory(); @@ -531,12 +547,12 @@ std::vector> Monitor::Cleanup(const ShmId& shmId, b try { bipc::managed_shared_memory managementSegment(bipc::open_only, managementSegmentName.c_str()); - Uint16RegionInfoHashMap* regions = managementSegment.find(bipc::unique_instance).first; - if (regions) { + Uint16RegionInfoHashMap* shmRegions = managementSegment.find(bipc::unique_instance).first; + if (shmRegions) { if (verbose) { - LOG(info) << "Found " << regions->size() << " unmanaged regions..."; + LOG(info) << "Found " << shmRegions->size() << " unmanaged regions..."; } - for (const auto& region : *regions) { + for (const auto& region : *shmRegions) { uint16_t id = region.first; RegionInfo info = region.second; string path = info.fPath.c_str(); @@ -553,13 +569,13 @@ std::vector> Monitor::Cleanup(const ShmId& shmId, b } } - Uint16SegmentInfoHashMap* segments = managementSegment.find(bipc::unique_instance).first; + Uint16SegmentInfoHashMap* shmSegments = managementSegment.find(bipc::unique_instance).first; - if (segments) { + if (shmSegments) { if (verbose) { - LOG(info) << "Found " << segments->size() << " managed segments..."; + LOG(info) << "Found " << shmSegments->size() << " managed segments..."; } - for (const auto& segment : *segments) { + for (const auto& segment : *shmSegments) { result.emplace_back(Remove("fmq_" + shmId.shmId + "_m_" + to_string(segment.first), verbose)); } } else { diff --git a/fairmq/shmem/Region.h b/fairmq/shmem/Region.h index 97ffd799..c7ba44b3 100644 --- a/fairmq/shmem/Region.h +++ b/fairmq/shmem/Region.h @@ -38,12 +38,13 @@ namespace fair::mq::shmem struct Region { - Region(const std::string& shmId, uint16_t id, uint64_t size, bool remote, RegionCallback callback, RegionBulkCallback bulkCallback, RegionConfig cfg) + Region(const std::string& shmId, uint64_t size, bool remote, RegionCallback callback, RegionBulkCallback bulkCallback, RegionConfig cfg) : fRemote(remote) + , fRemoveOnDestruction(cfg.removeOnDestruction) , fLinger(cfg.linger) , fStopAcks(false) - , fName("fmq_" + shmId + "_rg_" + std::to_string(id)) - , fQueueName("fmq_" + shmId + "_rgq_" + std::to_string(id)) + , fName("fmq_" + shmId + "_rg_" + std::to_string(cfg.id.value())) + , fQueueName("fmq_" + shmId + "_rgq_" + std::to_string(cfg.id.value())) , fShmemObject() , fFile(nullptr) , fFileMapping() @@ -81,23 +82,46 @@ struct Region if (fRemote) { fShmemObject = shared_memory_object(open_only, fName.c_str(), read_write); } else { - fShmemObject = shared_memory_object(create_only, fName.c_str(), read_write); - fShmemObject.truncate(size); + switch (cfg.constructionMode) { + case RegionConstruction::create: + fShmemObject = shared_memory_object(create_only, fName.c_str(), read_write); + fShmemObject.truncate(size); + break; + case RegionConstruction::open: + fShmemObject = shared_memory_object(open_only, fName.c_str(), read_write); + break; + case RegionConstruction::open_or_create: + try { + fShmemObject = shared_memory_object(open_only, fName.c_str(), read_write); + } catch (interprocess_exception&) { + fShmemObject = shared_memory_object(create_only, fName.c_str(), read_write); + fShmemObject.truncate(size); + } + break; + default: + throw TransportError(tools::ToString("Unknown RegionConstruction mode provided ")); + break; + } } } catch (interprocess_exception& e) { - LOG(error) << "Failed " << (fRemote ? "opening" : "creating") << " shared_memory_object for region id '" << id << "': " << e.what(); + LOG(error) << "Failed " << (fRemote ? "opening" : "creating") << " shared_memory_object for region id '" << cfg.id.value() << "': " << e.what(); throw; } try { fRegion = mapped_region(fShmemObject, read_write, 0, 0, 0, cfg.creationFlags); } catch (interprocess_exception& e) { - LOG(error) << "Failed mapping shared_memory_object for region id '" << id << "': " << e.what(); + LOG(error) << "Failed mapping shared_memory_object for region id '" << cfg.id.value() << "': " << e.what(); throw; } } - InitializeQueues(); - StartSendingAcks(); + // skip queues initialization if region is being created without callbacks passed + if (fRemote || (fCallback || fBulkCallback)) { + InitializeQueues(); + StartAckSender(); + } else { + LOG(trace) << "skipping queues creation & ack sender thread, because created region locally but no callback were provided"; + } LOG(trace) << "shmem: initialized region: " << fName << " (" << (fRemote ? "remote" : "local") << ")"; } @@ -112,16 +136,11 @@ struct Region void InitializeQueues() { using namespace boost::interprocess; - - if (fRemote) { - fQueue = std::make_unique(open_only, fQueueName.c_str()); - } else { - fQueue = std::make_unique(create_only, fQueueName.c_str(), 1024, fAckBunchSize * sizeof(RegionBlock)); - } + fQueue = std::make_unique(open_or_create, fQueueName.c_str(), 1024, fAckBunchSize * sizeof(RegionBlock)); LOG(trace) << "shmem: initialized region queue: " << fQueueName << " (" << (fRemote ? "remote" : "local") << ")"; } - void StartSendingAcks() + void StartAckSender() { fAcksSender = std::thread(&Region::SendAcks, this); } @@ -163,7 +182,7 @@ struct Region << " blocks left to send: " << blocksToSend << ")."; } - void StartReceivingAcks() + void StartAckReceiver() { if (!fAcksReceiver.joinable()) { fAcksReceiver = std::thread(&Region::ReceiveAcks, this); @@ -255,20 +274,26 @@ struct Region fAcksReceiver.join(); } - if (boost::interprocess::shared_memory_object::remove(fName.c_str())) { - LOG(trace) << "Region '" << fName << "' destroyed."; - } + if (fRemoveOnDestruction) { + if (boost::interprocess::shared_memory_object::remove(fName.c_str())) { + LOG(trace) << "Region '" << fName << "' destroyed."; + } - if (boost::interprocess::file_mapping::remove(fName.c_str())) { - LOG(trace) << "File mapping '" << fName << "' destroyed."; - } - - if (fFile) { - fclose(fFile); + if (boost::interprocess::file_mapping::remove(fName.c_str())) { + LOG(trace) << "File mapping '" << fName << "' destroyed."; + } + } else { + LOG(debug) << "Skipping removal of " << fName << " unmanaged region, because RegionConfig::removeOnDestruction is false"; } if (boost::interprocess::message_queue::remove(fQueueName.c_str())) { LOG(trace) << "Region queue '" << fQueueName << "' destroyed."; + } else { + LOG(debug) << "Region queue '" << fQueueName << "' not destroyed."; + } + + if (fFile) { + fclose(fFile); } } else { // LOG(debug) << "Region queue '" << fQueueName << "' is remote, no cleanup necessary"; @@ -278,6 +303,7 @@ struct Region } bool fRemote; + bool fRemoveOnDestruction; uint32_t fLinger; std::atomic fStopAcks; std::string fName; diff --git a/fairmq/shmem/TransportFactory.h b/fairmq/shmem/TransportFactory.h index 09f0d532..beb89201 100644 --- a/fairmq/shmem/TransportFactory.h +++ b/fairmq/shmem/TransportFactory.h @@ -147,14 +147,14 @@ class TransportFactory final : public fair::mq::TransportFactory { cfg.path = path; cfg.creationFlags = flags; - return CreateUnmanagedRegion(size, callback, nullptr, cfg); + return CreateUnmanagedRegion(size, callback, nullptr, std::move(cfg)); } UnmanagedRegionPtr CreateUnmanagedRegion(size_t size, RegionBulkCallback bulkCallback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override { cfg.path = path; cfg.creationFlags = flags; - return CreateUnmanagedRegion(size, nullptr, bulkCallback, cfg); + return CreateUnmanagedRegion(size, nullptr, bulkCallback, std::move(cfg)); } UnmanagedRegionPtr CreateUnmanagedRegion(size_t size, int64_t userFlags, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override @@ -162,7 +162,7 @@ class TransportFactory final : public fair::mq::TransportFactory cfg.path = path; cfg.userFlags = userFlags; cfg.creationFlags = flags; - return CreateUnmanagedRegion(size, callback, nullptr, cfg); + return CreateUnmanagedRegion(size, callback, nullptr, std::move(cfg)); } UnmanagedRegionPtr CreateUnmanagedRegion(size_t size, int64_t userFlags, RegionBulkCallback bulkCallback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override @@ -170,16 +170,16 @@ class TransportFactory final : public fair::mq::TransportFactory cfg.path = path; cfg.userFlags = userFlags; cfg.creationFlags = flags; - return CreateUnmanagedRegion(size, nullptr, bulkCallback, cfg); + return CreateUnmanagedRegion(size, nullptr, bulkCallback, std::move(cfg)); } UnmanagedRegionPtr CreateUnmanagedRegion(size_t size, RegionCallback callback, RegionConfig cfg) override { - return CreateUnmanagedRegion(size, callback, nullptr, cfg); + return CreateUnmanagedRegion(size, callback, nullptr, std::move(cfg)); } UnmanagedRegionPtr CreateUnmanagedRegion(size_t size, RegionBulkCallback bulkCallback, RegionConfig cfg) override { - return CreateUnmanagedRegion(size, nullptr, bulkCallback, cfg); + return CreateUnmanagedRegion(size, nullptr, bulkCallback, std::move(cfg)); } UnmanagedRegionPtr CreateUnmanagedRegion(size_t size, RegionCallback callback, RegionBulkCallback bulkCallback, fair::mq::RegionConfig cfg)