From d1c51e0f1faa78e2831fb199e725bac0a2bde7e2 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Tue, 16 Jun 2020 14:39:49 +0200 Subject: [PATCH] Use region linger setting in region example --- examples/region/Sampler.cxx | 15 +++++++-------- examples/region/Sampler.h | 1 + examples/region/runSampler.cxx | 1 + examples/region/test-ex-region.sh.in | 1 + 4 files changed, 10 insertions(+), 8 deletions(-) diff --git a/examples/region/Sampler.cxx b/examples/region/Sampler.cxx index aaa7534a..9083d0ef 100644 --- a/examples/region/Sampler.cxx +++ b/examples/region/Sampler.cxx @@ -23,6 +23,7 @@ namespace example_region Sampler::Sampler() : fMsgSize(10000) + , fLinger(100) , fMaxIterations(0) , fNumIterations(0) , fRegion(nullptr) @@ -32,6 +33,7 @@ Sampler::Sampler() void Sampler::InitTask() { fMsgSize = fConfig->GetProperty("msg-size"); + fLinger = fConfig->GetProperty("region-linger"); fMaxIterations = fConfig->GetProperty("max-iterations"); fChannels.at("data").at(0).Transport()->SubscribeToRegionEvents([](FairMQRegionInfo info) { @@ -54,6 +56,7 @@ void Sampler::InitTask() } } )); + fRegion->SetLinger(fLinger); } bool Sampler::ConditionalRun() @@ -84,20 +87,16 @@ bool Sampler::ConditionalRun() void Sampler::ResetTask() { - // if not all messages acknowledged, wait for a bit. But only once, since receiver could be already dead. + // On destruction UnmanagedRegion will try to TODO + fRegion.reset(); { - unique_lock lock(fMtx); + lock_guard lock(fMtx); if (fNumUnackedMsgs != 0) { - LOG(info) << "Waiting for all acknowledgements... (" << fNumUnackedMsgs << ")"; - lock.unlock(); - this_thread::sleep_for(chrono::milliseconds(500)); - lock.lock(); LOG(info) << "Done, still not acknowledged: " << fNumUnackedMsgs; } else { - LOG(info) << "All acknowledgements received"; + LOG(info) << "All acknowledgements received."; } } - fRegion.reset(); fChannels.at("data").at(0).Transport()->UnsubscribeFromRegionEvents(); } diff --git a/examples/region/Sampler.h b/examples/region/Sampler.h index 505713ac..45de28f4 100644 --- a/examples/region/Sampler.h +++ b/examples/region/Sampler.h @@ -36,6 +36,7 @@ class Sampler : public FairMQDevice private: int fMsgSize; + uint32_t fLinger; uint64_t fMaxIterations; uint64_t fNumIterations; FairMQUnmanagedRegionPtr fRegion; diff --git a/examples/region/runSampler.cxx b/examples/region/runSampler.cxx index 5806b48c..31f6ad9a 100644 --- a/examples/region/runSampler.cxx +++ b/examples/region/runSampler.cxx @@ -15,6 +15,7 @@ void addCustomOptions(bpo::options_description& options) { options.add_options() ("msg-size", bpo::value()->default_value(1000), "Message size in bytes") + ("region-linger", bpo::value()->default_value(100), "Linger period for regions") ("max-iterations", bpo::value()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)"); } diff --git a/examples/region/test-ex-region.sh.in b/examples/region/test-ex-region.sh.in index 783d0a79..e2f3da56 100755 --- a/examples/region/test-ex-region.sh.in +++ b/examples/region/test-ex-region.sh.in @@ -23,6 +23,7 @@ SAMPLER+=" --verbosity veryhigh" SAMPLER+=" --control static --color false" SAMPLER+=" --max-iterations 1" SAMPLER+=" --msg-size $msgSize" +SAMPLER+=" --region-linger 500" SAMPLER+=" --channel-config name=data,type=push,method=bind,address=tcp://127.0.0.1:7777" @CMAKE_CURRENT_BINARY_DIR@/$SAMPLER & SAMPLER_PID=$!