mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-16 01:51:45 +00:00
Compare commits
10 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
2df3d909fa | ||
|
05a2ae6a31 | ||
|
58ffdfd1f4 | ||
|
addfd071bb | ||
|
2d27abc533 | ||
|
faf577086a | ||
|
ff1f9b94ef | ||
|
34e8a24c86 | ||
|
7567a10513 | ||
|
424e22b41a |
@@ -61,7 +61,7 @@ function(add_example)
|
||||
set(FAIRMQ_BIN_DIR ${CMAKE_BINARY_DIR}/fairmq)
|
||||
foreach(script IN LISTS scripts)
|
||||
set(script_file "${script_prefix}-${script}.sh")
|
||||
configure_file("${CMAKE_CURRENT_SOURCE_DIR}/${script_file}.in" "${CMAKE_CURRENT_BINARY_DIR}/${script_file}")
|
||||
configure_file("${CMAKE_CURRENT_SOURCE_DIR}/${script_file}.in" "${CMAKE_CURRENT_BINARY_DIR}/${script_file}" @ONLY)
|
||||
endforeach()
|
||||
|
||||
if(ARG_CONFIG)
|
||||
@@ -119,7 +119,7 @@ function(add_example)
|
||||
set(FAIRMQ_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}/fairmq)
|
||||
foreach(script IN LISTS scripts)
|
||||
set(script_file "${script_prefix}-${script}.sh")
|
||||
configure_file("${CMAKE_CURRENT_SOURCE_DIR}/${script_file}.in" "${CMAKE_CURRENT_BINARY_DIR}/${script_file}_install")
|
||||
configure_file("${CMAKE_CURRENT_SOURCE_DIR}/${script_file}.in" "${CMAKE_CURRENT_BINARY_DIR}/${script_file}_install" @ONLY)
|
||||
install(
|
||||
PROGRAMS "${CMAKE_CURRENT_BINARY_DIR}/${script_file}_install"
|
||||
DESTINATION ${PROJECT_INSTALL_BINDIR}
|
||||
|
@@ -8,5 +8,5 @@
|
||||
|
||||
add_example(NAME region
|
||||
DEVICE sampler processor sink keep-alive
|
||||
SCRIPT region region-advanced
|
||||
SCRIPT region region-advanced region-advanced-external
|
||||
)
|
||||
|
95
examples/region/fairmq-start-ex-region-advanced-external.sh.in
Executable file
95
examples/region/fairmq-start-ex-region-advanced-external.sh.in
Executable file
@@ -0,0 +1,95 @@
|
||||
#!/bin/bash
|
||||
|
||||
export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@
|
||||
|
||||
transport=${1:-shmem}
|
||||
msgSize=${2:-1000000}
|
||||
|
||||
SAMPLER="fairmq-ex-region-sampler"
|
||||
SAMPLER+=" --id sampler1"
|
||||
# SAMPLER+=" --sampling-rate 10"
|
||||
SAMPLER+=" --severity debug"
|
||||
SAMPLER+=" --msg-size $msgSize"
|
||||
SAMPLER+=" --transport $transport"
|
||||
SAMPLER+=" --shmid 1"
|
||||
SAMPLER+=" --shm-monitor false"
|
||||
SAMPLER+=" --rc-segment-size 200000000"
|
||||
SAMPLER+=" --external-region true"
|
||||
SAMPLER+=" --shm-no-cleanup true"
|
||||
SAMPLER+=" --chan-name data1"
|
||||
SAMPLER+=" --channel-config name=data1,type=push,method=bind,address=tcp://127.0.0.1:7777"
|
||||
xterm -geometry 90x60+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER &
|
||||
|
||||
PROCESSOR1="fairmq-ex-region-processor"
|
||||
PROCESSOR1+=" --id processor1"
|
||||
PROCESSOR1+=" --severity debug"
|
||||
PROCESSOR1+=" --transport $transport"
|
||||
PROCESSOR1+=" --shmid 1"
|
||||
PROCESSOR1+=" --shm-segment-id 1"
|
||||
PROCESSOR1+=" --shm-monitor false"
|
||||
PROCESSOR1+=" --shm-no-cleanup true"
|
||||
PROCESSOR1+=" --channel-config name=data1,type=pull,method=connect,address=tcp://127.0.0.1:7777"
|
||||
PROCESSOR1+=" name=data2,type=push,method=bind,address=tcp://127.0.0.1:7778"
|
||||
PROCESSOR1+=" name=data3,type=push,method=bind,address=tcp://127.0.0.1:7779"
|
||||
xterm -geometry 90x40+550+40 -hold -e @EX_BIN_DIR@/$PROCESSOR1 &
|
||||
|
||||
PROCESSOR2="fairmq-ex-region-processor"
|
||||
PROCESSOR2+=" --id processor2"
|
||||
PROCESSOR2+=" --severity debug"
|
||||
PROCESSOR2+=" --transport $transport"
|
||||
PROCESSOR2+=" --shmid 1"
|
||||
PROCESSOR2+=" --shm-segment-id 2"
|
||||
PROCESSOR2+=" --shm-monitor false"
|
||||
PROCESSOR2+=" --shm-no-cleanup true"
|
||||
PROCESSOR2+=" --channel-config name=data1,type=pull,method=connect,address=tcp://127.0.0.1:7777"
|
||||
PROCESSOR2+=" name=data2,type=push,method=bind,address=tcp://127.0.0.1:7788"
|
||||
PROCESSOR2+=" name=data3,type=push,method=bind,address=tcp://127.0.0.1:7789"
|
||||
xterm -geometry 90x40+550+600 -hold -e @EX_BIN_DIR@/$PROCESSOR2 &
|
||||
|
||||
SINK1_1="fairmq-ex-region-sink"
|
||||
SINK1_1+=" --id sink1_1"
|
||||
SINK1_1+=" --severity debug"
|
||||
SINK1_1+=" --chan-name data2"
|
||||
SINK1_1+=" --transport $transport"
|
||||
SINK1_1+=" --shmid 1"
|
||||
SINK1_1+=" --shm-segment-id 1"
|
||||
SINK1_1+=" --shm-monitor false"
|
||||
SINK1_1+=" --shm-no-cleanup true"
|
||||
SINK1_1+=" --channel-config name=data2,type=pull,method=connect,address=tcp://127.0.0.1:7778"
|
||||
xterm -geometry 90x20+1100+0 -hold -e @EX_BIN_DIR@/$SINK1_1 &
|
||||
|
||||
SINK1_2="fairmq-ex-region-sink"
|
||||
SINK1_2+=" --id sink1_2"
|
||||
SINK1_2+=" --severity debug"
|
||||
SINK1_2+=" --chan-name data3"
|
||||
SINK1_2+=" --transport $transport"
|
||||
SINK1_2+=" --shmid 1"
|
||||
SINK1_2+=" --shm-segment-id 1"
|
||||
SINK1_2+=" --shm-monitor false"
|
||||
SINK1_2+=" --shm-no-cleanup true"
|
||||
SINK1_2+=" --channel-config name=data3,type=pull,method=connect,address=tcp://127.0.0.1:7779"
|
||||
xterm -geometry 90x20+1100+300 -hold -e @EX_BIN_DIR@/$SINK1_2 &
|
||||
|
||||
SINK2_1="fairmq-ex-region-sink"
|
||||
SINK2_1+=" --id sink2_1"
|
||||
SINK2_1+=" --severity debug"
|
||||
SINK2_1+=" --chan-name data2"
|
||||
SINK2_1+=" --transport $transport"
|
||||
SINK2_1+=" --shmid 1"
|
||||
SINK2_1+=" --shm-segment-id 2"
|
||||
SINK2_1+=" --shm-monitor false"
|
||||
SINK2_1+=" --shm-no-cleanup true"
|
||||
SINK2_1+=" --channel-config name=data2,type=pull,method=connect,address=tcp://127.0.0.1:7788"
|
||||
xterm -geometry 90x20+1100+600 -hold -e @EX_BIN_DIR@/$SINK2_1 &
|
||||
|
||||
SINK2_2="fairmq-ex-region-sink"
|
||||
SINK2_2+=" --id sink2_2"
|
||||
SINK2_2+=" --severity debug"
|
||||
SINK2_2+=" --chan-name data3"
|
||||
SINK2_2+=" --transport $transport"
|
||||
SINK2_2+=" --shmid 1"
|
||||
SINK2_2+=" --shm-segment-id 2"
|
||||
SINK2_2+=" --shm-monitor false"
|
||||
SINK2_2+=" --shm-no-cleanup true"
|
||||
SINK2_2+=" --channel-config name=data3,type=pull,method=connect,address=tcp://127.0.0.1:7789"
|
||||
xterm -geometry 90x20+1100+900 -hold -e @EX_BIN_DIR@/$SINK2_2 &
|
@@ -2,16 +2,8 @@
|
||||
|
||||
export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@
|
||||
|
||||
transport="shmem"
|
||||
msgSize="1000000"
|
||||
|
||||
if [[ $1 =~ ^[a-z]+$ ]]; then
|
||||
transport=$1
|
||||
fi
|
||||
|
||||
if [[ $2 =~ ^[0-9]+$ ]]; then
|
||||
msgSize=$1
|
||||
fi
|
||||
transport=${1:-shmem}
|
||||
msgSize=${2:-1000000}
|
||||
|
||||
SAMPLER="fairmq-ex-region-sampler"
|
||||
SAMPLER+=" --id sampler1"
|
||||
@@ -19,35 +11,70 @@ SAMPLER+=" --id sampler1"
|
||||
SAMPLER+=" --severity debug"
|
||||
SAMPLER+=" --msg-size $msgSize"
|
||||
SAMPLER+=" --transport $transport"
|
||||
SAMPLER+=" --rc-segment-size 0"
|
||||
SAMPLER+=" --shm-monitor true"
|
||||
SAMPLER+=" --chan-name data1"
|
||||
SAMPLER+=" --channel-config name=data1,type=push,method=bind,address=tcp://127.0.0.1:7777"
|
||||
xterm -geometry 120x60+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER &
|
||||
xterm -geometry 90x60+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER &
|
||||
|
||||
PROCESSOR="fairmq-ex-region-processor"
|
||||
PROCESSOR+=" --id processor1"
|
||||
PROCESSOR+=" --severity debug"
|
||||
PROCESSOR+=" --transport $transport"
|
||||
PROCESSOR+=" --shm-monitor true"
|
||||
PROCESSOR+=" --channel-config name=data1,type=pull,method=connect,address=tcp://127.0.0.1:7777"
|
||||
PROCESSOR+=" name=data2,type=push,method=bind,address=tcp://127.0.0.1:7778"
|
||||
PROCESSOR+=" name=data3,type=push,method=bind,address=tcp://127.0.0.1:7779"
|
||||
xterm -geometry 120x60+750+0 -hold -e @EX_BIN_DIR@/$PROCESSOR &
|
||||
PROCESSOR1="fairmq-ex-region-processor"
|
||||
PROCESSOR1+=" --id processor1"
|
||||
PROCESSOR1+=" --severity debug"
|
||||
PROCESSOR1+=" --transport $transport"
|
||||
PROCESSOR1+=" --shm-segment-id 1"
|
||||
PROCESSOR1+=" --shm-monitor true"
|
||||
PROCESSOR1+=" --channel-config name=data1,type=pull,method=connect,address=tcp://127.0.0.1:7777"
|
||||
PROCESSOR1+=" name=data2,type=push,method=bind,address=tcp://127.0.0.1:7778"
|
||||
PROCESSOR1+=" name=data3,type=push,method=bind,address=tcp://127.0.0.1:7779"
|
||||
xterm -geometry 90x40+550+40 -hold -e @EX_BIN_DIR@/$PROCESSOR1 &
|
||||
|
||||
SINK1="fairmq-ex-region-sink"
|
||||
SINK1+=" --id sink1"
|
||||
SINK1+=" --severity debug"
|
||||
SINK1+=" --chan-name data2"
|
||||
SINK1+=" --transport $transport"
|
||||
SINK1+=" --shm-monitor true"
|
||||
SINK1+=" --channel-config name=data2,type=pull,method=connect,address=tcp://127.0.0.1:7778"
|
||||
xterm -geometry 120x32+1500+0 -hold -e @EX_BIN_DIR@/$SINK1 &
|
||||
PROCESSOR2="fairmq-ex-region-processor"
|
||||
PROCESSOR2+=" --id processor2"
|
||||
PROCESSOR2+=" --severity debug"
|
||||
PROCESSOR2+=" --transport $transport"
|
||||
PROCESSOR2+=" --shm-segment-id 2"
|
||||
PROCESSOR2+=" --shm-monitor true"
|
||||
PROCESSOR2+=" --channel-config name=data1,type=pull,method=connect,address=tcp://127.0.0.1:7777"
|
||||
PROCESSOR2+=" name=data2,type=push,method=bind,address=tcp://127.0.0.1:7788"
|
||||
PROCESSOR2+=" name=data3,type=push,method=bind,address=tcp://127.0.0.1:7789"
|
||||
xterm -geometry 90x40+550+600 -hold -e @EX_BIN_DIR@/$PROCESSOR2 &
|
||||
|
||||
SINK2="fairmq-ex-region-sink"
|
||||
SINK2+=" --id sink2"
|
||||
SINK2+=" --severity debug"
|
||||
SINK2+=" --chan-name data3"
|
||||
SINK2+=" --transport $transport"
|
||||
SINK2+=" --shm-monitor true"
|
||||
SINK2+=" --channel-config name=data3,type=pull,method=connect,address=tcp://127.0.0.1:7779"
|
||||
xterm -geometry 120x32+1500+500 -hold -e @EX_BIN_DIR@/$SINK2 &
|
||||
SINK1_1="fairmq-ex-region-sink"
|
||||
SINK1_1+=" --id sink1_1"
|
||||
SINK1_1+=" --severity debug"
|
||||
SINK1_1+=" --chan-name data2"
|
||||
SINK1_1+=" --transport $transport"
|
||||
SINK1_1+=" --shm-segment-id 1"
|
||||
SINK1_1+=" --shm-monitor true"
|
||||
SINK1_1+=" --channel-config name=data2,type=pull,method=connect,address=tcp://127.0.0.1:7778"
|
||||
xterm -geometry 90x20+1100+0 -hold -e @EX_BIN_DIR@/$SINK1_1 &
|
||||
|
||||
SINK1_2="fairmq-ex-region-sink"
|
||||
SINK1_2+=" --id sink1_2"
|
||||
SINK1_2+=" --severity debug"
|
||||
SINK1_2+=" --chan-name data3"
|
||||
SINK1_2+=" --transport $transport"
|
||||
SINK1_2+=" --shm-segment-id 1"
|
||||
SINK1_2+=" --shm-monitor true"
|
||||
SINK1_2+=" --channel-config name=data3,type=pull,method=connect,address=tcp://127.0.0.1:7779"
|
||||
xterm -geometry 90x20+1100+300 -hold -e @EX_BIN_DIR@/$SINK1_2 &
|
||||
|
||||
SINK2_1="fairmq-ex-region-sink"
|
||||
SINK2_1+=" --id sink2_1"
|
||||
SINK2_1+=" --severity debug"
|
||||
SINK2_1+=" --chan-name data2"
|
||||
SINK2_1+=" --transport $transport"
|
||||
SINK2_1+=" --shm-segment-id 2"
|
||||
SINK2_1+=" --shm-monitor true"
|
||||
SINK2_1+=" --channel-config name=data2,type=pull,method=connect,address=tcp://127.0.0.1:7788"
|
||||
xterm -geometry 90x20+1100+600 -hold -e @EX_BIN_DIR@/$SINK2_1 &
|
||||
|
||||
SINK2_2="fairmq-ex-region-sink"
|
||||
SINK2_2+=" --id sink2_2"
|
||||
SINK2_2+=" --severity debug"
|
||||
SINK2_2+=" --chan-name data3"
|
||||
SINK2_2+=" --transport $transport"
|
||||
SINK2_2+=" --shm-segment-id 2"
|
||||
SINK2_2+=" --shm-monitor true"
|
||||
SINK2_2+=" --channel-config name=data3,type=pull,method=connect,address=tcp://127.0.0.1:7789"
|
||||
xterm -geometry 90x20+1100+900 -hold -e @EX_BIN_DIR@/$SINK2_2 &
|
||||
|
@@ -2,16 +2,8 @@
|
||||
|
||||
export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@
|
||||
|
||||
transport="shmem"
|
||||
msgSize="1000000"
|
||||
|
||||
if [[ $1 =~ ^[a-z]+$ ]]; then
|
||||
transport=$1
|
||||
fi
|
||||
|
||||
if [[ $2 =~ ^[0-9]+$ ]]; then
|
||||
msgSize=$1
|
||||
fi
|
||||
transport=${1:-shmem}
|
||||
msgSize=${2:-1000000}
|
||||
|
||||
SAMPLER="fairmq-ex-region-sampler"
|
||||
SAMPLER+=" --id sampler1"
|
||||
|
@@ -95,10 +95,11 @@ struct ShmManager
|
||||
uint64_t size = stoull(conf.at(1));
|
||||
fair::mq::RegionConfig cfg;
|
||||
cfg.id = id;
|
||||
cfg.rcSegmentSize = 0;
|
||||
cfg.size = size;
|
||||
regionCfgs.push_back(cfg);
|
||||
|
||||
auto ret = regions.emplace(id, make_unique<fair::mq::shmem::UnmanagedRegion>(shmId, id, size));
|
||||
auto ret = regions.emplace(id, make_unique<fair::mq::shmem::UnmanagedRegion>(shmId, cfg));
|
||||
fair::mq::shmem::UnmanagedRegion& region = *(ret.first->second);
|
||||
LOG(info) << "Created unamanged region " << id << " of size " << region.GetSize()
|
||||
<< ", starting at " << region.GetData() << ". Locking...";
|
||||
|
@@ -36,16 +36,22 @@ struct Processor : Device
|
||||
Channel& dataOut2 = GetChannel("data3", 0);
|
||||
|
||||
while (!NewStatePending()) {
|
||||
auto msg(dataIn.Transport()->CreateMessage());
|
||||
dataIn.Receive(msg);
|
||||
fair::mq::Parts inParts;
|
||||
dataIn.Receive(inParts);
|
||||
|
||||
fair::mq::MessagePtr msgCopy1(NewMessage());
|
||||
msgCopy1->Copy(*msg);
|
||||
fair::mq::MessagePtr msgCopy2(NewMessage());
|
||||
msgCopy2->Copy(*msg);
|
||||
fair::mq::Parts outParts1;
|
||||
fair::mq::Parts outParts2;
|
||||
|
||||
dataOut1.Send(msgCopy1);
|
||||
dataOut2.Send(msgCopy2);
|
||||
for (const auto& inPart : inParts) {
|
||||
outParts1.AddPart(NewMessage());
|
||||
outParts1.fParts.back()->Copy(*inPart);
|
||||
|
||||
outParts2.AddPart(NewMessage());
|
||||
outParts2.fParts.back()->Copy(*inPart);
|
||||
}
|
||||
|
||||
dataOut1.Send(outParts1);
|
||||
dataOut2.Send(outParts2);
|
||||
|
||||
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
|
||||
LOG(info) << "Configured max number of iterations reached. Leaving RUNNING state.";
|
||||
|
@@ -26,6 +26,7 @@ struct Sampler : fair::mq::Device
|
||||
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
|
||||
fChanName = fConfig->GetProperty<std::string>("chan-name");
|
||||
fSamplingRate = fConfig->GetProperty<float>("sampling-rate");
|
||||
fRCSegmentSize = fConfig->GetProperty<uint64_t>("rc-segment-size");
|
||||
|
||||
GetChannel(fChanName, 0).Transport()->SubscribeToRegionEvents([](fair::mq::RegionInfo info) {
|
||||
LOG(info) << "Region event: " << info.event << ": "
|
||||
@@ -45,6 +46,7 @@ struct Sampler : fair::mq::Device
|
||||
}
|
||||
regionCfg.lock = !fExternalRegion; // mlock region after creation
|
||||
regionCfg.zero = !fExternalRegion; // zero region content after creation
|
||||
regionCfg.rcSegmentSize = fRCSegmentSize; // size of the corresponding reference count segment
|
||||
fRegion = fair::mq::UnmanagedRegionPtr(NewUnmanagedRegionFor(
|
||||
fChanName, // region is created using the transport of this channel...
|
||||
0, // ... and this sub-channel
|
||||
@@ -66,17 +68,22 @@ struct Sampler : fair::mq::Device
|
||||
fair::mq::tools::RateLimiter rateLimiter(fSamplingRate);
|
||||
|
||||
while (!NewStatePending()) {
|
||||
fair::mq::MessagePtr msg(NewMessageFor(fChanName, // channel
|
||||
0, // sub-channel
|
||||
fRegion, // region
|
||||
fRegion->GetData(), // ptr within region
|
||||
fMsgSize, // offset from ptr
|
||||
nullptr // hint
|
||||
));
|
||||
fair::mq::Parts parts;
|
||||
// make 64 parts
|
||||
for (int i = 0; i < 64; ++i) {
|
||||
parts.AddPart(NewMessageFor(
|
||||
fChanName, // channel
|
||||
0, // sub-channel
|
||||
fRegion, // region
|
||||
fRegion->GetData(), // ptr within region
|
||||
fMsgSize, // offset from ptr
|
||||
nullptr // hint
|
||||
));
|
||||
}
|
||||
|
||||
std::lock_guard<std::mutex> lock(fMtx);
|
||||
++fNumUnackedMsgs;
|
||||
if (Send(msg, fChanName, 0) > 0) {
|
||||
fNumUnackedMsgs += parts.Size();
|
||||
if (Send(parts, fChanName, 0) > 0) {
|
||||
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
|
||||
LOG(info) << "Configured maximum number of iterations reached. Stopping sending.";
|
||||
break;
|
||||
@@ -117,6 +124,7 @@ struct Sampler : fair::mq::Device
|
||||
uint32_t fLinger = 100;
|
||||
uint64_t fMaxIterations = 0;
|
||||
uint64_t fNumIterations = 0;
|
||||
uint64_t fRCSegmentSize = 10000000;
|
||||
fair::mq::UnmanagedRegionPtr fRegion = nullptr;
|
||||
std::mutex fMtx;
|
||||
uint64_t fNumUnackedMsgs = 0;
|
||||
@@ -132,7 +140,8 @@ void addCustomOptions(bpo::options_description& options)
|
||||
("sampling-rate", bpo::value<float>()->default_value(0.), "Sampling rate (Hz).")
|
||||
("region-linger", bpo::value<uint32_t>()->default_value(100), "Linger period for regions")
|
||||
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)")
|
||||
("external-region", bpo::value<bool>()->default_value(false), "Use region created by another process");
|
||||
("external-region", bpo::value<bool>()->default_value(false), "Use region created by another process")
|
||||
("rc-segment-size", bpo::value<uint64_t>()->default_value(10000000), "Size of the reference count segment for Unamanged Region");
|
||||
}
|
||||
|
||||
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
|
||||
|
@@ -36,12 +36,8 @@ struct Sink : Device
|
||||
Channel& dataIn = GetChannel(fChanName, 0);
|
||||
|
||||
while (!NewStatePending()) {
|
||||
auto msg(dataIn.Transport()->CreateMessage());
|
||||
dataIn.Receive(msg);
|
||||
|
||||
// void* ptr = msg->GetData();
|
||||
// char* cptr = static_cast<char*>(ptr);
|
||||
// LOG(info) << "check: " << cptr[3];
|
||||
fair::mq::Parts parts;
|
||||
dataIn.Receive(parts);
|
||||
|
||||
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
|
||||
LOG(info) << "Configured max number of iterations reached. Leaving RUNNING state.";
|
||||
|
@@ -76,6 +76,11 @@ struct MessageBadAlloc : std::runtime_error
|
||||
using std::runtime_error::runtime_error;
|
||||
};
|
||||
|
||||
struct RefCountBadAlloc : std::runtime_error
|
||||
{
|
||||
using std::runtime_error::runtime_error;
|
||||
};
|
||||
|
||||
} // namespace fair::mq
|
||||
|
||||
using fairmq_free_fn [[deprecated("Use fair::mq::FreeFn")]] = fair::mq::FreeFn;
|
||||
|
@@ -134,7 +134,7 @@ struct RegionConfig
|
||||
int creationFlags = 0; /// flags passed to the underlying transport on region creation
|
||||
int64_t userFlags = 0; /// custom flags that have no effect on the transport, but can be retrieved from the region by the user
|
||||
uint64_t size = 0; /// region size
|
||||
uint64_t rcSegmentSize = 10000000; /// size of the segment that stores reference counts when "soft"-copying the messages
|
||||
uint64_t rcSegmentSize = 100000000; /// size of the segment that stores reference counts when "soft"-copying the messages
|
||||
std::string path = ""; /// file path, if the region is backed by a file
|
||||
std::optional<uint16_t> id = std::nullopt; /// region id
|
||||
uint32_t linger = 100; /// delay in ms before region destruction to collect outstanding events
|
||||
|
@@ -172,8 +172,6 @@ enum class AllocationAlgorithm : int
|
||||
|
||||
struct RegionInfo
|
||||
{
|
||||
static constexpr uint64_t DefaultRcSegmentSize = 10000000;
|
||||
|
||||
RegionInfo(const char* path, int flags, uint64_t userFlags, uint64_t size, uint64_t rcSegmentSize, const VoidAlloc& alloc)
|
||||
: fPath(path, alloc)
|
||||
, fCreationFlags(flags)
|
||||
@@ -183,14 +181,6 @@ struct RegionInfo
|
||||
, fDestroyed(false)
|
||||
{}
|
||||
|
||||
RegionInfo(const VoidAlloc& alloc)
|
||||
: RegionInfo("", 0, 0, 0, DefaultRcSegmentSize, alloc)
|
||||
{}
|
||||
|
||||
RegionInfo(const char* path, int flags, uint64_t userFlags, uint64_t size, const VoidAlloc& alloc)
|
||||
: RegionInfo(path, flags, userFlags, size, DefaultRcSegmentSize, alloc)
|
||||
{}
|
||||
|
||||
Str fPath;
|
||||
int fCreationFlags;
|
||||
uint64_t fUserFlags;
|
||||
|
@@ -323,7 +323,6 @@ class Manager
|
||||
}
|
||||
|
||||
const uint16_t id = cfg.id.value();
|
||||
const uint64_t rcSegmentSize = cfg.rcSegmentSize;
|
||||
|
||||
std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
|
||||
|
||||
@@ -340,6 +339,12 @@ class Manager
|
||||
LOG(debug) << "Unmanaged region (view) already present, promoting to controller";
|
||||
region->BecomeController(cfg);
|
||||
} else {
|
||||
// we need to update local config, if the region information already exists
|
||||
auto info = fShmRegions->find(id);
|
||||
if (info != fShmRegions->end()) {
|
||||
cfg.rcSegmentSize = info->second.fRCSegmentSize;
|
||||
}
|
||||
|
||||
auto res = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, size, true, cfg));
|
||||
region = res.first->second.get();
|
||||
}
|
||||
@@ -348,7 +353,6 @@ class Manager
|
||||
// start ack receiver only if a callback has been provided.
|
||||
if (callback || bulkCallback) {
|
||||
region->SetCallbacks(callback, bulkCallback);
|
||||
region->InitializeRefCountSegment(rcSegmentSize);
|
||||
region->InitializeQueues();
|
||||
region->StartAckSender();
|
||||
region->StartAckReceiver();
|
||||
@@ -401,19 +405,18 @@ class Manager
|
||||
|
||||
try {
|
||||
RegionConfig cfg;
|
||||
const uint64_t rcSegmentSize = cfg.rcSegmentSize;
|
||||
// get region info
|
||||
{
|
||||
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
|
||||
RegionInfo regionInfo = fShmRegions->at(id);
|
||||
cfg.id = id;
|
||||
cfg.creationFlags = regionInfo.fCreationFlags;
|
||||
cfg.rcSegmentSize = regionInfo.fRCSegmentSize;
|
||||
cfg.path = regionInfo.fPath.c_str();
|
||||
}
|
||||
// LOG(debug) << "Located remote region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'";
|
||||
|
||||
auto r = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, 0, false, std::move(cfg)));
|
||||
r.first->second->InitializeRefCountSegment(rcSegmentSize);
|
||||
r.first->second->InitializeQueues();
|
||||
r.first->second->StartAckSender();
|
||||
return r.first->second.get();
|
||||
@@ -482,6 +485,7 @@ class Manager
|
||||
cfg.id = info.id;
|
||||
cfg.creationFlags = regionInfo.fCreationFlags;
|
||||
cfg.path = regionInfo.fPath.c_str();
|
||||
cfg.rcSegmentSize = regionInfo.fRCSegmentSize;
|
||||
regionCfgs.emplace(info.id, cfg);
|
||||
// fill the ptr+size info after shmLock is released, to avoid constructing local region under it
|
||||
} else {
|
||||
@@ -503,10 +507,8 @@ class Manager
|
||||
if (it != fRegions.end()) {
|
||||
region = it->second.get();
|
||||
} else {
|
||||
const uint64_t rcSegmentSize = cfgIt->second.rcSegmentSize;
|
||||
auto r = fRegions.emplace(cfgIt->first, std::make_unique<UnmanagedRegion>(fShmId, 0, false, cfgIt->second));
|
||||
region = r.first->second.get();
|
||||
region->InitializeRefCountSegment(rcSegmentSize);
|
||||
region->InitializeQueues();
|
||||
region->StartAckSender();
|
||||
}
|
||||
|
@@ -251,7 +251,12 @@ class Message final : public fair::mq::Message
|
||||
if (!fRegionPtr) {
|
||||
throw TransportError(tools::ToString("Cannot get unmanaged region with id ", fRegionId));
|
||||
}
|
||||
return fRegionPtr->GetRefCountAddressFromHandle(fShared)->Get();
|
||||
if (fRegionPtr->fRcSegmentSize > 0) {
|
||||
return fRegionPtr->GetRefCountAddressFromHandle(fShared)->Get();
|
||||
} else {
|
||||
fManager.GetSegment(fSegmentId);
|
||||
return ShmHeader::RefCount(fManager.GetAddressFromHandle(fShared, fSegmentId));
|
||||
}
|
||||
}
|
||||
|
||||
void Copy(const fair::mq::Message& other) override
|
||||
@@ -277,11 +282,29 @@ class Message final : public fair::mq::Message
|
||||
if (!fRegionPtr) {
|
||||
throw TransportError(tools::ToString("Cannot get unmanaged region with id ", otherMsg.fRegionId));
|
||||
}
|
||||
if (otherMsg.fShared < 0) {
|
||||
// UR msg not yet shared, create the reference counting object with count 2
|
||||
otherMsg.fShared = fRegionPtr->HandleFromAddress(&(fRegionPtr->MakeRefCount(2)));
|
||||
} else {
|
||||
fRegionPtr->GetRefCountAddressFromHandle(otherMsg.fShared)->Increment();
|
||||
if (fRegionPtr->fRcSegmentSize > 0) {
|
||||
if (otherMsg.fShared < 0) {
|
||||
// UR msg not yet shared, create the reference counting object with count 2
|
||||
try {
|
||||
otherMsg.fShared = fRegionPtr->HandleFromAddress(&(fRegionPtr->MakeRefCount(2)));
|
||||
} catch (boost::interprocess::bad_alloc& ba) {
|
||||
throw RefCountBadAlloc(tools::ToString("Insufficient space in the reference count segment ", otherMsg.fRegionId, ", original exception: bad_alloc: ", ba.what()));
|
||||
}
|
||||
} else {
|
||||
fRegionPtr->GetRefCountAddressFromHandle(otherMsg.fShared)->Increment();
|
||||
}
|
||||
} else { // if RefCount segment size is 0, store the ref count in the managed segment
|
||||
if (otherMsg.fShared < 0) { // if UR msg is not yet shared
|
||||
char* ptr = fManager.Allocate(2, 0);
|
||||
// point the fShared in the unmanaged region message to the refCount holder
|
||||
otherMsg.fShared = fManager.GetHandleFromAddress(ptr, fSegmentId);
|
||||
// the message needs to be able to locate in which segment the refCount is stored
|
||||
otherMsg.fSegmentId = fSegmentId;
|
||||
ShmHeader::IncrementRefCount(ptr);
|
||||
} else { // if the UR msg is already shared
|
||||
fManager.GetSegment(otherMsg.fSegmentId);
|
||||
ShmHeader::IncrementRefCount(fManager.GetAddressFromHandle(otherMsg.fShared, otherMsg.fSegmentId));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -349,10 +372,21 @@ class Message final : public fair::mq::Message
|
||||
if (!fRegionPtr) {
|
||||
throw TransportError(tools::ToString("Cannot get unmanaged region with id ", fRegionId));
|
||||
}
|
||||
uint16_t refCount = fRegionPtr->GetRefCountAddressFromHandle(fShared)->Decrement();
|
||||
if (refCount == 1) {
|
||||
fRegionPtr->RemoveRefCount(*(fRegionPtr->GetRefCountAddressFromHandle(fShared)));
|
||||
ReleaseUnmanagedRegionBlock();
|
||||
if (fRegionPtr->fRcSegmentSize > 0) {
|
||||
uint16_t refCount = fRegionPtr->GetRefCountAddressFromHandle(fShared)->Decrement();
|
||||
if (refCount == 1) {
|
||||
fRegionPtr->RemoveRefCount(*(fRegionPtr->GetRefCountAddressFromHandle(fShared)));
|
||||
ReleaseUnmanagedRegionBlock();
|
||||
}
|
||||
} else { // if RefCount segment size is 0, get the ref count from the managed segment
|
||||
// make sure segment is initialized in this transport
|
||||
fManager.GetSegment(fSegmentId);
|
||||
// release unmanaged region block if ref count is one
|
||||
uint16_t refCount = ShmHeader::DecrementRefCount(fManager.GetAddressFromHandle(fShared, fSegmentId));
|
||||
if (refCount == 1) {
|
||||
fManager.Deallocate(fShared, fSegmentId);
|
||||
ReleaseUnmanagedRegionBlock();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
ReleaseUnmanagedRegionBlock();
|
||||
|
@@ -274,7 +274,7 @@ bool Monitor::PrintShm(const ShmId& shmId)
|
||||
|
||||
try {
|
||||
managed_shared_memory rcCountSegment(open_read_only, MakeShmName(shmId.shmId, "rrc", id).c_str());
|
||||
ss << ", rcCountSegment size: " << rcCountSegment.get_size();
|
||||
ss << ", rcCountSegment size: " << rcCountSegment.get_size() << ", free: " << rcCountSegment.get_free_memory();
|
||||
} catch (bie&) {
|
||||
ss << ", rcCountSegment: not found";
|
||||
}
|
||||
|
@@ -65,6 +65,7 @@ struct UnmanagedRegion
|
||||
, fShmemObject()
|
||||
, fFile(nullptr)
|
||||
, fFileMapping()
|
||||
, fRcSegmentSize(cfg.rcSegmentSize)
|
||||
, fQueue(nullptr)
|
||||
, fCallback(nullptr)
|
||||
, fBulkCallback(nullptr)
|
||||
@@ -146,11 +147,13 @@ struct UnmanagedRegion
|
||||
LOG(debug) << "Successfully zeroed free memory of region " << id << ".";
|
||||
}
|
||||
|
||||
InitializeRefCountSegment(fRcSegmentSize);
|
||||
|
||||
if (fControlling && created) {
|
||||
Register(shmId, cfg);
|
||||
}
|
||||
|
||||
LOG(debug) << (created ? "Created" : "Opened") << " unmanaged shared memory region: " << fName << " (" << (fControlling ? "controller" : "viewer") << ")";
|
||||
LOG(debug) << (created ? "Created" : "Opened") << " unmanaged shared memory region: " << fName << " (" << (fControlling ? "controller" : "viewer") << "), refCount segment size: " << fRcSegmentSize;
|
||||
}
|
||||
|
||||
UnmanagedRegion() = delete;
|
||||
@@ -264,6 +267,7 @@ struct UnmanagedRegion
|
||||
std::condition_variable fBlockSendCV;
|
||||
std::vector<RegionBlock> fBlocksToFree;
|
||||
const std::size_t fAckBunchSize = 256;
|
||||
uint64_t fRcSegmentSize;
|
||||
std::unique_ptr<boost::interprocess::message_queue> fQueue;
|
||||
std::unique_ptr<boost::interprocess::managed_shared_memory> fRefCountSegment;
|
||||
std::unique_ptr<RefCountPool> fRefCountPool;
|
||||
@@ -319,7 +323,7 @@ struct UnmanagedRegion
|
||||
void InitializeRefCountSegment(uint64_t size)
|
||||
{
|
||||
using namespace boost::interprocess;
|
||||
if (!fRefCountSegment) {
|
||||
if (!fRefCountSegment && size > 0) {
|
||||
fRefCountSegment = std::make_unique<managed_shared_memory>(open_or_create, fRefCountSegmentName.c_str(), size);
|
||||
LOG(trace) << "shmem: initialized ref count segment: " << fRefCountSegmentName;
|
||||
fRefCountPool = std::make_unique<RefCountPool>(fRefCountSegment->get_segment_manager());
|
||||
|
@@ -287,8 +287,10 @@ auto ZeroCopy(bool expandedShmMetadata = false) -> void
|
||||
|
||||
// The "zero copy" property of the Copy() method is an implementation detail and is not guaranteed.
|
||||
// Currently it holds true for the shmem (across devices) and for zeromq (within same device) transports.
|
||||
auto ZeroCopyFromUnmanaged(string const& address, bool expandedShmMetadata = false) -> void
|
||||
auto ZeroCopyFromUnmanaged(string const& address, bool expandedShmMetadata, uint64_t rcSegmentSize) -> void
|
||||
{
|
||||
fair::Logger::SetConsoleSeverity(fair::Severity::debug);
|
||||
|
||||
ProgOptions config1;
|
||||
ProgOptions config2;
|
||||
string session(tools::Uuid());
|
||||
@@ -311,11 +313,13 @@ auto ZeroCopyFromUnmanaged(string const& address, bool expandedShmMetadata = fal
|
||||
|
||||
const size_t msgSize{100};
|
||||
const size_t regionSize{1000000};
|
||||
RegionConfig cfg;
|
||||
cfg.rcSegmentSize = rcSegmentSize;
|
||||
tools::Semaphore blocker;
|
||||
|
||||
auto region = factory1->CreateUnmanagedRegion(regionSize, [&blocker](void*, size_t, void*) {
|
||||
blocker.Signal();
|
||||
});
|
||||
}, cfg);
|
||||
|
||||
{
|
||||
Channel push("Push", "push", factory1);
|
||||
@@ -461,12 +465,22 @@ TEST(ZeroCopy, shmem_expanded_metadata) // NOLINT
|
||||
|
||||
TEST(ZeroCopyFromUnmanaged, shmem) // NOLINT
|
||||
{
|
||||
ZeroCopyFromUnmanaged("ipc://test_zerocopy_unmanaged");
|
||||
ZeroCopyFromUnmanaged("ipc://test_zerocopy_unmanaged", false, 10000000);
|
||||
}
|
||||
|
||||
TEST(ZeroCopyFromUnmanaged, shmem_expanded_metadata) // NOLINT
|
||||
{
|
||||
ZeroCopyFromUnmanaged("ipc://test_zerocopy_unmanaged", true);
|
||||
ZeroCopyFromUnmanaged("ipc://test_zerocopy_unmanaged_expanded", true, 10000000);
|
||||
}
|
||||
|
||||
TEST(ZeroCopyFromUnmanaged, shmem_no_rc_segment) // NOLINT
|
||||
{
|
||||
ZeroCopyFromUnmanaged("ipc://test_zerocopy_unmanaged_no_rc_segment", false, 0);
|
||||
}
|
||||
|
||||
TEST(ZeroCopyFromUnmanaged, shmem_expanded_metadata_no_rc_segment) // NOLINT
|
||||
{
|
||||
ZeroCopyFromUnmanaged("ipc://test_zerocopy_unmanaged_expanded_no_rc_segment", true, 0);
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
Reference in New Issue
Block a user