From 8e430c476467bd9d4bbee9612f543590bd9445a9 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Tue, 12 Sep 2023 13:00:17 +0200 Subject: [PATCH] Add example with ref-counted copy from unmanaged region --- examples/region/CMakeLists.txt | 3 +- .../fairmq-start-ex-region-advanced.sh.in | 53 +++++++++++++ examples/region/processor.cxx | 74 +++++++++++++++++++ examples/region/sampler.cxx | 23 ++++-- examples/region/sink.cxx | 19 ++--- 5 files changed, 157 insertions(+), 15 deletions(-) create mode 100755 examples/region/fairmq-start-ex-region-advanced.sh.in create mode 100644 examples/region/processor.cxx diff --git a/examples/region/CMakeLists.txt b/examples/region/CMakeLists.txt index 729d953d..e51806b0 100644 --- a/examples/region/CMakeLists.txt +++ b/examples/region/CMakeLists.txt @@ -7,5 +7,6 @@ ################################################################################ add_example(NAME region - DEVICE sampler sink keep-alive + DEVICE sampler processor sink keep-alive + SCRIPT region region-advanced ) diff --git a/examples/region/fairmq-start-ex-region-advanced.sh.in b/examples/region/fairmq-start-ex-region-advanced.sh.in new file mode 100755 index 00000000..3bf4e0f6 --- /dev/null +++ b/examples/region/fairmq-start-ex-region-advanced.sh.in @@ -0,0 +1,53 @@ +#!/bin/bash + +export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@ + +transport="shmem" +msgSize="1000000" + +if [[ $1 =~ ^[a-z]+$ ]]; then + transport=$1 +fi + +if [[ $2 =~ ^[0-9]+$ ]]; then + msgSize=$1 +fi + +SAMPLER="fairmq-ex-region-sampler" +SAMPLER+=" --id sampler1" +# SAMPLER+=" --sampling-rate 10" +SAMPLER+=" --severity debug" +SAMPLER+=" --msg-size $msgSize" +SAMPLER+=" --transport $transport" +SAMPLER+=" --shm-monitor true" +SAMPLER+=" --chan-name data1" +SAMPLER+=" --channel-config name=data1,type=push,method=bind,address=tcp://127.0.0.1:7777" +xterm -geometry 120x60+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER & + +PROCESSOR="fairmq-ex-region-processor" +PROCESSOR+=" --id processor1" +PROCESSOR+=" --severity debug" +PROCESSOR+=" --transport $transport" +PROCESSOR+=" --shm-monitor true" +PROCESSOR+=" --channel-config name=data1,type=pull,method=connect,address=tcp://127.0.0.1:7777" +PROCESSOR+=" name=data2,type=push,method=bind,address=tcp://127.0.0.1:7778" +PROCESSOR+=" name=data3,type=push,method=bind,address=tcp://127.0.0.1:7779" +xterm -geometry 120x60+750+0 -hold -e @EX_BIN_DIR@/$PROCESSOR & + +SINK1="fairmq-ex-region-sink" +SINK1+=" --id sink1" +SINK1+=" --severity debug" +SINK1+=" --chan-name data2" +SINK1+=" --transport $transport" +SINK1+=" --shm-monitor true" +SINK1+=" --channel-config name=data2,type=pull,method=connect,address=tcp://127.0.0.1:7778" +xterm -geometry 120x32+1500+0 -hold -e @EX_BIN_DIR@/$SINK1 & + +SINK2="fairmq-ex-region-sink" +SINK2+=" --id sink2" +SINK2+=" --severity debug" +SINK2+=" --chan-name data3" +SINK2+=" --transport $transport" +SINK2+=" --shm-monitor true" +SINK2+=" --channel-config name=data3,type=pull,method=connect,address=tcp://127.0.0.1:7779" +xterm -geometry 120x32+1500+500 -hold -e @EX_BIN_DIR@/$SINK2 & diff --git a/examples/region/processor.cxx b/examples/region/processor.cxx new file mode 100644 index 00000000..4a8d4c6c --- /dev/null +++ b/examples/region/processor.cxx @@ -0,0 +1,74 @@ +/******************************************************************************** + * Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#include +#include +#include + +namespace bpo = boost::program_options; +using namespace std; +using namespace fair::mq; + +namespace { + +struct Processor : Device +{ + void InitTask() override + { + fMaxIterations = fConfig->GetProperty("max-iterations"); + GetChannel("data1", 0).Transport()->SubscribeToRegionEvents([](RegionInfo info) { + LOG(info) << "Region event: " << info.event << ": " + << (info.managed ? "managed" : "unmanaged") << ", id: " << info.id + << ", ptr: " << info.ptr << ", size: " << info.size + << ", flags: " << info.flags; + }); + } + + void Run() override + { + Channel& dataIn = GetChannel("data1", 0); + Channel& dataOut1 = GetChannel("data2", 0); + Channel& dataOut2 = GetChannel("data3", 0); + + while (!NewStatePending()) { + auto msg(dataIn.Transport()->CreateMessage()); + dataIn.Receive(msg); + + fair::mq::MessagePtr msgCopy1(NewMessage()); + msgCopy1->Copy(*msg); + fair::mq::MessagePtr msgCopy2(NewMessage()); + msgCopy2->Copy(*msg); + + dataOut1.Send(msgCopy1); + dataOut2.Send(msgCopy2); + + if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) { + LOG(info) << "Configured max number of iterations reached. Leaving RUNNING state."; + break; + } + } + } + + void ResetTask() override + { + GetChannel("data1", 0).Transport()->UnsubscribeFromRegionEvents(); + } + + private: + uint64_t fMaxIterations = 0; + uint64_t fNumIterations = 0; +}; + +} // namespace + +void addCustomOptions(bpo::options_description& options) +{ + options.add_options()("max-iterations", bpo::value()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)"); +} + +unique_ptr getDevice(ProgOptions& /*config*/) { return make_unique(); } diff --git a/examples/region/sampler.cxx b/examples/region/sampler.cxx index 71e1d043..33d204c5 100644 --- a/examples/region/sampler.cxx +++ b/examples/region/sampler.cxx @@ -8,6 +8,7 @@ #include #include +#include #include #include @@ -23,8 +24,10 @@ struct Sampler : fair::mq::Device fMsgSize = fConfig->GetProperty("msg-size"); fLinger = fConfig->GetProperty("region-linger"); fMaxIterations = fConfig->GetProperty("max-iterations"); + fChanName = fConfig->GetProperty("chan-name"); + fSamplingRate = fConfig->GetProperty("sampling-rate"); - GetChannel("data", 0).Transport()->SubscribeToRegionEvents([](fair::mq::RegionInfo info) { + GetChannel(fChanName, 0).Transport()->SubscribeToRegionEvents([](fair::mq::RegionInfo info) { LOG(info) << "Region event: " << info.event << ": " << (info.managed ? "managed" : "unmanaged") << ", id: " << info.id @@ -43,7 +46,7 @@ struct Sampler : fair::mq::Device regionCfg.lock = !fExternalRegion; // mlock region after creation regionCfg.zero = !fExternalRegion; // zero region content after creation fRegion = fair::mq::UnmanagedRegionPtr(NewUnmanagedRegionFor( - "data", // region is created using the transport of this channel... + fChanName, // region is created using the transport of this channel... 0, // ... and this sub-channel 10000000, // region size [this](const std::vector& blocks) { // callback to be called when message buffers no longer needed by transport @@ -59,8 +62,11 @@ struct Sampler : fair::mq::Device void Run() override { + + fair::mq::tools::RateLimiter rateLimiter(fSamplingRate); + while (!NewStatePending()) { - fair::mq::MessagePtr msg(NewMessageFor("data", // channel + fair::mq::MessagePtr msg(NewMessageFor(fChanName, // channel 0, // sub-channel fRegion, // region fRegion->GetData(), // ptr within region @@ -70,11 +76,14 @@ struct Sampler : fair::mq::Device std::lock_guard lock(fMtx); ++fNumUnackedMsgs; - if (Send(msg, "data", 0) > 0) { + if (Send(msg, fChanName, 0) > 0) { if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) { LOG(info) << "Configured maximum number of iterations reached. Stopping sending."; break; } + if (fSamplingRate > 0.001) { + rateLimiter.maybe_sleep(); + } } } @@ -99,7 +108,7 @@ struct Sampler : fair::mq::Device void ResetTask() override { fRegion.reset(); - GetChannel("data", 0).Transport()->UnsubscribeFromRegionEvents(); + GetChannel(fChanName, 0).Transport()->UnsubscribeFromRegionEvents(); } private: @@ -111,12 +120,16 @@ struct Sampler : fair::mq::Device fair::mq::UnmanagedRegionPtr fRegion = nullptr; std::mutex fMtx; uint64_t fNumUnackedMsgs = 0; + std::string fChanName; + float fSamplingRate = 0.; }; void addCustomOptions(bpo::options_description& options) { options.add_options() + ("chan-name", bpo::value()->default_value("data"), "name of the output channel") ("msg-size", bpo::value()->default_value(1000), "Message size in bytes") + ("sampling-rate", bpo::value()->default_value(0.), "Sampling rate (Hz).") ("region-linger", bpo::value()->default_value(100), "Linger period for regions") ("max-iterations", bpo::value()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)") ("external-region", bpo::value()->default_value(false), "Use region created by another process"); diff --git a/examples/region/sink.cxx b/examples/region/sink.cxx index a8af6ed0..3dd3837e 100644 --- a/examples/region/sink.cxx +++ b/examples/region/sink.cxx @@ -22,7 +22,8 @@ struct Sink : Device { // Get the fMaxIterations value from the command line options (via fConfig) fMaxIterations = fConfig->GetProperty("max-iterations"); - GetChannel("data", 0).Transport()->SubscribeToRegionEvents([](RegionInfo info) { + fChanName = fConfig->GetProperty("chan-name"); + GetChannel(fChanName, 0).Transport()->SubscribeToRegionEvents([](RegionInfo info) { LOG(info) << "Region event: " << info.event << ": " << (info.managed ? "managed" : "unmanaged") << ", id: " << info.id << ", ptr: " << info.ptr << ", size: " << info.size @@ -32,11 +33,11 @@ struct Sink : Device void Run() override { - Channel& dataInChannel = GetChannel("data", 0); + Channel& dataIn = GetChannel(fChanName, 0); while (!NewStatePending()) { - auto msg(dataInChannel.Transport()->CreateMessage()); - dataInChannel.Receive(msg); + auto msg(dataIn.Transport()->CreateMessage()); + dataIn.Receive(msg); // void* ptr = msg->GetData(); // char* cptr = static_cast(ptr); @@ -51,22 +52,22 @@ struct Sink : Device void ResetTask() override { - GetChannel("data", 0).Transport()->UnsubscribeFromRegionEvents(); + GetChannel(fChanName, 0).Transport()->UnsubscribeFromRegionEvents(); } private: uint64_t fMaxIterations = 0; uint64_t fNumIterations = 0; + std::string fChanName; }; } // namespace void addCustomOptions(bpo::options_description& options) { - options.add_options()( - "max-iterations", - bpo::value()->default_value(0), - "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)"); + options.add_options() + ("chan-name", bpo::value()->default_value("data"), "name of the input channel") + ("max-iterations", bpo::value()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)"); } unique_ptr getDevice(ProgOptions& /*config*/) { return make_unique(); }