Fix region example by moving our test code to a separate one

This commit is contained in:
Alexey Rybalchenko 2019-03-04 11:39:43 +01:00 committed by Dennis Klein
parent 3bf5f3bf45
commit f191c5099c
15 changed files with 380 additions and 17 deletions

View File

@ -16,5 +16,6 @@ add_subdirectory(multiple-channels)
if(BUILD_NANOMSG_TRANSPORT) if(BUILD_NANOMSG_TRANSPORT)
add_subdirectory(multiple-transports) add_subdirectory(multiple-transports)
endif() endif()
add_subdirectory(readout)
add_subdirectory(region) add_subdirectory(region)
add_subdirectory(req-rep) add_subdirectory(req-rep)

View File

@ -0,0 +1,54 @@
################################################################################
# Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# #
# This software is distributed under the terms of the #
# GNU Lesser General Public Licence (LGPL) version 3, #
# copied verbatim in the file "LICENSE" #
################################################################################
add_library(ExampleReadoutLib STATIC
"Sampler.cxx"
"Sampler.h"
"Builder.h"
"Sink.cxx"
"Sink.h"
)
target_link_libraries(ExampleReadoutLib PUBLIC FairMQ)
add_executable(fairmq-ex-readout-sampler runSampler.cxx)
target_link_libraries(fairmq-ex-readout-sampler PRIVATE ExampleReadoutLib)
add_executable(fairmq-ex-readout-builder runBuilder.cxx)
target_link_libraries(fairmq-ex-readout-builder PRIVATE ExampleReadoutLib)
add_executable(fairmq-ex-readout-sink runSink.cxx)
target_link_libraries(fairmq-ex-readout-sink PRIVATE ExampleReadoutLib)
add_custom_target(Examplereadout DEPENDS fairmq-ex-readout-sampler fairmq-ex-readout-sink)
set(EX_BIN_DIR ${CMAKE_CURRENT_BINARY_DIR})
set(FAIRMQ_BIN_DIR ${CMAKE_BINARY_DIR}/fairmq)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-readout.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-readout.sh)
# install
install(
TARGETS
fairmq-ex-readout-sampler
fairmq-ex-readout-sink
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
)
# configure run script with different executable paths for build and for install directories
set(EX_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR})
set(FAIRMQ_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}/fairmq)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-readout.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-readout.sh_install)
install(
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-readout.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-readout.sh
)

View File

@ -0,0 +1,5 @@
Region example
==============
This example demonstrates the use of a more advanced feature - UnmanagedRegion, that can be used to create a buffer through one of FairMQ transports. The contents of this buffer are managed by the user, who can also create messages out of sub-buffers of the created buffer. Such feature can be interesting in environments that have special requirements by the hardware that writes the data, to keep the transfer efficient (e.g. shared memory).

View File

@ -0,0 +1,91 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sampler.cpp
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#include "Sampler.h"
#include <thread>
using namespace std;
namespace example_region
{
Sampler::Sampler()
: fMsgSize(10000)
, fMaxIterations(0)
, fNumIterations(0)
, fRegion(nullptr)
, fNumUnackedMsgs(0)
{
}
void Sampler::InitTask()
{
fMsgSize = fConfig->GetValue<int>("msg-size");
fMaxIterations = fConfig->GetValue<uint64_t>("max-iterations");
fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegionFor("data1",
0,
10000000,
[this](void* /*data*/, size_t /*size*/, void* /*hint*/) { // callback to be called when message buffers no longer needed by transport
--fNumUnackedMsgs;
if (fMaxIterations > 0)
{
LOG(debug) << "Received ack";
}
}
));
}
bool Sampler::ConditionalRun()
{
FairMQMessagePtr msg(NewMessageFor("data1", // channel
0, // sub-channel
fRegion, // region
fRegion->GetData(), // ptr within region
fMsgSize, // offset from ptr
nullptr // hint
));
if (Send(msg, "data1", 0) > 0)
{
++fNumUnackedMsgs;
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations)
{
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
return false;
}
}
return true;
}
void Sampler::ResetTask()
{
// if not all messages acknowledged, wait for a bit. But only once, since receiver could be already dead.
if (fNumUnackedMsgs != 0)
{
LOG(debug) << "waiting for all acknowledgements... (" << fNumUnackedMsgs << ")";
this_thread::sleep_for(chrono::milliseconds(500));
LOG(debug) << "done, still unacked: " << fNumUnackedMsgs;
}
fRegion.reset();
}
Sampler::~Sampler()
{
}
} // namespace example_region

