diff --git a/examples/multipart/CMakeLists.txt b/examples/multipart/CMakeLists.txt index fe96af0a..b74b6e66 100644 --- a/examples/multipart/CMakeLists.txt +++ b/examples/multipart/CMakeLists.txt @@ -32,19 +32,19 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-multipart.sh.in ${CMA configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test-ex-multipart.sh.in ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multipart.sh) add_test(NAME Example.Multipart.zeromq COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multipart.sh zeromq) -set_tests_properties(Example.Multipart.zeromq PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message with 5 parts") +set_tests_properties(Example.Multipart.zeromq PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message with 7 parts") if(BUILD_NANOMSG_TRANSPORT) add_test(NAME Example.Multipart.nanomsg COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multipart.sh nanomsg) - set_tests_properties(Example.Multipart.nanomsg PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message with 5 parts") + set_tests_properties(Example.Multipart.nanomsg PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message with 7 parts") endif() add_test(NAME Example.Multipart.shmem COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multipart.sh shmem) -set_tests_properties(Example.Multipart.shmem PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message with 5 parts") +set_tests_properties(Example.Multipart.shmem PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message with 7 parts") if(BUILD_OFI_TRANSPORT) add_test(NAME Example.Multipart.ofi COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multipart.sh ofi) - set_tests_properties(Example.Multipart.ofi PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message with 5 parts") + set_tests_properties(Example.Multipart.ofi PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message with 7 parts") endif() # install diff --git a/examples/multipart/Sampler.cxx b/examples/multipart/Sampler.cxx index d354dfc8..2939144d 100644 --- a/examples/multipart/Sampler.cxx +++ b/examples/multipart/Sampler.cxx @@ -63,6 +63,10 @@ bool Sampler::ConditionalRun() assert(parts.Size() == 6); + parts.AddPart(NewMessage(100)); + + assert(parts.Size() == 7); + LOG(info) << "Sending body of size: " << parts.At(1)->GetSize(); Send(parts, "data"); diff --git a/examples/multipart/Sink.cxx b/examples/multipart/Sink.cxx index 47a5053a..0cd018cf 100644 --- a/examples/multipart/Sink.cxx +++ b/examples/multipart/Sink.cxx @@ -22,13 +22,24 @@ namespace example_multipart bool Sink::HandleData(FairMQParts& parts, int /*index*/) { + LOG(info) << "Received message with " << parts.Size() << " parts"; + Header header; header.stopFlag = (static_cast(parts.At(0)->GetData()))->stopFlag; - LOG(info) << "Received message with " << parts.Size() << " parts"; - - LOG(info) << "Received header with stopFlag: " << header.stopFlag; - LOG(info) << "Received body of size: " << parts.At(1)->GetSize(); + LOG(info) << "Received part 1 (header) with stopFlag: " << header.stopFlag; + LOG(info) << "Received part 2 of size: " << parts.At(1)->GetSize(); + assert(parts.At(1)->GetSize() == 1000); + LOG(info) << "Received part 3 of size: " << parts.At(2)->GetSize(); + assert(parts.At(2)->GetSize() == 500); + LOG(info) << "Received part 4 of size: " << parts.At(3)->GetSize(); + assert(parts.At(3)->GetSize() == 600); + LOG(info) << "Received part 5 of size: " << parts.At(4)->GetSize(); + assert(parts.At(4)->GetSize() == 700); + LOG(info) << "Received part 6 of size: " << parts.At(5)->GetSize(); + assert(parts.At(5)->GetSize() == 0); + LOG(info) << "Received part 7 of size: " << parts.At(6)->GetSize(); + assert(parts.At(6)->GetSize() == 100); if (header.stopFlag == 1) { LOG(info) << "stopFlag is 1, going IDLE"; diff --git a/examples/multipart/fairmq-start-ex-multipart.sh.in b/examples/multipart/fairmq-start-ex-multipart.sh.in index 59611e58..a6c21799 100755 --- a/examples/multipart/fairmq-start-ex-multipart.sh.in +++ b/examples/multipart/fairmq-start-ex-multipart.sh.in @@ -2,12 +2,24 @@ export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@ +transport="zeromq" + +if [[ $1 =~ ^[a-z]+$ ]]; then + transport=$1 +fi + +SESSION="$(@CMAKE_BINARY_DIR@/fairmq/fairmq-uuid-gen -h)" + SAMPLER="fairmq-ex-multipart-sampler" SAMPLER+=" --id sampler1" +SAMPLER+=" --transport $transport" +SAMPLER+=" --session $SESSION" SAMPLER+=" --channel-config name=data,type=push,method=connect,rateLogging=0,address=tcp://127.0.0.1:5555" xterm -geometry 80x23+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER & SINK="fairmq-ex-multipart-sink" SINK+=" --id sink1" +SINK+=" --transport $transport" +SINK+=" --session $SESSION" SINK+=" --channel-config name=data,type=pull,method=bind,rateLogging=0,address=tcp://127.0.0.1:5555" xterm -geometry 80x23+500+0 -hold -e @EX_BIN_DIR@/$SINK &