Compare commits

..

7 Commits

Author SHA1 Message Date
Alexey Rybalchenko
2df3d909fa shm: when refCount segment size is zero, fallback to old behaviour
, which is to store reference counts inside the main data segment
2023-11-29 19:21:42 +01:00
Alexey Rybalchenko
05a2ae6a31 example: configure new script too 2023-11-29 19:21:42 +01:00
Alexey Rybalchenko
58ffdfd1f4 Remove unused ctor and constant 2023-11-29 19:21:42 +01:00
Alexey Rybalchenko
addfd071bb Fix incorrect parameters in region example scripts 2023-11-24 14:19:21 +01:00
Alexey Rybalchenko
2d27abc533 Examples: add a script for externally created region 2023-11-24 14:19:21 +01:00
Alexey Rybalchenko
faf577086a shm: fix initialization of rc segment when region is created externally 2023-11-24 14:19:21 +01:00
Alexey Rybalchenko
ff1f9b94ef shm: include rcCountSegment free memory in the monitor output 2023-11-24 14:19:21 +01:00
12 changed files with 181 additions and 64 deletions

View File

@@ -61,7 +61,7 @@ function(add_example)
set(FAIRMQ_BIN_DIR ${CMAKE_BINARY_DIR}/fairmq) set(FAIRMQ_BIN_DIR ${CMAKE_BINARY_DIR}/fairmq)
foreach(script IN LISTS scripts) foreach(script IN LISTS scripts)
set(script_file "${script_prefix}-${script}.sh") set(script_file "${script_prefix}-${script}.sh")
configure_file("${CMAKE_CURRENT_SOURCE_DIR}/${script_file}.in" "${CMAKE_CURRENT_BINARY_DIR}/${script_file}") configure_file("${CMAKE_CURRENT_SOURCE_DIR}/${script_file}.in" "${CMAKE_CURRENT_BINARY_DIR}/${script_file}" @ONLY)
endforeach() endforeach()
if(ARG_CONFIG) if(ARG_CONFIG)
@@ -119,7 +119,7 @@ function(add_example)
set(FAIRMQ_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}/fairmq) set(FAIRMQ_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}/fairmq)
foreach(script IN LISTS scripts) foreach(script IN LISTS scripts)
set(script_file "${script_prefix}-${script}.sh") set(script_file "${script_prefix}-${script}.sh")
configure_file("${CMAKE_CURRENT_SOURCE_DIR}/${script_file}.in" "${CMAKE_CURRENT_BINARY_DIR}/${script_file}_install") configure_file("${CMAKE_CURRENT_SOURCE_DIR}/${script_file}.in" "${CMAKE_CURRENT_BINARY_DIR}/${script_file}_install" @ONLY)
install( install(
PROGRAMS "${CMAKE_CURRENT_BINARY_DIR}/${script_file}_install" PROGRAMS "${CMAKE_CURRENT_BINARY_DIR}/${script_file}_install"
DESTINATION ${PROJECT_INSTALL_BINDIR} DESTINATION ${PROJECT_INSTALL_BINDIR}

View File

@@ -8,5 +8,5 @@
add_example(NAME region add_example(NAME region
DEVICE sampler processor sink keep-alive DEVICE sampler processor sink keep-alive
SCRIPT region region-advanced SCRIPT region region-advanced region-advanced-external
) )

View File

@@ -0,0 +1,95 @@
#!/bin/bash
export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@
transport=${1:-shmem}
msgSize=${2:-1000000}
SAMPLER="fairmq-ex-region-sampler"
SAMPLER+=" --id sampler1"
# SAMPLER+=" --sampling-rate 10"
SAMPLER+=" --severity debug"
SAMPLER+=" --msg-size $msgSize"
SAMPLER+=" --transport $transport"
SAMPLER+=" --shmid 1"
SAMPLER+=" --shm-monitor false"
SAMPLER+=" --rc-segment-size 200000000"
SAMPLER+=" --external-region true"
SAMPLER+=" --shm-no-cleanup true"
SAMPLER+=" --chan-name data1"
SAMPLER+=" --channel-config name=data1,type=push,method=bind,address=tcp://127.0.0.1:7777"
xterm -geometry 90x60+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER &
PROCESSOR1="fairmq-ex-region-processor"
PROCESSOR1+=" --id processor1"
PROCESSOR1+=" --severity debug"
PROCESSOR1+=" --transport $transport"
PROCESSOR1+=" --shmid 1"
PROCESSOR1+=" --shm-segment-id 1"
PROCESSOR1+=" --shm-monitor false"
PROCESSOR1+=" --shm-no-cleanup true"
PROCESSOR1+=" --channel-config name=data1,type=pull,method=connect,address=tcp://127.0.0.1:7777"
PROCESSOR1+=" name=data2,type=push,method=bind,address=tcp://127.0.0.1:7778"
PROCESSOR1+=" name=data3,type=push,method=bind,address=tcp://127.0.0.1:7779"
xterm -geometry 90x40+550+40 -hold -e @EX_BIN_DIR@/$PROCESSOR1 &
PROCESSOR2="fairmq-ex-region-processor"
PROCESSOR2+=" --id processor2"
PROCESSOR2+=" --severity debug"
PROCESSOR2+=" --transport $transport"
PROCESSOR2+=" --shmid 1"
PROCESSOR2+=" --shm-segment-id 2"
PROCESSOR2+=" --shm-monitor false"
PROCESSOR2+=" --shm-no-cleanup true"
PROCESSOR2+=" --channel-config name=data1,type=pull,method=connect,address=tcp://127.0.0.1:7777"
PROCESSOR2+=" name=data2,type=push,method=bind,address=tcp://127.0.0.1:7788"
PROCESSOR2+=" name=data3,type=push,method=bind,address=tcp://127.0.0.1:7789"
xterm -geometry 90x40+550+600 -hold -e @EX_BIN_DIR@/$PROCESSOR2 &
SINK1_1="fairmq-ex-region-sink"
SINK1_1+=" --id sink1_1"
SINK1_1+=" --severity debug"
SINK1_1+=" --chan-name data2"
SINK1_1+=" --transport $transport"
SINK1_1+=" --shmid 1"
SINK1_1+=" --shm-segment-id 1"
SINK1_1+=" --shm-monitor false"
SINK1_1+=" --shm-no-cleanup true"
SINK1_1+=" --channel-config name=data2,type=pull,method=connect,address=tcp://127.0.0.1:7778"
xterm -geometry 90x20+1100+0 -hold -e @EX_BIN_DIR@/$SINK1_1 &
SINK1_2="fairmq-ex-region-sink"
SINK1_2+=" --id sink1_2"
SINK1_2+=" --severity debug"
SINK1_2+=" --chan-name data3"
SINK1_2+=" --transport $transport"
SINK1_2+=" --shmid 1"
SINK1_2+=" --shm-segment-id 1"
SINK1_2+=" --shm-monitor false"
SINK1_2+=" --shm-no-cleanup true"
SINK1_2+=" --channel-config name=data3,type=pull,method=connect,address=tcp://127.0.0.1:7779"
xterm -geometry 90x20+1100+300 -hold -e @EX_BIN_DIR@/$SINK1_2 &
SINK2_1="fairmq-ex-region-sink"
SINK2_1+=" --id sink2_1"
SINK2_1+=" --severity debug"
SINK2_1+=" --chan-name data2"
SINK2_1+=" --transport $transport"
SINK2_1+=" --shmid 1"
SINK2_1+=" --shm-segment-id 2"
SINK2_1+=" --shm-monitor false"
SINK2_1+=" --shm-no-cleanup true"
SINK2_1+=" --channel-config name=data2,type=pull,method=connect,address=tcp://127.0.0.1:7788"
xterm -geometry 90x20+1100+600 -hold -e @EX_BIN_DIR@/$SINK2_1 &
SINK2_2="fairmq-ex-region-sink"
SINK2_2+=" --id sink2_2"
SINK2_2+=" --severity debug"
SINK2_2+=" --chan-name data3"
SINK2_2+=" --transport $transport"
SINK2_2+=" --shmid 1"
SINK2_2+=" --shm-segment-id 2"
SINK2_2+=" --shm-monitor false"
SINK2_2+=" --shm-no-cleanup true"
SINK2_2+=" --channel-config name=data3,type=pull,method=connect,address=tcp://127.0.0.1:7789"
xterm -geometry 90x20+1100+900 -hold -e @EX_BIN_DIR@/$SINK2_2 &

View File

@@ -2,16 +2,8 @@
export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@ export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@
transport="shmem" transport=${1:-shmem}
msgSize="1000000" msgSize=${2:-1000000}
if [[ $1 =~ ^[a-z]+$ ]]; then
transport=$1
fi
if [[ $2 =~ ^[0-9]+$ ]]; then
msgSize=$1
fi
SAMPLER="fairmq-ex-region-sampler" SAMPLER="fairmq-ex-region-sampler"
SAMPLER+=" --id sampler1" SAMPLER+=" --id sampler1"
@@ -19,6 +11,7 @@ SAMPLER+=" --id sampler1"
SAMPLER+=" --severity debug" SAMPLER+=" --severity debug"
SAMPLER+=" --msg-size $msgSize" SAMPLER+=" --msg-size $msgSize"
SAMPLER+=" --transport $transport" SAMPLER+=" --transport $transport"
SAMPLER+=" --rc-segment-size 0"
SAMPLER+=" --shm-monitor true" SAMPLER+=" --shm-monitor true"
SAMPLER+=" --chan-name data1" SAMPLER+=" --chan-name data1"
SAMPLER+=" --channel-config name=data1,type=push,method=bind,address=tcp://127.0.0.1:7777" SAMPLER+=" --channel-config name=data1,type=push,method=bind,address=tcp://127.0.0.1:7777"

View File

@@ -2,16 +2,8 @@
export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@ export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@
transport="shmem" transport=${1:-shmem}
msgSize="1000000" msgSize=${2:-1000000}
if [[ $1 =~ ^[a-z]+$ ]]; then
transport=$1
fi
if [[ $2 =~ ^[0-9]+$ ]]; then
msgSize=$1
fi
SAMPLER="fairmq-ex-region-sampler" SAMPLER="fairmq-ex-region-sampler"
SAMPLER+=" --id sampler1" SAMPLER+=" --id sampler1"

View File

@@ -95,10 +95,11 @@ struct ShmManager
uint64_t size = stoull(conf.at(1)); uint64_t size = stoull(conf.at(1));
fair::mq::RegionConfig cfg; fair::mq::RegionConfig cfg;
cfg.id = id; cfg.id = id;
cfg.rcSegmentSize = 0;
cfg.size = size; cfg.size = size;
regionCfgs.push_back(cfg); regionCfgs.push_back(cfg);
auto ret = regions.emplace(id, make_unique<fair::mq::shmem::UnmanagedRegion>(shmId, id, size)); auto ret = regions.emplace(id, make_unique<fair::mq::shmem::UnmanagedRegion>(shmId, cfg));
fair::mq::shmem::UnmanagedRegion& region = *(ret.first->second); fair::mq::shmem::UnmanagedRegion& region = *(ret.first->second);
LOG(info) << "Created unamanged region " << id << " of size " << region.GetSize() LOG(info) << "Created unamanged region " << id << " of size " << region.GetSize()
<< ", starting at " << region.GetData() << ". Locking..."; << ", starting at " << region.GetData() << ". Locking...";

View File

@@ -172,8 +172,6 @@ enum class AllocationAlgorithm : int
struct RegionInfo struct RegionInfo
{ {
static constexpr uint64_t DefaultRcSegmentSize = 10000000;
RegionInfo(const char* path, int flags, uint64_t userFlags, uint64_t size, uint64_t rcSegmentSize, const VoidAlloc& alloc) RegionInfo(const char* path, int flags, uint64_t userFlags, uint64_t size, uint64_t rcSegmentSize, const VoidAlloc& alloc)
: fPath(path, alloc) : fPath(path, alloc)
, fCreationFlags(flags) , fCreationFlags(flags)
@@ -183,14 +181,6 @@ struct RegionInfo
, fDestroyed(false) , fDestroyed(false)
{} {}
RegionInfo(const VoidAlloc& alloc)
: RegionInfo("", 0, 0, 0, DefaultRcSegmentSize, alloc)
{}
RegionInfo(const char* path, int flags, uint64_t userFlags, uint64_t size, const VoidAlloc& alloc)
: RegionInfo(path, flags, userFlags, size, DefaultRcSegmentSize, alloc)
{}
Str fPath; Str fPath;
int fCreationFlags; int fCreationFlags;
uint64_t fUserFlags; uint64_t fUserFlags;

View File

@@ -323,7 +323,6 @@ class Manager
} }
const uint16_t id = cfg.id.value(); const uint16_t id = cfg.id.value();
const uint64_t rcSegmentSize = cfg.rcSegmentSize;
std::lock_guard<std::mutex> lock(fLocalRegionsMtx); std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
@@ -340,6 +339,12 @@ class Manager
LOG(debug) << "Unmanaged region (view) already present, promoting to controller"; LOG(debug) << "Unmanaged region (view) already present, promoting to controller";
region->BecomeController(cfg); region->BecomeController(cfg);
} else { } else {
// we need to update local config, if the region information already exists
auto info = fShmRegions->find(id);
if (info != fShmRegions->end()) {
cfg.rcSegmentSize = info->second.fRCSegmentSize;
}
auto res = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, size, true, cfg)); auto res = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, size, true, cfg));
region = res.first->second.get(); region = res.first->second.get();
} }
@@ -348,7 +353,6 @@ class Manager
// start ack receiver only if a callback has been provided. // start ack receiver only if a callback has been provided.
if (callback || bulkCallback) { if (callback || bulkCallback) {
region->SetCallbacks(callback, bulkCallback); region->SetCallbacks(callback, bulkCallback);
region->InitializeRefCountSegment(rcSegmentSize);
region->InitializeQueues(); region->InitializeQueues();
region->StartAckSender(); region->StartAckSender();
region->StartAckReceiver(); region->StartAckReceiver();
@@ -401,19 +405,18 @@ class Manager
try { try {
RegionConfig cfg; RegionConfig cfg;
const uint64_t rcSegmentSize = cfg.rcSegmentSize;
// get region info // get region info
{ {
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx); boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
RegionInfo regionInfo = fShmRegions->at(id); RegionInfo regionInfo = fShmRegions->at(id);
cfg.id = id; cfg.id = id;
cfg.creationFlags = regionInfo.fCreationFlags; cfg.creationFlags = regionInfo.fCreationFlags;
cfg.rcSegmentSize = regionInfo.fRCSegmentSize;
cfg.path = regionInfo.fPath.c_str(); cfg.path = regionInfo.fPath.c_str();
} }
// LOG(debug) << "Located remote region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'"; // LOG(debug) << "Located remote region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'";
auto r = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, 0, false, std::move(cfg))); auto r = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, 0, false, std::move(cfg)));
r.first->second->InitializeRefCountSegment(rcSegmentSize);
r.first->second->InitializeQueues(); r.first->second->InitializeQueues();
r.first->second->StartAckSender(); r.first->second->StartAckSender();
return r.first->second.get(); return r.first->second.get();
@@ -482,6 +485,7 @@ class Manager
cfg.id = info.id; cfg.id = info.id;
cfg.creationFlags = regionInfo.fCreationFlags; cfg.creationFlags = regionInfo.fCreationFlags;
cfg.path = regionInfo.fPath.c_str(); cfg.path = regionInfo.fPath.c_str();
cfg.rcSegmentSize = regionInfo.fRCSegmentSize;
regionCfgs.emplace(info.id, cfg); regionCfgs.emplace(info.id, cfg);
// fill the ptr+size info after shmLock is released, to avoid constructing local region under it // fill the ptr+size info after shmLock is released, to avoid constructing local region under it
} else { } else {
@@ -503,10 +507,8 @@ class Manager
if (it != fRegions.end()) { if (it != fRegions.end()) {
region = it->second.get(); region = it->second.get();
} else { } else {
const uint64_t rcSegmentSize = cfgIt->second.rcSegmentSize;
auto r = fRegions.emplace(cfgIt->first, std::make_unique<UnmanagedRegion>(fShmId, 0, false, cfgIt->second)); auto r = fRegions.emplace(cfgIt->first, std::make_unique<UnmanagedRegion>(fShmId, 0, false, cfgIt->second));
region = r.first->second.get(); region = r.first->second.get();
region->InitializeRefCountSegment(rcSegmentSize);
region->InitializeQueues(); region->InitializeQueues();
region->StartAckSender(); region->StartAckSender();
} }

View File

@@ -251,7 +251,12 @@ class Message final : public fair::mq::Message
if (!fRegionPtr) { if (!fRegionPtr) {
throw TransportError(tools::ToString("Cannot get unmanaged region with id ", fRegionId)); throw TransportError(tools::ToString("Cannot get unmanaged region with id ", fRegionId));
} }
return fRegionPtr->GetRefCountAddressFromHandle(fShared)->Get(); if (fRegionPtr->fRcSegmentSize > 0) {
return fRegionPtr->GetRefCountAddressFromHandle(fShared)->Get();
} else {
fManager.GetSegment(fSegmentId);
return ShmHeader::RefCount(fManager.GetAddressFromHandle(fShared, fSegmentId));
}
} }
void Copy(const fair::mq::Message& other) override void Copy(const fair::mq::Message& other) override
@@ -277,19 +282,29 @@ class Message final : public fair::mq::Message
if (!fRegionPtr) { if (!fRegionPtr) {
throw TransportError(tools::ToString("Cannot get unmanaged region with id ", otherMsg.fRegionId)); throw TransportError(tools::ToString("Cannot get unmanaged region with id ", otherMsg.fRegionId));
} }
if (otherMsg.fShared < 0) { if (fRegionPtr->fRcSegmentSize > 0) {
// UR msg not yet shared, create the reference counting object with count 2 if (otherMsg.fShared < 0) {
try { // UR msg not yet shared, create the reference counting object with count 2
otherMsg.fShared = fRegionPtr->HandleFromAddress(&(fRegionPtr->MakeRefCount(2))); try {
} catch (boost::interprocess::bad_alloc& ba) { otherMsg.fShared = fRegionPtr->HandleFromAddress(&(fRegionPtr->MakeRefCount(2)));
throw RefCountBadAlloc(tools::ToString( } catch (boost::interprocess::bad_alloc& ba) {
"Insufficient space in the reference count segment ", throw RefCountBadAlloc(tools::ToString("Insufficient space in the reference count segment ", otherMsg.fRegionId, ", original exception: bad_alloc: ", ba.what()));
otherMsg.fRegionId, }
", original exception: bad_alloc: ", } else {
ba.what())); fRegionPtr->GetRefCountAddressFromHandle(otherMsg.fShared)->Increment();
}
} else { // if RefCount segment size is 0, store the ref count in the managed segment
if (otherMsg.fShared < 0) { // if UR msg is not yet shared
char* ptr = fManager.Allocate(2, 0);
// point the fShared in the unmanaged region message to the refCount holder
otherMsg.fShared = fManager.GetHandleFromAddress(ptr, fSegmentId);
// the message needs to be able to locate in which segment the refCount is stored
otherMsg.fSegmentId = fSegmentId;
ShmHeader::IncrementRefCount(ptr);
} else { // if the UR msg is already shared
fManager.GetSegment(otherMsg.fSegmentId);
ShmHeader::IncrementRefCount(fManager.GetAddressFromHandle(otherMsg.fShared, otherMsg.fSegmentId));
} }
} else {
fRegionPtr->GetRefCountAddressFromHandle(otherMsg.fShared)->Increment();
} }
} }
@@ -357,10 +372,21 @@ class Message final : public fair::mq::Message
if (!fRegionPtr) { if (!fRegionPtr) {
throw TransportError(tools::ToString("Cannot get unmanaged region with id ", fRegionId)); throw TransportError(tools::ToString("Cannot get unmanaged region with id ", fRegionId));
} }
uint16_t refCount = fRegionPtr->GetRefCountAddressFromHandle(fShared)->Decrement(); if (fRegionPtr->fRcSegmentSize > 0) {
if (refCount == 1) { uint16_t refCount = fRegionPtr->GetRefCountAddressFromHandle(fShared)->Decrement();
fRegionPtr->RemoveRefCount(*(fRegionPtr->GetRefCountAddressFromHandle(fShared))); if (refCount == 1) {
ReleaseUnmanagedRegionBlock(); fRegionPtr->RemoveRefCount(*(fRegionPtr->GetRefCountAddressFromHandle(fShared)));
ReleaseUnmanagedRegionBlock();
}
} else { // if RefCount segment size is 0, get the ref count from the managed segment
// make sure segment is initialized in this transport
fManager.GetSegment(fSegmentId);
// release unmanaged region block if ref count is one
uint16_t refCount = ShmHeader::DecrementRefCount(fManager.GetAddressFromHandle(fShared, fSegmentId));
if (refCount == 1) {
fManager.Deallocate(fShared, fSegmentId);
ReleaseUnmanagedRegionBlock();
}
} }
} else { } else {
ReleaseUnmanagedRegionBlock(); ReleaseUnmanagedRegionBlock();

View File

@@ -274,7 +274,7 @@ bool Monitor::PrintShm(const ShmId& shmId)
try { try {
managed_shared_memory rcCountSegment(open_read_only, MakeShmName(shmId.shmId, "rrc", id).c_str()); managed_shared_memory rcCountSegment(open_read_only, MakeShmName(shmId.shmId, "rrc", id).c_str());
ss << ", rcCountSegment size: " << rcCountSegment.get_size(); ss << ", rcCountSegment size: " << rcCountSegment.get_size() << ", free: " << rcCountSegment.get_free_memory();
} catch (bie&) { } catch (bie&) {
ss << ", rcCountSegment: not found"; ss << ", rcCountSegment: not found";
} }

View File

@@ -65,6 +65,7 @@ struct UnmanagedRegion
, fShmemObject() , fShmemObject()
, fFile(nullptr) , fFile(nullptr)
, fFileMapping() , fFileMapping()
, fRcSegmentSize(cfg.rcSegmentSize)
, fQueue(nullptr) , fQueue(nullptr)
, fCallback(nullptr) , fCallback(nullptr)
, fBulkCallback(nullptr) , fBulkCallback(nullptr)
@@ -146,11 +147,13 @@ struct UnmanagedRegion
LOG(debug) << "Successfully zeroed free memory of region " << id << "."; LOG(debug) << "Successfully zeroed free memory of region " << id << ".";
} }
InitializeRefCountSegment(fRcSegmentSize);
if (fControlling && created) { if (fControlling && created) {
Register(shmId, cfg); Register(shmId, cfg);
} }
LOG(debug) << (created ? "Created" : "Opened") << " unmanaged shared memory region: " << fName << " (" << (fControlling ? "controller" : "viewer") << ")"; LOG(debug) << (created ? "Created" : "Opened") << " unmanaged shared memory region: " << fName << " (" << (fControlling ? "controller" : "viewer") << "), refCount segment size: " << fRcSegmentSize;
} }
UnmanagedRegion() = delete; UnmanagedRegion() = delete;
@@ -264,6 +267,7 @@ struct UnmanagedRegion
std::condition_variable fBlockSendCV; std::condition_variable fBlockSendCV;
std::vector<RegionBlock> fBlocksToFree; std::vector<RegionBlock> fBlocksToFree;
const std::size_t fAckBunchSize = 256; const std::size_t fAckBunchSize = 256;
uint64_t fRcSegmentSize;
std::unique_ptr<boost::interprocess::message_queue> fQueue; std::unique_ptr<boost::interprocess::message_queue> fQueue;
std::unique_ptr<boost::interprocess::managed_shared_memory> fRefCountSegment; std::unique_ptr<boost::interprocess::managed_shared_memory> fRefCountSegment;
std::unique_ptr<RefCountPool> fRefCountPool; std::unique_ptr<RefCountPool> fRefCountPool;
@@ -319,7 +323,7 @@ struct UnmanagedRegion
void InitializeRefCountSegment(uint64_t size) void InitializeRefCountSegment(uint64_t size)
{ {
using namespace boost::interprocess; using namespace boost::interprocess;
if (!fRefCountSegment) { if (!fRefCountSegment && size > 0) {
fRefCountSegment = std::make_unique<managed_shared_memory>(open_or_create, fRefCountSegmentName.c_str(), size); fRefCountSegment = std::make_unique<managed_shared_memory>(open_or_create, fRefCountSegmentName.c_str(), size);
LOG(trace) << "shmem: initialized ref count segment: " << fRefCountSegmentName; LOG(trace) << "shmem: initialized ref count segment: " << fRefCountSegmentName;
fRefCountPool = std::make_unique<RefCountPool>(fRefCountSegment->get_segment_manager()); fRefCountPool = std::make_unique<RefCountPool>(fRefCountSegment->get_segment_manager());

View File

@@ -287,8 +287,10 @@ auto ZeroCopy(bool expandedShmMetadata = false) -> void
// The "zero copy" property of the Copy() method is an implementation detail and is not guaranteed. // The "zero copy" property of the Copy() method is an implementation detail and is not guaranteed.
// Currently it holds true for the shmem (across devices) and for zeromq (within same device) transports. // Currently it holds true for the shmem (across devices) and for zeromq (within same device) transports.
auto ZeroCopyFromUnmanaged(string const& address, bool expandedShmMetadata = false) -> void auto ZeroCopyFromUnmanaged(string const& address, bool expandedShmMetadata, uint64_t rcSegmentSize) -> void
{ {
fair::Logger::SetConsoleSeverity(fair::Severity::debug);
ProgOptions config1; ProgOptions config1;
ProgOptions config2; ProgOptions config2;
string session(tools::Uuid()); string session(tools::Uuid());
@@ -311,11 +313,13 @@ auto ZeroCopyFromUnmanaged(string const& address, bool expandedShmMetadata = fal
const size_t msgSize{100}; const size_t msgSize{100};
const size_t regionSize{1000000}; const size_t regionSize{1000000};
RegionConfig cfg;
cfg.rcSegmentSize = rcSegmentSize;
tools::Semaphore blocker; tools::Semaphore blocker;
auto region = factory1->CreateUnmanagedRegion(regionSize, [&blocker](void*, size_t, void*) { auto region = factory1->CreateUnmanagedRegion(regionSize, [&blocker](void*, size_t, void*) {
blocker.Signal(); blocker.Signal();
}); }, cfg);
{ {
Channel push("Push", "push", factory1); Channel push("Push", "push", factory1);
@@ -461,12 +465,22 @@ TEST(ZeroCopy, shmem_expanded_metadata) // NOLINT
TEST(ZeroCopyFromUnmanaged, shmem) // NOLINT TEST(ZeroCopyFromUnmanaged, shmem) // NOLINT
{ {
ZeroCopyFromUnmanaged("ipc://test_zerocopy_unmanaged"); ZeroCopyFromUnmanaged("ipc://test_zerocopy_unmanaged", false, 10000000);
} }
TEST(ZeroCopyFromUnmanaged, shmem_expanded_metadata) // NOLINT TEST(ZeroCopyFromUnmanaged, shmem_expanded_metadata) // NOLINT
{ {
ZeroCopyFromUnmanaged("ipc://test_zerocopy_unmanaged", true); ZeroCopyFromUnmanaged("ipc://test_zerocopy_unmanaged_expanded", true, 10000000);
}
TEST(ZeroCopyFromUnmanaged, shmem_no_rc_segment) // NOLINT
{
ZeroCopyFromUnmanaged("ipc://test_zerocopy_unmanaged_no_rc_segment", false, 0);
}
TEST(ZeroCopyFromUnmanaged, shmem_expanded_metadata_no_rc_segment) // NOLINT
{
ZeroCopyFromUnmanaged("ipc://test_zerocopy_unmanaged_expanded_no_rc_segment", true, 0);
} }
} // namespace } // namespace