View File

@ -0,0 +1,46 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sampler.h
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#ifndef FAIRMQEXAMPLEREGIONSAMPLER_H
#define FAIRMQEXAMPLEREGIONSAMPLER_H
#include <atomic>
#include "FairMQDevice.h"
namespace example_region
{
class Sampler : public FairMQDevice
{
public:
Sampler();
virtual ~Sampler();
protected:
virtual void InitTask();
virtual bool ConditionalRun();
virtual void ResetTask();
private:
int fMsgSize;
uint64_t fMaxIterations;
uint64_t fNumIterations;
FairMQUnmanagedRegionPtr fRegion;
std::atomic<uint64_t> fNumUnackedMsgs;
};
} // namespace example_region
#endif /* FAIRMQEXAMPLEREGIONSAMPLER_H */

56
examples/readout/Sink.cxx Normal file
View File

@ -0,0 +1,56 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sink.cxx
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#include "Sink.h"
using namespace std;
namespace example_region
{
Sink::Sink()
: fMaxIterations(0)
, fNumIterations(0)
{
}
void Sink::InitTask()
{
// Get the fMaxIterations value from the command line options (via fConfig)
fMaxIterations = fConfig->GetValue<uint64_t>("max-iterations");
}
void Sink::Run()
{
FairMQChannel& dataInChannel = fChannels.at("data").at(0);
while (!NewStatePending())
{
FairMQMessagePtr msg(dataInChannel.Transport()->CreateMessage());
dataInChannel.Receive(msg);
// void* ptr = msg->GetData();
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations)
{
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
break;
}
}
}
Sink::~Sink()
{
}
} // namespace example_region

42
examples/readout/Sink.h Normal file
View File

@ -0,0 +1,42 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sink.h
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#ifndef FAIRMQEXAMPLEREGIONSINK_H
#define FAIRMQEXAMPLEREGIONSINK_H
#include <string>
#include "FairMQDevice.h"
namespace example_region
{
class Sink : public FairMQDevice
{
public:
Sink();
virtual ~Sink();
protected:
virtual void Run();
virtual void InitTask();
private:
uint64_t fMaxIterations;
uint64_t fNumIterations;
};
} // namespace example_region
#endif /* FAIRMQEXAMPLEREGIONSINK_H */

View File

@ -0,0 +1,30 @@
#!/bin/bash
export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@
msgSize="1000000"
if [[ $1 =~ ^[0-9]+$ ]]; then
msgSize=$1
fi
SAMPLER="fairmq-ex-readout-sampler"
SAMPLER+=" --id sampler1"
SAMPLER+=" --severity debug"
SAMPLER+=" --msg-size $msgSize"
# SAMPLER+=" --rate 10"
SAMPLER+=" --channel-config name=data1,type=pair,method=bind,address=tcp://127.0.0.1:7777,transport=shmem"
xterm -geometry 80x23+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER &
BUILDER="fairmq-ex-readout-builder"
BUILDER+=" --id builder1"
BUILDER+=" --severity debug"
BUILDER+=" --channel-config name=data1,type=pair,method=connect,address=tcp://127.0.0.1:7777,transport=shmem"
BUILDER+=" name=data2,type=pair,method=connect,address=tcp://127.0.0.1:7778,transport=ofi"
xterm -geometry 80x23+500+0 -hold -e @EX_BIN_DIR@/$BUILDER &
SINK="fairmq-ex-readout-sink"
SINK+=" --id sink1"
SINK+=" --severity debug"
SINK+=" --channel-config name=data,type=pair,method=bind,address=tcp://127.0.0.1:7778,transport=ofi"
xterm -geometry 80x23+1000+0 -hold -e @EX_BIN_DIR@/$SINK &

View File

@ -0,0 +1,24 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "Sampler.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("msg-size", bpo::value<int>()->default_value(1000), "Message size in bytes")
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
{
return new example_region::Sampler();
}

View File

@ -0,0 +1,23 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "Sink.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
{
return new example_region::Sink();
}

View File

