mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 08:41:16 +00:00
FairMQ: Add test for example/advanced/Region.
Also fix a regression in nanomsg transport.
This commit is contained in:
parent
ce162364fa
commit
c9fc46e2c9
|
@ -3,10 +3,12 @@ ex8config="@CMAKE_BINARY_DIR@/bin/config/ex8-multipart.json"
|
||||||
|
|
||||||
SAMPLER="ex8-sampler"
|
SAMPLER="ex8-sampler"
|
||||||
SAMPLER+=" --id sampler1"
|
SAMPLER+=" --id sampler1"
|
||||||
|
SAMPLER+=" --transport nanomsg"
|
||||||
SAMPLER+=" --mq-config $ex8config"
|
SAMPLER+=" --mq-config $ex8config"
|
||||||
xterm -geometry 80x23+0+0 -hold -e @CMAKE_BINARY_DIR@/bin/examples/MQ/8-multipart/$SAMPLER &
|
xterm -geometry 80x23+0+0 -hold -e gdb --args @CMAKE_BINARY_DIR@/bin/examples/MQ/8-multipart/$SAMPLER &
|
||||||
|
|
||||||
SINK="ex8-sink"
|
SINK="ex8-sink"
|
||||||
SINK+=" --id sink1"
|
SINK+=" --id sink1"
|
||||||
|
SINK+=" --transport nanomsg"
|
||||||
SINK+=" --mq-config $ex8config"
|
SINK+=" --mq-config $ex8config"
|
||||||
xterm -geometry 80x23+500+0 -hold -e @CMAKE_BINARY_DIR@/bin/examples/MQ/8-multipart/$SINK &
|
xterm -geometry 80x23+500+0 -hold -e @CMAKE_BINARY_DIR@/bin/examples/MQ/8-multipart/$SINK &
|
||||||
|
|
|
@ -6,10 +6,9 @@
|
||||||
# copied verbatim in the file "LICENSE" #
|
# copied verbatim in the file "LICENSE" #
|
||||||
################################################################################
|
################################################################################
|
||||||
|
|
||||||
configure_file(${CMAKE_SOURCE_DIR}/examples/advanced/Region/ex-region.json
|
configure_file(${CMAKE_SOURCE_DIR}/examples/advanced/Region/ex-region.json ${CMAKE_BINARY_DIR}/bin/config/ex-region.json)
|
||||||
${CMAKE_BINARY_DIR}/bin/config/ex-region.json)
|
configure_file(${CMAKE_SOURCE_DIR}/examples/advanced/Region/startMQExRegion.sh.in ${CMAKE_BINARY_DIR}/bin/examples/advanced/Region/startMQExRegion.sh)
|
||||||
configure_file(${CMAKE_SOURCE_DIR}/examples/advanced/Region/startMQExRegion.sh.in
|
configure_file(${CMAKE_SOURCE_DIR}/examples/advanced/Region/testMQExRegion.sh.in ${CMAKE_BINARY_DIR}/bin/examples/advanced/Region/testMQExRegion.sh)
|
||||||
${CMAKE_BINARY_DIR}/bin/examples/advanced/Region/startMQExRegion.sh)
|
|
||||||
|
|
||||||
Set(INCLUDE_DIRECTORIES
|
Set(INCLUDE_DIRECTORIES
|
||||||
${CMAKE_SOURCE_DIR}/fairmq
|
${CMAKE_SOURCE_DIR}/fairmq
|
||||||
|
@ -73,6 +72,11 @@ ForEach(_file RANGE 0 ${_length})
|
||||||
GENERATE_EXECUTABLE()
|
GENERATE_EXECUTABLE()
|
||||||
EndForEach(_file RANGE 0 ${_length})
|
EndForEach(_file RANGE 0 ${_length})
|
||||||
|
|
||||||
|
add_test(NAME MQ.ex-advanced-region COMMAND ${CMAKE_BINARY_DIR}/bin/examples/advanced/Region/testMQExRegion.sh)
|
||||||
|
set_tests_properties(MQ.ex-advanced-region PROPERTIES TIMEOUT "30")
|
||||||
|
set_tests_properties(MQ.ex-advanced-region PROPERTIES RUN_SERIAL true)
|
||||||
|
set_tests_properties(MQ.ex-advanced-region PROPERTIES PASS_REGULAR_EXPRESSION "Received ack")
|
||||||
|
|
||||||
Install(
|
Install(
|
||||||
FILES ex-region.json
|
FILES ex-region.json
|
||||||
DESTINATION share/fairbase/examples/advanced/Region/config/
|
DESTINATION share/fairbase/examples/advanced/Region/config/
|
||||||
|
|
|
@ -23,6 +23,8 @@ using namespace std;
|
||||||
|
|
||||||
FairMQExampleRegionSampler::FairMQExampleRegionSampler()
|
FairMQExampleRegionSampler::FairMQExampleRegionSampler()
|
||||||
: fMsgSize(10000)
|
: fMsgSize(10000)
|
||||||
|
, fMaxIterations(0)
|
||||||
|
, fNumIterations(0)
|
||||||
, fRegion(nullptr)
|
, fRegion(nullptr)
|
||||||
, fNumUnackedMsgs(0)
|
, fNumUnackedMsgs(0)
|
||||||
{
|
{
|
||||||
|
@ -31,11 +33,18 @@ FairMQExampleRegionSampler::FairMQExampleRegionSampler()
|
||||||
void FairMQExampleRegionSampler::InitTask()
|
void FairMQExampleRegionSampler::InitTask()
|
||||||
{
|
{
|
||||||
fMsgSize = fConfig->GetValue<int>("msg-size");
|
fMsgSize = fConfig->GetValue<int>("msg-size");
|
||||||
|
fMaxIterations = fConfig->GetValue<uint64_t>("max-iterations");
|
||||||
|
|
||||||
fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegionFor("data",
|
fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegionFor("data",
|
||||||
0,
|
0,
|
||||||
10000000,
|
10000000,
|
||||||
[this](void* data, size_t size) { --fNumUnackedMsgs; } // callback to be called when message buffers no longer needed by transport
|
[this](void* data, size_t size) { // callback to be called when message buffers no longer needed by transport
|
||||||
|
--fNumUnackedMsgs;
|
||||||
|
if (fMaxIterations > 0)
|
||||||
|
{
|
||||||
|
LOG(DEBUG) << "Received ack";
|
||||||
|
}
|
||||||
|
}
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -50,6 +59,12 @@ bool FairMQExampleRegionSampler::ConditionalRun()
|
||||||
if (Send(msg, "data", 0) > 0)
|
if (Send(msg, "data", 0) > 0)
|
||||||
{
|
{
|
||||||
++fNumUnackedMsgs;
|
++fNumUnackedMsgs;
|
||||||
|
|
||||||
|
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations)
|
||||||
|
{
|
||||||
|
LOG(INFO) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
|
||||||
|
return false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
|
|
|
@ -33,6 +33,8 @@ class FairMQExampleRegionSampler : public FairMQDevice
|
||||||
|
|
||||||
private:
|
private:
|
||||||
int fMsgSize;
|
int fMsgSize;
|
||||||
|
uint64_t fMaxIterations;
|
||||||
|
uint64_t fNumIterations;
|
||||||
FairMQUnmanagedRegionPtr fRegion;
|
FairMQUnmanagedRegionPtr fRegion;
|
||||||
std::atomic<uint64_t> fNumUnackedMsgs;
|
std::atomic<uint64_t> fNumUnackedMsgs;
|
||||||
};
|
};
|
||||||
|
|
|
@ -14,13 +14,22 @@
|
||||||
|
|
||||||
#include "FairMQExampleRegionSink.h"
|
#include "FairMQExampleRegionSink.h"
|
||||||
#include "FairMQLogger.h"
|
#include "FairMQLogger.h"
|
||||||
|
#include "FairMQProgOptions.h" // device->fConfig
|
||||||
|
|
||||||
using namespace std;
|
using namespace std;
|
||||||
|
|
||||||
FairMQExampleRegionSink::FairMQExampleRegionSink()
|
FairMQExampleRegionSink::FairMQExampleRegionSink()
|
||||||
|
: fMaxIterations(0)
|
||||||
|
, fNumIterations(0)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void FairMQExampleRegionSink::InitTask()
|
||||||
|
{
|
||||||
|
// Get the fMaxIterations value from the command line options (via fConfig)
|
||||||
|
fMaxIterations = fConfig->GetValue<uint64_t>("max-iterations");
|
||||||
|
}
|
||||||
|
|
||||||
void FairMQExampleRegionSink::Run()
|
void FairMQExampleRegionSink::Run()
|
||||||
{
|
{
|
||||||
FairMQChannel& dataInChannel = fChannels.at("data").at(0);
|
FairMQChannel& dataInChannel = fChannels.at("data").at(0);
|
||||||
|
@ -30,6 +39,12 @@ void FairMQExampleRegionSink::Run()
|
||||||
FairMQMessagePtr msg(dataInChannel.Transport()->CreateMessage());
|
FairMQMessagePtr msg(dataInChannel.Transport()->CreateMessage());
|
||||||
dataInChannel.Receive(msg);
|
dataInChannel.Receive(msg);
|
||||||
void* ptr = msg->GetData();
|
void* ptr = msg->GetData();
|
||||||
|
|
||||||
|
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations)
|
||||||
|
{
|
||||||
|
LOG(INFO) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -27,6 +27,11 @@ class FairMQExampleRegionSink : public FairMQDevice
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
virtual void Run();
|
virtual void Run();
|
||||||
|
virtual void InitTask();
|
||||||
|
|
||||||
|
private:
|
||||||
|
uint64_t fMaxIterations;
|
||||||
|
uint64_t fNumIterations;
|
||||||
};
|
};
|
||||||
|
|
||||||
#endif /* FAIRMQEXAMPLEREGIONSINK_H_ */
|
#endif /* FAIRMQEXAMPLEREGIONSINK_H_ */
|
||||||
|
|
|
@ -14,7 +14,8 @@ namespace bpo = boost::program_options;
|
||||||
void addCustomOptions(bpo::options_description& options)
|
void addCustomOptions(bpo::options_description& options)
|
||||||
{
|
{
|
||||||
options.add_options()
|
options.add_options()
|
||||||
("msg-size", bpo::value<int>()->default_value(1000), "Message size in bytes");
|
("msg-size", bpo::value<int>()->default_value(1000), "Message size in bytes")
|
||||||
|
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
|
||||||
}
|
}
|
||||||
|
|
||||||
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
|
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
|
||||||
|
|
|
@ -11,8 +11,10 @@
|
||||||
|
|
||||||
namespace bpo = boost::program_options;
|
namespace bpo = boost::program_options;
|
||||||
|
|
||||||
void addCustomOptions(bpo::options_description& /*options*/)
|
void addCustomOptions(bpo::options_description& options)
|
||||||
{
|
{
|
||||||
|
options.add_options()
|
||||||
|
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
|
||||||
}
|
}
|
||||||
|
|
||||||
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
|
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
|
||||||
|
|
36
examples/advanced/Region/testMQExRegion.sh.in
Executable file
36
examples/advanced/Region/testMQExRegion.sh.in
Executable file
|
@ -0,0 +1,36 @@
|
||||||
|
#!/bin/bash
|
||||||
|
|
||||||
|
exRegionConfig="@CMAKE_BINARY_DIR@/bin/config/ex-region.json"
|
||||||
|
|
||||||
|
msgSize="1000000"
|
||||||
|
|
||||||
|
# setup a trap to kill everything if the test fails/timeouts
|
||||||
|
trap 'kill -TERM $SAMPLER_PID; kill -TERM $SINK_PID; wait $SAMPLER_PID; wait $SINK_PID; @CMAKE_BINARY_DIR@/bin/shmmonitor --cleanup --session region_test' TERM
|
||||||
|
|
||||||
|
@CMAKE_BINARY_DIR@/bin/shmmonitor --cleanup --session region_test
|
||||||
|
|
||||||
|
SAMPLER="ex-region-sampler"
|
||||||
|
SAMPLER+=" --id sampler1"
|
||||||
|
SAMPLER+=" --session region_test"
|
||||||
|
SAMPLER+=" --control static --log-color false"
|
||||||
|
SAMPLER+=" --max-iterations 1"
|
||||||
|
SAMPLER+=" --msg-size $msgSize"
|
||||||
|
SAMPLER+=" --transport shmem"
|
||||||
|
SAMPLER+=" --mq-config $exRegionConfig"
|
||||||
|
@CMAKE_BINARY_DIR@/bin/examples/advanced/Region/$SAMPLER &
|
||||||
|
SAMPLER_PID=$!
|
||||||
|
|
||||||
|
SINK="ex-region-sink"
|
||||||
|
SINK+=" --id sink1"
|
||||||
|
SINK+=" --session region_test"
|
||||||
|
SINK+=" --verbosity INFO"
|
||||||
|
SINK+=" --control static --log-color false"
|
||||||
|
SINK+=" --max-iterations 1"
|
||||||
|
SINK+=" --transport shmem"
|
||||||
|
SINK+=" --mq-config $exRegionConfig"
|
||||||
|
@CMAKE_BINARY_DIR@/bin/examples/advanced/Region/$SINK &
|
||||||
|
SINK_PID=$!
|
||||||
|
|
||||||
|
# wait for sampler and sink to finish
|
||||||
|
wait $SAMPLER_PID
|
||||||
|
wait $SINK_PID
|
Loading…
Reference in New Issue
Block a user