mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 00:31:14 +00:00
Avoid fixed ports in the test suites
This commit is contained in:
parent
bfd08bb33f
commit
f4d39d224b
|
@ -8,20 +8,23 @@ if [[ $1 =~ ^[a-z]+$ ]]; then
|
||||||
transport=$1
|
transport=$1
|
||||||
fi
|
fi
|
||||||
|
|
||||||
SESSION="$(@CMAKE_BINARY_DIR@/fairmq/fairmq-uuid-gen -h)"
|
session="$(@CMAKE_BINARY_DIR@/fairmq/fairmq-uuid-gen -h)"
|
||||||
|
chan="data"
|
||||||
|
chanAddr="/tmp/fmq_$session""_""$chan""_""$transport"
|
||||||
|
|
||||||
# setup a trap to kill everything if the test fails/timeouts
|
# setup a trap to kill everything if the test fails/timeouts
|
||||||
trap 'kill -TERM $SAMPLER_PID; kill -TERM $SINK_PID; wait $SAMPLER_PID; wait $SINK_PID;' TERM
|
trap 'kill -TERM $SAMPLER_PID; kill -TERM $SINK_PID; wait $SAMPLER_PID; wait $SINK_PID; rm $chanAddr' TERM
|
||||||
|
|
||||||
SAMPLER="fairmq-ex-1-1-sampler"
|
SAMPLER="fairmq-ex-1-1-sampler"
|
||||||
SAMPLER+=" --id sampler1"
|
SAMPLER+=" --id sampler1"
|
||||||
SAMPLER+=" --rate 1"
|
SAMPLER+=" --rate 1"
|
||||||
SAMPLER+=" --transport $transport"
|
SAMPLER+=" --transport $transport"
|
||||||
SAMPLER+=" --verbosity veryhigh"
|
SAMPLER+=" --verbosity veryhigh"
|
||||||
SAMPLER+=" --session $SESSION"
|
SAMPLER+=" --session $session"
|
||||||
|
SAMPLER+=" --shm-segment-size 100000000"
|
||||||
SAMPLER+=" --control static --color false"
|
SAMPLER+=" --control static --color false"
|
||||||
SAMPLER+=" --max-iterations 1"
|
SAMPLER+=" --max-iterations 1"
|
||||||
SAMPLER+=" --channel-config name=data,type=push,method=bind,address=tcp://*:5555,rateLogging=0"
|
SAMPLER+=" --channel-config name=$chan,type=push,method=bind,address=ipc://$chanAddr,rateLogging=0"
|
||||||
@CMAKE_CURRENT_BINARY_DIR@/$SAMPLER &
|
@CMAKE_CURRENT_BINARY_DIR@/$SAMPLER &
|
||||||
SAMPLER_PID=$!
|
SAMPLER_PID=$!
|
||||||
|
|
||||||
|
@ -29,13 +32,16 @@ SINK="fairmq-ex-1-1-sink"
|
||||||
SINK+=" --id sink1"
|
SINK+=" --id sink1"
|
||||||
SINK+=" --transport $transport"
|
SINK+=" --transport $transport"
|
||||||
SINK+=" --verbosity veryhigh"
|
SINK+=" --verbosity veryhigh"
|
||||||
SINK+=" --session $SESSION"
|
SINK+=" --session $session"
|
||||||
|
SINK+=" --shm-segment-size 100000000"
|
||||||
SINK+=" --control static --color false"
|
SINK+=" --control static --color false"
|
||||||
SINK+=" --max-iterations 1"
|
SINK+=" --max-iterations 1"
|
||||||
SINK+=" --channel-config name=data,type=pull,method=connect,address=tcp://localhost:5555,rateLogging=0"
|
SINK+=" --channel-config name=$chan,type=pull,method=connect,address=ipc://$chanAddr,rateLogging=0"
|
||||||
@CMAKE_CURRENT_BINARY_DIR@/$SINK &
|
@CMAKE_CURRENT_BINARY_DIR@/$SINK &
|
||||||
SINK_PID=$!
|
SINK_PID=$!
|
||||||
|
|
||||||
# wait for sampler and sink to finish
|
# wait for sampler and sink to finish
|
||||||
wait $SAMPLER_PID
|
wait $SAMPLER_PID
|
||||||
wait $SINK_PID
|
wait $SINK_PID
|
||||||
|
|
||||||
|
rm $chanAddr
|
||||||
|
|
|
@ -8,20 +8,25 @@ if [[ $1 =~ ^[a-z]+$ ]]; then
|
||||||
transport=$1
|
transport=$1
|
||||||
fi
|
fi
|
||||||
|
|
||||||
ex2config="@CMAKE_CURRENT_BINARY_DIR@/ex-1-n-1.json"
|
session="$(@CMAKE_BINARY_DIR@/fairmq/fairmq-uuid-gen -h)"
|
||||||
SESSION="$(@CMAKE_BINARY_DIR@/fairmq/fairmq-uuid-gen -h)"
|
chan1="data1"
|
||||||
|
chan2="data2"
|
||||||
|
chan1Addr="/tmp/fmq_$session""_""$chan1""_""$transport"
|
||||||
|
chan2Addr="/tmp/fmq_$session""_""$chan2""_""$transport"
|
||||||
|
|
||||||
# setup a trap to kill everything if the test fails/timeouts
|
# setup a trap to kill everything if the test fails/timeouts
|
||||||
trap 'kill -TERM $SAMPLER_PID; kill -TERM $SINK_PID; kill -TERM $PROCESSOR1_PID; kill -TERM $PROCESSOR2_PID; wait $SAMPLER_PID; wait $SINK_PID; wait $PROCESSOR1_PID; wait $PROCESSOR2_PID;' TERM
|
trap 'kill -TERM $SAMPLER_PID; kill -TERM $SINK_PID; kill -TERM $PROCESSOR1_PID; kill -TERM $PROCESSOR2_PID; wait $SAMPLER_PID; wait $SINK_PID; wait $PROCESSOR1_PID; wait $PROCESSOR2_PID; rm $chan1Addr; rm $chan2Addr' TERM
|
||||||
|
|
||||||
SAMPLER="fairmq-ex-1-n-1-sampler"
|
SAMPLER="fairmq-ex-1-n-1-sampler"
|
||||||
SAMPLER+=" --id sampler1"
|
SAMPLER+=" --id sampler1"
|
||||||
SAMPLER+=" --transport $transport"
|
SAMPLER+=" --transport $transport"
|
||||||
SAMPLER+=" --verbosity veryhigh"
|
SAMPLER+=" --verbosity veryhigh"
|
||||||
SAMPLER+=" --session $SESSION"
|
SAMPLER+=" --session $session"
|
||||||
|
SAMPLER+=" --severity debug"
|
||||||
|
SAMPLER+=" --shm-segment-size 100000000"
|
||||||
SAMPLER+=" --control static --color false"
|
SAMPLER+=" --control static --color false"
|
||||||
SAMPLER+=" --max-iterations 2"
|
SAMPLER+=" --max-iterations 2"
|
||||||
SAMPLER+=" --mq-config $ex2config"
|
SAMPLER+=" --channel-config name=$chan1,type=push,method=bind,address=ipc://$chan1Addr,rateLogging=0"
|
||||||
@CMAKE_CURRENT_BINARY_DIR@/$SAMPLER &
|
@CMAKE_CURRENT_BINARY_DIR@/$SAMPLER &
|
||||||
SAMPLER_PID=$!
|
SAMPLER_PID=$!
|
||||||
|
|
||||||
|
@ -29,10 +34,12 @@ PROCESSOR1="fairmq-ex-1-n-1-processor"
|
||||||
PROCESSOR1+=" --id processor1"
|
PROCESSOR1+=" --id processor1"
|
||||||
PROCESSOR1+=" --transport $transport"
|
PROCESSOR1+=" --transport $transport"
|
||||||
PROCESSOR1+=" --verbosity veryhigh"
|
PROCESSOR1+=" --verbosity veryhigh"
|
||||||
PROCESSOR1+=" --session $SESSION"
|
PROCESSOR1+=" --session $session"
|
||||||
|
PROCESSOR1+=" --severity debug"
|
||||||
|
PROCESSOR1+=" --shm-segment-size 100000000"
|
||||||
PROCESSOR1+=" --control static --color false"
|
PROCESSOR1+=" --control static --color false"
|
||||||
PROCESSOR1+=" --mq-config $ex2config"
|
PROCESSOR1+=" --channel-config name=$chan1,type=pull,method=connect,address=ipc://$chan1Addr,rateLogging=0"
|
||||||
PROCESSOR1+=" --config-key processor"
|
PROCESSOR1+=" name=$chan2,type=push,method=connect,address=ipc://$chan2Addr,rateLogging=0"
|
||||||
@CMAKE_CURRENT_BINARY_DIR@/$PROCESSOR1 &
|
@CMAKE_CURRENT_BINARY_DIR@/$PROCESSOR1 &
|
||||||
PROCESSOR1_PID=$!
|
PROCESSOR1_PID=$!
|
||||||
|
|
||||||
|
@ -40,10 +47,12 @@ PROCESSOR2="fairmq-ex-1-n-1-processor"
|
||||||
PROCESSOR2+=" --id processor2"
|
PROCESSOR2+=" --id processor2"
|
||||||
PROCESSOR2+=" --transport $transport"
|
PROCESSOR2+=" --transport $transport"
|
||||||
PROCESSOR2+=" --verbosity veryhigh"
|
PROCESSOR2+=" --verbosity veryhigh"
|
||||||
PROCESSOR2+=" --session $SESSION"
|
PROCESSOR2+=" --session $session"
|
||||||
|
PROCESSOR2+=" --severity debug"
|
||||||
|
PROCESSOR2+=" --shm-segment-size 100000000"
|
||||||
PROCESSOR2+=" --control static --color false"
|
PROCESSOR2+=" --control static --color false"
|
||||||
PROCESSOR2+=" --mq-config $ex2config"
|
PROCESSOR2+=" --channel-config name=$chan1,type=pull,method=connect,address=ipc://$chan1Addr,rateLogging=0"
|
||||||
PROCESSOR2+=" --config-key processor"
|
PROCESSOR2+=" name=$chan2,type=push,method=connect,address=ipc://$chan2Addr,rateLogging=0"
|
||||||
@CMAKE_CURRENT_BINARY_DIR@/$PROCESSOR2 &
|
@CMAKE_CURRENT_BINARY_DIR@/$PROCESSOR2 &
|
||||||
PROCESSOR2_PID=$!
|
PROCESSOR2_PID=$!
|
||||||
|
|
||||||
|
@ -51,10 +60,12 @@ SINK="fairmq-ex-1-n-1-sink"
|
||||||
SINK+=" --id sink1"
|
SINK+=" --id sink1"
|
||||||
SINK+=" --transport $transport"
|
SINK+=" --transport $transport"
|
||||||
SINK+=" --verbosity veryhigh"
|
SINK+=" --verbosity veryhigh"
|
||||||
SINK+=" --session $SESSION"
|
SINK+=" --session $session"
|
||||||
|
SINK+=" --severity debug"
|
||||||
|
SINK+=" --shm-segment-size 100000000"
|
||||||
SINK+=" --control static --color false"
|
SINK+=" --control static --color false"
|
||||||
SINK+=" --max-iterations 2"
|
SINK+=" --max-iterations 2"
|
||||||
SINK+=" --mq-config $ex2config"
|
SINK+=" --channel-config name=$chan2,type=pull,method=bind,address=ipc://$chan2Addr,rateLogging=0"
|
||||||
@CMAKE_CURRENT_BINARY_DIR@/$SINK &
|
@CMAKE_CURRENT_BINARY_DIR@/$SINK &
|
||||||
SINK_PID=$!
|
SINK_PID=$!
|
||||||
|
|
||||||
|
@ -69,3 +80,5 @@ kill -SIGINT $PROCESSOR2_PID
|
||||||
# wait for everything to finish
|
# wait for everything to finish
|
||||||
wait $PROCESSOR1_PID
|
wait $PROCESSOR1_PID
|
||||||
wait $PROCESSOR2_PID
|
wait $PROCESSOR2_PID
|
||||||
|
|
||||||
|
rm $chan1Addr; rm $chan2Addr
|
||||||
|
|
|
@ -2,8 +2,6 @@
|
||||||
|
|
||||||
export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@
|
export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@
|
||||||
|
|
||||||
SESSION="$(@CMAKE_BINARY_DIR@/fairmq/fairmq-uuid-gen -h)"
|
|
||||||
|
|
||||||
transport="zeromq"
|
transport="zeromq"
|
||||||
multipart="false"
|
multipart="false"
|
||||||
numParts="1"
|
numParts="1"
|
||||||
|
@ -20,8 +18,22 @@ if [[ $3 =~ ^[0-9]+$ ]]; then
|
||||||
numParts=$3
|
numParts=$3
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
session="$(@CMAKE_BINARY_DIR@/fairmq/fairmq-uuid-gen -h)"
|
||||||
|
chan1="data1"
|
||||||
|
chan2="data2"
|
||||||
|
chan3="data3"
|
||||||
|
chan4="data4"
|
||||||
|
chan5="data5"
|
||||||
|
chan1Addr="/tmp/fmq_$session""_""$chan1""_""$transport"
|
||||||
|
chan2Addr1="/tmp/fmq_$session""_""$chan2""_1""_""$transport"
|
||||||
|
chan2Addr2="/tmp/fmq_$session""_""$chan2""_2""_""$transport"
|
||||||
|
chan3Addr1="/tmp/fmq_$session""_""$chan3""_1""_""$transport"
|
||||||
|
chan3Addr2="/tmp/fmq_$session""_""$chan3""_2""_""$transport"
|
||||||
|
chan4Addr="/tmp/fmq_$session""_""$chan4""_""$transport"
|
||||||
|
chan5Addr="/tmp/fmq_$session""_""$chan5""_""$transport"
|
||||||
|
|
||||||
# setup a trap to kill everything if the test fails/timeouts
|
# setup a trap to kill everything if the test fails/timeouts
|
||||||
trap 'kill -TERM $SAMPLER_PID; kill -TERM $SPLITTER_PID; kill -TERM $PROXY1_PID; kill -TERM $PROXY2_PID; kill -TERM $MERGER_PID; kill -TERM $MULTIPLIER_PID; kill -TERM $SINK_PID;' TERM
|
trap 'kill -TERM $SAMPLER_PID; kill -TERM $SPLITTER_PID; kill -TERM $PROXY1_PID; kill -TERM $PROXY2_PID; kill -TERM $MERGER_PID; kill -TERM $MULTIPLIER_PID; kill -TERM $SINK_PID; rm $chan1Addr; rm $chan2Addr1; rm $chan2Addr2; rm $chan3Addr1; rm $chan3Addr2; rm $chan4Addr; rm $chan5Addr' TERM
|
||||||
|
|
||||||
SAMPLER="fairmq-bsampler"
|
SAMPLER="fairmq-bsampler"
|
||||||
SAMPLER+=" --id bsampler1"
|
SAMPLER+=" --id bsampler1"
|
||||||
|
@ -30,14 +42,15 @@ SAMPLER+=" --transport $transport"
|
||||||
SAMPLER+=" --color false"
|
SAMPLER+=" --color false"
|
||||||
SAMPLER+=" --control static"
|
SAMPLER+=" --control static"
|
||||||
SAMPLER+=" --verbosity veryhigh"
|
SAMPLER+=" --verbosity veryhigh"
|
||||||
|
SAMPLER+=" --shm-segment-size 100000000"
|
||||||
SAMPLER+=" --severity debug"
|
SAMPLER+=" --severity debug"
|
||||||
SAMPLER+=" --msg-size 100000"
|
SAMPLER+=" --msg-size 100000"
|
||||||
SAMPLER+=" --multipart $multipart"
|
SAMPLER+=" --multipart $multipart"
|
||||||
SAMPLER+=" --num-parts $numParts"
|
SAMPLER+=" --num-parts $numParts"
|
||||||
SAMPLER+=" --msg-rate 1"
|
SAMPLER+=" --msg-rate 1"
|
||||||
SAMPLER+=" --max-iterations 0"
|
SAMPLER+=" --max-iterations 0"
|
||||||
SAMPLER+=" --out-channel data1"
|
SAMPLER+=" --out-channel $chan1"
|
||||||
SAMPLER+=" --channel-config name=data1,type=push,method=bind,sndBufSize=50,rcvBufSize=50,address=tcp://localhost:5555"
|
SAMPLER+=" --channel-config name=$chan1,type=push,method=bind,sndBufSize=50,rcvBufSize=50,address=ipc://$chan1Addr"
|
||||||
@FAIRMQ_BIN_DIR@/$SAMPLER &
|
@FAIRMQ_BIN_DIR@/$SAMPLER &
|
||||||
SAMPLER_PID=$!
|
SAMPLER_PID=$!
|
||||||
|
|
||||||
|
@ -48,11 +61,12 @@ SPLITTER+=" --transport $transport"
|
||||||
SPLITTER+=" --color false"
|
SPLITTER+=" --color false"
|
||||||
SPLITTER+=" --control static"
|
SPLITTER+=" --control static"
|
||||||
SPLITTER+=" --verbosity veryhigh"
|
SPLITTER+=" --verbosity veryhigh"
|
||||||
|
SPLITTER+=" --shm-segment-size 100000000"
|
||||||
SPLITTER+=" --multipart $multipart"
|
SPLITTER+=" --multipart $multipart"
|
||||||
SPLITTER+=" --in-channel data1"
|
SPLITTER+=" --in-channel $chan1"
|
||||||
SPLITTER+=" --out-channel data2"
|
SPLITTER+=" --out-channel $chan2"
|
||||||
SPLITTER+=" --channel-config name=data1,type=pull,method=connect,sndBufSize=50,rcvBufSize=50,address=tcp://localhost:5555"
|
SPLITTER+=" --channel-config name=$chan1,type=pull,method=connect,sndBufSize=50,rcvBufSize=50,address=ipc://$chan1Addr"
|
||||||
SPLITTER+=" name=data2,type=push,method=bind,sndBufSize=50,rcvBufSize=50,address=tcp://localhost:5556,address=tcp://localhost:5557"
|
SPLITTER+=" name=$chan2,type=push,method=bind,sndBufSize=50,rcvBufSize=50,address=ipc://$chan2Addr1,address=ipc://$chan2Addr2"
|
||||||
@FAIRMQ_BIN_DIR@/$SPLITTER &
|
@FAIRMQ_BIN_DIR@/$SPLITTER &
|
||||||
SPLITTER_PID=$!
|
SPLITTER_PID=$!
|
||||||
|
|
||||||
|
@ -63,11 +77,12 @@ PROXY1+=" --transport $transport"
|
||||||
PROXY1+=" --color false"
|
PROXY1+=" --color false"
|
||||||
PROXY1+=" --control static"
|
PROXY1+=" --control static"
|
||||||
PROXY1+=" --verbosity veryhigh"
|
PROXY1+=" --verbosity veryhigh"
|
||||||
|
PROXY1+=" --shm-segment-size 100000000"
|
||||||
PROXY1+=" --multipart $multipart"
|
PROXY1+=" --multipart $multipart"
|
||||||
PROXY1+=" --in-channel data2"
|
PROXY1+=" --in-channel $chan2"
|
||||||
PROXY1+=" --out-channel data3"
|
PROXY1+=" --out-channel $chan3"
|
||||||
PROXY1+=" --channel-config name=data2,type=pull,method=connect,sndBufSize=50,rcvBufSize=50,address=tcp://localhost:5556"
|
PROXY1+=" --channel-config name=$chan2,type=pull,method=connect,sndBufSize=50,rcvBufSize=50,address=ipc://$chan2Addr1"
|
||||||
PROXY1+=" name=data3,type=push,method=connect,sndBufSize=50,rcvBufSize=50,address=tcp://localhost:5558"
|
PROXY1+=" name=$chan3,type=push,method=connect,sndBufSize=50,rcvBufSize=50,address=ipc://$chan3Addr1"
|
||||||
@FAIRMQ_BIN_DIR@/$PROXY1 &
|
@FAIRMQ_BIN_DIR@/$PROXY1 &
|
||||||
PROXY1_PID=$!
|
PROXY1_PID=$!
|
||||||
|
|
||||||
|
@ -78,11 +93,12 @@ PROXY2+=" --transport $transport"
|
||||||
PROXY2+=" --color false"
|
PROXY2+=" --color false"
|
||||||
PROXY2+=" --control static"
|
PROXY2+=" --control static"
|
||||||
PROXY2+=" --verbosity veryhigh"
|
PROXY2+=" --verbosity veryhigh"
|
||||||
|
PROXY2+=" --shm-segment-size 100000000"
|
||||||
PROXY2+=" --multipart $multipart"
|
PROXY2+=" --multipart $multipart"
|
||||||
PROXY2+=" --in-channel data2"
|
PROXY2+=" --in-channel $chan2"
|
||||||
PROXY2+=" --out-channel data3"
|
PROXY2+=" --out-channel $chan3"
|
||||||
PROXY2+=" --channel-config name=data2,type=pull,method=connect,sndBufSize=50,rcvBufSize=50,address=tcp://localhost:5557"
|
PROXY2+=" --channel-config name=$chan2,type=pull,method=connect,sndBufSize=50,rcvBufSize=50,address=ipc://$chan2Addr2"
|
||||||
PROXY2+=" name=data3,type=push,method=connect,sndBufSize=50,rcvBufSize=50,address=tcp://localhost:5559"
|
PROXY2+=" name=$chan3,type=push,method=connect,sndBufSize=50,rcvBufSize=50,address=ipc://$chan3Addr2"
|
||||||
@FAIRMQ_BIN_DIR@/$PROXY2 &
|
@FAIRMQ_BIN_DIR@/$PROXY2 &
|
||||||
PROXY2_PID=$!
|
PROXY2_PID=$!
|
||||||
|
|
||||||
|
@ -93,11 +109,12 @@ MERGER+=" --transport $transport"
|
||||||
MERGER+=" --color false"
|
MERGER+=" --color false"
|
||||||
MERGER+=" --control static"
|
MERGER+=" --control static"
|
||||||
MERGER+=" --verbosity veryhigh"
|
MERGER+=" --verbosity veryhigh"
|
||||||
|
MERGER+=" --shm-segment-size 100000000"
|
||||||
MERGER+=" --multipart $multipart"
|
MERGER+=" --multipart $multipart"
|
||||||
MERGER+=" --in-channel data3"
|
MERGER+=" --in-channel $chan3"
|
||||||
MERGER+=" --out-channel data4"
|
MERGER+=" --out-channel $chan4"
|
||||||
MERGER+=" --channel-config name=data3,type=pull,method=bind,sndBufSize=50,rcvBufSize=50,address=tcp://localhost:5558,address=tcp://localhost:5559"
|
MERGER+=" --channel-config name=$chan3,type=pull,method=bind,sndBufSize=50,rcvBufSize=50,address=ipc://$chan3Addr1,address=ipc://$chan3Addr2"
|
||||||
MERGER+=" name=data4,type=push,method=connect,sndBufSize=50,rcvBufSize=50,address=tcp://localhost:5560"
|
MERGER+=" name=$chan4,type=push,method=connect,sndBufSize=50,rcvBufSize=50,address=ipc://$chan4Addr"
|
||||||
@FAIRMQ_BIN_DIR@/$MERGER &
|
@FAIRMQ_BIN_DIR@/$MERGER &
|
||||||
MERGER_PID=$!
|
MERGER_PID=$!
|
||||||
|
|
||||||
|
@ -108,11 +125,12 @@ MULTIPLIER+=" --transport $transport"
|
||||||
MULTIPLIER+=" --color false"
|
MULTIPLIER+=" --color false"
|
||||||
MULTIPLIER+=" --control static"
|
MULTIPLIER+=" --control static"
|
||||||
MULTIPLIER+=" --verbosity veryhigh"
|
MULTIPLIER+=" --verbosity veryhigh"
|
||||||
|
MULTIPLIER+=" --shm-segment-size 100000000"
|
||||||
MULTIPLIER+=" --multipart $multipart"
|
MULTIPLIER+=" --multipart $multipart"
|
||||||
MULTIPLIER+=" --in-channel data4"
|
MULTIPLIER+=" --in-channel $chan4"
|
||||||
MULTIPLIER+=" --out-channel data5"
|
MULTIPLIER+=" --out-channel $chan5"
|
||||||
MULTIPLIER+=" --channel-config name=data4,type=pull,method=bind,sndBufSize=50,rcvBufSize=50,address=tcp://localhost:5560"
|
MULTIPLIER+=" --channel-config name=$chan4,type=pull,method=bind,sndBufSize=50,rcvBufSize=50,address=ipc://$chan4Addr"
|
||||||
MULTIPLIER+=" name=data5,type=push,method=connect,sndBufSize=50,rcvBufSize=50,address=tcp://localhost:5561,address=tcp://localhost:5561"
|
MULTIPLIER+=" name=$chan5,type=push,method=connect,sndBufSize=50,rcvBufSize=50,address=ipc://$chan5Addr,address=ipc://$chan5Addr"
|
||||||
@FAIRMQ_BIN_DIR@/$MULTIPLIER &
|
@FAIRMQ_BIN_DIR@/$MULTIPLIER &
|
||||||
MULTIPLIER_PID=$!
|
MULTIPLIER_PID=$!
|
||||||
|
|
||||||
|
@ -126,8 +144,8 @@ SINK+=" --verbosity veryhigh"
|
||||||
SINK+=" --severity debug"
|
SINK+=" --severity debug"
|
||||||
SINK+=" --multipart $multipart"
|
SINK+=" --multipart $multipart"
|
||||||
SINK+=" --max-iterations 2"
|
SINK+=" --max-iterations 2"
|
||||||
SINK+=" --in-channel data5"
|
SINK+=" --in-channel $chan5"
|
||||||
SINK+=" --channel-config name=data5,type=pull,method=bind,sndBufSize=50,rcvBufSize=50,address=tcp://localhost:5561"
|
SINK+=" --channel-config name=$chan5,type=pull,method=bind,sndBufSize=50,rcvBufSize=50,address=ipc://$chan5Addr"
|
||||||
@FAIRMQ_BIN_DIR@/$SINK &
|
@FAIRMQ_BIN_DIR@/$SINK &
|
||||||
SINK_PID=$!
|
SINK_PID=$!
|
||||||
|
|
||||||
|
@ -146,3 +164,5 @@ wait $PROXY1_PID
|
||||||
wait $PROXY2_PID
|
wait $PROXY2_PID
|
||||||
wait $MERGER_PID
|
wait $MERGER_PID
|
||||||
wait $MULTIPLIER_PID
|
wait $MULTIPLIER_PID
|
||||||
|
|
||||||
|
rm $chan1Addr; rm $chan2Addr1; rm $chan2Addr2; rm $chan3Addr1; rm $chan3Addr2; rm $chan4Addr; rm $chan5Addr
|
||||||
|
|
|
@ -8,19 +8,24 @@ if [[ $1 =~ ^[a-z]+$ ]]; then
|
||||||
transport=$1
|
transport=$1
|
||||||
fi
|
fi
|
||||||
|
|
||||||
SESSION="$(@CMAKE_BINARY_DIR@/fairmq/fairmq-uuid-gen -h)"
|
session="$(@CMAKE_BINARY_DIR@/fairmq/fairmq-uuid-gen -h)"
|
||||||
|
chan="data"
|
||||||
|
chanAddr1="/tmp/fmq_$session""_""$chan""_1""_""$transport"
|
||||||
|
chanAddr2="/tmp/fmq_$session""_""$chan""_2""_""$transport"
|
||||||
|
|
||||||
# setup a trap to kill everything if the test fails/timeouts
|
# setup a trap to kill everything if the test fails/timeouts
|
||||||
trap 'kill -TERM $SAMPLER_PID; kill -TERM $SINK1_PID; kill -TERM $SINK2_PID; wait $SAMPLER_PID; wait $SINK1_PID; wait $SINK2_PID;' TERM
|
trap 'kill -TERM $SAMPLER_PID; kill -TERM $SINK1_PID; kill -TERM $SINK2_PID; wait $SAMPLER_PID; wait $SINK1_PID; wait $SINK2_PID; rm $chanAddr1; rm $chanAddr2' TERM
|
||||||
|
|
||||||
SAMPLER="fairmq-ex-copypush-sampler"
|
SAMPLER="fairmq-ex-copypush-sampler"
|
||||||
SAMPLER+=" --id sampler1"
|
SAMPLER+=" --id sampler1"
|
||||||
SAMPLER+=" --transport $transport"
|
SAMPLER+=" --transport $transport"
|
||||||
SAMPLER+=" --verbosity veryhigh"
|
SAMPLER+=" --verbosity veryhigh"
|
||||||
SAMPLER+=" --session $SESSION"
|
SAMPLER+=" --severity debug"
|
||||||
|
SAMPLER+=" --shm-segment-size 100000000"
|
||||||
|
SAMPLER+=" --session $session"
|
||||||
SAMPLER+=" --control static --color false"
|
SAMPLER+=" --control static --color false"
|
||||||
SAMPLER+=" --max-iterations 1"
|
SAMPLER+=" --max-iterations 1"
|
||||||
SAMPLER+=" --channel-config name=data,type=push,method=bind,rateLogging=0,address=tcp://*:5555,address=tcp://*:5556"
|
SAMPLER+=" --channel-config name=$chan,type=push,method=bind,rateLogging=0,address=ipc://$chanAddr1,address=ipc://$chanAddr2"
|
||||||
@CMAKE_CURRENT_BINARY_DIR@/$SAMPLER &
|
@CMAKE_CURRENT_BINARY_DIR@/$SAMPLER &
|
||||||
SAMPLER_PID=$!
|
SAMPLER_PID=$!
|
||||||
|
|
||||||
|
@ -28,10 +33,12 @@ SINK1="fairmq-ex-copypush-sink"
|
||||||
SINK1+=" --id sink1"
|
SINK1+=" --id sink1"
|
||||||
SINK1+=" --transport $transport"
|
SINK1+=" --transport $transport"
|
||||||
SINK1+=" --verbosity veryhigh"
|
SINK1+=" --verbosity veryhigh"
|
||||||
SINK1+=" --session $SESSION"
|
SINK1+=" --severity debug"
|
||||||
|
SINK1+=" --shm-segment-size 100000000"
|
||||||
|
SINK1+=" --session $session"
|
||||||
SINK1+=" --control static --color false"
|
SINK1+=" --control static --color false"
|
||||||
SINK1+=" --max-iterations 1"
|
SINK1+=" --max-iterations 1"
|
||||||
SINK1+=" --channel-config name=data,type=pull,method=connect,rateLogging=0,address=tcp://localhost:5555"
|
SINK1+=" --channel-config name=$chan,type=pull,method=connect,rateLogging=0,address=ipc://$chanAddr1"
|
||||||
@CMAKE_CURRENT_BINARY_DIR@/$SINK1 &
|
@CMAKE_CURRENT_BINARY_DIR@/$SINK1 &
|
||||||
SINK1_PID=$!
|
SINK1_PID=$!
|
||||||
|
|
||||||
|
@ -39,10 +46,12 @@ SINK2="fairmq-ex-copypush-sink"
|
||||||
SINK2+=" --id sink2"
|
SINK2+=" --id sink2"
|
||||||
SINK2+=" --transport $transport"
|
SINK2+=" --transport $transport"
|
||||||
SINK2+=" --verbosity veryhigh"
|
SINK2+=" --verbosity veryhigh"
|
||||||
SINK2+=" --session $SESSION"
|
SINK2+=" --severity debug"
|
||||||
|
SINK2+=" --shm-segment-size 100000000"
|
||||||
|
SINK2+=" --session $session"
|
||||||
SINK2+=" --control static --color false"
|
SINK2+=" --control static --color false"
|
||||||
SINK2+=" --max-iterations 1"
|
SINK2+=" --max-iterations 1"
|
||||||
SINK2+=" --channel-config name=data,type=pull,method=connect,rateLogging=0,address=tcp://localhost:5556"
|
SINK2+=" --channel-config name=$chan,type=pull,method=connect,rateLogging=0,address=ipc://$chanAddr2"
|
||||||
@CMAKE_CURRENT_BINARY_DIR@/$SINK2 &
|
@CMAKE_CURRENT_BINARY_DIR@/$SINK2 &
|
||||||
SINK2_PID=$!
|
SINK2_PID=$!
|
||||||
|
|
||||||
|
@ -50,3 +59,5 @@ SINK2_PID=$!
|
||||||
wait $SAMPLER_PID
|
wait $SAMPLER_PID
|
||||||
wait $SINK1_PID
|
wait $SINK1_PID
|
||||||
wait $SINK2_PID
|
wait $SINK2_PID
|
||||||
|
|
||||||
|
rm $chanAddr1; rm $chanAddr2
|
||||||
|
|
|
@ -8,19 +8,28 @@ if [[ $1 =~ ^[a-z]+$ ]]; then
|
||||||
transport=$1
|
transport=$1
|
||||||
fi
|
fi
|
||||||
|
|
||||||
SESSION="$(@CMAKE_BINARY_DIR@/fairmq/fairmq-uuid-gen -h)"
|
session="$(@CMAKE_BINARY_DIR@/fairmq/fairmq-uuid-gen -h)"
|
||||||
|
chan="data"
|
||||||
|
chanAddr=""
|
||||||
|
chanIpcFile="/tmp/fmq_$session""_""$chan""_""$transport"
|
||||||
|
if [ $transport = "ofi" ]; then
|
||||||
|
chanAddr="tcp://127.0.0.1:5656"
|
||||||
|
else
|
||||||
|
chanAddr="ipc://""$chanIpcFile"
|
||||||
|
fi
|
||||||
|
|
||||||
# setup a trap to kill everything if the test fails/timeouts
|
# setup a trap to kill everything if the test fails/timeouts
|
||||||
trap 'kill -TERM $SAMPLER_PID; kill -TERM $SINK_PID; wait $SAMPLER_PID; wait $SINK_PID;' TERM
|
trap 'kill -TERM $SAMPLER_PID; kill -TERM $SINK_PID; wait $SAMPLER_PID; wait $SINK_PID; rm $chanIpcFile' TERM
|
||||||
|
|
||||||
SAMPLER="fairmq-ex-multipart-sampler"
|
SAMPLER="fairmq-ex-multipart-sampler"
|
||||||
SAMPLER+=" --id sampler1"
|
SAMPLER+=" --id sampler1"
|
||||||
SAMPLER+=" --transport $transport"
|
SAMPLER+=" --transport $transport"
|
||||||
SAMPLER+=" --verbosity veryhigh"
|
SAMPLER+=" --verbosity veryhigh"
|
||||||
SAMPLER+=" --session $SESSION"
|
SAMPLER+=" --session $session"
|
||||||
|
SAMPLER+=" --shm-segment-size 100000000"
|
||||||
SAMPLER+=" --max-iterations 1"
|
SAMPLER+=" --max-iterations 1"
|
||||||
SAMPLER+=" --control static --color false"
|
SAMPLER+=" --control static --color false"
|
||||||
SAMPLER+=" --channel-config name=data,type=pair,method=connect,rateLogging=0,address=tcp://127.0.0.1:5555,linger=1000"
|
SAMPLER+=" --channel-config name=$chan,type=pair,method=connect,rateLogging=0,address=$chanAddr,linger=1000"
|
||||||
@CMAKE_CURRENT_BINARY_DIR@/$SAMPLER &
|
@CMAKE_CURRENT_BINARY_DIR@/$SAMPLER &
|
||||||
SAMPLER_PID=$!
|
SAMPLER_PID=$!
|
||||||
|
|
||||||
|
@ -28,11 +37,14 @@ SINK="fairmq-ex-multipart-sink"
|
||||||
SINK+=" --id sink1"
|
SINK+=" --id sink1"
|
||||||
SINK+=" --transport $transport"
|
SINK+=" --transport $transport"
|
||||||
SINK+=" --verbosity veryhigh"
|
SINK+=" --verbosity veryhigh"
|
||||||
SINK+=" --session $SESSION"
|
SINK+=" --session $session"
|
||||||
|
SINK+=" --shm-segment-size 100000000"
|
||||||
SINK+=" --control static --color false"
|
SINK+=" --control static --color false"
|
||||||
SINK+=" --channel-config name=data,type=pair,method=bind,rateLogging=0,address=tcp://127.0.0.1:5555"
|
SINK+=" --channel-config name=$chan,type=pair,method=bind,rateLogging=0,address=$chanAddr"
|
||||||
@CMAKE_CURRENT_BINARY_DIR@/$SINK &
|
@CMAKE_CURRENT_BINARY_DIR@/$SINK &
|
||||||
SINK_PID=$!
|
SINK_PID=$!
|
||||||
|
|
||||||
wait $SAMPLER_PID
|
wait $SAMPLER_PID
|
||||||
wait $SINK_PID
|
wait $SINK_PID
|
||||||
|
|
||||||
|
rm $chanIpcFile
|
||||||
|
|
|
@ -8,18 +8,25 @@ if [[ $1 =~ ^[a-z]+$ ]]; then
|
||||||
transport=$1
|
transport=$1
|
||||||
fi
|
fi
|
||||||
|
|
||||||
# setup a trap to kill everything if the test fails/timeouts
|
session="$(@CMAKE_BINARY_DIR@/fairmq/fairmq-uuid-gen -h)"
|
||||||
trap 'kill -TERM $SAMPLER_PID; kill -TERM $SINK_PID; kill -TERM $BROADCASTER_PID; wait $SAMPLER_PID; wait $SINK_PID; wait $BROADCASTER_PID;' TERM
|
dataChan="data"
|
||||||
|
broadcastChan="broadcast"
|
||||||
|
dataChanAddr="/tmp/fmq_$session""_""$dataChan""_""$transport"
|
||||||
|
broadcastChanAddr="/tmp/fmq_$session""_""$broadcastChan""_""$transport"
|
||||||
|
|
||||||
|
# setup a trap to kill everything if the test fails/timeouts
|
||||||
|
trap 'kill -TERM $SAMPLER_PID; kill -TERM $SINK_PID; kill -TERM $BROADCASTER_PID; wait $SAMPLER_PID; wait $SINK_PID; wait $BROADCASTER_PID; rm $dataChanAddr; rm $broadcastChanAddr' TERM
|
||||||
|
|
||||||
SINK="fairmq-ex-multiple-channels-sink"
|
SINK="fairmq-ex-multiple-channels-sink"
|
||||||
SINK+=" --id sink1"
|
SINK+=" --id sink1"
|
||||||
|
SINK+=" --session $session"
|
||||||
SINK+=" --transport $transport"
|
SINK+=" --transport $transport"
|
||||||
SINK+=" --verbosity veryhigh"
|
SINK+=" --verbosity veryhigh --severity debug"
|
||||||
|
SINK+=" --shm-segment-size 100000000"
|
||||||
SINK+=" --max-iterations 1"
|
SINK+=" --max-iterations 1"
|
||||||
SINK+=" --control static --color false"
|
SINK+=" --control static --color false"
|
||||||
SINK+=" --channel-config name=data,type=pull,method=connect,rateLogging=0,address=tcp://localhost:5555"
|
SINK+=" --channel-config name=$dataChan,type=pull,method=connect,rateLogging=0,address=ipc://$dataChanAddr"
|
||||||
SINK+=" name=broadcast,type=sub,method=connect,rateLogging=0,address=tcp://localhost:5005"
|
SINK+=" name=$broadcastChan,type=sub,method=connect,rateLogging=0,address=ipc://$broadcastChanAddr"
|
||||||
@CMAKE_CURRENT_BINARY_DIR@/$SINK &
|
@CMAKE_CURRENT_BINARY_DIR@/$SINK &
|
||||||
SINK_PID=$!
|
SINK_PID=$!
|
||||||
|
|
||||||
|
@ -27,21 +34,25 @@ sleep 1
|
||||||
|
|
||||||
SAMPLER="fairmq-ex-multiple-channels-sampler"
|
SAMPLER="fairmq-ex-multiple-channels-sampler"
|
||||||
SAMPLER+=" --id sampler1"
|
SAMPLER+=" --id sampler1"
|
||||||
|
SAMPLER+=" --session $session"
|
||||||
SAMPLER+=" --transport $transport"
|
SAMPLER+=" --transport $transport"
|
||||||
SAMPLER+=" --verbosity veryhigh"
|
SAMPLER+=" --verbosity veryhigh --severity debug"
|
||||||
|
SAMPLER+=" --shm-segment-size 100000000"
|
||||||
SAMPLER+=" --max-iterations 1"
|
SAMPLER+=" --max-iterations 1"
|
||||||
SAMPLER+=" --control static --color false"
|
SAMPLER+=" --control static --color false"
|
||||||
SAMPLER+=" --channel-config name=data,type=push,method=bind,rateLogging=0,address=tcp://*:5555"
|
SAMPLER+=" --channel-config name=$dataChan,type=push,method=bind,rateLogging=0,address=ipc://$dataChanAddr"
|
||||||
SAMPLER+=" name=broadcast,type=sub,method=connect,rateLogging=0,address=tcp://localhost:5005"
|
SAMPLER+=" name=$broadcastChan,type=sub,method=connect,rateLogging=0,address=ipc://$broadcastChanAddr"
|
||||||
@CMAKE_CURRENT_BINARY_DIR@/$SAMPLER &
|
@CMAKE_CURRENT_BINARY_DIR@/$SAMPLER &
|
||||||
SAMPLER_PID=$!
|
SAMPLER_PID=$!
|
||||||
|
|
||||||
BROADCASTER="fairmq-ex-multiple-channels-broadcaster"
|
BROADCASTER="fairmq-ex-multiple-channels-broadcaster"
|
||||||
BROADCASTER+=" --id broadcaster1"
|
BROADCASTER+=" --id broadcaster1"
|
||||||
|
BROADCASTER+=" --session $session"
|
||||||
BROADCASTER+=" --transport $transport"
|
BROADCASTER+=" --transport $transport"
|
||||||
BROADCASTER+=" --verbosity veryhigh"
|
BROADCASTER+=" --verbosity veryhigh --severity debug"
|
||||||
|
BROADCASTER+=" --shm-segment-size 100000000"
|
||||||
BROADCASTER+=" --control static --color false"
|
BROADCASTER+=" --control static --color false"
|
||||||
BROADCASTER+=" --channel-config name=broadcast,type=pub,method=bind,rateLogging=0,address=tcp://*:5005"
|
BROADCASTER+=" --channel-config name=$broadcastChan,type=pub,method=bind,rateLogging=0,address=ipc://$broadcastChanAddr"
|
||||||
@CMAKE_CURRENT_BINARY_DIR@/$BROADCASTER &
|
@CMAKE_CURRENT_BINARY_DIR@/$BROADCASTER &
|
||||||
BROADCASTER_PID=$!
|
BROADCASTER_PID=$!
|
||||||
|
|
||||||
|
@ -53,3 +64,5 @@ kill -SIGINT $BROADCASTER_PID
|
||||||
|
|
||||||
# wait for broadcaster to finish
|
# wait for broadcaster to finish
|
||||||
wait $BROADCASTER_PID
|
wait $BROADCASTER_PID
|
||||||
|
|
||||||
|
rm $dataChanAddr; rm $broadcastChanAddr
|
||||||
|
|
|
@ -2,46 +2,57 @@
|
||||||
|
|
||||||
export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@
|
export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@
|
||||||
|
|
||||||
SESSION="$(@CMAKE_BINARY_DIR@/fairmq/fairmq-uuid-gen -h)"
|
session="$(@CMAKE_BINARY_DIR@/fairmq/fairmq-uuid-gen -h)"
|
||||||
|
chan1="data1"
|
||||||
|
chan2="data2"
|
||||||
|
ackChan="ack"
|
||||||
|
chan1Addr="/tmp/fmq_$session""_""$chan1"
|
||||||
|
chan2Addr="/tmp/fmq_$session""_""$chan2"
|
||||||
|
ackChanAddr="/tmp/fmq_$session""_""$ackChan"
|
||||||
|
|
||||||
trap 'kill -TERM $SAMPLER1_PID; kill -TERM $SAMPLER2_PID; kill -TERM $SINK_PID; wait $SAMPLER1_PID; wait $SAMPLER2_PID; wait $SINK_PID; @CMAKE_BINARY_DIR@/fairmq/fairmq-shmmonitor --cleanup --session $SESSION;' TERM
|
trap 'kill -TERM $SAMPLER1_PID; kill -TERM $SAMPLER2_PID; kill -TERM $SINK_PID; wait $SAMPLER1_PID; wait $SAMPLER2_PID; wait $SINK_PID; @CMAKE_BINARY_DIR@/fairmq/fairmq-shmmonitor --cleanup --session $SESSION; rm $chan1Addr; rm $chan2Addr; rm $ackChanAddr' TERM
|
||||||
|
|
||||||
SINK="fairmq-ex-multiple-transports-sink"
|
SINK="fairmq-ex-multiple-transports-sink"
|
||||||
SINK+=" --id sink1"
|
SINK+=" --id sink1"
|
||||||
SINK+=" --verbosity veryhigh"
|
SINK+=" --verbosity veryhigh --severity debug"
|
||||||
SINK+=" --session $SESSION"
|
SINK+=" --shm-segment-size 100000000"
|
||||||
|
SINK+=" --session $session"
|
||||||
SINK+=" --max-iterations 1"
|
SINK+=" --max-iterations 1"
|
||||||
SINK+=" --control static --color false"
|
SINK+=" --control static --color false"
|
||||||
SINK+=" --transport shmem"
|
SINK+=" --transport shmem"
|
||||||
SINK+=" --channel-config name=data1,type=pull,method=connect,address=tcp://127.0.0.1:5555"
|
SINK+=" --channel-config name=$chan1,type=pull,method=connect,address=ipc://$chan1Addr"
|
||||||
SINK+=" name=data2,type=pull,method=connect,address=tcp://127.0.0.1:5556,transport=zeromq"
|
SINK+=" name=$chan2,type=pull,method=connect,address=ipc://$chan2Addr,transport=zeromq"
|
||||||
SINK+=" name=ack,type=pub,method=connect,address=tcp://127.0.0.1:5557,transport=zeromq"
|
SINK+=" name=$ackChan,type=pub,method=connect,address=ipc://$ackChanAddr,transport=zeromq"
|
||||||
@CMAKE_CURRENT_BINARY_DIR@/$SINK &
|
@CMAKE_CURRENT_BINARY_DIR@/$SINK &
|
||||||
SINK_PID=$!
|
SINK_PID=$!
|
||||||
|
|
||||||
SAMPLER1="fairmq-ex-multiple-transports-sampler1"
|
SAMPLER1="fairmq-ex-multiple-transports-sampler1"
|
||||||
SAMPLER1+=" --id sampler1"
|
SAMPLER1+=" --id sampler1"
|
||||||
SAMPLER1+=" --session $SESSION"
|
SAMPLER1+=" --session $session"
|
||||||
SAMPLER1+=" --verbosity veryhigh"
|
SAMPLER1+=" --verbosity veryhigh --severity debug"
|
||||||
|
SAMPLER1+=" --shm-segment-size 100000000"
|
||||||
SAMPLER1+=" --max-iterations 1"
|
SAMPLER1+=" --max-iterations 1"
|
||||||
SAMPLER1+=" --control static --color false"
|
SAMPLER1+=" --control static --color false"
|
||||||
SAMPLER1+=" --transport shmem"
|
SAMPLER1+=" --transport shmem"
|
||||||
SAMPLER1+=" --channel-config name=data1,type=push,method=bind,address=tcp://127.0.0.1:5555"
|
SAMPLER1+=" --channel-config name=$chan1,type=push,method=bind,address=ipc://$chan1Addr"
|
||||||
SAMPLER1+=" name=ack,type=sub,method=bind,address=tcp://127.0.0.1:5557,transport=zeromq"
|
SAMPLER1+=" name=$ackChan,type=sub,method=bind,address=ipc://$ackChanAddr,transport=zeromq"
|
||||||
@CMAKE_CURRENT_BINARY_DIR@/$SAMPLER1 &
|
@CMAKE_CURRENT_BINARY_DIR@/$SAMPLER1 &
|
||||||
SAMPLER1_PID=$!
|
SAMPLER1_PID=$!
|
||||||
|
|
||||||
SAMPLER2="fairmq-ex-multiple-transports-sampler2"
|
SAMPLER2="fairmq-ex-multiple-transports-sampler2"
|
||||||
SAMPLER2+=" --id sampler2"
|
SAMPLER2+=" --id sampler2"
|
||||||
SAMPLER2+=" --session $SESSION"
|
SAMPLER2+=" --session $session"
|
||||||
SAMPLER2+=" --verbosity veryhigh"
|
SAMPLER2+=" --verbosity veryhigh --severity debug"
|
||||||
|
SAMPLER2+=" --shm-segment-size 100000000"
|
||||||
SAMPLER2+=" --max-iterations 1"
|
SAMPLER2+=" --max-iterations 1"
|
||||||
SAMPLER2+=" --control static --color false"
|
SAMPLER2+=" --control static --color false"
|
||||||
SAMPLER2+=" --transport zeromq"
|
SAMPLER2+=" --transport zeromq"
|
||||||
SAMPLER2+=" --channel-config name=data2,type=push,method=bind,address=tcp://127.0.0.1:5556"
|
SAMPLER2+=" --channel-config name=$chan2,type=push,method=bind,address=ipc://$chan2Addr"
|
||||||
@CMAKE_CURRENT_BINARY_DIR@/$SAMPLER2 &
|
@CMAKE_CURRENT_BINARY_DIR@/$SAMPLER2 &
|
||||||
SAMPLER2_PID=$!
|
SAMPLER2_PID=$!
|
||||||
|
|
||||||
wait $SAMPLER1_PID
|
wait $SAMPLER1_PID
|
||||||
wait $SAMPLER2_PID
|
wait $SAMPLER2_PID
|
||||||
wait $SINK_PID
|
wait $SINK_PID
|
||||||
|
|
||||||
|
rm $chan1Addr; rm $chan2Addr; rm $ackChanAddr
|
||||||
|
|
|
@ -9,22 +9,25 @@ if [[ $1 =~ ^[a-z]+$ ]]; then
|
||||||
fi
|
fi
|
||||||
|
|
||||||
msgSize="1000000"
|
msgSize="1000000"
|
||||||
SESSION="$(@CMAKE_BINARY_DIR@/fairmq/fairmq-uuid-gen -h)"
|
session="$(@CMAKE_BINARY_DIR@/fairmq/fairmq-uuid-gen -h)"
|
||||||
|
chan="data"
|
||||||
|
chanAddr="/tmp/fmq_$session""_""$chan""_""$transport"
|
||||||
|
|
||||||
# setup a trap to kill everything if the test fails/timeouts
|
# setup a trap to kill everything if the test fails/timeouts
|
||||||
trap 'kill -TERM $SAMPLER_PID; kill -TERM $SINK_PID; wait $SAMPLER_PID; wait $SINK_PID; @CMAKE_BINARY_DIR@/fairmq/fairmq-shmmonitor --cleanup --session $SESSION' TERM
|
trap 'kill -TERM $SAMPLER_PID; kill -TERM $SINK_PID; wait $SAMPLER_PID; wait $SINK_PID; @CMAKE_BINARY_DIR@/fairmq/fairmq-shmmonitor --cleanup --session $SESSION; rm $chanAddr' TERM
|
||||||
|
|
||||||
SAMPLER="fairmq-ex-region-sampler"
|
SAMPLER="fairmq-ex-region-sampler"
|
||||||
SAMPLER+=" --id sampler1"
|
SAMPLER+=" --id sampler1"
|
||||||
SAMPLER+=" --transport $transport"
|
SAMPLER+=" --transport $transport"
|
||||||
SAMPLER+=" --severity debug"
|
SAMPLER+=" --severity debug"
|
||||||
SAMPLER+=" --session $SESSION"
|
SAMPLER+=" --session $session"
|
||||||
|
SAMPLER+=" --shm-segment-size 100000000"
|
||||||
SAMPLER+=" --verbosity veryhigh"
|
SAMPLER+=" --verbosity veryhigh"
|
||||||
SAMPLER+=" --control static --color false"
|
SAMPLER+=" --control static --color false"
|
||||||
SAMPLER+=" --max-iterations 1"
|
SAMPLER+=" --max-iterations 1"
|
||||||
SAMPLER+=" --msg-size $msgSize"
|
SAMPLER+=" --msg-size $msgSize"
|
||||||
SAMPLER+=" --region-linger 500"
|
SAMPLER+=" --region-linger 500"
|
||||||
SAMPLER+=" --channel-config name=data,type=push,method=bind,address=tcp://127.0.0.1:7777"
|
SAMPLER+=" --channel-config name=$chan,type=push,method=bind,address=ipc://$chanAddr"
|
||||||
@CMAKE_CURRENT_BINARY_DIR@/$SAMPLER &
|
@CMAKE_CURRENT_BINARY_DIR@/$SAMPLER &
|
||||||
SAMPLER_PID=$!
|
SAMPLER_PID=$!
|
||||||
|
|
||||||
|
@ -32,14 +35,17 @@ SINK="fairmq-ex-region-sink"
|
||||||
SINK+=" --id sink1"
|
SINK+=" --id sink1"
|
||||||
SINK+=" --transport $transport"
|
SINK+=" --transport $transport"
|
||||||
SINK+=" --severity debug"
|
SINK+=" --severity debug"
|
||||||
SINK+=" --session $SESSION"
|
SINK+=" --session $session"
|
||||||
|
SINK+=" --shm-segment-size 100000000"
|
||||||
SINK+=" --verbosity veryhigh"
|
SINK+=" --verbosity veryhigh"
|
||||||
SINK+=" --control static --color false"
|
SINK+=" --control static --color false"
|
||||||
SINK+=" --max-iterations 1"
|
SINK+=" --max-iterations 1"
|
||||||
SINK+=" --channel-config name=data,type=pull,method=connect,address=tcp://127.0.0.1:7777"
|
SINK+=" --channel-config name=$chan,type=pull,method=connect,address=ipc://$chanAddr"
|
||||||
@CMAKE_CURRENT_BINARY_DIR@/$SINK &
|
@CMAKE_CURRENT_BINARY_DIR@/$SINK &
|
||||||
SINK_PID=$!
|
SINK_PID=$!
|
||||||
|
|
||||||
# wait for sampler and sink to finish
|
# wait for sampler and sink to finish
|
||||||
wait $SAMPLER_PID
|
wait $SAMPLER_PID
|
||||||
wait $SINK_PID
|
wait $SINK_PID
|
||||||
|
|
||||||
|
rm $chanAddr
|
||||||
|
|
|
@ -8,19 +8,22 @@ if [[ $1 =~ ^[a-z]+$ ]]; then
|
||||||
transport=$1
|
transport=$1
|
||||||
fi
|
fi
|
||||||
|
|
||||||
SESSION="$(@CMAKE_BINARY_DIR@/fairmq/fairmq-uuid-gen -h)"
|
session="$(@CMAKE_BINARY_DIR@/fairmq/fairmq-uuid-gen -h)"
|
||||||
|
chan="data"
|
||||||
|
chanAddr="/tmp/fmq_$session""_""$chan""_""$transport"
|
||||||
|
|
||||||
# setup a trap to kill everything if the test fails/timeouts
|
# setup a trap to kill everything if the test fails/timeouts
|
||||||
trap 'kill -TERM $CLIENT_PID; kill -TERM $SERVER_PID; wait $CLIENT_PID; wait $SERVER_PID;' TERM
|
trap 'kill -TERM $CLIENT_PID; kill -TERM $SERVER_PID; wait $CLIENT_PID; wait $SERVER_PID; rm $chanAddr' TERM
|
||||||
|
|
||||||
CLIENT="fairmq-ex-req-rep-client"
|
CLIENT="fairmq-ex-req-rep-client"
|
||||||
CLIENT+=" --id client"
|
CLIENT+=" --id client"
|
||||||
CLIENT+=" --transport $transport"
|
CLIENT+=" --transport $transport"
|
||||||
CLIENT+=" --verbosity veryhigh"
|
CLIENT+=" --verbosity veryhigh"
|
||||||
CLIENT+=" --session $SESSION"
|
CLIENT+=" --session $session"
|
||||||
|
CLIENT+=" --shm-segment-size 100000000"
|
||||||
CLIENT+=" --control static --color false"
|
CLIENT+=" --control static --color false"
|
||||||
CLIENT+=" --max-iterations 1"
|
CLIENT+=" --max-iterations 1"
|
||||||
CLIENT+=" --channel-config name=data,type=req,method=connect,rateLogging=0,address=tcp://127.0.0.1:5005"
|
CLIENT+=" --channel-config name=$chan,type=req,method=connect,rateLogging=0,address=ipc://$chanAddr"
|
||||||
@CMAKE_CURRENT_BINARY_DIR@/$CLIENT &
|
@CMAKE_CURRENT_BINARY_DIR@/$CLIENT &
|
||||||
CLIENT_PID=$!
|
CLIENT_PID=$!
|
||||||
|
|
||||||
|
@ -28,13 +31,16 @@ SERVER="fairmq-ex-req-rep-server"
|
||||||
SERVER+=" --id server"
|
SERVER+=" --id server"
|
||||||
SERVER+=" --transport $transport"
|
SERVER+=" --transport $transport"
|
||||||
SERVER+=" --verbosity veryhigh"
|
SERVER+=" --verbosity veryhigh"
|
||||||
SERVER+=" --session $SESSION"
|
SERVER+=" --session $session"
|
||||||
|
SERVER+=" --shm-segment-size 100000000"
|
||||||
SERVER+=" --control static --color false"
|
SERVER+=" --control static --color false"
|
||||||
SERVER+=" --max-iterations 1"
|
SERVER+=" --max-iterations 1"
|
||||||
SERVER+=" --channel-config name=data,type=rep,method=bind,rateLogging=0,address=tcp://127.0.0.1:5005"
|
SERVER+=" --channel-config name=$chan,type=rep,method=bind,rateLogging=0,address=ipc://$chanAddr"
|
||||||
@CMAKE_CURRENT_BINARY_DIR@/$SERVER &
|
@CMAKE_CURRENT_BINARY_DIR@/$SERVER &
|
||||||
SERVER_PID=$!
|
SERVER_PID=$!
|
||||||
|
|
||||||
# wait for everything to finish
|
# wait for everything to finish
|
||||||
wait $CLIENT_PID
|
wait $CLIENT_PID
|
||||||
wait $SERVER_PID
|
wait $SERVER_PID
|
||||||
|
|
||||||
|
rm $chanAddr
|
||||||
|
|
|
@ -45,11 +45,9 @@ add_testhelper(runTestDevice
|
||||||
${definitions}
|
${definitions}
|
||||||
)
|
)
|
||||||
|
|
||||||
set(MQ_CONFIG "${CMAKE_BINARY_DIR}/test/testsuite_FairMQ.IOPatterns_config.json")
|
|
||||||
set(RUN_TEST_DEVICE "${CMAKE_BINARY_DIR}/test/testhelper_runTestDevice")
|
set(RUN_TEST_DEVICE "${CMAKE_BINARY_DIR}/test/testhelper_runTestDevice")
|
||||||
set(FAIRMQ_BIN_DIR ${CMAKE_BINARY_DIR}/fairmq)
|
set(FAIRMQ_BIN_DIR ${CMAKE_BINARY_DIR}/fairmq)
|
||||||
set(SDK_TESTSUITE_SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/sdk)
|
set(SDK_TESTSUITE_SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/sdk)
|
||||||
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/protocols/config.json.in ${MQ_CONFIG})
|
|
||||||
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/runner.cxx.in ${CMAKE_CURRENT_BINARY_DIR}/runner.cxx)
|
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/runner.cxx.in ${CMAKE_CURRENT_BINARY_DIR}/runner.cxx)
|
||||||
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/TestEnvironment.h.in ${CMAKE_CURRENT_BINARY_DIR}/TestEnvironment.h)
|
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/TestEnvironment.h.in ${CMAKE_CURRENT_BINARY_DIR}/TestEnvironment.h)
|
||||||
|
|
||||||
|
|
|
@ -41,6 +41,7 @@ auto RunPushPullWithMsgResize(string const & transport, string const & _address)
|
||||||
{
|
{
|
||||||
ProgOptions config;
|
ProgOptions config;
|
||||||
config.SetProperty<string>("session", tools::Uuid());
|
config.SetProperty<string>("session", tools::Uuid());
|
||||||
|
config.SetProperty<size_t>("shm-segment-size", 100000000);
|
||||||
auto factory(TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config));
|
auto factory(TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config));
|
||||||
|
|
||||||
Channel push{"Push", "push", factory};
|
Channel push{"Push", "push", factory};
|
||||||
|
@ -102,6 +103,7 @@ auto RunMsgRebuild(const string& transport) -> void
|
||||||
{
|
{
|
||||||
ProgOptions config;
|
ProgOptions config;
|
||||||
config.SetProperty<string>("session", tools::Uuid());
|
config.SetProperty<string>("session", tools::Uuid());
|
||||||
|
config.SetProperty<size_t>("shm-segment-size", 100000000);
|
||||||
auto factory(TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config));
|
auto factory(TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config));
|
||||||
|
|
||||||
size_t const msgSize{100};
|
size_t const msgSize{100};
|
||||||
|
@ -134,6 +136,7 @@ auto RunPushPullWithAlignment(string const& transport, string const& _address) -
|
||||||
{
|
{
|
||||||
ProgOptions config;
|
ProgOptions config;
|
||||||
config.SetProperty<string>("session", tools::Uuid());
|
config.SetProperty<string>("session", tools::Uuid());
|
||||||
|
config.SetProperty<size_t>("shm-segment-size", 100000000);
|
||||||
auto factory(TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config));
|
auto factory(TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config));
|
||||||
|
|
||||||
Channel push{"Push", "push", factory};
|
Channel push{"Push", "push", factory};
|
||||||
|
@ -187,6 +190,7 @@ auto EmptyMessage(string const& transport, string const& _address) -> void
|
||||||
{
|
{
|
||||||
ProgOptions config;
|
ProgOptions config;
|
||||||
config.SetProperty<string>("session", tools::Uuid());
|
config.SetProperty<string>("session", tools::Uuid());
|
||||||
|
config.SetProperty<size_t>("shm-segment-size", 100000000);
|
||||||
auto factory(TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config));
|
auto factory(TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config));
|
||||||
|
|
||||||
Channel push{"Push", "push", factory};
|
Channel push{"Push", "push", factory};
|
||||||
|
@ -237,6 +241,7 @@ auto ZeroCopy() -> void
|
||||||
{
|
{
|
||||||
ProgOptions config;
|
ProgOptions config;
|
||||||
config.SetProperty<string>("session", tools::Uuid());
|
config.SetProperty<string>("session", tools::Uuid());
|
||||||
|
config.SetProperty<size_t>("shm-segment-size", 100000000);
|
||||||
auto factory(TransportFactory::CreateTransportFactory("shmem", tools::Uuid(), &config));
|
auto factory(TransportFactory::CreateTransportFactory("shmem", tools::Uuid(), &config));
|
||||||
|
|
||||||
unique_ptr<string> str(make_unique<string>("asdf"));
|
unique_ptr<string> str(make_unique<string>("asdf"));
|
||||||
|
@ -268,7 +273,9 @@ auto ZeroCopyFromUnmanaged(string const& address) -> void
|
||||||
ProgOptions config2;
|
ProgOptions config2;
|
||||||
string session(tools::Uuid());
|
string session(tools::Uuid());
|
||||||
config1.SetProperty<string>("session", session);
|
config1.SetProperty<string>("session", session);
|
||||||
|
config1.SetProperty<size_t>("shm-segment-size", 100000000);
|
||||||
config2.SetProperty<string>("session", session);
|
config2.SetProperty<string>("session", session);
|
||||||
|
config2.SetProperty<size_t>("shm-segment-size", 100000000);
|
||||||
// ref counts should be accessible accross different segments
|
// ref counts should be accessible accross different segments
|
||||||
config2.SetProperty<uint16_t>("shm-segment-id", 2);
|
config2.SetProperty<uint16_t>("shm-segment-id", 2);
|
||||||
auto factory1(TransportFactory::CreateTransportFactory("shmem", tools::Uuid(), &config1));
|
auto factory1(TransportFactory::CreateTransportFactory("shmem", tools::Uuid(), &config1));
|
||||||
|
|
|
@ -10,7 +10,9 @@
|
||||||
|
|
||||||
#include <fairmq/tools/Unique.h>
|
#include <fairmq/tools/Unique.h>
|
||||||
#include <fairmq/tools/Process.h>
|
#include <fairmq/tools/Process.h>
|
||||||
|
#include <fairmq/tools/Strings.h>
|
||||||
#include <gtest/gtest.h>
|
#include <gtest/gtest.h>
|
||||||
|
#include <cstdio> // std::remove
|
||||||
#include <sstream> // std::stringstream
|
#include <sstream> // std::stringstream
|
||||||
#include <thread>
|
#include <thread>
|
||||||
|
|
||||||
|
@ -23,15 +25,22 @@ using namespace fair::mq::tools;
|
||||||
|
|
||||||
auto RunPoller(string transport, int pollType) -> void
|
auto RunPoller(string transport, int pollType) -> void
|
||||||
{
|
{
|
||||||
size_t session{fair::mq::tools::UuidHash()};
|
size_t session{UuidHash()};
|
||||||
|
string data1IpcFile("/tmp/fmq_" + to_string(session) + "_data1_" + transport);
|
||||||
|
string data2IpcFile("/tmp/fmq_" + to_string(session) + "_data2_" + transport);
|
||||||
|
string data1Address("ipc://" + data1IpcFile);
|
||||||
|
string data2Address("ipc://" + data2IpcFile);
|
||||||
|
|
||||||
auto pollout = execute_result{"", 0};
|
auto pollout = execute_result{"", 0};
|
||||||
thread poll_out_thread([&]() {
|
thread poll_out_thread([&]() {
|
||||||
stringstream cmd;
|
stringstream cmd;
|
||||||
cmd << runTestDevice
|
cmd << runTestDevice
|
||||||
<< " --id pollout_"<< transport
|
<< " --id pollout_"<< transport
|
||||||
<< " --control static --color false"
|
<< " --control static"
|
||||||
<< " --session " << session << " --mq-config \"" << mqConfig << "\"";
|
<< " --color false"
|
||||||
|
<< " --session " << session
|
||||||
|
<< " --channel-config name=data1,type=push,method=bind,address=" << data1Address
|
||||||
|
<< " name=data2,type=push,method=bind,address=" << data2Address;
|
||||||
pollout = execute(cmd.str(), "[POLLOUT]");
|
pollout = execute(cmd.str(), "[POLLOUT]");
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -40,8 +49,12 @@ auto RunPoller(string transport, int pollType) -> void
|
||||||
stringstream cmd;
|
stringstream cmd;
|
||||||
cmd << runTestDevice
|
cmd << runTestDevice
|
||||||
<< " --id pollin_" << transport
|
<< " --id pollin_" << transport
|
||||||
<< " --control static --color false"
|
<< " --control static"
|
||||||
<< " --session " << session << " --mq-config \"" << mqConfig << "\" --poll-type " << pollType;
|
<< " --color false"
|
||||||
|
<< " --session " << session
|
||||||
|
<< " --poll-type " << pollType
|
||||||
|
<< " --channel-config name=data1,type=pull,method=connect,address=" << data1Address
|
||||||
|
<< " name=data2,type=pull,method=connect,address=" << data2Address;
|
||||||
pollin = execute(cmd.str(), "[POLLIN]");
|
pollin = execute(cmd.str(), "[POLLIN]");
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -49,6 +62,9 @@ auto RunPoller(string transport, int pollType) -> void
|
||||||
poll_in_thread.join();
|
poll_in_thread.join();
|
||||||
cerr << pollout.console_out << pollin.console_out;
|
cerr << pollout.console_out << pollin.console_out;
|
||||||
|
|
||||||
|
std::remove(data1IpcFile.c_str());
|
||||||
|
std::remove(data2IpcFile.c_str());
|
||||||
|
|
||||||
exit(pollout.exit_code + pollin.exit_code);
|
exit(pollout.exit_code + pollin.exit_code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -9,7 +9,9 @@
|
||||||
#include "runner.h"
|
#include "runner.h"
|
||||||
#include <fairmq/tools/Unique.h>
|
#include <fairmq/tools/Unique.h>
|
||||||
#include <fairmq/tools/Process.h>
|
#include <fairmq/tools/Process.h>
|
||||||
|
#include <fairmq/tools/Strings.h>
|
||||||
#include <gtest/gtest.h>
|
#include <gtest/gtest.h>
|
||||||
|
#include <cstdio> // std::remove
|
||||||
#include <sstream> // std::stringstream
|
#include <sstream> // std::stringstream
|
||||||
#include <thread>
|
#include <thread>
|
||||||
|
|
||||||
|
@ -23,20 +25,37 @@ using namespace fair::mq::tools;
|
||||||
auto RunPair(string transport) -> void
|
auto RunPair(string transport) -> void
|
||||||
{
|
{
|
||||||
size_t session{fair::mq::tools::UuidHash()};
|
size_t session{fair::mq::tools::UuidHash()};
|
||||||
|
string ipcFile("/tmp/fmq_" + to_string(session) + "_data_" + transport);
|
||||||
|
string address("ipc://" + ipcFile);
|
||||||
|
|
||||||
|
// ofi does not run with ipc://
|
||||||
|
if (transport == "ofi") {
|
||||||
|
address = "tcp://127.0.0.1:5957";
|
||||||
|
}
|
||||||
|
|
||||||
auto pairleft = execute_result{"", 100};
|
auto pairleft = execute_result{"", 100};
|
||||||
thread pairleft_thread([&]() {
|
thread pairleft_thread([&]() {
|
||||||
stringstream cmd;
|
stringstream cmd;
|
||||||
cmd << runTestDevice << " --id pairleft_" << transport << " --control static "
|
cmd << runTestDevice
|
||||||
<< "--session " << session << " --color false --mq-config \"" << mqConfig << "\"";
|
<< " --id pairleft_" << transport
|
||||||
|
<< " --control static"
|
||||||
|
<< " --shm-segment-size 100000000"
|
||||||
|
<< " --session " << session
|
||||||
|
<< " --color false"
|
||||||
|
<< " --channel-config name=data,type=pair,method=bind,address=" << address;
|
||||||
pairleft = execute(cmd.str(), "[PAIR L]");
|
pairleft = execute(cmd.str(), "[PAIR L]");
|
||||||
});
|
});
|
||||||
|
|
||||||
auto pairright = execute_result{"", 100};
|
auto pairright = execute_result{"", 100};
|
||||||
thread pairright_thread([&]() {
|
thread pairright_thread([&]() {
|
||||||
stringstream cmd;
|
stringstream cmd;
|
||||||
cmd << runTestDevice << " --id pairright_" << transport << " --control static "
|
cmd << runTestDevice
|
||||||
<< "--session " << session << " --color false --mq-config \"" << mqConfig << "\"";
|
<< " --id pairright_" << transport
|
||||||
|
<< " --control static"
|
||||||
|
<< " --shm-segment-size 100000000"
|
||||||
|
<< " --session " << session
|
||||||
|
<< " --color false"
|
||||||
|
<< " --channel-config name=data,type=pair,method=connect,address=" << address;
|
||||||
pairright = execute(cmd.str(), "[PAIR R]");
|
pairright = execute(cmd.str(), "[PAIR R]");
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -44,6 +63,8 @@ auto RunPair(string transport) -> void
|
||||||
pairright_thread.join();
|
pairright_thread.join();
|
||||||
cerr << pairleft.console_out << pairright.console_out;
|
cerr << pairleft.console_out << pairright.console_out;
|
||||||
|
|
||||||
|
std::remove(ipcFile.c_str());
|
||||||
|
|
||||||
exit(pairleft.exit_code + pairright.exit_code);
|
exit(pairleft.exit_code + pairright.exit_code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -7,9 +7,14 @@
|
||||||
********************************************************************************/
|
********************************************************************************/
|
||||||
|
|
||||||
#include "runner.h"
|
#include "runner.h"
|
||||||
|
|
||||||
#include <fairmq/tools/Unique.h>
|
#include <fairmq/tools/Unique.h>
|
||||||
#include <fairmq/tools/Process.h>
|
#include <fairmq/tools/Process.h>
|
||||||
|
#include <fairmq/tools/Strings.h>
|
||||||
|
|
||||||
#include <gtest/gtest.h>
|
#include <gtest/gtest.h>
|
||||||
|
|
||||||
|
#include <cstdio> // std::remove
|
||||||
#include <sstream> // std::stringstream
|
#include <sstream> // std::stringstream
|
||||||
#include <thread>
|
#include <thread>
|
||||||
|
|
||||||
|
@ -23,28 +28,47 @@ using namespace fair::mq::tools;
|
||||||
auto RunPubSub(string transport) -> void
|
auto RunPubSub(string transport) -> void
|
||||||
{
|
{
|
||||||
size_t session{fair::mq::tools::UuidHash()};
|
size_t session{fair::mq::tools::UuidHash()};
|
||||||
|
string dataIpcFile("/tmp/fmq_" + to_string(session) + "_data_" + transport);
|
||||||
|
string ctrlIpcFile("/tmp/fmq_" + to_string(session) + "_ctrl_" + transport);
|
||||||
|
string dataAddress("ipc://" + dataIpcFile);
|
||||||
|
string ctrlAddress("ipc://" + ctrlIpcFile);
|
||||||
|
|
||||||
auto pub = execute_result{"", 0};
|
auto pub = execute_result{"", 0};
|
||||||
thread pub_thread([&]() {
|
thread pub_thread([&]() {
|
||||||
stringstream cmd;
|
stringstream cmd;
|
||||||
cmd << runTestDevice << " --id pub_" << transport << " --control static "
|
cmd << runTestDevice
|
||||||
<< "--session " << session << " --color false --mq-config \"" << mqConfig << "\"";
|
<< " --id pub_" << transport
|
||||||
|
<< " --control static"
|
||||||
|
<< " --session " << session
|
||||||
|
<< " --color false"
|
||||||
|
<< " --channel-config name=data,type=pub,method=bind,address=" << dataAddress
|
||||||
|
<< " name=control,type=pull,method=bind,address=" << ctrlAddress;
|
||||||
pub = execute(cmd.str(), "[PUB]");
|
pub = execute(cmd.str(), "[PUB]");
|
||||||
});
|
});
|
||||||
|
|
||||||
auto sub1 = execute_result{"", 0};
|
auto sub1 = execute_result{"", 0};
|
||||||
thread sub1_thread([&]() {
|
thread sub1_thread([&]() {
|
||||||
stringstream cmd;
|
stringstream cmd;
|
||||||
cmd << runTestDevice << " --id sub_1" << transport << " --control static "
|
cmd << runTestDevice
|
||||||
<< "--session " << session << " --color false --mq-config \"" << mqConfig << "\"";
|
<< " --id sub_1" << transport
|
||||||
|
<< " --control static"
|
||||||
|
<< " --session " << session
|
||||||
|
<< " --color false"
|
||||||
|
<< " --channel-config name=data,type=sub,method=connect,address=" << dataAddress
|
||||||
|
<< " name=control,type=push,method=connect,address=" << ctrlAddress;
|
||||||
sub1 = execute(cmd.str(), "[SUB1]");
|
sub1 = execute(cmd.str(), "[SUB1]");
|
||||||
});
|
});
|
||||||
|
|
||||||
auto sub2 = execute_result{"", 0};
|
auto sub2 = execute_result{"", 0};
|
||||||
thread sub2_thread([&]() {
|
thread sub2_thread([&]() {
|
||||||
stringstream cmd;
|
stringstream cmd;
|
||||||
cmd << runTestDevice << " --id sub_2" << transport << " --control static "
|
cmd << runTestDevice
|
||||||
<< "--session " << session << " --color false --mq-config \"" << mqConfig << "\"";
|
<< " --id sub_2" << transport
|
||||||
|
<< " --control static"
|
||||||
|
<< " --session " << session
|
||||||
|
<< " --color false"
|
||||||
|
<< " --channel-config name=data,type=sub,method=connect,address=" << dataAddress
|
||||||
|
<< " name=control,type=push,method=connect,address=" << ctrlAddress;
|
||||||
sub2 = execute(cmd.str(), "[SUB2]");
|
sub2 = execute(cmd.str(), "[SUB2]");
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -53,6 +77,9 @@ auto RunPubSub(string transport) -> void
|
||||||
sub2_thread.join();
|
sub2_thread.join();
|
||||||
cerr << pub.console_out << sub1.console_out << sub2.console_out << endl;
|
cerr << pub.console_out << sub1.console_out << sub2.console_out << endl;
|
||||||
|
|
||||||
|
std::remove(dataIpcFile.c_str());
|
||||||
|
std::remove(ctrlIpcFile.c_str());
|
||||||
|
|
||||||
exit(pub.exit_code + sub1.exit_code + sub2.exit_code);
|
exit(pub.exit_code + sub1.exit_code + sub2.exit_code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -9,7 +9,9 @@
|
||||||
#include "runner.h"
|
#include "runner.h"
|
||||||
#include <fairmq/tools/Unique.h>
|
#include <fairmq/tools/Unique.h>
|
||||||
#include <fairmq/tools/Process.h>
|
#include <fairmq/tools/Process.h>
|
||||||
|
#include <fairmq/tools/Strings.h>
|
||||||
#include <gtest/gtest.h>
|
#include <gtest/gtest.h>
|
||||||
|
#include <cstdio> // std::remove
|
||||||
#include <sstream> // std::stringstream
|
#include <sstream> // std::stringstream
|
||||||
#include <thread>
|
#include <thread>
|
||||||
|
|
||||||
|
@ -22,21 +24,33 @@ using namespace fair::mq::tools;
|
||||||
|
|
||||||
auto RunPushPull(string transport) -> void
|
auto RunPushPull(string transport) -> void
|
||||||
{
|
{
|
||||||
size_t session{fair::mq::tools::UuidHash()};
|
size_t session(fair::mq::tools::UuidHash());
|
||||||
|
string ipcFile("/tmp/fmq_" + to_string(session) + "_data_" + transport);
|
||||||
|
string address("ipc://" + ipcFile);
|
||||||
|
|
||||||
auto push = execute_result{"", 100};
|
auto push = execute_result{"", 100};
|
||||||
thread push_thread([&]() {
|
thread push_thread([&]() {
|
||||||
stringstream cmd;
|
stringstream cmd;
|
||||||
cmd << runTestDevice << " --id push_" << transport << " --control static "
|
cmd << runTestDevice
|
||||||
<< "--session " << session << " --color false --mq-config \"" << mqConfig << "\"";
|
<< " --id push_" << transport
|
||||||
|
<< " --control static"
|
||||||
|
<< " --shm-segment-size 100000000"
|
||||||
|
<< " --session " << session
|
||||||
|
<< " --color false"
|
||||||
|
<< " --channel-config name=data,type=push,method=bind,address=" << address;
|
||||||
push = execute(cmd.str(), "[PUSH]");
|
push = execute(cmd.str(), "[PUSH]");
|
||||||
});
|
});
|
||||||
|
|
||||||
auto pull = execute_result{"", 100};
|
auto pull = execute_result{"", 100};
|
||||||
thread pull_thread([&]() {
|
thread pull_thread([&]() {
|
||||||
stringstream cmd;
|
stringstream cmd;
|
||||||
cmd << runTestDevice << " --id pull_" << transport << " --control static "
|
cmd << runTestDevice
|
||||||
<< "--session " << session << " --color false --mq-config \"" << mqConfig << "\"";
|
<< " --id pull_" << transport
|
||||||
|
<< " --control static"
|
||||||
|
<< " --shm-segment-size 100000000"
|
||||||
|
<< " --session " << session
|
||||||
|
<< " --color false"
|
||||||
|
<< " --channel-config name=data,type=pull,method=connect,address=" << address;
|
||||||
pull = execute(cmd.str(), "[PULL]");
|
pull = execute(cmd.str(), "[PULL]");
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -44,15 +58,17 @@ auto RunPushPull(string transport) -> void
|
||||||
pull_thread.join();
|
pull_thread.join();
|
||||||
cerr << push.console_out << pull.console_out;
|
cerr << push.console_out << pull.console_out;
|
||||||
|
|
||||||
|
std::remove(ipcFile.c_str());
|
||||||
|
|
||||||
exit(push.exit_code + pull.exit_code);
|
exit(push.exit_code + pull.exit_code);
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(PushPull, SingleMsg_MP_tcp_zeromq)
|
TEST(PushPull, SingleMsg_MP_ipc_zeromq)
|
||||||
{
|
{
|
||||||
EXPECT_EXIT(RunPushPull("zeromq"), ::testing::ExitedWithCode(0), "PUSH-PULL test successfull");
|
EXPECT_EXIT(RunPushPull("zeromq"), ::testing::ExitedWithCode(0), "PUSH-PULL test successfull");
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(PushPull, SingleMsg_MP_tcp_shmem)
|
TEST(PushPull, SingleMsg_MP_ipc_shmem)
|
||||||
{
|
{
|
||||||
EXPECT_EXIT(RunPushPull("shmem"), ::testing::ExitedWithCode(0), "PUSH-PULL test successfull");
|
EXPECT_EXIT(RunPushPull("shmem"), ::testing::ExitedWithCode(0), "PUSH-PULL test successfull");
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,6 +29,7 @@ auto RunSingleThreadedMultipart(string transport, string address1, string addres
|
||||||
|
|
||||||
fair::mq::ProgOptions config;
|
fair::mq::ProgOptions config;
|
||||||
config.SetProperty<string>("session", tools::Uuid());
|
config.SetProperty<string>("session", tools::Uuid());
|
||||||
|
config.SetProperty<size_t>("shm-segment-size", 100000000);
|
||||||
|
|
||||||
auto factory = TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config);
|
auto factory = TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config);
|
||||||
|
|
||||||
|
|
|
@ -9,7 +9,9 @@
|
||||||
#include "runner.h"
|
#include "runner.h"
|
||||||
#include <fairmq/tools/Unique.h>
|
#include <fairmq/tools/Unique.h>
|
||||||
#include <fairmq/tools/Process.h>
|
#include <fairmq/tools/Process.h>
|
||||||
|
#include <fairmq/tools/Strings.h>
|
||||||
#include <gtest/gtest.h>
|
#include <gtest/gtest.h>
|
||||||
|
#include <cstdio> // std::remove
|
||||||
#include <sstream> // std::stringstream
|
#include <sstream> // std::stringstream
|
||||||
#include <thread>
|
#include <thread>
|
||||||
|
|
||||||
|
@ -23,37 +25,45 @@ using namespace fair::mq::tools;
|
||||||
auto RunReqRep(string transport) -> void
|
auto RunReqRep(string transport) -> void
|
||||||
{
|
{
|
||||||
size_t session{fair::mq::tools::UuidHash()};
|
size_t session{fair::mq::tools::UuidHash()};
|
||||||
|
string ipcFile("/tmp/fmq_" + to_string(session) + "_data_" + transport);
|
||||||
|
string address("ipc://" + ipcFile);
|
||||||
|
|
||||||
auto rep = execute_result{"", 0};
|
auto rep = execute_result{"", 0};
|
||||||
thread rep_thread([&]() {
|
thread rep_thread([&]() {
|
||||||
stringstream cmd;
|
stringstream cmd;
|
||||||
cmd << runTestDevice << " --id rep_" << transport
|
cmd << runTestDevice
|
||||||
|
<< " --id rep_" << transport
|
||||||
<< " --control static"
|
<< " --control static"
|
||||||
|
<< " --shm-segment-size 100000000"
|
||||||
<< " --session " << session
|
<< " --session " << session
|
||||||
<< " --color false"
|
<< " --color false"
|
||||||
<< " --mq-config \"" << mqConfig << "\"";
|
<< " --channel-config name=data,type=rep,method=bind,address=" << address;
|
||||||
rep = execute(cmd.str(), "[REP]");
|
rep = execute(cmd.str(), "[REP]");
|
||||||
});
|
});
|
||||||
|
|
||||||
auto req1 = execute_result{"", 0};
|
auto req1 = execute_result{"", 0};
|
||||||
thread req1_thread([&]() {
|
thread req1_thread([&]() {
|
||||||
stringstream cmd;
|
stringstream cmd;
|
||||||
cmd << runTestDevice << " --id req_1" << transport
|
cmd << runTestDevice
|
||||||
|
<< " --id req_1" << transport
|
||||||
<< " --control static"
|
<< " --control static"
|
||||||
|
<< " --shm-segment-size 100000000"
|
||||||
<< " --session " << session
|
<< " --session " << session
|
||||||
<< " --color false"
|
<< " --color false"
|
||||||
<< " --mq-config \"" << mqConfig << "\"";
|
<< " --channel-config name=data,type=req,method=connect,address=" << address;
|
||||||
req1 = execute(cmd.str(), "[REQ1]");
|
req1 = execute(cmd.str(), "[REQ1]");
|
||||||
});
|
});
|
||||||
|
|
||||||
auto req2 = execute_result{"", 0};
|
auto req2 = execute_result{"", 0};
|
||||||
thread req2_thread([&]() {
|
thread req2_thread([&]() {
|
||||||
stringstream cmd;
|
stringstream cmd;
|
||||||
cmd << runTestDevice << " --id req_2" << transport
|
cmd << runTestDevice
|
||||||
|
<< " --id req_2" << transport
|
||||||
<< " --control static"
|
<< " --control static"
|
||||||
|
<< " --shm-segment-size 100000000"
|
||||||
<< " --session " << session
|
<< " --session " << session
|
||||||
<< " --color false"
|
<< " --color false"
|
||||||
<< " --mq-config \"" << mqConfig << "\"";
|
<< " --channel-config name=data,type=req,method=connect,address=" << address;
|
||||||
req2 = execute(cmd.str(), "[REQ2]");
|
req2 = execute(cmd.str(), "[REQ2]");
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -62,6 +72,8 @@ auto RunReqRep(string transport) -> void
|
||||||
req2_thread.join();
|
req2_thread.join();
|
||||||
cerr << req1.console_out << req2.console_out << rep.console_out;
|
cerr << req1.console_out << req2.console_out << rep.console_out;
|
||||||
|
|
||||||
|
std::remove(ipcFile.c_str());
|
||||||
|
|
||||||
exit(req1.exit_code + req2.exit_code + rep.exit_code);
|
exit(req1.exit_code + req2.exit_code + rep.exit_code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,403 +0,0 @@
|
||||||
{
|
|
||||||
"fairMQOptions": {
|
|
||||||
"devices": [
|
|
||||||
{
|
|
||||||
"id": "pairleft_zeromq",
|
|
||||||
"channels": [
|
|
||||||
{
|
|
||||||
"address": "tcp://127.0.0.1:5557",
|
|
||||||
"method": "bind",
|
|
||||||
"name": "data",
|
|
||||||
"rateLogging": 0,
|
|
||||||
"transport": "zeromq",
|
|
||||||
"type": "pair"
|
|
||||||
}
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"id": "pairright_zeromq",
|
|
||||||
"channels": [
|
|
||||||
{
|
|
||||||
"address": "tcp://127.0.0.1:5557",
|
|
||||||
"method": "connect",
|
|
||||||
"name": "data",
|
|
||||||
"rateLogging": 0,
|
|
||||||
"transport": "zeromq",
|
|
||||||
"type": "pair"
|
|
||||||
}
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"id": "pairleft_shmem",
|
|
||||||
"channels": [
|
|
||||||
{
|
|
||||||
"address": "tcp://127.0.0.1:5857",
|
|
||||||
"method": "bind",
|
|
||||||
"name": "data",
|
|
||||||
"rateLogging": 0,
|
|
||||||
"transport": "shmem",
|
|
||||||
"type": "pair"
|
|
||||||
}
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"id": "pairright_shmem",
|
|
||||||
"channels": [
|
|
||||||
{
|
|
||||||
"address": "tcp://127.0.0.1:5857",
|
|
||||||
"method": "connect",
|
|
||||||
"name": "data",
|
|
||||||
"rateLogging": 0,
|
|
||||||
"transport": "shmem",
|
|
||||||
"type": "pair"
|
|
||||||
}
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"id": "pairleft_ofi",
|
|
||||||
"channels": [
|
|
||||||
{
|
|
||||||
"address": "tcp://127.0.0.1:5957",
|
|
||||||
"method": "bind",
|
|
||||||
"name": "data",
|
|
||||||
"rateLogging": 0,
|
|
||||||
"transport": "ofi",
|
|
||||||
"type": "pair"
|
|
||||||
}
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"id": "pairright_ofi",
|
|
||||||
"channels": [
|
|
||||||
{
|
|
||||||
"address": "tcp://127.0.0.1:5957",
|
|
||||||
"method": "connect",
|
|
||||||
"name": "data",
|
|
||||||
"rateLogging": 0,
|
|
||||||
"transport": "ofi",
|
|
||||||
"type": "pair"
|
|
||||||
}
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"id": "push_zeromq",
|
|
||||||
"channels": [
|
|
||||||
{
|
|
||||||
"address": "tcp://127.0.0.1:5557",
|
|
||||||
"method": "bind",
|
|
||||||
"name": "data",
|
|
||||||
"rateLogging": 0,
|
|
||||||
"transport": "zeromq",
|
|
||||||
"type": "push"
|
|
||||||
}
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"id": "pull_zeromq",
|
|
||||||
"channels": [
|
|
||||||
{
|
|
||||||
"address": "tcp://127.0.0.1:5557",
|
|
||||||
"method": "connect",
|
|
||||||
"name": "data",
|
|
||||||
"rateLogging": 0,
|
|
||||||
"transport": "zeromq",
|
|
||||||
"type": "pull"
|
|
||||||
}
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"id": "push_shmem",
|
|
||||||
"channels": [
|
|
||||||
{
|
|
||||||
"address": "tcp://127.0.0.1:5857",
|
|
||||||
"method": "bind",
|
|
||||||
"name": "data",
|
|
||||||
"rateLogging": 0,
|
|
||||||
"transport": "shmem",
|
|
||||||
"type": "push"
|
|
||||||
}
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"id": "pull_shmem",
|
|
||||||
"channels": [
|
|
||||||
{
|
|
||||||
"address": "tcp://127.0.0.1:5857",
|
|
||||||
"method": "connect",
|
|
||||||
"name": "data",
|
|
||||||
"rateLogging": 0,
|
|
||||||
"transport": "shmem",
|
|
||||||
"type": "pull"
|
|
||||||
}
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"id": "pub_zeromq",
|
|
||||||
"channels": [
|
|
||||||
{
|
|
||||||
"address": "tcp://127.0.0.1:5556",
|
|
||||||
"method": "bind",
|
|
||||||
"name": "data",
|
|
||||||
"rateLogging": 0,
|
|
||||||
"transport": "zeromq",
|
|
||||||
"type": "pub"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"address": "tcp://127.0.0.1:5555",
|
|
||||||
"method": "bind",
|
|
||||||
"name": "control",
|
|
||||||
"rateLogging": 0,
|
|
||||||
"transport": "zeromq",
|
|
||||||
"type": "pull"
|
|
||||||
}
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"id": "sub_1zeromq",
|
|
||||||
"channels": [
|
|
||||||
{
|
|
||||||
"address": "tcp://127.0.0.1:5556",
|
|
||||||
"method": "connect",
|
|
||||||
"name": "data",
|
|
||||||
"rateLogging": 0,
|
|
||||||
"transport": "zeromq",
|
|
||||||
"type": "sub"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"address": "tcp://127.0.0.1:5555",
|
|
||||||
"method": "connect",
|
|
||||||
"name": "control",
|
|
||||||
"rateLogging": 0,
|
|
||||||
"transport": "zeromq",
|
|
||||||
"type": "push"
|
|
||||||
}
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"id": "sub_2zeromq",
|
|
||||||
"channels": [
|
|
||||||
{
|
|
||||||
"address": "tcp://127.0.0.1:5556",
|
|
||||||
"method": "connect",
|
|
||||||
"name": "data",
|
|
||||||
"rateLogging": 0,
|
|
||||||
"transport": "zeromq",
|
|
||||||
"type": "sub"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"address": "tcp://127.0.0.1:5555",
|
|
||||||
"method": "connect",
|
|
||||||
"name": "control",
|
|
||||||
"rateLogging": 0,
|
|
||||||
"transport": "zeromq",
|
|
||||||
"type": "push"
|
|
||||||
}
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"id": "req_1zeromq",
|
|
||||||
"channels": [
|
|
||||||
{
|
|
||||||
"address": "tcp://127.0.0.1:5558",
|
|
||||||
"method": "connect",
|
|
||||||
"name": "data",
|
|
||||||
"rateLogging": 0,
|
|
||||||
"transport": "zeromq",
|
|
||||||
"type": "req"
|
|
||||||
}
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"id": "req_2zeromq",
|
|
||||||
"channels": [
|
|
||||||
{
|
|
||||||
"address": "tcp://127.0.0.1:5558",
|
|
||||||
"method": "connect",
|
|
||||||
"name": "data",
|
|
||||||
"rateLogging": 0,
|
|
||||||
"transport": "zeromq",
|
|
||||||
"type": "req"
|
|
||||||
}
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"id": "req_1shmem",
|
|
||||||
"channels": [
|
|
||||||
{
|
|
||||||
"address": "tcp://127.0.0.1:5758",
|
|
||||||
"method": "connect",
|
|
||||||
"name": "data",
|
|
||||||
"rateLogging": 0,
|
|
||||||
"transport": "shmem",
|
|
||||||
"type": "req"
|
|
||||||
}
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"id": "req_2shmem",
|
|
||||||
"channels": [
|
|
||||||
{
|
|
||||||
"address": "tcp://127.0.0.1:5758",
|
|
||||||
"method": "connect",
|
|
||||||
"name": "data",
|
|
||||||
"rateLogging": 0,
|
|
||||||
"transport": "shmem",
|
|
||||||
"type": "req"
|
|
||||||
}
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"id": "rep_zeromq",
|
|
||||||
"channels": [
|
|
||||||
{
|
|
||||||
"address": "tcp://127.0.0.1:5558",
|
|
||||||
"method": "bind",
|
|
||||||
"name": "data",
|
|
||||||
"rateLogging": 0,
|
|
||||||
"transport": "zeromq",
|
|
||||||
"type": "rep"
|
|
||||||
}
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"id": "rep_shmem",
|
|
||||||
"channels": [
|
|
||||||
{
|
|
||||||
"address": "tcp://127.0.0.1:5758",
|
|
||||||
"method": "bind",
|
|
||||||
"name": "data",
|
|
||||||
"rateLogging": 0,
|
|
||||||
"transport": "shmem",
|
|
||||||
"type": "rep"
|
|
||||||
}
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"id": "transfer_timeout_zeromq",
|
|
||||||
"channels": [
|
|
||||||
{
|
|
||||||
"address": "tcp://127.0.0.1:5559",
|
|
||||||
"method": "bind",
|
|
||||||
"name": "data-in",
|
|
||||||
"rateLogging": 0,
|
|
||||||
"transport": "zeromq",
|
|
||||||
"type": "pull"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"address": "tcp://127.0.0.1:5560",
|
|
||||||
"method": "bind",
|
|
||||||
"name": "data-out",
|
|
||||||
"rateLogging": 0,
|
|
||||||
"transport": "zeromq",
|
|
||||||
"type": "push"
|
|
||||||
}
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"id": "transfer_timeout_shmem",
|
|
||||||
"channels": [
|
|
||||||
{
|
|
||||||
"address": "tcp://127.0.0.1:5959",
|
|
||||||
"method": "bind",
|
|
||||||
"name": "data-in",
|
|
||||||
"rateLogging": 0,
|
|
||||||
"transport": "shmem",
|
|
||||||
"type": "pull"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"address": "tcp://127.0.0.1:5960",
|
|
||||||
"method": "bind",
|
|
||||||
"name": "data-out",
|
|
||||||
"rateLogging": 0,
|
|
||||||
"transport": "shmem",
|
|
||||||
"type": "push"
|
|
||||||
}
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"id": "pollout_zeromq",
|
|
||||||
"channels": [
|
|
||||||
{
|
|
||||||
"address": "tcp://127.0.0.1:6000",
|
|
||||||
"method": "bind",
|
|
||||||
"name": "data1",
|
|
||||||
"rateLogging": 0,
|
|
||||||
"transport": "zeromq",
|
|
||||||
"type": "push"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"address": "tcp://127.0.0.1:6001",
|
|
||||||
"method": "bind",
|
|
||||||
"name": "data2",
|
|
||||||
"rateLogging": 0,
|
|
||||||
"transport": "zeromq",
|
|
||||||
"type": "push"
|
|
||||||
}
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"id": "pollin_zeromq",
|
|
||||||
"channels": [
|
|
||||||
{
|
|
||||||
"address": "tcp://127.0.0.1:6000",
|
|
||||||
"method": "connect",
|
|
||||||
"name": "data1",
|
|
||||||
"rateLogging": 0,
|
|
||||||
"transport": "zeromq",
|
|
||||||
"type": "pull"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"address": "tcp://127.0.0.1:6001",
|
|
||||||
"method": "connect",
|
|
||||||
"name": "data2",
|
|
||||||
"rateLogging": 0,
|
|
||||||
"transport": "zeromq",
|
|
||||||
"type": "pull"
|
|
||||||
}
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"id": "pollout_shmem",
|
|
||||||
"channels": [
|
|
||||||
{
|
|
||||||
"address": "tcp://127.0.0.1:6004",
|
|
||||||
"method": "bind",
|
|
||||||
"name": "data1",
|
|
||||||
"rateLogging": 0,
|
|
||||||
"transport": "shmem",
|
|
||||||
"type": "push"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"address": "tcp://127.0.0.1:6005",
|
|
||||||
"method": "bind",
|
|
||||||
"name": "data2",
|
|
||||||
"rateLogging": 0,
|
|
||||||
"transport": "shmem",
|
|
||||||
"type": "push"
|
|
||||||
}
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"id": "pollin_shmem",
|
|
||||||
"channels": [
|
|
||||||
{
|
|
||||||
"address": "tcp://127.0.0.1:6004",
|
|
||||||
"method": "connect",
|
|
||||||
"name": "data1",
|
|
||||||
"rateLogging": 0,
|
|
||||||
"transport": "shmem",
|
|
||||||
"type": "pull"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"address": "tcp://127.0.0.1:6005",
|
|
||||||
"method": "connect",
|
|
||||||
"name": "data2",
|
|
||||||
"rateLogging": 0,
|
|
||||||
"transport": "shmem",
|
|
||||||
"type": "pull"
|
|
||||||
}
|
|
||||||
]
|
|
||||||
}
|
|
||||||
]
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -33,7 +33,9 @@ void RegionsCache(const string& transport, const string& address)
|
||||||
ProgOptions config1;
|
ProgOptions config1;
|
||||||
ProgOptions config2;
|
ProgOptions config2;
|
||||||
config1.SetProperty<string>("session", to_string(session1));
|
config1.SetProperty<string>("session", to_string(session1));
|
||||||
|
config1.SetProperty<size_t>("shm-segment-size", 100000000);
|
||||||
config2.SetProperty<string>("session", to_string(session2));
|
config2.SetProperty<string>("session", to_string(session2));
|
||||||
|
config2.SetProperty<size_t>("shm-segment-size", 100000000);
|
||||||
|
|
||||||
auto factory1 = TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config1);
|
auto factory1 = TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config1);
|
||||||
auto factory2 = TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config2);
|
auto factory2 = TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config2);
|
||||||
|
@ -92,6 +94,7 @@ void RegionEventSubscriptions(const string& transport)
|
||||||
|
|
||||||
ProgOptions config;
|
ProgOptions config;
|
||||||
config.SetProperty<string>("session", to_string(session));
|
config.SetProperty<string>("session", to_string(session));
|
||||||
|
config.SetProperty<size_t>("shm-segment-size", 100000000);
|
||||||
|
|
||||||
auto factory = TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config);
|
auto factory = TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config);
|
||||||
|
|
||||||
|
@ -164,6 +167,7 @@ void RegionCallbacks(const string& transport, const string& _address)
|
||||||
|
|
||||||
ProgOptions config;
|
ProgOptions config;
|
||||||
config.SetProperty<string>("session", to_string(session));
|
config.SetProperty<string>("session", to_string(session));
|
||||||
|
config.SetProperty<size_t>("shm-segment-size", 100000000);
|
||||||
|
|
||||||
auto factory = TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config);
|
auto factory = TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config);
|
||||||
|
|
||||||
|
|
|
@ -19,7 +19,6 @@ namespace fair::mq::test
|
||||||
using namespace std;
|
using namespace std;
|
||||||
|
|
||||||
string runTestDevice = "@RUN_TEST_DEVICE@";
|
string runTestDevice = "@RUN_TEST_DEVICE@";
|
||||||
string mqConfig = "@MQ_CONFIG@";
|
|
||||||
|
|
||||||
} // namespace fair::mq::test
|
} // namespace fair::mq::test
|
||||||
|
|
||||||
|
|
|
@ -41,6 +41,7 @@ void RunOptionsTest(const string& transport)
|
||||||
{
|
{
|
||||||
ProgOptions config;
|
ProgOptions config;
|
||||||
config.SetProperty<string>("session", tools::Uuid());
|
config.SetProperty<string>("session", tools::Uuid());
|
||||||
|
config.SetProperty<size_t>("shm-segment-size", 100000000);
|
||||||
auto factory = TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config);
|
auto factory = TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config);
|
||||||
Channel channel("Push", "push", factory);
|
Channel channel("Push", "push", factory);
|
||||||
|
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
#include <gtest/gtest.h>
|
#include <gtest/gtest.h>
|
||||||
|
|
||||||
#include <chrono>
|
#include <chrono>
|
||||||
|
#include <cstdio> // std::remove
|
||||||
#include <sstream> // std::stringstream
|
#include <sstream> // std::stringstream
|
||||||
#include <thread>
|
#include <thread>
|
||||||
|
|
||||||
|
@ -40,13 +41,29 @@ void delayedInterruptor(TransportFactory& transport)
|
||||||
auto RunTransferTimeout(string transport) -> void
|
auto RunTransferTimeout(string transport) -> void
|
||||||
{
|
{
|
||||||
size_t session{UuidHash()};
|
size_t session{UuidHash()};
|
||||||
|
string dataInIpcFile("/tmp/fmq_" + to_string(session) + "_datain_" + transport);
|
||||||
|
string dataOutIpcFile("/tmp/fmq_" + to_string(session) + "_dataout_" + transport);
|
||||||
|
string dataInAddress("ipc://" + dataInIpcFile);
|
||||||
|
string dataOutAddress("ipc://" + dataOutIpcFile);
|
||||||
|
|
||||||
stringstream cmd;
|
stringstream cmd;
|
||||||
cmd << runTestDevice << " --id transfer_timeout_" << transport << " --control static --transport " << transport
|
cmd << runTestDevice
|
||||||
<< " --session " << session << " --color false --mq-config \"" << mqConfig << "\"";
|
<< " --id transfer_timeout_" << transport
|
||||||
|
<< " --control static"
|
||||||
|
<< " --shm-segment-size 100000000"
|
||||||
|
<< " --severity debug"
|
||||||
|
<< " --transport " << transport
|
||||||
|
<< " --session " << session
|
||||||
|
<< " --color false"
|
||||||
|
<< " --channel-config name=data-in,type=pull,method=bind,address=" << dataInAddress
|
||||||
|
<< " name=data-out,type=push,method=bind,address=" << dataOutAddress;
|
||||||
auto res = execute(cmd.str());
|
auto res = execute(cmd.str());
|
||||||
|
|
||||||
cerr << res.console_out;
|
cerr << res.console_out;
|
||||||
|
|
||||||
|
std::remove(dataInIpcFile.c_str());
|
||||||
|
std::remove(dataOutIpcFile.c_str());
|
||||||
|
|
||||||
exit(res.exit_code);
|
exit(res.exit_code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -57,6 +74,7 @@ void InterruptTransfer(const string& transport, const string& _address)
|
||||||
|
|
||||||
fair::mq::ProgOptions config;
|
fair::mq::ProgOptions config;
|
||||||
config.SetProperty<string>("session", to_string(session));
|
config.SetProperty<string>("session", to_string(session));
|
||||||
|
config.SetProperty<size_t>("shm-segment-size", 100000000);
|
||||||
|
|
||||||
auto factory = TransportFactory::CreateTransportFactory(transport, Uuid(), &config);
|
auto factory = TransportFactory::CreateTransportFactory(transport, Uuid(), &config);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user