@ -9,7 +9,6 @@
add_library(ExampleRegionLib STATIC add_library(ExampleRegionLib STATIC
"Sampler.cxx" "Sampler.cxx"
"Sampler.h" "Sampler.h"
"Builder.h"
"Sink.cxx" "Sink.cxx"
"Sink.h" "Sink.h"
) )
@ -19,9 +18,6 @@ target_link_libraries(ExampleRegionLib PUBLIC FairMQ)
add_executable(fairmq-ex-region-sampler runSampler.cxx) add_executable(fairmq-ex-region-sampler runSampler.cxx)
target_link_libraries(fairmq-ex-region-sampler PRIVATE ExampleRegionLib) target_link_libraries(fairmq-ex-region-sampler PRIVATE ExampleRegionLib)
add_executable(fairmq-ex-region-builder runBuilder.cxx)
target_link_libraries(fairmq-ex-region-builder PRIVATE ExampleRegionLib)
add_executable(fairmq-ex-region-sink runSink.cxx) add_executable(fairmq-ex-region-sink runSink.cxx)
target_link_libraries(fairmq-ex-region-sink PRIVATE ExampleRegionLib) target_link_libraries(fairmq-ex-region-sink PRIVATE ExampleRegionLib)

View File

@ -35,7 +35,7 @@ void Sampler::InitTask()
fMsgSize = fConfig->GetValue<int>("msg-size"); fMsgSize = fConfig->GetValue<int>("msg-size");
fMaxIterations = fConfig->GetValue<uint64_t>("max-iterations"); fMaxIterations = fConfig->GetValue<uint64_t>("max-iterations");
fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegionFor("data1", fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegionFor("data",
0, 0,
10000000, 10000000,
[this](void* /*data*/, size_t /*size*/, void* /*hint*/) { // callback to be called when message buffers no longer needed by transport [this](void* /*data*/, size_t /*size*/, void* /*hint*/) { // callback to be called when message buffers no longer needed by transport
@ -50,7 +50,7 @@ void Sampler::InitTask()
bool Sampler::ConditionalRun() bool Sampler::ConditionalRun()
{ {
FairMQMessagePtr msg(NewMessageFor("data1", // channel FairMQMessagePtr msg(NewMessageFor("data", // channel
0, // sub-channel 0, // sub-channel
fRegion, // region fRegion, // region
fRegion->GetData(), // ptr within region fRegion->GetData(), // ptr within region
@ -58,7 +58,7 @@ bool Sampler::ConditionalRun()
nullptr // hint nullptr // hint
)); ));
if (Send(msg, "data1", 0) > 0) if (Send(msg, "data", 0) > 0)
{ {
++fNumUnackedMsgs; ++fNumUnackedMsgs;

View File

@ -13,18 +13,13 @@ SAMPLER+=" --id sampler1"
SAMPLER+=" --severity debug" SAMPLER+=" --severity debug"
SAMPLER+=" --msg-size $msgSize" SAMPLER+=" --msg-size $msgSize"
# SAMPLER+=" --rate 10" # SAMPLER+=" --rate 10"
SAMPLER+=" --channel-config name=data1,type=pair,method=bind,address=tcp://127.0.0.1:7777,transport=shmem" SAMPLER+=" --transport shmem"
SAMPLER+=" --channel-config name=data,type=push,method=bind,address=tcp://127.0.0.1:7777,sndKernelSize=212992"
xterm -geometry 80x23+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER & xterm -geometry 80x23+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER &
BUILDER="fairmq-ex-region-builder"
BUILDER+=" --id builder1"
BUILDER+=" --severity debug"
BUILDER+=" --channel-config name=data1,type=pair,method=connect,address=tcp://127.0.0.1:7777,transport=shmem"
BUILDER+=" name=data2,type=pair,method=connect,address=tcp://127.0.0.1:7778,transport=ofi"
xterm -geometry 80x23+500+0 -hold -e @EX_BIN_DIR@/$BUILDER &
SINK="fairmq-ex-region-sink" SINK="fairmq-ex-region-sink"
SINK+=" --id sink1" SINK+=" --id sink1"
SINK+=" --severity debug" SINK+=" --severity debug"
SINK+=" --channel-config name=data,type=pair,method=bind,address=tcp://127.0.0.1:7778,transport=ofi" SINK+=" --transport shmem"
xterm -geometry 80x23+1000+0 -hold -e @EX_BIN_DIR@/$SINK & SINK+=" --channel-config name=data,type=pull,method=connect,address=tcp://127.0.0.1:7777,rcvKernelSize=212992"
xterm -geometry 80x23+500+0 -hold -e @EX_BIN_DIR@/$SINK &