mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-15 09:31:45 +00:00
FairMQ: Add test for example/advanced/Region.
Also fix a regression in nanomsg transport.
This commit is contained in:
@@ -23,6 +23,8 @@ using namespace std;
|
||||
|
||||
FairMQExampleRegionSampler::FairMQExampleRegionSampler()
|
||||
: fMsgSize(10000)
|
||||
, fMaxIterations(0)
|
||||
, fNumIterations(0)
|
||||
, fRegion(nullptr)
|
||||
, fNumUnackedMsgs(0)
|
||||
{
|
||||
@@ -31,11 +33,18 @@ FairMQExampleRegionSampler::FairMQExampleRegionSampler()
|
||||
void FairMQExampleRegionSampler::InitTask()
|
||||
{
|
||||
fMsgSize = fConfig->GetValue<int>("msg-size");
|
||||
fMaxIterations = fConfig->GetValue<uint64_t>("max-iterations");
|
||||
|
||||
fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegionFor("data",
|
||||
0,
|
||||
10000000,
|
||||
[this](void* data, size_t size) { --fNumUnackedMsgs; } // callback to be called when message buffers no longer needed by transport
|
||||
[this](void* data, size_t size) { // callback to be called when message buffers no longer needed by transport
|
||||
--fNumUnackedMsgs;
|
||||
if (fMaxIterations > 0)
|
||||
{
|
||||
LOG(DEBUG) << "Received ack";
|
||||
}
|
||||
}
|
||||
));
|
||||
}
|
||||
|
||||
@@ -50,6 +59,12 @@ bool FairMQExampleRegionSampler::ConditionalRun()
|
||||
if (Send(msg, "data", 0) > 0)
|
||||
{
|
||||
++fNumUnackedMsgs;
|
||||
|
||||
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations)
|
||||
{
|
||||
LOG(INFO) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
|
Reference in New Issue
Block a user