FairMQ: Introduce callbacks for the FairMQUnmanagedRegion.

Callbacks are called when the data buffer of the message assiciated
with the corresponding region is no longer needed by the transport.
Example in examples/advanced/Region/
This commit is contained in:
Alexey Rybalchenko
2017-11-14 17:00:37 +01:00
committed by Mohammad Al-Turany
parent 378c47c5e5
commit 58a312b730
39 changed files with 747 additions and 548 deletions

View File

@@ -8,28 +8,31 @@
#include <string>
#include <cstdlib>
#include <boost/date_time/posix_time/posix_time.hpp>
#include "FairMQMessageSHM.h"
#include "FairMQUnmanagedRegionSHM.h"
#include "FairMQLogger.h"
#include "FairMQShmCommon.h"
#include "Common.h"
using namespace std;
using namespace fair::mq::shmem;
namespace bipc = boost::interprocess;
namespace bpt = boost::posix_time;
atomic<bool> FairMQMessageSHM::fInterrupted(false);
FairMQ::Transport FairMQMessageSHM::fTransportType = FairMQ::Transport::SHM;
FairMQMessageSHM::FairMQMessageSHM()
: fMessage()
FairMQMessageSHM::FairMQMessageSHM(Manager& manager)
: fManager(manager)
, fMessage()
, fQueued(false)
, fMetaCreated(false)
, fRegionId(0)
, fHandle()
, fHandle(-1)
, fSize(0)
, fLocalPtr(nullptr)
, fRemoteRegion(nullptr)
{
if (zmq_msg_init(&fMessage) != 0)
{
@@ -38,28 +41,28 @@ FairMQMessageSHM::FairMQMessageSHM()
fMetaCreated = true;
}
FairMQMessageSHM::FairMQMessageSHM(const size_t size)
: fMessage()
FairMQMessageSHM::FairMQMessageSHM(Manager& manager, const size_t size)
: fManager(manager)
, fMessage()
, fQueued(false)
, fMetaCreated(false)
, fRegionId(0)
, fHandle()
, fHandle(-1)
, fSize(0)
, fLocalPtr(nullptr)
, fRemoteRegion(nullptr)
{
InitializeChunk(size);
}
FairMQMessageSHM::FairMQMessageSHM(void* data, const size_t size, fairmq_free_fn* ffn, void* hint)
: fMessage()
FairMQMessageSHM::FairMQMessageSHM(Manager& manager, void* data, const size_t size, fairmq_free_fn* ffn, void* hint)
: fManager(manager)
, fMessage()
, fQueued(false)
, fMetaCreated(false)
, fRegionId(0)
, fHandle()
, fHandle(-1)
, fSize(0)
, fLocalPtr(nullptr)
, fRemoteRegion(nullptr)
{
if (InitializeChunk(size))
{
@@ -75,15 +78,15 @@ FairMQMessageSHM::FairMQMessageSHM(void* data, const size_t size, fairmq_free_fn
}
}
FairMQMessageSHM::FairMQMessageSHM(FairMQUnmanagedRegionPtr& region, void* data, const size_t size)
: fMessage()
FairMQMessageSHM::FairMQMessageSHM(Manager& manager, FairMQUnmanagedRegionPtr& region, void* data, const size_t size)
: fManager(manager)
, fMessage()
, fQueued(false)
, fMetaCreated(false)
, fRegionId(static_cast<FairMQUnmanagedRegionSHM*>(region.get())->fRegionId)
, fHandle()
, fHandle(-1)
, fSize(size)
, fLocalPtr(data)
, fRemoteRegion(nullptr)
{
fHandle = (bipc::managed_shared_memory::handle_t)(reinterpret_cast<const char*>(data) - reinterpret_cast<const char*>(region->GetData()));
@@ -105,11 +108,11 @@ FairMQMessageSHM::FairMQMessageSHM(FairMQUnmanagedRegionPtr& region, void* data,
bool FairMQMessageSHM::InitializeChunk(const size_t size)
{
while (!fHandle)
while (fHandle < 0)
{
try
{
fLocalPtr = Manager::Instance().Segment()->allocate(size);
fLocalPtr = fManager.Segment().allocate(size);
}
catch (bipc::bad_alloc& ba)
{
@@ -124,7 +127,7 @@ bool FairMQMessageSHM::InitializeChunk(const size_t size)
continue;
}
}
fHandle = Manager::Instance().Segment()->get_handle_from_address(fLocalPtr);
fHandle = fManager.Segment().get_handle_from_address(fLocalPtr);
}
fSize = size;
@@ -202,15 +205,20 @@ void* FairMQMessageSHM::GetData()
{
if (fRegionId == 0)
{
return Manager::Instance().Segment()->get_address_from_handle(fHandle);
return fManager.Segment().get_address_from_handle(fHandle);
}
else
{
if (!fRemoteRegion)
boost::interprocess::mapped_region* region = fManager.GetRemoteRegion(fRegionId);
if (region)
{
fRemoteRegion = FairMQUnmanagedRegionSHM::GetRemoteRegion(fRegionId);
fLocalPtr = reinterpret_cast<char*>(region->get_address()) + fHandle;
}
else
{
// LOG(WARN) << "could not get pointer from a region message";
fLocalPtr = nullptr;
}
fLocalPtr = reinterpret_cast<char*>(fRemoteRegion->get_address()) + fHandle;
return fLocalPtr;
}
}
@@ -226,11 +234,6 @@ void FairMQMessageSHM::SetMessage(void*, const size_t)
// dummy method to comply with the interface. functionality not allowed in zeromq.
}
void FairMQMessageSHM::SetDeviceId(const string& /*deviceId*/)
{
// fDeviceID = deviceId;
}
FairMQ::Transport FairMQMessageSHM::GetType() const
{
return fTransportType;
@@ -238,7 +241,7 @@ FairMQ::Transport FairMQMessageSHM::GetType() const
void FairMQMessageSHM::Copy(const unique_ptr<FairMQMessage>& msg)
{
if (!fHandle)
if (fHandle < 0)
{
bipc::managed_shared_memory::handle_t otherHandle = static_cast<FairMQMessageSHM*>(msg.get())->fHandle;
if (otherHandle)
@@ -261,10 +264,56 @@ void FairMQMessageSHM::Copy(const unique_ptr<FairMQMessage>& msg)
void FairMQMessageSHM::CloseMessage()
{
if (fHandle && !fQueued && fRegionId == 0)
if (fHandle >= 0 && !fQueued)
{
Manager::Instance().Segment()->deallocate(Manager::Instance().Segment()->get_address_from_handle(fHandle));
fHandle = 0;
if (fRegionId == 0)
{
fManager.Segment().deallocate(fManager.Segment().get_address_from_handle(fHandle));
fHandle = 0;
}
else
{
// send notification back to the receiver
// RegionBlock block(fHandle, fSize);
// if (fManager.GetRegionQueue(fRegionId).try_send(static_cast<void*>(&block), sizeof(RegionBlock), 0))
// {
// // LOG(INFO) << "true";
// }
// // else
// // {
// // LOG(DEBUG) << "could not send ack";
// // }
// timed version
RegionBlock block(fHandle, fSize);
bool success = false;
do
{
auto sndTill = bpt::microsec_clock::universal_time() + bpt::milliseconds(200);
bipc::message_queue* q = fManager.GetRegionQueue(fRegionId);
if (q)
{
if (q->timed_send(&block, sizeof(RegionBlock), 0, sndTill))
{
success = true;
}
else
{
if (fInterrupted)
{
break;
}
LOG(DEBUG) << "region ack queue is full, retrying...";
}
}
else
{
// LOG(WARN) << "region ack queue for id " << fRegionId << " no longer exist. Not sending ack";
success = true;
}
}
while (!success);
}
}
if (fMetaCreated)