mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-15 09:31:45 +00:00
Tests for MQ examples
This commit is contained in:
committed by
Mohammad Al-Turany
parent
984eed1a89
commit
319bdc91a1
@@ -6,10 +6,9 @@
|
||||
# copied verbatim in the file "LICENSE" #
|
||||
################################################################################
|
||||
|
||||
configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/8-multipart/ex8-multipart.json
|
||||
${CMAKE_BINARY_DIR}/bin/config/ex8-multipart.json)
|
||||
configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/8-multipart/startMQEx8.sh.in
|
||||
${CMAKE_BINARY_DIR}/bin/examples/MQ/8-multipart/startMQEx8.sh)
|
||||
configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/8-multipart/ex8-multipart.json ${CMAKE_BINARY_DIR}/bin/config/ex8-multipart.json)
|
||||
configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/8-multipart/startMQEx8.sh.in ${CMAKE_BINARY_DIR}/bin/examples/MQ/8-multipart/startMQEx8.sh)
|
||||
configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/8-multipart/testMQEx8.sh.in ${CMAKE_BINARY_DIR}/bin/examples/MQ/8-multipart/testMQEx8.sh)
|
||||
|
||||
Set(INCLUDE_DIRECTORIES
|
||||
${CMAKE_SOURCE_DIR}/fairmq
|
||||
@@ -73,6 +72,11 @@ ForEach(_file RANGE 0 ${_length})
|
||||
GENERATE_EXECUTABLE()
|
||||
EndForEach(_file RANGE 0 ${_length})
|
||||
|
||||
add_test(NAME MQ.ex8-multipart COMMAND ${CMAKE_BINARY_DIR}/bin/examples/MQ/8-multipart/testMQEx8.sh)
|
||||
set_tests_properties(MQ.ex8-multipart PROPERTIES TIMEOUT "30")
|
||||
set_tests_properties(MQ.ex8-multipart PROPERTIES RUN_SERIAL true)
|
||||
set_tests_properties(MQ.ex8-multipart PROPERTIES PASS_REGULAR_EXPRESSION "Received message with 2 parts")
|
||||
|
||||
Install(
|
||||
FILES ex8-multipart.json
|
||||
DESTINATION share/fairbase/examples/MQ/8-multipart/config/
|
||||
|
@@ -18,22 +18,31 @@
|
||||
#include "FairMQExample8Sampler.h"
|
||||
#include "FairMQEx8Header.h"
|
||||
#include "FairMQLogger.h"
|
||||
#include "FairMQProgOptions.h"
|
||||
|
||||
using namespace std;
|
||||
|
||||
FairMQExample8Sampler::FairMQExample8Sampler()
|
||||
: fCounter(0)
|
||||
: fMaxIterations(5)
|
||||
, fNumIterations(0)
|
||||
{
|
||||
}
|
||||
|
||||
void FairMQExample8Sampler::InitTask()
|
||||
{
|
||||
fMaxIterations = fConfig->GetValue<uint64_t>("max-iterations");
|
||||
}
|
||||
|
||||
bool FairMQExample8Sampler::ConditionalRun()
|
||||
{
|
||||
// Wait a second to keep the output readable.
|
||||
this_thread::sleep_for(chrono::seconds(1));
|
||||
|
||||
Ex8Header header;
|
||||
// Set stopFlag to 1 for the first 4 messages, and to 0 for the 5th.
|
||||
fCounter < 5 ? header.stopFlag = 0 : header.stopFlag = 1;
|
||||
header.stopFlag = 0;
|
||||
|
||||
// Set stopFlag to 1 for last message.
|
||||
if (fMaxIterations > 0 && fNumIterations == fMaxIterations - 1)
|
||||
{
|
||||
header.stopFlag = 1;
|
||||
}
|
||||
LOG(INFO) << "Sending header with stopFlag: " << header.stopFlag;
|
||||
|
||||
FairMQParts parts;
|
||||
@@ -48,11 +57,15 @@ bool FairMQExample8Sampler::ConditionalRun()
|
||||
Send(parts, "data-out");
|
||||
|
||||
// Go out of the sending loop if the stopFlag was sent.
|
||||
if (fCounter++ == 5)
|
||||
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations)
|
||||
{
|
||||
LOG(INFO) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
|
||||
return false;
|
||||
}
|
||||
|
||||
// Wait a second to keep the output readable.
|
||||
this_thread::sleep_for(chrono::seconds(1));
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@@ -24,10 +24,12 @@ class FairMQExample8Sampler : public FairMQDevice
|
||||
virtual ~FairMQExample8Sampler();
|
||||
|
||||
protected:
|
||||
virtual void InitTask();
|
||||
virtual bool ConditionalRun();
|
||||
|
||||
private:
|
||||
int fCounter;
|
||||
uint64_t fMaxIterations;
|
||||
uint64_t fNumIterations;
|
||||
};
|
||||
|
||||
#endif /* FAIRMQEXAMPLE8SAMPLER_H_ */
|
||||
|
@@ -27,11 +27,15 @@ bool FairMQExample8Sink::HandleData(FairMQParts& parts, int /*index*/)
|
||||
{
|
||||
Ex8Header header;
|
||||
header.stopFlag = (static_cast<Ex8Header*>(parts.At(0)->GetData()))->stopFlag;
|
||||
|
||||
LOG(INFO) << "Received message with " << parts.Size() << " parts";
|
||||
|
||||
LOG(INFO) << "Received header with stopFlag: " << header.stopFlag;
|
||||
LOG(INFO) << "Received body of size: " << parts.At(1)->GetSize();
|
||||
|
||||
if (header.stopFlag == 1)
|
||||
{
|
||||
LOG(INFO) << "stopFlag is 0, going IDLE";
|
||||
LOG(INFO) << "stopFlag is 1, going IDLE";
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@@ -11,8 +11,10 @@
|
||||
|
||||
namespace bpo = boost::program_options;
|
||||
|
||||
void addCustomOptions(bpo::options_description& /*options*/)
|
||||
void addCustomOptions(bpo::options_description& options)
|
||||
{
|
||||
options.add_options()
|
||||
("max-iterations", bpo::value<uint64_t>()->default_value(5), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
|
||||
}
|
||||
|
||||
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
|
||||
|
23
examples/MQ/8-multipart/testMQEx8.sh.in
Executable file
23
examples/MQ/8-multipart/testMQEx8.sh.in
Executable file
@@ -0,0 +1,23 @@
|
||||
#!/bin/bash
|
||||
ex8config="@CMAKE_BINARY_DIR@/bin/config/ex8-multipart.json"
|
||||
|
||||
# setup a trap to kill everything if the test fails/timeouts
|
||||
trap 'kill -TERM $SAMPLER_PID; kill -TERM $SINK_PID; wait $SAMPLER_PID; wait $SINK_PID;' TERM
|
||||
|
||||
SAMPLER="ex8-sampler"
|
||||
SAMPLER+=" --id sampler1"
|
||||
SAMPLER+=" --max-iterations 1"
|
||||
SAMPLER+=" --control static --log-color false"
|
||||
SAMPLER+=" --mq-config $ex8config"
|
||||
@CMAKE_BINARY_DIR@/bin/examples/MQ/8-multipart/$SAMPLER &
|
||||
SAMPLER_PID=$!
|
||||
|
||||
SINK="ex8-sink"
|
||||
SINK+=" --id sink1"
|
||||
SINK+=" --control static --log-color false"
|
||||
SINK+=" --mq-config $ex8config"
|
||||
@CMAKE_BINARY_DIR@/bin/examples/MQ/8-multipart/$SINK &
|
||||
SINK_PID=$!
|
||||
|
||||
wait $SAMPLER_PID
|
||||
wait $SINK_PID
|
Reference in New Issue
Block a user