From ce162364fa3443e592cb8f3c163d9dc8676247c1 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Tue, 14 Nov 2017 17:00:37 +0100 Subject: [PATCH] FairMQ: Introduce callbacks for the FairMQUnmanagedRegion. Callbacks are called when the data buffer of the message assiciated with the corresponding region is no longer needed by the transport. Example in examples/advanced/Region/ --- .../Region/FairMQExampleRegionSampler.cxx | 41 +++++++++++++++---- .../Region/FairMQExampleRegionSampler.h | 11 +++-- .../Region/FairMQExampleRegionSink.cxx | 5 ++- .../advanced/Region/startMQExRegion.sh.in | 1 + 4 files changed, 45 insertions(+), 13 deletions(-) diff --git a/examples/advanced/Region/FairMQExampleRegionSampler.cxx b/examples/advanced/Region/FairMQExampleRegionSampler.cxx index 010517b7..52b16063 100644 --- a/examples/advanced/Region/FairMQExampleRegionSampler.cxx +++ b/examples/advanced/Region/FairMQExampleRegionSampler.cxx @@ -16,29 +16,54 @@ #include "FairMQLogger.h" #include "FairMQProgOptions.h" // device->fConfig +#include +#include + using namespace std; FairMQExampleRegionSampler::FairMQExampleRegionSampler() : fMsgSize(10000) + , fRegion(nullptr) + , fNumUnackedMsgs(0) { } void FairMQExampleRegionSampler::InitTask() { fMsgSize = fConfig->GetValue("msg-size"); + + fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegionFor("data", + 0, + 10000000, + [this](void* data, size_t size) { --fNumUnackedMsgs; } // callback to be called when message buffers no longer needed by transport + )); } -void FairMQExampleRegionSampler::Run() +bool FairMQExampleRegionSampler::ConditionalRun() { - FairMQChannel& dataOutChannel = fChannels.at("data").at(0); - - FairMQUnmanagedRegionPtr region(NewUnmanagedRegionFor("data", 0, 10000000)); - - while (CheckCurrentState(RUNNING)) + FairMQMessagePtr msg(NewMessageFor("data", // channel + 0, // sub-channel + fRegion, // region + fRegion->GetData(), // ptr within region + fMsgSize // offset from ptr + )); + if (Send(msg, "data", 0) > 0) { - FairMQMessagePtr msg(NewMessageFor("data", 0, region, region->GetData(), fMsgSize)); - dataOutChannel.Send(msg); + ++fNumUnackedMsgs; } + + return true; +} + +void FairMQExampleRegionSampler::ResetTask() +{ + // if not all messages acknowledged, wait for a bit. But only once, since receiver could be already dead. + if (fNumUnackedMsgs != 0) + { + LOG(DEBUG) << "waiting for all acknowledgements... (" << fNumUnackedMsgs << ")"; + this_thread::sleep_for(chrono::milliseconds(500)); + } + fRegion.reset(); } FairMQExampleRegionSampler::~FairMQExampleRegionSampler() diff --git a/examples/advanced/Region/FairMQExampleRegionSampler.h b/examples/advanced/Region/FairMQExampleRegionSampler.h index d2441472..08456288 100644 --- a/examples/advanced/Region/FairMQExampleRegionSampler.h +++ b/examples/advanced/Region/FairMQExampleRegionSampler.h @@ -16,6 +16,7 @@ #define FAIRMQEXAMPLEREGIONSAMPLER_H_ #include +#include #include "FairMQDevice.h" @@ -26,10 +27,14 @@ class FairMQExampleRegionSampler : public FairMQDevice virtual ~FairMQExampleRegionSampler(); protected: - int fMsgSize; - virtual void InitTask(); - virtual void Run(); + virtual bool ConditionalRun(); + virtual void ResetTask(); + + private: + int fMsgSize; + FairMQUnmanagedRegionPtr fRegion; + std::atomic fNumUnackedMsgs; }; #endif /* FAIRMQEXAMPLEREGIONSAMPLER_H_ */ diff --git a/examples/advanced/Region/FairMQExampleRegionSink.cxx b/examples/advanced/Region/FairMQExampleRegionSink.cxx index fb254011..098ddfe7 100644 --- a/examples/advanced/Region/FairMQExampleRegionSink.cxx +++ b/examples/advanced/Region/FairMQExampleRegionSink.cxx @@ -1,8 +1,8 @@ /******************************************************************************** * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ /** @@ -29,6 +29,7 @@ void FairMQExampleRegionSink::Run() { FairMQMessagePtr msg(dataInChannel.Transport()->CreateMessage()); dataInChannel.Receive(msg); + void* ptr = msg->GetData(); } } diff --git a/examples/advanced/Region/startMQExRegion.sh.in b/examples/advanced/Region/startMQExRegion.sh.in index 2a4cfca6..1cc7e6a4 100755 --- a/examples/advanced/Region/startMQExRegion.sh.in +++ b/examples/advanced/Region/startMQExRegion.sh.in @@ -10,6 +10,7 @@ fi SAMPLER="ex-region-sampler" SAMPLER+=" --id sampler1" SAMPLER+=" --msg-size $msgSize" +# SAMPLER+=" --rate 10" SAMPLER+=" --transport shmem" SAMPLER+=" --mq-config $exRegionConfig" xterm -geometry 80x23+0+0 -hold -e @CMAKE_BINARY_DIR@/bin/examples/advanced/Region/$SAMPLER &