From 7df278818caab5c392d6a7b40da69de0274ae8c5 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Thu, 28 Feb 2019 01:41:04 +0100 Subject: [PATCH] Enhance region example with Builder device --- examples/region/Builder.h | 40 ++++++++++++++++++++ examples/region/CMakeLists.txt | 3 ++ examples/region/Sampler.cxx | 6 +-- examples/region/fairmq-start-ex-region.sh.in | 15 +++++--- examples/region/runBuilder.cxx | 20 ++++++++++ fairmq/run/startMQBenchmark.sh.in | 4 +- fairmq/shmem/FairMQMessageSHM.cxx | 1 + 7 files changed, 79 insertions(+), 10 deletions(-) create mode 100644 examples/region/Builder.h create mode 100644 examples/region/runBuilder.cxx diff --git a/examples/region/Builder.h b/examples/region/Builder.h new file mode 100644 index 00000000..2420bbde --- /dev/null +++ b/examples/region/Builder.h @@ -0,0 +1,40 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#ifndef FAIRMQEXAMPLEREGIONBUILDER_H +#define FAIRMQEXAMPLEREGIONBUILDER_H + +#include + +#include "FairMQDevice.h" + +namespace example_region +{ + +class Builder : public FairMQDevice +{ + public: + Builder() { + OnData("data1", &Builder::HandleData); + } + virtual ~Builder() {} + + protected: + bool HandleData(FairMQMessagePtr& msg, int /*index*/) + { + if (Send(msg, "data2") < 0) { + return false; + } + + return true; + } +}; + +} // namespace example_region + +#endif /* FAIRMQEXAMPLEREGIONBUILDER_H */ diff --git a/examples/region/CMakeLists.txt b/examples/region/CMakeLists.txt index e092b8ab..4723c409 100644 --- a/examples/region/CMakeLists.txt +++ b/examples/region/CMakeLists.txt @@ -9,6 +9,7 @@ add_library(ExampleRegionLib STATIC "Sampler.cxx" "Sampler.h" + "Builder.h" "Sink.cxx" "Sink.h" ) @@ -18,6 +19,8 @@ target_link_libraries(ExampleRegionLib PUBLIC FairMQ) add_executable(fairmq-ex-region-sampler runSampler.cxx) target_link_libraries(fairmq-ex-region-sampler PRIVATE ExampleRegionLib) +add_executable(fairmq-ex-region-builder runBuilder.cxx) +target_link_libraries(fairmq-ex-region-builder PRIVATE ExampleRegionLib) add_executable(fairmq-ex-region-sink runSink.cxx) target_link_libraries(fairmq-ex-region-sink PRIVATE ExampleRegionLib) diff --git a/examples/region/Sampler.cxx b/examples/region/Sampler.cxx index 678c321e..ba986d3d 100644 --- a/examples/region/Sampler.cxx +++ b/examples/region/Sampler.cxx @@ -35,7 +35,7 @@ void Sampler::InitTask() fMsgSize = fConfig->GetValue("msg-size"); fMaxIterations = fConfig->GetValue("max-iterations"); - fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegionFor("data", + fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegionFor("data1", 0, 10000000, [this](void* /*data*/, size_t /*size*/, void* /*hint*/) { // callback to be called when message buffers no longer needed by transport @@ -50,7 +50,7 @@ void Sampler::InitTask() bool Sampler::ConditionalRun() { - FairMQMessagePtr msg(NewMessageFor("data", // channel + FairMQMessagePtr msg(NewMessageFor("data1", // channel 0, // sub-channel fRegion, // region fRegion->GetData(), // ptr within region @@ -58,7 +58,7 @@ bool Sampler::ConditionalRun() nullptr // hint )); - if (Send(msg, "data", 0) > 0) + if (Send(msg, "data1", 0) > 0) { ++fNumUnackedMsgs; diff --git a/examples/region/fairmq-start-ex-region.sh.in b/examples/region/fairmq-start-ex-region.sh.in index 56a3fd66..e0e39a81 100755 --- a/examples/region/fairmq-start-ex-region.sh.in +++ b/examples/region/fairmq-start-ex-region.sh.in @@ -13,13 +13,18 @@ SAMPLER+=" --id sampler1" SAMPLER+=" --severity debug" SAMPLER+=" --msg-size $msgSize" # SAMPLER+=" --rate 10" -SAMPLER+=" --transport shmem" -SAMPLER+=" --channel-config name=data,type=push,method=bind,address=tcp://127.0.0.1:7777,sndKernelSize=212992" +SAMPLER+=" --channel-config name=data1,type=pair,method=bind,address=tcp://127.0.0.1:7777,transport=shmem" xterm -geometry 80x23+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER & +BUILDER="fairmq-ex-region-builder" +BUILDER+=" --id builder1" +BUILDER+=" --severity debug" +BUILDER+=" --channel-config name=data1,type=pair,method=connect,address=tcp://127.0.0.1:7777,transport=shmem" +BUILDER+=" name=data2,type=pair,method=connect,address=tcp://127.0.0.1:7778,transport=ofi" +xterm -geometry 80x23+500+0 -hold -e @EX_BIN_DIR@/$BUILDER & + SINK="fairmq-ex-region-sink" SINK+=" --id sink1" SINK+=" --severity debug" -SINK+=" --transport shmem" -SINK+=" --channel-config name=data,type=pull,method=connect,address=tcp://127.0.0.1:7777,rcvKernelSize=212992" -xterm -geometry 80x23+500+0 -hold -e @EX_BIN_DIR@/$SINK & +SINK+=" --channel-config name=data,type=pair,method=bind,address=tcp://127.0.0.1:7778,transport=ofi" +xterm -geometry 80x23+1000+0 -hold -e @EX_BIN_DIR@/$SINK & diff --git a/examples/region/runBuilder.cxx b/examples/region/runBuilder.cxx new file mode 100644 index 00000000..4d11eaab --- /dev/null +++ b/examples/region/runBuilder.cxx @@ -0,0 +1,20 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#include "runFairMQDevice.h" +#include "Builder.h" + +namespace bpo = boost::program_options; + +void addCustomOptions(bpo::options_description& /* options */) +{} + +FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) +{ + return new example_region::Builder(); +} diff --git a/fairmq/run/startMQBenchmark.sh.in b/fairmq/run/startMQBenchmark.sh.in index a3ca853b..86f44e33 100755 --- a/fairmq/run/startMQBenchmark.sh.in +++ b/fairmq/run/startMQBenchmark.sh.in @@ -61,7 +61,7 @@ SAMPLER+=" --msg-size $msgSize" SAMPLER+=" --num-parts 1" # SAMPLER+=" --msg-rate 1000" SAMPLER+=" --max-iterations $maxIterations" -SAMPLER+=" --channel-config name=data,type=push,method=bind,address=tcp://127.0.0.1:5555" +SAMPLER+=" --channel-config name=data,type=pair,method=bind,address=tcp://127.0.0.1:5555" xterm -geometry 90x50+0+0 -hold -e $affinitySamp @CMAKE_CURRENT_BINARY_DIR@/$SAMPLER & echo "" echo "started: xterm -geometry 90x50+0+0 -hold -e $affinitySamp @CMAKE_CURRENT_BINARY_DIR@/$SAMPLER" @@ -75,7 +75,7 @@ SINK+=" --transport $transport" SINK+=" --severity debug" SINK+=" --multipart false" SINK+=" --max-iterations $maxIterations" -SINK+=" --channel-config name=data,type=pull,method=connect,address=tcp://127.0.0.1:5555" +SINK+=" --channel-config name=data,type=pair,method=connect,address=tcp://127.0.0.1:5555" xterm -geometry 90x50+550+0 -hold -e $affinitySink @CMAKE_CURRENT_BINARY_DIR@/$SINK & echo "" echo "started: xterm -geometry 90x50+550+0 -hold -e $affinitySink @CMAKE_CURRENT_BINARY_DIR@/$SINK" diff --git a/fairmq/shmem/FairMQMessageSHM.cxx b/fairmq/shmem/FairMQMessageSHM.cxx index ee0a8f12..f71da88b 100644 --- a/fairmq/shmem/FairMQMessageSHM.cxx +++ b/fairmq/shmem/FairMQMessageSHM.cxx @@ -350,6 +350,7 @@ void FairMQMessageSHM::CloseMessage() } if (fRegionPtr) { + // LOG(debug) << "sending ack"; if (fRegionPtr->fQueue->timed_send(&block, sizeof(RegionBlock), 0, sndTill)) { success = true;