From c9fc46e2c9f419f232a8564a361d965d8739de0c Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Tue, 28 Nov 2017 15:37:41 +0100 Subject: [PATCH] FairMQ: Add test for example/advanced/Region. Also fix a regression in nanomsg transport. --- examples/MQ/8-multipart/startMQEx8.sh.in | 4 ++- examples/advanced/Region/CMakeLists.txt | 12 ++++--- .../Region/FairMQExampleRegionSampler.cxx | 17 ++++++++- .../Region/FairMQExampleRegionSampler.h | 2 ++ .../Region/FairMQExampleRegionSink.cxx | 15 ++++++++ .../advanced/Region/FairMQExampleRegionSink.h | 5 +++ .../Region/runExampleRegionSampler.cxx | 3 +- .../advanced/Region/runExampleRegionSink.cxx | 4 ++- examples/advanced/Region/testMQExRegion.sh.in | 36 +++++++++++++++++++ 9 files changed, 90 insertions(+), 8 deletions(-) create mode 100755 examples/advanced/Region/testMQExRegion.sh.in diff --git a/examples/MQ/8-multipart/startMQEx8.sh.in b/examples/MQ/8-multipart/startMQEx8.sh.in index 002c44b2..a66a91fe 100755 --- a/examples/MQ/8-multipart/startMQEx8.sh.in +++ b/examples/MQ/8-multipart/startMQEx8.sh.in @@ -3,10 +3,12 @@ ex8config="@CMAKE_BINARY_DIR@/bin/config/ex8-multipart.json" SAMPLER="ex8-sampler" SAMPLER+=" --id sampler1" +SAMPLER+=" --transport nanomsg" SAMPLER+=" --mq-config $ex8config" -xterm -geometry 80x23+0+0 -hold -e @CMAKE_BINARY_DIR@/bin/examples/MQ/8-multipart/$SAMPLER & +xterm -geometry 80x23+0+0 -hold -e gdb --args @CMAKE_BINARY_DIR@/bin/examples/MQ/8-multipart/$SAMPLER & SINK="ex8-sink" SINK+=" --id sink1" +SINK+=" --transport nanomsg" SINK+=" --mq-config $ex8config" xterm -geometry 80x23+500+0 -hold -e @CMAKE_BINARY_DIR@/bin/examples/MQ/8-multipart/$SINK & diff --git a/examples/advanced/Region/CMakeLists.txt b/examples/advanced/Region/CMakeLists.txt index 249f6b2e..8ebf7544 100644 --- a/examples/advanced/Region/CMakeLists.txt +++ b/examples/advanced/Region/CMakeLists.txt @@ -6,10 +6,9 @@ # copied verbatim in the file "LICENSE" # ################################################################################ -configure_file(${CMAKE_SOURCE_DIR}/examples/advanced/Region/ex-region.json - ${CMAKE_BINARY_DIR}/bin/config/ex-region.json) -configure_file(${CMAKE_SOURCE_DIR}/examples/advanced/Region/startMQExRegion.sh.in - ${CMAKE_BINARY_DIR}/bin/examples/advanced/Region/startMQExRegion.sh) +configure_file(${CMAKE_SOURCE_DIR}/examples/advanced/Region/ex-region.json ${CMAKE_BINARY_DIR}/bin/config/ex-region.json) +configure_file(${CMAKE_SOURCE_DIR}/examples/advanced/Region/startMQExRegion.sh.in ${CMAKE_BINARY_DIR}/bin/examples/advanced/Region/startMQExRegion.sh) +configure_file(${CMAKE_SOURCE_DIR}/examples/advanced/Region/testMQExRegion.sh.in ${CMAKE_BINARY_DIR}/bin/examples/advanced/Region/testMQExRegion.sh) Set(INCLUDE_DIRECTORIES ${CMAKE_SOURCE_DIR}/fairmq @@ -73,6 +72,11 @@ ForEach(_file RANGE 0 ${_length}) GENERATE_EXECUTABLE() EndForEach(_file RANGE 0 ${_length}) +add_test(NAME MQ.ex-advanced-region COMMAND ${CMAKE_BINARY_DIR}/bin/examples/advanced/Region/testMQExRegion.sh) +set_tests_properties(MQ.ex-advanced-region PROPERTIES TIMEOUT "30") +set_tests_properties(MQ.ex-advanced-region PROPERTIES RUN_SERIAL true) +set_tests_properties(MQ.ex-advanced-region PROPERTIES PASS_REGULAR_EXPRESSION "Received ack") + Install( FILES ex-region.json DESTINATION share/fairbase/examples/advanced/Region/config/ diff --git a/examples/advanced/Region/FairMQExampleRegionSampler.cxx b/examples/advanced/Region/FairMQExampleRegionSampler.cxx index 52b16063..4557512c 100644 --- a/examples/advanced/Region/FairMQExampleRegionSampler.cxx +++ b/examples/advanced/Region/FairMQExampleRegionSampler.cxx @@ -23,6 +23,8 @@ using namespace std; FairMQExampleRegionSampler::FairMQExampleRegionSampler() : fMsgSize(10000) + , fMaxIterations(0) + , fNumIterations(0) , fRegion(nullptr) , fNumUnackedMsgs(0) { @@ -31,11 +33,18 @@ FairMQExampleRegionSampler::FairMQExampleRegionSampler() void FairMQExampleRegionSampler::InitTask() { fMsgSize = fConfig->GetValue("msg-size"); + fMaxIterations = fConfig->GetValue("max-iterations"); fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegionFor("data", 0, 10000000, - [this](void* data, size_t size) { --fNumUnackedMsgs; } // callback to be called when message buffers no longer needed by transport + [this](void* data, size_t size) { // callback to be called when message buffers no longer needed by transport + --fNumUnackedMsgs; + if (fMaxIterations > 0) + { + LOG(DEBUG) << "Received ack"; + } + } )); } @@ -50,6 +59,12 @@ bool FairMQExampleRegionSampler::ConditionalRun() if (Send(msg, "data", 0) > 0) { ++fNumUnackedMsgs; + + if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) + { + LOG(INFO) << "Configured maximum number of iterations reached. Leaving RUNNING state."; + return false; + } } return true; diff --git a/examples/advanced/Region/FairMQExampleRegionSampler.h b/examples/advanced/Region/FairMQExampleRegionSampler.h index 08456288..d09a3ec8 100644 --- a/examples/advanced/Region/FairMQExampleRegionSampler.h +++ b/examples/advanced/Region/FairMQExampleRegionSampler.h @@ -33,6 +33,8 @@ class FairMQExampleRegionSampler : public FairMQDevice private: int fMsgSize; + uint64_t fMaxIterations; + uint64_t fNumIterations; FairMQUnmanagedRegionPtr fRegion; std::atomic fNumUnackedMsgs; }; diff --git a/examples/advanced/Region/FairMQExampleRegionSink.cxx b/examples/advanced/Region/FairMQExampleRegionSink.cxx index 098ddfe7..dd1a7ae9 100644 --- a/examples/advanced/Region/FairMQExampleRegionSink.cxx +++ b/examples/advanced/Region/FairMQExampleRegionSink.cxx @@ -14,13 +14,22 @@ #include "FairMQExampleRegionSink.h" #include "FairMQLogger.h" +#include "FairMQProgOptions.h" // device->fConfig using namespace std; FairMQExampleRegionSink::FairMQExampleRegionSink() + : fMaxIterations(0) + , fNumIterations(0) { } +void FairMQExampleRegionSink::InitTask() +{ + // Get the fMaxIterations value from the command line options (via fConfig) + fMaxIterations = fConfig->GetValue("max-iterations"); +} + void FairMQExampleRegionSink::Run() { FairMQChannel& dataInChannel = fChannels.at("data").at(0); @@ -30,6 +39,12 @@ void FairMQExampleRegionSink::Run() FairMQMessagePtr msg(dataInChannel.Transport()->CreateMessage()); dataInChannel.Receive(msg); void* ptr = msg->GetData(); + + if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) + { + LOG(INFO) << "Configured maximum number of iterations reached. Leaving RUNNING state."; + break; + } } } diff --git a/examples/advanced/Region/FairMQExampleRegionSink.h b/examples/advanced/Region/FairMQExampleRegionSink.h index 7cf057f2..5e00ce56 100644 --- a/examples/advanced/Region/FairMQExampleRegionSink.h +++ b/examples/advanced/Region/FairMQExampleRegionSink.h @@ -27,6 +27,11 @@ class FairMQExampleRegionSink : public FairMQDevice protected: virtual void Run(); + virtual void InitTask(); + + private: + uint64_t fMaxIterations; + uint64_t fNumIterations; }; #endif /* FAIRMQEXAMPLEREGIONSINK_H_ */ diff --git a/examples/advanced/Region/runExampleRegionSampler.cxx b/examples/advanced/Region/runExampleRegionSampler.cxx index ea279142..58fea22c 100644 --- a/examples/advanced/Region/runExampleRegionSampler.cxx +++ b/examples/advanced/Region/runExampleRegionSampler.cxx @@ -14,7 +14,8 @@ namespace bpo = boost::program_options; void addCustomOptions(bpo::options_description& options) { options.add_options() - ("msg-size", bpo::value()->default_value(1000), "Message size in bytes"); + ("msg-size", bpo::value()->default_value(1000), "Message size in bytes") + ("max-iterations", bpo::value()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)"); } FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) diff --git a/examples/advanced/Region/runExampleRegionSink.cxx b/examples/advanced/Region/runExampleRegionSink.cxx index 8bef7018..4fa1eb68 100644 --- a/examples/advanced/Region/runExampleRegionSink.cxx +++ b/examples/advanced/Region/runExampleRegionSink.cxx @@ -11,8 +11,10 @@ namespace bpo = boost::program_options; -void addCustomOptions(bpo::options_description& /*options*/) +void addCustomOptions(bpo::options_description& options) { + options.add_options() + ("max-iterations", bpo::value()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)"); } FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) diff --git a/examples/advanced/Region/testMQExRegion.sh.in b/examples/advanced/Region/testMQExRegion.sh.in new file mode 100755 index 00000000..8f7fbb08 --- /dev/null +++ b/examples/advanced/Region/testMQExRegion.sh.in @@ -0,0 +1,36 @@ +#!/bin/bash + +exRegionConfig="@CMAKE_BINARY_DIR@/bin/config/ex-region.json" + +msgSize="1000000" + +# setup a trap to kill everything if the test fails/timeouts +trap 'kill -TERM $SAMPLER_PID; kill -TERM $SINK_PID; wait $SAMPLER_PID; wait $SINK_PID; @CMAKE_BINARY_DIR@/bin/shmmonitor --cleanup --session region_test' TERM + +@CMAKE_BINARY_DIR@/bin/shmmonitor --cleanup --session region_test + +SAMPLER="ex-region-sampler" +SAMPLER+=" --id sampler1" +SAMPLER+=" --session region_test" +SAMPLER+=" --control static --log-color false" +SAMPLER+=" --max-iterations 1" +SAMPLER+=" --msg-size $msgSize" +SAMPLER+=" --transport shmem" +SAMPLER+=" --mq-config $exRegionConfig" +@CMAKE_BINARY_DIR@/bin/examples/advanced/Region/$SAMPLER & +SAMPLER_PID=$! + +SINK="ex-region-sink" +SINK+=" --id sink1" +SINK+=" --session region_test" +SINK+=" --verbosity INFO" +SINK+=" --control static --log-color false" +SINK+=" --max-iterations 1" +SINK+=" --transport shmem" +SINK+=" --mq-config $exRegionConfig" +@CMAKE_BINARY_DIR@/bin/examples/advanced/Region/$SINK & +SINK_PID=$! + +# wait for sampler and sink to finish +wait $SAMPLER_PID +wait $SINK_PID