Enhance region example with Builder device

This commit is contained in:
Alexey Rybalchenko 2019-02-28 01:41:04 +01:00 committed by Dennis Klein
parent f85663bfe8
commit 7df278818c
7 changed files with 79 additions and 10 deletions

40
examples/region/Builder.h Normal file
View File

@ -0,0 +1,40 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIRMQEXAMPLEREGIONBUILDER_H
#define FAIRMQEXAMPLEREGIONBUILDER_H
#include <atomic>
#include "FairMQDevice.h"
namespace example_region
{
class Builder : public FairMQDevice
{
public:
Builder() {
OnData("data1", &Builder::HandleData);
}
virtual ~Builder() {}
protected:
bool HandleData(FairMQMessagePtr& msg, int /*index*/)
{
if (Send(msg, "data2") < 0) {
return false;
}
return true;
}
};
} // namespace example_region
#endif /* FAIRMQEXAMPLEREGIONBUILDER_H */

View File

@ -9,6 +9,7 @@
add_library(ExampleRegionLib STATIC
"Sampler.cxx"
"Sampler.h"
"Builder.h"
"Sink.cxx"
"Sink.h"
)
@ -18,6 +19,8 @@ target_link_libraries(ExampleRegionLib PUBLIC FairMQ)
add_executable(fairmq-ex-region-sampler runSampler.cxx)
target_link_libraries(fairmq-ex-region-sampler PRIVATE ExampleRegionLib)
add_executable(fairmq-ex-region-builder runBuilder.cxx)
target_link_libraries(fairmq-ex-region-builder PRIVATE ExampleRegionLib)
add_executable(fairmq-ex-region-sink runSink.cxx)
target_link_libraries(fairmq-ex-region-sink PRIVATE ExampleRegionLib)

View File

@ -35,7 +35,7 @@ void Sampler::InitTask()
fMsgSize = fConfig->GetValue<int>("msg-size");
fMaxIterations = fConfig->GetValue<uint64_t>("max-iterations");
fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegionFor("data",
fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegionFor("data1",
0,
10000000,
[this](void* /*data*/, size_t /*size*/, void* /*hint*/) { // callback to be called when message buffers no longer needed by transport
@ -50,7 +50,7 @@ void Sampler::InitTask()
bool Sampler::ConditionalRun()
{
FairMQMessagePtr msg(NewMessageFor("data", // channel
FairMQMessagePtr msg(NewMessageFor("data1", // channel
0, // sub-channel
fRegion, // region
fRegion->GetData(), // ptr within region
@ -58,7 +58,7 @@ bool Sampler::ConditionalRun()
nullptr // hint
));
if (Send(msg, "data", 0) > 0)
if (Send(msg, "data1", 0) > 0)
{
++fNumUnackedMsgs;

View File

@ -13,13 +13,18 @@ SAMPLER+=" --id sampler1"
SAMPLER+=" --severity debug"
SAMPLER+=" --msg-size $msgSize"
# SAMPLER+=" --rate 10"
SAMPLER+=" --transport shmem"
SAMPLER+=" --channel-config name=data,type=push,method=bind,address=tcp://127.0.0.1:7777,sndKernelSize=212992"
SAMPLER+=" --channel-config name=data1,type=pair,method=bind,address=tcp://127.0.0.1:7777,transport=shmem"
xterm -geometry 80x23+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER &
BUILDER="fairmq-ex-region-builder"
BUILDER+=" --id builder1"
BUILDER+=" --severity debug"
BUILDER+=" --channel-config name=data1,type=pair,method=connect,address=tcp://127.0.0.1:7777,transport=shmem"
BUILDER+=" name=data2,type=pair,method=connect,address=tcp://127.0.0.1:7778,transport=ofi"
xterm -geometry 80x23+500+0 -hold -e @EX_BIN_DIR@/$BUILDER &
SINK="fairmq-ex-region-sink"
SINK+=" --id sink1"
SINK+=" --severity debug"
SINK+=" --transport shmem"
SINK+=" --channel-config name=data,type=pull,method=connect,address=tcp://127.0.0.1:7777,rcvKernelSize=212992"
xterm -geometry 80x23+500+0 -hold -e @EX_BIN_DIR@/$SINK &
SINK+=" --channel-config name=data,type=pair,method=bind,address=tcp://127.0.0.1:7778,transport=ofi"
xterm -geometry 80x23+1000+0 -hold -e @EX_BIN_DIR@/$SINK &

View File

@ -0,0 +1,20 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "Builder.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& /* options */)
{}
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
{
return new example_region::Builder();
}

View File

@ -61,7 +61,7 @@ SAMPLER+=" --msg-size $msgSize"
SAMPLER+=" --num-parts 1"
# SAMPLER+=" --msg-rate 1000"
SAMPLER+=" --max-iterations $maxIterations"
SAMPLER+=" --channel-config name=data,type=push,method=bind,address=tcp://127.0.0.1:5555"
SAMPLER+=" --channel-config name=data,type=pair,method=bind,address=tcp://127.0.0.1:5555"
xterm -geometry 90x50+0+0 -hold -e $affinitySamp @CMAKE_CURRENT_BINARY_DIR@/$SAMPLER &
echo ""
echo "started: xterm -geometry 90x50+0+0 -hold -e $affinitySamp @CMAKE_CURRENT_BINARY_DIR@/$SAMPLER"
@ -75,7 +75,7 @@ SINK+=" --transport $transport"
SINK+=" --severity debug"
SINK+=" --multipart false"
SINK+=" --max-iterations $maxIterations"
SINK+=" --channel-config name=data,type=pull,method=connect,address=tcp://127.0.0.1:5555"
SINK+=" --channel-config name=data,type=pair,method=connect,address=tcp://127.0.0.1:5555"
xterm -geometry 90x50+550+0 -hold -e $affinitySink @CMAKE_CURRENT_BINARY_DIR@/$SINK &
echo ""
echo "started: xterm -geometry 90x50+550+0 -hold -e $affinitySink @CMAKE_CURRENT_BINARY_DIR@/$SINK"

View File

@ -350,6 +350,7 @@ void FairMQMessageSHM::CloseMessage()
}
if (fRegionPtr)
{
// LOG(debug) << "sending ack";
if (fRegionPtr->fQueue->timed_send(&block, sizeof(RegionBlock), 0, sndTill))
{
success = true;