shmem: introduce FairMQShmMonitor.

This commit is contained in:
Alexey Rybalchenko
2017-04-24 14:53:33 +02:00
committed by Mohammad Al-Turany
parent 7b4a2ae932
commit 2a526b8625
10 changed files with 446 additions and 14 deletions

View File

@@ -23,8 +23,9 @@
#include "../options/FairMQProgOptions.h"
using namespace std;
using namespace FairMQ::shmem;
using namespace fair::mq::shmem;
namespace bipc = boost::interprocess;
namespace bpt = boost::posix_time;
FairMQ::Transport FairMQTransportFactorySHM::fTransportType = FairMQ::Transport::SHM;
@@ -85,7 +86,7 @@ void FairMQTransportFactorySHM::Initialize(const FairMQProgOptions* config)
{ // mutex scope
bipc::scoped_lock<bipc::named_mutex> lock(fShMutex);
pair<FairMQShmDeviceCounter*, size_t> result = Manager::Instance().Segment()->find<FairMQShmDeviceCounter>(bipc::unique_instance);
pair<DeviceCounter*, size_t> result = Manager::Instance().Segment()->find<DeviceCounter>(bipc::unique_instance);
if (result.first != nullptr)
{
fDeviceCounter = result.first;
@@ -96,7 +97,7 @@ void FairMQTransportFactorySHM::Initialize(const FairMQProgOptions* config)
else
{
LOG(DEBUG) << "shmem: no device counter found, creating one and initializing with 1";
fDeviceCounter = Manager::Instance().Segment()->construct<FairMQShmDeviceCounter>(bipc::unique_instance)(1);
fDeviceCounter = Manager::Instance().Segment()->construct<DeviceCounter>(bipc::unique_instance)(1);
LOG(DEBUG) << "shmem: initialized device counter with: " << fDeviceCounter->count;
}
}
@@ -110,7 +111,7 @@ void FairMQTransportFactorySHM::SendHeartbeats()
{
bipc::message_queue mq(bipc::open_only, "fairmq_shmem_control_queue");
bool heartbeat = true;
boost::posix_time::ptime sndTill = boost::posix_time::microsec_clock::universal_time() + boost::posix_time::milliseconds(100);
bpt::ptime sndTill = bpt::microsec_clock::universal_time() + bpt::milliseconds(100);
if (mq.timed_send(&heartbeat, sizeof(heartbeat), 0, sndTill))
{
this_thread::sleep_for(chrono::milliseconds(100));