mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 08:41:16 +00:00
Add example with ref-counted copy from unmanaged region
This commit is contained in:
parent
68038c4693
commit
46f50a10ea
|
@ -7,5 +7,6 @@
|
||||||
################################################################################
|
################################################################################
|
||||||
|
|
||||||
add_example(NAME region
|
add_example(NAME region
|
||||||
DEVICE sampler sink keep-alive
|
DEVICE sampler processor sink keep-alive
|
||||||
|
SCRIPT region region-advanced
|
||||||
)
|
)
|
||||||
|
|
53
examples/region/fairmq-start-ex-region-advanced.sh.in
Executable file
53
examples/region/fairmq-start-ex-region-advanced.sh.in
Executable file
|
@ -0,0 +1,53 @@
|
||||||
|
#!/bin/bash
|
||||||
|
|
||||||
|
export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@
|
||||||
|
|
||||||
|
transport="shmem"
|
||||||
|
msgSize="1000000"
|
||||||
|
|
||||||
|
if [[ $1 =~ ^[a-z]+$ ]]; then
|
||||||
|
transport=$1
|
||||||
|
fi
|
||||||
|
|
||||||
|
if [[ $2 =~ ^[0-9]+$ ]]; then
|
||||||
|
msgSize=$1
|
||||||
|
fi
|
||||||
|
|
||||||
|
SAMPLER="fairmq-ex-region-sampler"
|
||||||
|
SAMPLER+=" --id sampler1"
|
||||||
|
# SAMPLER+=" --sampling-rate 10"
|
||||||
|
SAMPLER+=" --severity debug"
|
||||||
|
SAMPLER+=" --msg-size $msgSize"
|
||||||
|
SAMPLER+=" --transport $transport"
|
||||||
|
SAMPLER+=" --shm-monitor true"
|
||||||
|
SAMPLER+=" --chan-name data1"
|
||||||
|
SAMPLER+=" --channel-config name=data1,type=push,method=bind,address=tcp://127.0.0.1:7777"
|
||||||
|
xterm -geometry 120x60+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER &
|
||||||
|
|
||||||
|
PROCESSOR="fairmq-ex-region-processor"
|
||||||
|
PROCESSOR+=" --id processor1"
|
||||||
|
PROCESSOR+=" --severity debug"
|
||||||
|
PROCESSOR+=" --transport $transport"
|
||||||
|
PROCESSOR+=" --shm-monitor true"
|
||||||
|
PROCESSOR+=" --channel-config name=data1,type=pull,method=connect,address=tcp://127.0.0.1:7777"
|
||||||
|
PROCESSOR+=" name=data2,type=push,method=bind,address=tcp://127.0.0.1:7778"
|
||||||
|
PROCESSOR+=" name=data3,type=push,method=bind,address=tcp://127.0.0.1:7779"
|
||||||
|
xterm -geometry 120x60+750+0 -hold -e @EX_BIN_DIR@/$PROCESSOR &
|
||||||
|
|
||||||
|
SINK1="fairmq-ex-region-sink"
|
||||||
|
SINK1+=" --id sink1"
|
||||||
|
SINK1+=" --severity debug"
|
||||||
|
SINK1+=" --chan-name data2"
|
||||||
|
SINK1+=" --transport $transport"
|
||||||
|
SINK1+=" --shm-monitor true"
|
||||||
|
SINK1+=" --channel-config name=data2,type=pull,method=connect,address=tcp://127.0.0.1:7778"
|
||||||
|
xterm -geometry 120x32+1500+0 -hold -e @EX_BIN_DIR@/$SINK1 &
|
||||||
|
|
||||||
|
SINK2="fairmq-ex-region-sink"
|
||||||
|
SINK2+=" --id sink2"
|
||||||
|
SINK2+=" --severity debug"
|
||||||
|
SINK2+=" --chan-name data3"
|
||||||
|
SINK2+=" --transport $transport"
|
||||||
|
SINK2+=" --shm-monitor true"
|
||||||
|
SINK2+=" --channel-config name=data3,type=pull,method=connect,address=tcp://127.0.0.1:7779"
|
||||||
|
xterm -geometry 120x32+1500+500 -hold -e @EX_BIN_DIR@/$SINK2 &
|
74
examples/region/processor.cxx
Normal file
74
examples/region/processor.cxx
Normal file
|
@ -0,0 +1,74 @@
|
||||||
|
/********************************************************************************
|
||||||
|
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||||
|
* *
|
||||||
|
* This software is distributed under the terms of the *
|
||||||
|
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||||
|
* copied verbatim in the file "LICENSE" *
|
||||||
|
********************************************************************************/
|
||||||
|
|
||||||
|
#include <fairmq/Device.h>
|
||||||
|
#include <fairmq/runDevice.h>
|
||||||
|
#include <memory>
|
||||||
|
|
||||||
|
namespace bpo = boost::program_options;
|
||||||
|
using namespace std;
|
||||||
|
using namespace fair::mq;
|
||||||
|
|
||||||
|
namespace {
|
||||||
|
|
||||||
|
struct Processor : Device
|
||||||
|
{
|
||||||
|
void InitTask() override
|
||||||
|
{
|
||||||
|
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
|
||||||
|
GetChannel("data1", 0).Transport()->SubscribeToRegionEvents([](RegionInfo info) {
|
||||||
|
LOG(info) << "Region event: " << info.event << ": "
|
||||||
|
<< (info.managed ? "managed" : "unmanaged") << ", id: " << info.id
|
||||||
|
<< ", ptr: " << info.ptr << ", size: " << info.size
|
||||||
|
<< ", flags: " << info.flags;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
void Run() override
|
||||||
|
{
|
||||||
|
Channel& dataIn = GetChannel("data1", 0);
|
||||||
|
Channel& dataOut1 = GetChannel("data2", 0);
|
||||||
|
Channel& dataOut2 = GetChannel("data3", 0);
|
||||||
|
|
||||||
|
while (!NewStatePending()) {
|
||||||
|
auto msg(dataIn.Transport()->CreateMessage());
|
||||||
|
dataIn.Receive(msg);
|
||||||
|
|
||||||
|
fair::mq::MessagePtr msgCopy1(NewMessage());
|
||||||
|
msgCopy1->Copy(*msg);
|
||||||
|
fair::mq::MessagePtr msgCopy2(NewMessage());
|
||||||
|
msgCopy2->Copy(*msg);
|
||||||
|
|
||||||
|
dataOut1.Send(msgCopy1);
|
||||||
|
dataOut2.Send(msgCopy2);
|
||||||
|
|
||||||
|
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
|
||||||
|
LOG(info) << "Configured max number of iterations reached. Leaving RUNNING state.";
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void ResetTask() override
|
||||||
|
{
|
||||||
|
GetChannel("data1", 0).Transport()->UnsubscribeFromRegionEvents();
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
uint64_t fMaxIterations = 0;
|
||||||
|
uint64_t fNumIterations = 0;
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace
|
||||||
|
|
||||||
|
void addCustomOptions(bpo::options_description& options)
|
||||||
|
{
|
||||||
|
options.add_options()("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
|
||||||
|
}
|
||||||
|
|
||||||
|
unique_ptr<Device> getDevice(ProgOptions& /*config*/) { return make_unique<Processor>(); }
|
|
@ -8,6 +8,7 @@
|
||||||
|
|
||||||
#include <fairmq/Device.h>
|
#include <fairmq/Device.h>
|
||||||
#include <fairmq/runDevice.h>
|
#include <fairmq/runDevice.h>
|
||||||
|
#include <fairmq/tools/RateLimit.h>
|
||||||
|
|
||||||
#include <cstdint>
|
#include <cstdint>
|
||||||
#include <mutex>
|
#include <mutex>
|
||||||
|
@ -23,8 +24,10 @@ struct Sampler : fair::mq::Device
|
||||||
fMsgSize = fConfig->GetProperty<int>("msg-size");
|
fMsgSize = fConfig->GetProperty<int>("msg-size");
|
||||||
fLinger = fConfig->GetProperty<uint32_t>("region-linger");
|
fLinger = fConfig->GetProperty<uint32_t>("region-linger");
|
||||||
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
|
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
|
||||||
|
fChanName = fConfig->GetProperty<std::string>("chan-name");
|
||||||
|
fSamplingRate = fConfig->GetProperty<float>("sampling-rate");
|
||||||
|
|
||||||
GetChannel("data", 0).Transport()->SubscribeToRegionEvents([](fair::mq::RegionInfo info) {
|
GetChannel(fChanName, 0).Transport()->SubscribeToRegionEvents([](fair::mq::RegionInfo info) {
|
||||||
LOG(info) << "Region event: " << info.event << ": "
|
LOG(info) << "Region event: " << info.event << ": "
|
||||||
<< (info.managed ? "managed" : "unmanaged")
|
<< (info.managed ? "managed" : "unmanaged")
|
||||||
<< ", id: " << info.id
|
<< ", id: " << info.id
|
||||||
|
@ -43,7 +46,7 @@ struct Sampler : fair::mq::Device
|
||||||
regionCfg.lock = !fExternalRegion; // mlock region after creation
|
regionCfg.lock = !fExternalRegion; // mlock region after creation
|
||||||
regionCfg.zero = !fExternalRegion; // zero region content after creation
|
regionCfg.zero = !fExternalRegion; // zero region content after creation
|
||||||
fRegion = fair::mq::UnmanagedRegionPtr(NewUnmanagedRegionFor(
|
fRegion = fair::mq::UnmanagedRegionPtr(NewUnmanagedRegionFor(
|
||||||
"data", // region is created using the transport of this channel...
|
fChanName, // region is created using the transport of this channel...
|
||||||
0, // ... and this sub-channel
|
0, // ... and this sub-channel
|
||||||
10000000, // region size
|
10000000, // region size
|
||||||
[this](const std::vector<fair::mq::RegionBlock>& blocks) { // callback to be called when message buffers no longer needed by transport
|
[this](const std::vector<fair::mq::RegionBlock>& blocks) { // callback to be called when message buffers no longer needed by transport
|
||||||
|
@ -59,8 +62,11 @@ struct Sampler : fair::mq::Device
|
||||||
|
|
||||||
void Run() override
|
void Run() override
|
||||||
{
|
{
|
||||||
|
|
||||||
|
fair::mq::tools::RateLimiter rateLimiter(fSamplingRate);
|
||||||
|
|
||||||
while (!NewStatePending()) {
|
while (!NewStatePending()) {
|
||||||
fair::mq::MessagePtr msg(NewMessageFor("data", // channel
|
fair::mq::MessagePtr msg(NewMessageFor(fChanName, // channel
|
||||||
0, // sub-channel
|
0, // sub-channel
|
||||||
fRegion, // region
|
fRegion, // region
|
||||||
fRegion->GetData(), // ptr within region
|
fRegion->GetData(), // ptr within region
|
||||||
|
@ -70,11 +76,14 @@ struct Sampler : fair::mq::Device
|
||||||
|
|
||||||
std::lock_guard<std::mutex> lock(fMtx);
|
std::lock_guard<std::mutex> lock(fMtx);
|
||||||
++fNumUnackedMsgs;
|
++fNumUnackedMsgs;
|
||||||
if (Send(msg, "data", 0) > 0) {
|
if (Send(msg, fChanName, 0) > 0) {
|
||||||
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
|
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
|
||||||
LOG(info) << "Configured maximum number of iterations reached. Stopping sending.";
|
LOG(info) << "Configured maximum number of iterations reached. Stopping sending.";
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
if (fSamplingRate > 0.001) {
|
||||||
|
rateLimiter.maybe_sleep();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -99,7 +108,7 @@ struct Sampler : fair::mq::Device
|
||||||
void ResetTask() override
|
void ResetTask() override
|
||||||
{
|
{
|
||||||
fRegion.reset();
|
fRegion.reset();
|
||||||
GetChannel("data", 0).Transport()->UnsubscribeFromRegionEvents();
|
GetChannel(fChanName, 0).Transport()->UnsubscribeFromRegionEvents();
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
@ -111,12 +120,16 @@ struct Sampler : fair::mq::Device
|
||||||
fair::mq::UnmanagedRegionPtr fRegion = nullptr;
|
fair::mq::UnmanagedRegionPtr fRegion = nullptr;
|
||||||
std::mutex fMtx;
|
std::mutex fMtx;
|
||||||
uint64_t fNumUnackedMsgs = 0;
|
uint64_t fNumUnackedMsgs = 0;
|
||||||
|
std::string fChanName;
|
||||||
|
float fSamplingRate = 0.;
|
||||||
};
|
};
|
||||||
|
|
||||||
void addCustomOptions(bpo::options_description& options)
|
void addCustomOptions(bpo::options_description& options)
|
||||||
{
|
{
|
||||||
options.add_options()
|
options.add_options()
|
||||||
|
("chan-name", bpo::value<std::string>()->default_value("data"), "name of the output channel")
|
||||||
("msg-size", bpo::value<int>()->default_value(1000), "Message size in bytes")
|
("msg-size", bpo::value<int>()->default_value(1000), "Message size in bytes")
|
||||||
|
("sampling-rate", bpo::value<float>()->default_value(0.), "Sampling rate (Hz).")
|
||||||
("region-linger", bpo::value<uint32_t>()->default_value(100), "Linger period for regions")
|
("region-linger", bpo::value<uint32_t>()->default_value(100), "Linger period for regions")
|
||||||
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)")
|
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)")
|
||||||
("external-region", bpo::value<bool>()->default_value(false), "Use region created by another process");
|
("external-region", bpo::value<bool>()->default_value(false), "Use region created by another process");
|
||||||
|
|
|
@ -22,7 +22,8 @@ struct Sink : Device
|
||||||
{
|
{
|
||||||
// Get the fMaxIterations value from the command line options (via fConfig)
|
// Get the fMaxIterations value from the command line options (via fConfig)
|
||||||
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
|
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
|
||||||
GetChannel("data", 0).Transport()->SubscribeToRegionEvents([](RegionInfo info) {
|
fChanName = fConfig->GetProperty<std::string>("chan-name");
|
||||||
|
GetChannel(fChanName, 0).Transport()->SubscribeToRegionEvents([](RegionInfo info) {
|
||||||
LOG(info) << "Region event: " << info.event << ": "
|
LOG(info) << "Region event: " << info.event << ": "
|
||||||
<< (info.managed ? "managed" : "unmanaged") << ", id: " << info.id
|
<< (info.managed ? "managed" : "unmanaged") << ", id: " << info.id
|
||||||
<< ", ptr: " << info.ptr << ", size: " << info.size
|
<< ", ptr: " << info.ptr << ", size: " << info.size
|
||||||
|
@ -32,11 +33,11 @@ struct Sink : Device
|
||||||
|
|
||||||
void Run() override
|
void Run() override
|
||||||
{
|
{
|
||||||
Channel& dataInChannel = GetChannel("data", 0);
|
Channel& dataIn = GetChannel(fChanName, 0);
|
||||||
|
|
||||||
while (!NewStatePending()) {
|
while (!NewStatePending()) {
|
||||||
auto msg(dataInChannel.Transport()->CreateMessage());
|
auto msg(dataIn.Transport()->CreateMessage());
|
||||||
dataInChannel.Receive(msg);
|
dataIn.Receive(msg);
|
||||||
|
|
||||||
// void* ptr = msg->GetData();
|
// void* ptr = msg->GetData();
|
||||||
// char* cptr = static_cast<char*>(ptr);
|
// char* cptr = static_cast<char*>(ptr);
|
||||||
|
@ -51,22 +52,22 @@ struct Sink : Device
|
||||||
|
|
||||||
void ResetTask() override
|
void ResetTask() override
|
||||||
{
|
{
|
||||||
GetChannel("data", 0).Transport()->UnsubscribeFromRegionEvents();
|
GetChannel(fChanName, 0).Transport()->UnsubscribeFromRegionEvents();
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
uint64_t fMaxIterations = 0;
|
uint64_t fMaxIterations = 0;
|
||||||
uint64_t fNumIterations = 0;
|
uint64_t fNumIterations = 0;
|
||||||
|
std::string fChanName;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace
|
} // namespace
|
||||||
|
|
||||||
void addCustomOptions(bpo::options_description& options)
|
void addCustomOptions(bpo::options_description& options)
|
||||||
{
|
{
|
||||||
options.add_options()(
|
options.add_options()
|
||||||
"max-iterations",
|
("chan-name", bpo::value<std::string>()->default_value("data"), "name of the input channel")
|
||||||
bpo::value<uint64_t>()->default_value(0),
|
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
|
||||||
"Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
unique_ptr<Device> getDevice(ProgOptions& /*config*/) { return make_unique<Sink>(); }
|
unique_ptr<Device> getDevice(ProgOptions& /*config*/) { return make_unique<Sink>(); }
|
||||||
|
|
Loading…
Reference in New Issue
Block a user