mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-14 17:16:47 +00:00
Shmem: simplify message/socket and refactor to use namespaces
This commit is contained in:
parent
b2e027478e
commit
a2cff5b7bb
|
@ -192,11 +192,11 @@ if(BUILD_FAIRMQ)
|
|||
plugins/Builtin.h
|
||||
plugins/config/Config.h
|
||||
plugins/Control.h
|
||||
shmem/FairMQMessageSHM.h
|
||||
shmem/FairMQPollerSHM.h
|
||||
shmem/FairMQUnmanagedRegionSHM.h
|
||||
shmem/FairMQSocketSHM.h
|
||||
shmem/FairMQTransportFactorySHM.h
|
||||
shmem/Message.h
|
||||
shmem/Poller.h
|
||||
shmem/UnmanagedRegion.h
|
||||
shmem/Socket.h
|
||||
shmem/TransportFactory.h
|
||||
shmem/Common.h
|
||||
shmem/Manager.h
|
||||
shmem/Region.h
|
||||
|
@ -253,11 +253,11 @@ if(BUILD_FAIRMQ)
|
|||
SuboptParser.cxx
|
||||
plugins/config/Config.cxx
|
||||
plugins/Control.cxx
|
||||
shmem/FairMQMessageSHM.cxx
|
||||
shmem/FairMQPollerSHM.cxx
|
||||
shmem/FairMQUnmanagedRegionSHM.cxx
|
||||
shmem/FairMQSocketSHM.cxx
|
||||
shmem/FairMQTransportFactorySHM.cxx
|
||||
shmem/Message.cxx
|
||||
shmem/Poller.cxx
|
||||
shmem/UnmanagedRegion.cxx
|
||||
shmem/Socket.cxx
|
||||
shmem/TransportFactory.cxx
|
||||
shmem/Manager.cxx
|
||||
shmem/Region.cxx
|
||||
zeromq/FairMQMessageZMQ.cxx
|
||||
|
|
|
@ -31,8 +31,8 @@ namespace fair
|
|||
namespace mq
|
||||
{
|
||||
|
||||
using PollerPtr = std::unique_ptr<FairMQPoller>;
|
||||
|
||||
using Poller = FairMQPoller;
|
||||
using PollerPtr = FairMQPollerPtr;
|
||||
struct PollerError : std::runtime_error { using std::runtime_error::runtime_error; };
|
||||
|
||||
} /* namespace mq */
|
||||
|
|
|
@ -22,7 +22,7 @@ class FairMQSocket
|
|||
FairMQSocket() {}
|
||||
FairMQSocket(FairMQTransportFactory* fac): fTransport(fac) {}
|
||||
|
||||
virtual std::string GetId() = 0;
|
||||
virtual std::string GetId() const = 0;
|
||||
|
||||
virtual bool Bind(const std::string& address) = 0;
|
||||
virtual bool Connect(const std::string& address) = 0;
|
||||
|
|
|
@ -8,7 +8,7 @@
|
|||
|
||||
#include <FairMQTransportFactory.h>
|
||||
#include <zeromq/FairMQTransportFactoryZMQ.h>
|
||||
#include <shmem/FairMQTransportFactorySHM.h>
|
||||
#include <fairmq/shmem/TransportFactory.h>
|
||||
#ifdef BUILD_NANOMSG_TRANSPORT
|
||||
#include <nanomsg/FairMQTransportFactoryNN.h>
|
||||
#endif /* BUILD_NANOMSG_TRANSPORT */
|
||||
|
@ -44,7 +44,7 @@ auto FairMQTransportFactory::CreateTransportFactory(const std::string& type, con
|
|||
}
|
||||
else if (type == "shmem")
|
||||
{
|
||||
return make_shared<FairMQTransportFactorySHM>(finalId, config);
|
||||
return make_shared<fair::mq::shmem::TransportFactory>(finalId, config);
|
||||
}
|
||||
#ifdef BUILD_NANOMSG_TRANSPORT
|
||||
else if (type == "nanomsg")
|
||||
|
|
|
@ -137,6 +137,7 @@ namespace fair
|
|||
namespace mq
|
||||
{
|
||||
|
||||
using TransportFactory = FairMQTransportFactory;
|
||||
struct TransportFactoryError : std::runtime_error { using std::runtime_error::runtime_error; };
|
||||
|
||||
} /* namespace mq */
|
||||
|
|
|
@ -31,7 +31,9 @@ namespace fair
|
|||
namespace mq
|
||||
{
|
||||
|
||||
using UnmanagedRegionPtr = std::unique_ptr<FairMQUnmanagedRegion>;
|
||||
using RegionCallback = FairMQRegionCallback;
|
||||
using UnmanagedRegion = FairMQUnmanagedRegion;
|
||||
using UnmanagedRegionPtr = FairMQUnmanagedRegionPtr;
|
||||
|
||||
} /* namespace mq */
|
||||
} /* namespace fair */
|
||||
|
|
|
@ -91,11 +91,6 @@ FairMQSocketNN::FairMQSocketNN(const string& type, const string& name, const str
|
|||
LOG(debug) << "Created socket " << GetId();
|
||||
}
|
||||
|
||||
string FairMQSocketNN::GetId()
|
||||
{
|
||||
return fId;
|
||||
}
|
||||
|
||||
bool FairMQSocketNN::Bind(const string& address)
|
||||
{
|
||||
// LOG(info) << "bind socket " << fId << " on " << address;
|
||||
|
|
|
@ -23,7 +23,7 @@ class FairMQSocketNN final : public FairMQSocket
|
|||
FairMQSocketNN(const FairMQSocketNN&) = delete;
|
||||
FairMQSocketNN operator=(const FairMQSocketNN&) = delete;
|
||||
|
||||
std::string GetId() override;
|
||||
std::string GetId() const override { return fId; }
|
||||
|
||||
bool Bind(const std::string& address) override;
|
||||
bool Connect(const std::string& address) override;
|
||||
|
|
|
@ -43,7 +43,7 @@ class Socket final : public fair::mq::Socket
|
|||
Socket(const Socket&) = delete;
|
||||
Socket operator=(const Socket&) = delete;
|
||||
|
||||
auto GetId() -> std::string { return fId; }
|
||||
auto GetId() const -> std::string override { return fId; }
|
||||
|
||||
auto Bind(const std::string& address) -> bool override;
|
||||
auto Connect(const std::string& address) -> bool override;
|
||||
|
|
|
@ -77,8 +77,8 @@ struct MetaHeader
|
|||
{
|
||||
size_t fSize;
|
||||
size_t fRegionId;
|
||||
boost::interprocess::managed_shared_memory::handle_t fHandle;
|
||||
size_t fHint;
|
||||
boost::interprocess::managed_shared_memory::handle_t fHandle;
|
||||
};
|
||||
|
||||
struct RegionBlock
|
||||
|
|
|
@ -1,302 +0,0 @@
|
|||
/********************************************************************************
|
||||
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
#include "Common.h"
|
||||
#include "Region.h"
|
||||
|
||||
#include "FairMQMessageSHM.h"
|
||||
#include "FairMQUnmanagedRegionSHM.h"
|
||||
#include "FairMQLogger.h"
|
||||
|
||||
#include <boost/date_time/posix_time/posix_time.hpp>
|
||||
|
||||
#include <cstdlib>
|
||||
|
||||
using namespace std;
|
||||
using namespace fair::mq::shmem;
|
||||
|
||||
namespace bipc = ::boost::interprocess;
|
||||
namespace bpt = ::boost::posix_time;
|
||||
|
||||
atomic<bool> FairMQMessageSHM::fInterrupted(false);
|
||||
fair::mq::Transport FairMQMessageSHM::fTransportType = fair::mq::Transport::SHM;
|
||||
|
||||
FairMQMessageSHM::FairMQMessageSHM(Manager& manager, FairMQTransportFactory* factory)
|
||||
: FairMQMessage{factory}
|
||||
, fManager(manager)
|
||||
, fMessage()
|
||||
, fQueued(false)
|
||||
, fMetaCreated(false)
|
||||
, fRegionId(0)
|
||||
, fRegionPtr(nullptr)
|
||||
, fHandle(-1)
|
||||
, fSize(0)
|
||||
, fHint(0)
|
||||
, fLocalPtr(nullptr)
|
||||
{
|
||||
if (zmq_msg_init(&fMessage) != 0) {
|
||||
LOG(error) << "failed initializing message, reason: " << zmq_strerror(errno);
|
||||
}
|
||||
fMetaCreated = true;
|
||||
}
|
||||
|
||||
FairMQMessageSHM::FairMQMessageSHM(Manager& manager, const size_t size, FairMQTransportFactory* factory)
|
||||
: FairMQMessage{factory}
|
||||
, fManager(manager)
|
||||
, fMessage()
|
||||
, fQueued(false)
|
||||
, fMetaCreated(false)
|
||||
, fRegionId(0)
|
||||
, fRegionPtr(nullptr)
|
||||
, fHandle(-1)
|
||||
, fSize(0)
|
||||
, fHint(0)
|
||||
, fLocalPtr(nullptr)
|
||||
{
|
||||
InitializeChunk(size);
|
||||
}
|
||||
|
||||
FairMQMessageSHM::FairMQMessageSHM(Manager& manager, MetaHeader* hdr, FairMQTransportFactory* factory)
|
||||
: FairMQMessage{factory}
|
||||
, fManager(manager)
|
||||
, fMessage()
|
||||
, fQueued(false)
|
||||
, fMetaCreated(false)
|
||||
, fRegionId(hdr->fRegionId)
|
||||
, fRegionPtr(nullptr)
|
||||
, fHandle(hdr->fHandle)
|
||||
, fSize(hdr->fSize)
|
||||
, fHint(hdr->fHint)
|
||||
, fLocalPtr(nullptr)
|
||||
{
|
||||
if (zmq_msg_init_size(&fMessage, sizeof(MetaHeader)) != 0) {
|
||||
LOG(error) << "failed initializing message, reason: " << zmq_strerror(errno);
|
||||
}
|
||||
// fill the zmq buffer with the delivered meta data
|
||||
memcpy(zmq_msg_data(&fMessage), hdr, sizeof(MetaHeader));
|
||||
fMetaCreated = true;
|
||||
}
|
||||
|
||||
FairMQMessageSHM::FairMQMessageSHM(Manager& manager, void* data, const size_t size, fairmq_free_fn* ffn, void* hint, FairMQTransportFactory* factory)
|
||||
: FairMQMessage{factory}
|
||||
, fManager(manager)
|
||||
, fMessage()
|
||||
, fQueued(false)
|
||||
, fMetaCreated(false)
|
||||
, fRegionId(0)
|
||||
, fRegionPtr(nullptr)
|
||||
, fHandle(-1)
|
||||
, fSize(0)
|
||||
, fHint(0)
|
||||
, fLocalPtr(nullptr)
|
||||
{
|
||||
if (InitializeChunk(size)) {
|
||||
memcpy(fLocalPtr, data, size);
|
||||
if (ffn) {
|
||||
ffn(data, hint);
|
||||
} else {
|
||||
free(data);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
FairMQMessageSHM::FairMQMessageSHM(Manager& manager, FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint, FairMQTransportFactory* factory)
|
||||
: FairMQMessage{factory}
|
||||
, fManager(manager)
|
||||
, fMessage()
|
||||
, fQueued(false)
|
||||
, fMetaCreated(false)
|
||||
, fRegionId(static_cast<FairMQUnmanagedRegionSHM*>(region.get())->fRegionId)
|
||||
, fRegionPtr(nullptr)
|
||||
, fHandle(-1)
|
||||
, fSize(size)
|
||||
, fHint(reinterpret_cast<size_t>(hint))
|
||||
, fLocalPtr(static_cast<char*>(data))
|
||||
{
|
||||
if (reinterpret_cast<const char*>(data) >= reinterpret_cast<const char*>(region->GetData()) ||
|
||||
reinterpret_cast<const char*>(data) <= reinterpret_cast<const char*>(region->GetData()) + region->GetSize()) {
|
||||
fHandle = (bipc::managed_shared_memory::handle_t)(reinterpret_cast<const char*>(data) - reinterpret_cast<const char*>(region->GetData()));
|
||||
|
||||
if (zmq_msg_init_size(&fMessage, sizeof(MetaHeader)) != 0) {
|
||||
LOG(error) << "failed initializing meta message, reason: " << zmq_strerror(errno);
|
||||
} else {
|
||||
MetaHeader header;
|
||||
header.fSize = size;
|
||||
header.fHandle = fHandle;
|
||||
header.fRegionId = fRegionId;
|
||||
header.fHint = fHint;
|
||||
memcpy(zmq_msg_data(&fMessage), &header, sizeof(MetaHeader));
|
||||
|
||||
fMetaCreated = true;
|
||||
}
|
||||
} else {
|
||||
LOG(error) << "trying to create region message with data from outside the region";
|
||||
throw runtime_error("trying to create region message with data from outside the region");
|
||||
}
|
||||
}
|
||||
|
||||
bool FairMQMessageSHM::InitializeChunk(const size_t size)
|
||||
{
|
||||
while (fHandle < 0) {
|
||||
try {
|
||||
bipc::managed_shared_memory::size_type actualSize = size;
|
||||
char* hint = 0; // unused for bipc::allocate_new
|
||||
fLocalPtr = fManager.Segment().allocation_command<char>(bipc::allocate_new, size, actualSize, hint);
|
||||
} catch (bipc::bad_alloc& ba) {
|
||||
// LOG(warn) << "Shared memory full...";
|
||||
this_thread::sleep_for(chrono::milliseconds(50));
|
||||
if (fInterrupted) {
|
||||
return false;
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
fHandle = fManager.Segment().get_handle_from_address(fLocalPtr);
|
||||
}
|
||||
|
||||
fSize = size;
|
||||
|
||||
if (zmq_msg_init_size(&fMessage, sizeof(MetaHeader)) != 0) {
|
||||
LOG(error) << "failed initializing meta message, reason: " << zmq_strerror(errno);
|
||||
return false;
|
||||
}
|
||||
MetaHeader header;
|
||||
header.fSize = size;
|
||||
header.fHandle = fHandle;
|
||||
header.fRegionId = fRegionId;
|
||||
header.fHint = fHint;
|
||||
memcpy(zmq_msg_data(&fMessage), &header, sizeof(MetaHeader));
|
||||
|
||||
fMetaCreated = true;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void FairMQMessageSHM::Rebuild()
|
||||
{
|
||||
CloseMessage();
|
||||
|
||||
fQueued = false;
|
||||
|
||||
if (zmq_msg_init(&fMessage) != 0) {
|
||||
LOG(error) << "failed initializing message, reason: " << zmq_strerror(errno);
|
||||
}
|
||||
fMetaCreated = true;
|
||||
}
|
||||
|
||||
void FairMQMessageSHM::Rebuild(const size_t size)
|
||||
{
|
||||
CloseMessage();
|
||||
fQueued = false;
|
||||
InitializeChunk(size);
|
||||
}
|
||||
|
||||
void FairMQMessageSHM::Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint)
|
||||
{
|
||||
CloseMessage();
|
||||
|
||||
fQueued = false;
|
||||
|
||||
if (InitializeChunk(size)) {
|
||||
memcpy(fLocalPtr, data, size);
|
||||
if (ffn) {
|
||||
ffn(data, hint);
|
||||
} else {
|
||||
free(data);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void* FairMQMessageSHM::GetData() const
|
||||
{
|
||||
if (!fLocalPtr) {
|
||||
if (fRegionId == 0) {
|
||||
if (fSize > 0) {
|
||||
fLocalPtr = reinterpret_cast<char*>(fManager.Segment().get_address_from_handle(fHandle));
|
||||
} else {
|
||||
fLocalPtr = nullptr;
|
||||
}
|
||||
} else {
|
||||
fRegionPtr = fManager.GetRemoteRegion(fRegionId);
|
||||
if (fRegionPtr) {
|
||||
fLocalPtr = reinterpret_cast<char*>(fRegionPtr->fRegion.get_address()) + fHandle;
|
||||
} else {
|
||||
// LOG(warn) << "could not get pointer from a region message";
|
||||
fLocalPtr = nullptr;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return fLocalPtr;
|
||||
}
|
||||
|
||||
bool FairMQMessageSHM::SetUsedSize(const size_t size)
|
||||
{
|
||||
if (size == fSize) {
|
||||
return true;
|
||||
} else if (size <= fSize) {
|
||||
try {
|
||||
bipc::managed_shared_memory::size_type shrunkSize = size;
|
||||
fLocalPtr = fManager.Segment().allocation_command<char>(bipc::shrink_in_place, fSize + 128, shrunkSize, fLocalPtr);
|
||||
fSize = size;
|
||||
|
||||
// update meta header
|
||||
MetaHeader* hdrPtr = static_cast<MetaHeader*>(zmq_msg_data(&fMessage));
|
||||
hdrPtr->fSize = fSize;
|
||||
return true;
|
||||
} catch (bipc::interprocess_exception& e) {
|
||||
LOG(info) << "could not set used size: " << e.what();
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
LOG(error) << "cannot set used size higher than original.";
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
void FairMQMessageSHM::Copy(const FairMQMessage& msg)
|
||||
{
|
||||
if (fHandle < 0) {
|
||||
bipc::managed_shared_memory::handle_t otherHandle = static_cast<const FairMQMessageSHM&>(msg).fHandle;
|
||||
if (otherHandle) {
|
||||
if (InitializeChunk(msg.GetSize())) {
|
||||
memcpy(GetData(), msg.GetData(), msg.GetSize());
|
||||
}
|
||||
} else {
|
||||
LOG(error) << "copy fail: source message not initialized!";
|
||||
}
|
||||
} else {
|
||||
LOG(error) << "copy fail: target message already initialized!";
|
||||
}
|
||||
}
|
||||
|
||||
void FairMQMessageSHM::CloseMessage()
|
||||
{
|
||||
if (fHandle >= 0 && !fQueued) {
|
||||
if (fRegionId == 0) {
|
||||
fManager.Segment().deallocate(fManager.Segment().get_address_from_handle(fHandle));
|
||||
fHandle = -1;
|
||||
} else {
|
||||
if (!fRegionPtr) {
|
||||
fRegionPtr = fManager.GetRemoteRegion(fRegionId);
|
||||
}
|
||||
|
||||
if (fRegionPtr) {
|
||||
fRegionPtr->ReleaseBlock({fHandle, fSize, fHint});
|
||||
} else {
|
||||
LOG(warn) << "region ack queue for id " << fRegionId << " no longer exist. Not sending ack";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (fMetaCreated) {
|
||||
if (zmq_msg_close(&fMessage) != 0) {
|
||||
LOG(error) << "failed closing message, reason: " << zmq_strerror(errno);
|
||||
}
|
||||
fMetaCreated = false;
|
||||
}
|
||||
}
|
|
@ -1,84 +0,0 @@
|
|||
/********************************************************************************
|
||||
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
#ifndef FAIRMQMESSAGESHM_H_
|
||||
#define FAIRMQMESSAGESHM_H_
|
||||
|
||||
#include <fairmq/shmem/Manager.h>
|
||||
|
||||
#include "FairMQMessage.h"
|
||||
#include "FairMQUnmanagedRegion.h"
|
||||
|
||||
#include <zmq.h>
|
||||
|
||||
#include <boost/interprocess/mapped_region.hpp>
|
||||
|
||||
#include <cstddef> // size_t
|
||||
#include <atomic>
|
||||
|
||||
class FairMQSocketSHM;
|
||||
namespace fair
|
||||
{
|
||||
namespace mq
|
||||
{
|
||||
namespace shmem
|
||||
{
|
||||
class MetaHeader;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class FairMQMessageSHM final : public FairMQMessage
|
||||
{
|
||||
friend class FairMQSocketSHM;
|
||||
|
||||
public:
|
||||
FairMQMessageSHM(fair::mq::shmem::Manager& manager, FairMQTransportFactory* factory = nullptr);
|
||||
FairMQMessageSHM(fair::mq::shmem::Manager& manager, const size_t size, FairMQTransportFactory* factory = nullptr);
|
||||
FairMQMessageSHM(fair::mq::shmem::Manager& manager, void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr, FairMQTransportFactory* factory = nullptr);
|
||||
FairMQMessageSHM(fair::mq::shmem::Manager& manager, FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0, FairMQTransportFactory* factory = nullptr);
|
||||
|
||||
FairMQMessageSHM(fair::mq::shmem::Manager& manager, fair::mq::shmem::MetaHeader* hdr, FairMQTransportFactory* factory = nullptr);
|
||||
|
||||
FairMQMessageSHM(const FairMQMessageSHM&) = delete;
|
||||
FairMQMessageSHM operator=(const FairMQMessageSHM&) = delete;
|
||||
|
||||
void Rebuild() override;
|
||||
void Rebuild(const size_t size) override;
|
||||
void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override;
|
||||
|
||||
void* GetData() const override;
|
||||
size_t GetSize() const override { return fSize; }
|
||||
|
||||
bool SetUsedSize(const size_t size) override;
|
||||
|
||||
fair::mq::Transport GetType() const override { return fTransportType; }
|
||||
|
||||
void Copy(const FairMQMessage& msg) override;
|
||||
|
||||
~FairMQMessageSHM() override { CloseMessage(); }
|
||||
|
||||
private:
|
||||
fair::mq::shmem::Manager& fManager;
|
||||
zmq_msg_t fMessage;
|
||||
bool fQueued;
|
||||
bool fMetaCreated;
|
||||
static std::atomic<bool> fInterrupted;
|
||||
static fair::mq::Transport fTransportType;
|
||||
size_t fRegionId;
|
||||
mutable fair::mq::shmem::Region* fRegionPtr;
|
||||
boost::interprocess::managed_shared_memory::handle_t fHandle;
|
||||
size_t fSize;
|
||||
size_t fHint;
|
||||
mutable char* fLocalPtr;
|
||||
|
||||
bool InitializeChunk(const size_t size);
|
||||
zmq_msg_t* GetMessage() { return &fMessage; }
|
||||
void CloseMessage();
|
||||
};
|
||||
|
||||
#endif /* FAIRMQMESSAGESHM_H_ */
|
|
@ -1,67 +0,0 @@
|
|||
/********************************************************************************
|
||||
* Copyright (C) 2016-2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
|
||||
#ifndef FAIRMQTRANSPORTFACTORYSHM_H_
|
||||
#define FAIRMQTRANSPORTFACTORYSHM_H_
|
||||
|
||||
#include <fairmq/shmem/Manager.h>
|
||||
#include <fairmq/shmem/Common.h>
|
||||
|
||||
#include "FairMQTransportFactory.h"
|
||||
#include "FairMQMessageSHM.h"
|
||||
#include "FairMQSocketSHM.h"
|
||||
#include "FairMQPollerSHM.h"
|
||||
#include "FairMQUnmanagedRegionSHM.h"
|
||||
#include <fairmq/ProgOptions.h>
|
||||
|
||||
#include <vector>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <atomic>
|
||||
|
||||
class FairMQTransportFactorySHM final : public FairMQTransportFactory
|
||||
{
|
||||
public:
|
||||
FairMQTransportFactorySHM(const std::string& id = "", const fair::mq::ProgOptions* config = nullptr);
|
||||
FairMQTransportFactorySHM(const FairMQTransportFactorySHM&) = delete;
|
||||
FairMQTransportFactorySHM operator=(const FairMQTransportFactorySHM&) = delete;
|
||||
|
||||
FairMQMessagePtr CreateMessage() override;
|
||||
FairMQMessagePtr CreateMessage(const size_t size) override;
|
||||
FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override;
|
||||
FairMQMessagePtr CreateMessage(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0) override;
|
||||
|
||||
FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name) override;
|
||||
|
||||
FairMQPollerPtr CreatePoller(const std::vector<FairMQChannel>& channels) const override;
|
||||
FairMQPollerPtr CreatePoller(const std::vector<FairMQChannel*>& channels) const override;
|
||||
FairMQPollerPtr CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const override;
|
||||
|
||||
FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) const override;
|
||||
|
||||
fair::mq::Transport GetType() const override;
|
||||
|
||||
void Interrupt() override { FairMQSocketSHM::Interrupt(); }
|
||||
void Resume() override { FairMQSocketSHM::Resume(); }
|
||||
void Reset() override {}
|
||||
|
||||
~FairMQTransportFactorySHM() override;
|
||||
|
||||
private:
|
||||
void SendHeartbeats();
|
||||
|
||||
static fair::mq::Transport fTransportType;
|
||||
std::string fDeviceId;
|
||||
std::string fShmId;
|
||||
void* fZMQContext;
|
||||
std::unique_ptr<fair::mq::shmem::Manager> fManager;
|
||||
std::thread fHeartbeatThread;
|
||||
std::atomic<bool> fSendHeartbeats;
|
||||
};
|
||||
|
||||
#endif /* FAIRMQTRANSPORTFACTORYSHM_H_ */
|
|
@ -6,8 +6,9 @@
|
|||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
|
||||
#include <fairmq/shmem/Manager.h>
|
||||
#include <fairmq/shmem/Common.h>
|
||||
#include "Manager.h"
|
||||
#include "Common.h"
|
||||
|
||||
#include <fairmq/tools/CppSTL.h>
|
||||
#include <fairmq/tools/Strings.h>
|
||||
|
||||
|
@ -83,7 +84,7 @@ void Manager::StartMonitor(const std::string& id)
|
|||
this_thread::sleep_for(chrono::milliseconds(10));
|
||||
if (++numTries > 1000) {
|
||||
LOG(error) << "Did not get response from fairmq-shmmonitor after " << 10 * 1000 << " milliseconds. Exiting.";
|
||||
throw runtime_error(fair::mq::tools::ToString("Did not get response from fairmq-shmmonitor after ", 10 * 1000, " milliseconds. Exiting."));
|
||||
throw runtime_error(tools::ToString("Did not get response from fairmq-shmmonitor after ", 10 * 1000, " milliseconds. Exiting."));
|
||||
}
|
||||
}
|
||||
} while (true);
|
||||
|
@ -109,7 +110,7 @@ void Manager::Resume()
|
|||
}
|
||||
}
|
||||
|
||||
bipc::mapped_region* Manager::CreateRegion(const size_t size, const uint64_t id, FairMQRegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */)
|
||||
bipc::mapped_region* Manager::CreateRegion(const size_t size, const uint64_t id, RegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */)
|
||||
{
|
||||
auto it = fRegions.find(id);
|
||||
if (it != fRegions.end()) {
|
||||
|
@ -125,7 +126,7 @@ bipc::mapped_region* Manager::CreateRegion(const size_t size, const uint64_t id,
|
|||
}
|
||||
// LOG(debug) << "Created region with id '" << id << "', path: '" << path << "', flags: '" << flags << "'";
|
||||
|
||||
auto r = fRegions.emplace(id, fair::mq::tools::make_unique<Region>(*this, id, size, false, callback, path, flags));
|
||||
auto r = fRegions.emplace(id, tools::make_unique<Region>(*this, id, size, false, callback, path, flags));
|
||||
|
||||
r.first->second->StartReceivingAcks();
|
||||
|
||||
|
@ -158,7 +159,7 @@ Region* Manager::GetRemoteRegion(const uint64_t id)
|
|||
}
|
||||
// LOG(debug) << "Located remote region with id '" << id << "', path: '" << path << "', flags: '" << flags << "'";
|
||||
|
||||
auto r = fRegions.emplace(id, fair::mq::tools::make_unique<Region>(*this, id, 0, true, nullptr, path, flags));
|
||||
auto r = fRegions.emplace(id, tools::make_unique<Region>(*this, id, 0, true, nullptr, path, flags));
|
||||
return r.first->second.get();
|
||||
} catch (bie& e) {
|
||||
LOG(warn) << "Could not get remote region for id: " << id;
|
||||
|
|
|
@ -6,7 +6,7 @@
|
|||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
/**
|
||||
* FairMQShmManager.h
|
||||
* Manager.h
|
||||
*
|
||||
* @since 2016-04-08
|
||||
* @author A. Rybalchenko
|
||||
|
@ -15,11 +15,10 @@
|
|||
#ifndef FAIR_MQ_SHMEM_MANAGER_H_
|
||||
#define FAIR_MQ_SHMEM_MANAGER_H_
|
||||
|
||||
#include <fairmq/shmem/Region.h>
|
||||
#include <fairmq/shmem/Common.h>
|
||||
#include "Region.h"
|
||||
#include "Common.h"
|
||||
|
||||
#include "FairMQLogger.h"
|
||||
#include "FairMQMessage.h"
|
||||
#include <FairMQLogger.h>
|
||||
|
||||
#include <boost/interprocess/managed_shared_memory.hpp>
|
||||
#include <boost/interprocess/ipc/message_queue.hpp>
|
||||
|
@ -64,7 +63,7 @@ class Manager
|
|||
int IncrementDeviceCounter();
|
||||
int DecrementDeviceCounter();
|
||||
|
||||
boost::interprocess::mapped_region* CreateRegion(const size_t size, const uint64_t id, FairMQRegionCallback callback, const std::string& path = "", int flags = 0);
|
||||
boost::interprocess::mapped_region* CreateRegion(const size_t size, const uint64_t id, RegionCallback callback, const std::string& path = "", int flags = 0);
|
||||
Region* GetRemoteRegion(const uint64_t id);
|
||||
void RemoveRegion(const uint64_t id);
|
||||
|
||||
|
@ -77,7 +76,7 @@ class Manager
|
|||
boost::interprocess::managed_shared_memory fSegment;
|
||||
boost::interprocess::managed_shared_memory fManagementSegment;
|
||||
boost::interprocess::named_mutex fShmMtx;
|
||||
fair::mq::shmem::DeviceCounter* fDeviceCounter;
|
||||
DeviceCounter* fDeviceCounter;
|
||||
static std::unordered_map<uint64_t, std::unique_ptr<Region>> fRegions;
|
||||
};
|
||||
|
||||
|
|
232
fairmq/shmem/Message.cxx
Normal file
232
fairmq/shmem/Message.cxx
Normal file
|
@ -0,0 +1,232 @@
|
|||
/********************************************************************************
|
||||
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
|
||||
#include "Region.h"
|
||||
#include "Message.h"
|
||||
#include "UnmanagedRegion.h"
|
||||
|
||||
#include <FairMQLogger.h>
|
||||
|
||||
#include <boost/date_time/posix_time/posix_time.hpp>
|
||||
|
||||
#include <cstdlib>
|
||||
|
||||
using namespace std;
|
||||
|
||||
namespace bipc = ::boost::interprocess;
|
||||
namespace bpt = ::boost::posix_time;
|
||||
|
||||
namespace fair
|
||||
{
|
||||
namespace mq
|
||||
{
|
||||
namespace shmem
|
||||
{
|
||||
|
||||
atomic<bool> Message::fInterrupted(false);
|
||||
Transport Message::fTransportType = Transport::SHM;
|
||||
|
||||
Message::Message(Manager& manager, FairMQTransportFactory* factory)
|
||||
: fair::mq::Message{factory}
|
||||
, fManager(manager)
|
||||
, fQueued(false)
|
||||
, fMeta{0, 0, 0, -1}
|
||||
, fRegionPtr(nullptr)
|
||||
, fLocalPtr(nullptr)
|
||||
{
|
||||
}
|
||||
|
||||
Message::Message(Manager& manager, const size_t size, FairMQTransportFactory* factory)
|
||||
: fair::mq::Message{factory}
|
||||
, fManager(manager)
|
||||
, fQueued(false)
|
||||
, fMeta{0, 0, 0, -1}
|
||||
, fRegionPtr(nullptr)
|
||||
, fLocalPtr(nullptr)
|
||||
{
|
||||
InitializeChunk(size);
|
||||
}
|
||||
|
||||
Message::Message(Manager& manager, MetaHeader& hdr, FairMQTransportFactory* factory)
|
||||
: fair::mq::Message{factory}
|
||||
, fManager(manager)
|
||||
, fQueued(false)
|
||||
, fMeta{hdr}
|
||||
, fRegionPtr(nullptr)
|
||||
, fLocalPtr(nullptr)
|
||||
{
|
||||
}
|
||||
|
||||
Message::Message(Manager& manager, void* data, const size_t size, fairmq_free_fn* ffn, void* hint, FairMQTransportFactory* factory)
|
||||
: fair::mq::Message{factory}
|
||||
, fManager(manager)
|
||||
, fQueued(false)
|
||||
, fMeta{0, 0, 0, -1}
|
||||
, fRegionPtr(nullptr)
|
||||
, fLocalPtr(nullptr)
|
||||
{
|
||||
if (InitializeChunk(size)) {
|
||||
std::memcpy(fLocalPtr, data, size);
|
||||
if (ffn) {
|
||||
ffn(data, hint);
|
||||
} else {
|
||||
free(data);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Message::Message(Manager& manager, UnmanagedRegionPtr& region, void* data, const size_t size, void* hint, FairMQTransportFactory* factory)
|
||||
: fair::mq::Message{factory}
|
||||
, fManager(manager)
|
||||
, fQueued(false)
|
||||
, fMeta{size, static_cast<UnmanagedRegion*>(region.get())->fRegionId, reinterpret_cast<size_t>(hint), -1}
|
||||
, fRegionPtr(nullptr)
|
||||
, fLocalPtr(static_cast<char*>(data))
|
||||
{
|
||||
if (reinterpret_cast<const char*>(data) >= reinterpret_cast<const char*>(region->GetData()) ||
|
||||
reinterpret_cast<const char*>(data) <= reinterpret_cast<const char*>(region->GetData()) + region->GetSize()) {
|
||||
fMeta.fHandle = (bipc::managed_shared_memory::handle_t)(reinterpret_cast<const char*>(data) - reinterpret_cast<const char*>(region->GetData()));
|
||||
} else {
|
||||
LOG(error) << "trying to create region message with data from outside the region";
|
||||
throw runtime_error("trying to create region message with data from outside the region");
|
||||
}
|
||||
}
|
||||
|
||||
bool Message::InitializeChunk(const size_t size)
|
||||
{
|
||||
while (fMeta.fHandle < 0) {
|
||||
try {
|
||||
bipc::managed_shared_memory::size_type actualSize = size;
|
||||
char* hint = 0; // unused for bipc::allocate_new
|
||||
fLocalPtr = fManager.Segment().allocation_command<char>(bipc::allocate_new, size, actualSize, hint);
|
||||
} catch (bipc::bad_alloc& ba) {
|
||||
// LOG(warn) << "Shared memory full...";
|
||||
this_thread::sleep_for(chrono::milliseconds(50));
|
||||
if (fInterrupted) {
|
||||
return false;
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
fMeta.fHandle = fManager.Segment().get_handle_from_address(fLocalPtr);
|
||||
}
|
||||
|
||||
fMeta.fSize = size;
|
||||
return true;
|
||||
}
|
||||
|
||||
void Message::Rebuild()
|
||||
{
|
||||
CloseMessage();
|
||||
fQueued = false;
|
||||
}
|
||||
|
||||
void Message::Rebuild(const size_t size)
|
||||
{
|
||||
CloseMessage();
|
||||
fQueued = false;
|
||||
InitializeChunk(size);
|
||||
}
|
||||
|
||||
void Message::Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint)
|
||||
{
|
||||
CloseMessage();
|
||||
fQueued = false;
|
||||
|
||||
if (InitializeChunk(size)) {
|
||||
std::memcpy(fLocalPtr, data, size);
|
||||
if (ffn) {
|
||||
ffn(data, hint);
|
||||
} else {
|
||||
free(data);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void* Message::GetData() const
|
||||
{
|
||||
if (!fLocalPtr) {
|
||||
if (fMeta.fRegionId == 0) {
|
||||
if (fMeta.fSize > 0) {
|
||||
fLocalPtr = reinterpret_cast<char*>(fManager.Segment().get_address_from_handle(fMeta.fHandle));
|
||||
} else {
|
||||
fLocalPtr = nullptr;
|
||||
}
|
||||
} else {
|
||||
fRegionPtr = fManager.GetRemoteRegion(fMeta.fRegionId);
|
||||
if (fRegionPtr) {
|
||||
fLocalPtr = reinterpret_cast<char*>(fRegionPtr->fRegion.get_address()) + fMeta.fHandle;
|
||||
} else {
|
||||
// LOG(warn) << "could not get pointer from a region message";
|
||||
fLocalPtr = nullptr;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return fLocalPtr;
|
||||
}
|
||||
|
||||
bool Message::SetUsedSize(const size_t size)
|
||||
{
|
||||
if (size == fMeta.fSize) {
|
||||
return true;
|
||||
} else if (size <= fMeta.fSize) {
|
||||
try {
|
||||
bipc::managed_shared_memory::size_type shrunkSize = size;
|
||||
fLocalPtr = fManager.Segment().allocation_command<char>(bipc::shrink_in_place, fMeta.fSize + 128, shrunkSize, fLocalPtr);
|
||||
fMeta.fSize = size;
|
||||
return true;
|
||||
} catch (bipc::interprocess_exception& e) {
|
||||
LOG(info) << "could not set used size: " << e.what();
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
LOG(error) << "cannot set used size higher than original.";
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
void Message::Copy(const fair::mq::Message& msg)
|
||||
{
|
||||
if (fMeta.fHandle < 0) {
|
||||
bipc::managed_shared_memory::handle_t otherHandle = static_cast<const Message&>(msg).fMeta.fHandle;
|
||||
if (otherHandle) {
|
||||
if (InitializeChunk(msg.GetSize())) {
|
||||
std::memcpy(GetData(), msg.GetData(), msg.GetSize());
|
||||
}
|
||||
} else {
|
||||
LOG(error) << "copy fail: source message not initialized!";
|
||||
}
|
||||
} else {
|
||||
LOG(error) << "copy fail: target message already initialized!";
|
||||
}
|
||||
}
|
||||
|
||||
void Message::CloseMessage()
|
||||
{
|
||||
if (fMeta.fHandle >= 0 && !fQueued) {
|
||||
if (fMeta.fRegionId == 0) {
|
||||
fManager.Segment().deallocate(fManager.Segment().get_address_from_handle(fMeta.fHandle));
|
||||
fMeta.fHandle = -1;
|
||||
} else {
|
||||
if (!fRegionPtr) {
|
||||
fRegionPtr = fManager.GetRemoteRegion(fMeta.fRegionId);
|
||||
}
|
||||
|
||||
if (fRegionPtr) {
|
||||
fRegionPtr->ReleaseBlock({fMeta.fHandle, fMeta.fSize, fMeta.fHint});
|
||||
} else {
|
||||
LOG(warn) << "region ack queue for id " << fMeta.fRegionId << " no longer exist. Not sending ack";
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
79
fairmq/shmem/Message.h
Normal file
79
fairmq/shmem/Message.h
Normal file
|
@ -0,0 +1,79 @@
|
|||
/********************************************************************************
|
||||
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
#ifndef FAIR_MQ_SHMEM_MESSAGE_H_
|
||||
#define FAIR_MQ_SHMEM_MESSAGE_H_
|
||||
|
||||
#include "Common.h"
|
||||
#include "Manager.h"
|
||||
|
||||
#include <FairMQMessage.h>
|
||||
#include <FairMQUnmanagedRegion.h>
|
||||
|
||||
#include <boost/interprocess/mapped_region.hpp>
|
||||
|
||||
#include <cstddef> // size_t
|
||||
#include <atomic>
|
||||
|
||||
namespace fair
|
||||
{
|
||||
namespace mq
|
||||
{
|
||||
namespace shmem
|
||||
{
|
||||
|
||||
class Socket;
|
||||
|
||||
class Message final : public fair::mq::Message
|
||||
{
|
||||
friend class Socket;
|
||||
|
||||
public:
|
||||
Message(Manager& manager, FairMQTransportFactory* factory = nullptr);
|
||||
Message(Manager& manager, const size_t size, FairMQTransportFactory* factory = nullptr);
|
||||
Message(Manager& manager, void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr, FairMQTransportFactory* factory = nullptr);
|
||||
Message(Manager& manager, UnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0, FairMQTransportFactory* factory = nullptr);
|
||||
|
||||
Message(Manager& manager, MetaHeader& hdr, FairMQTransportFactory* factory = nullptr);
|
||||
|
||||
Message(const Message&) = delete;
|
||||
Message operator=(const Message&) = delete;
|
||||
|
||||
void Rebuild() override;
|
||||
void Rebuild(const size_t size) override;
|
||||
void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override;
|
||||
|
||||
void* GetData() const override;
|
||||
size_t GetSize() const override { return fMeta.fSize; }
|
||||
|
||||
bool SetUsedSize(const size_t size) override;
|
||||
|
||||
Transport GetType() const override { return fTransportType; }
|
||||
|
||||
void Copy(const fair::mq::Message& msg) override;
|
||||
|
||||
~Message() override { CloseMessage(); }
|
||||
|
||||
private:
|
||||
Manager& fManager;
|
||||
bool fQueued;
|
||||
MetaHeader fMeta;
|
||||
mutable Region* fRegionPtr;
|
||||
mutable char* fLocalPtr;
|
||||
|
||||
static std::atomic<bool> fInterrupted;
|
||||
static Transport fTransportType;
|
||||
|
||||
bool InitializeChunk(const size_t size);
|
||||
void CloseMessage();
|
||||
};
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#endif /* FAIR_MQ_SHMEM_MESSAGE_H_ */
|
|
@ -6,8 +6,9 @@
|
|||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
|
||||
#include <fairmq/shmem/Monitor.h>
|
||||
#include <fairmq/shmem/Common.h>
|
||||
#include "Monitor.h"
|
||||
#include "Common.h"
|
||||
|
||||
#include <fairmq/Tools.h>
|
||||
|
||||
#include <boost/interprocess/managed_shared_memory.hpp>
|
||||
|
@ -275,7 +276,7 @@ void Monitor::CheckSegment()
|
|||
unsigned int numDevices = 0;
|
||||
|
||||
if (fInteractive) {
|
||||
fair::mq::shmem::DeviceCounter* dc = managementSegment.find<fair::mq::shmem::DeviceCounter>(bipc::unique_instance).first;
|
||||
DeviceCounter* dc = managementSegment.find<DeviceCounter>(bipc::unique_instance).first;
|
||||
if (dc) {
|
||||
numDevices = dc->fCount;
|
||||
}
|
||||
|
|
|
@ -6,21 +6,29 @@
|
|||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
/**
|
||||
* FairMQPollerSHM.cxx
|
||||
* Poller.cxx
|
||||
*
|
||||
* @since 2014-01-23
|
||||
* @author A. Rybalchenko
|
||||
*/
|
||||
|
||||
#include "FairMQPollerSHM.h"
|
||||
#include "FairMQSocketSHM.h"
|
||||
#include "FairMQLogger.h"
|
||||
#include "Poller.h"
|
||||
#include "Socket.h"
|
||||
|
||||
#include <FairMQLogger.h>
|
||||
|
||||
#include <zmq.h>
|
||||
|
||||
using namespace std;
|
||||
|
||||
FairMQPollerSHM::FairMQPollerSHM(const vector<FairMQChannel>& channels)
|
||||
namespace fair
|
||||
{
|
||||
namespace mq
|
||||
{
|
||||
namespace shmem
|
||||
{
|
||||
|
||||
Poller::Poller(const vector<FairMQChannel>& channels)
|
||||
: fItems()
|
||||
, fNumItems(0)
|
||||
, fOffsetMap()
|
||||
|
@ -30,19 +38,19 @@ FairMQPollerSHM::FairMQPollerSHM(const vector<FairMQChannel>& channels)
|
|||
|
||||
for (int i = 0; i < fNumItems; ++i)
|
||||
{
|
||||
fItems[i].socket = static_cast<const FairMQSocketSHM*>(&(channels.at(i).GetSocket()))->GetSocket();
|
||||
fItems[i].socket = static_cast<const Socket*>(&(channels.at(i).GetSocket()))->GetSocket();
|
||||
fItems[i].fd = 0;
|
||||
fItems[i].revents = 0;
|
||||
|
||||
int type = 0;
|
||||
size_t size = sizeof(type);
|
||||
zmq_getsockopt(static_cast<const FairMQSocketSHM*>(&(channels.at(i).GetSocket()))->GetSocket(), ZMQ_TYPE, &type, &size);
|
||||
zmq_getsockopt(static_cast<const Socket*>(&(channels.at(i).GetSocket()))->GetSocket(), ZMQ_TYPE, &type, &size);
|
||||
|
||||
SetItemEvents(fItems[i], type);
|
||||
}
|
||||
}
|
||||
|
||||
FairMQPollerSHM::FairMQPollerSHM(const vector<FairMQChannel*>& channels)
|
||||
Poller::Poller(const vector<FairMQChannel*>& channels)
|
||||
: fItems()
|
||||
, fNumItems(0)
|
||||
, fOffsetMap()
|
||||
|
@ -52,19 +60,19 @@ FairMQPollerSHM::FairMQPollerSHM(const vector<FairMQChannel*>& channels)
|
|||
|
||||
for (int i = 0; i < fNumItems; ++i)
|
||||
{
|
||||
fItems[i].socket = static_cast<const FairMQSocketSHM*>(&(channels.at(i)->GetSocket()))->GetSocket();
|
||||
fItems[i].socket = static_cast<const Socket*>(&(channels.at(i)->GetSocket()))->GetSocket();
|
||||
fItems[i].fd = 0;
|
||||
fItems[i].revents = 0;
|
||||
|
||||
int type = 0;
|
||||
size_t size = sizeof(type);
|
||||
zmq_getsockopt(static_cast<const FairMQSocketSHM*>(&(channels.at(i)->GetSocket()))->GetSocket(), ZMQ_TYPE, &type, &size);
|
||||
zmq_getsockopt(static_cast<const Socket*>(&(channels.at(i)->GetSocket()))->GetSocket(), ZMQ_TYPE, &type, &size);
|
||||
|
||||
SetItemEvents(fItems[i], type);
|
||||
}
|
||||
}
|
||||
|
||||
FairMQPollerSHM::FairMQPollerSHM(const unordered_map<string, vector<FairMQChannel>>& channelsMap, const vector<string>& channelList)
|
||||
Poller::Poller(const unordered_map<string, vector<FairMQChannel>>& channelsMap, const vector<string>& channelList)
|
||||
: fItems()
|
||||
, fNumItems(0)
|
||||
, fOffsetMap()
|
||||
|
@ -89,19 +97,19 @@ FairMQPollerSHM::FairMQPollerSHM(const unordered_map<string, vector<FairMQChanne
|
|||
{
|
||||
index = fOffsetMap[channel] + i;
|
||||
|
||||
fItems[index].socket = static_cast<const FairMQSocketSHM*>(&(channelsMap.at(channel).at(i).GetSocket()))->GetSocket();
|
||||
fItems[index].socket = static_cast<const Socket*>(&(channelsMap.at(channel).at(i).GetSocket()))->GetSocket();
|
||||
fItems[index].fd = 0;
|
||||
fItems[index].revents = 0;
|
||||
|
||||
int type = 0;
|
||||
size_t size = sizeof(type);
|
||||
zmq_getsockopt(static_cast<const FairMQSocketSHM*>(&(channelsMap.at(channel).at(i).GetSocket()))->GetSocket(), ZMQ_TYPE, &type, &size);
|
||||
zmq_getsockopt(static_cast<const Socket*>(&(channelsMap.at(channel).at(i).GetSocket()))->GetSocket(), ZMQ_TYPE, &type, &size);
|
||||
|
||||
SetItemEvents(fItems[index], type);
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (const std::out_of_range& oor)
|
||||
catch (const out_of_range& oor)
|
||||
{
|
||||
LOG(error) << "at least one of the provided channel keys for poller initialization is invalid";
|
||||
LOG(error) << "out of range error: " << oor.what() << '\n';
|
||||
|
@ -109,7 +117,7 @@ FairMQPollerSHM::FairMQPollerSHM(const unordered_map<string, vector<FairMQChanne
|
|||
}
|
||||
}
|
||||
|
||||
void FairMQPollerSHM::SetItemEvents(zmq_pollitem_t& item, const int type)
|
||||
void Poller::SetItemEvents(zmq_pollitem_t& item, const int type)
|
||||
{
|
||||
if (type == ZMQ_REQ || type == ZMQ_REP || type == ZMQ_PAIR || type == ZMQ_DEALER || type == ZMQ_ROUTER)
|
||||
{
|
||||
|
@ -130,7 +138,7 @@ void FairMQPollerSHM::SetItemEvents(zmq_pollitem_t& item, const int type)
|
|||
}
|
||||
}
|
||||
|
||||
void FairMQPollerSHM::Poll(const int timeout)
|
||||
void Poller::Poll(const int timeout)
|
||||
{
|
||||
if (zmq_poll(fItems, fNumItems, timeout) < 0)
|
||||
{
|
||||
|
@ -141,12 +149,12 @@ void FairMQPollerSHM::Poll(const int timeout)
|
|||
else
|
||||
{
|
||||
LOG(error) << "polling failed, reason: " << zmq_strerror(errno);
|
||||
throw std::runtime_error("polling failed");
|
||||
throw runtime_error("polling failed");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool FairMQPollerSHM::CheckInput(const int index)
|
||||
bool Poller::CheckInput(const int index)
|
||||
{
|
||||
if (fItems[index].revents & ZMQ_POLLIN)
|
||||
{
|
||||
|
@ -156,7 +164,7 @@ bool FairMQPollerSHM::CheckInput(const int index)
|
|||
return false;
|
||||
}
|
||||
|
||||
bool FairMQPollerSHM::CheckOutput(const int index)
|
||||
bool Poller::CheckOutput(const int index)
|
||||
{
|
||||
if (fItems[index].revents & ZMQ_POLLOUT)
|
||||
{
|
||||
|
@ -166,7 +174,7 @@ bool FairMQPollerSHM::CheckOutput(const int index)
|
|||
return false;
|
||||
}
|
||||
|
||||
bool FairMQPollerSHM::CheckInput(const string& channelKey, const int index)
|
||||
bool Poller::CheckInput(const string& channelKey, const int index)
|
||||
{
|
||||
try
|
||||
{
|
||||
|
@ -177,7 +185,7 @@ bool FairMQPollerSHM::CheckInput(const string& channelKey, const int index)
|
|||
|
||||
return false;
|
||||
}
|
||||
catch (const std::out_of_range& oor)
|
||||
catch (const out_of_range& oor)
|
||||
{
|
||||
LOG(error) << "invalid channel key: \"" << channelKey << "\"";
|
||||
LOG(error) << "out of range error: " << oor.what() << '\n';
|
||||
|
@ -185,7 +193,7 @@ bool FairMQPollerSHM::CheckInput(const string& channelKey, const int index)
|
|||
}
|
||||
}
|
||||
|
||||
bool FairMQPollerSHM::CheckOutput(const string& channelKey, const int index)
|
||||
bool Poller::CheckOutput(const string& channelKey, const int index)
|
||||
{
|
||||
try
|
||||
{
|
||||
|
@ -196,7 +204,7 @@ bool FairMQPollerSHM::CheckOutput(const string& channelKey, const int index)
|
|||
|
||||
return false;
|
||||
}
|
||||
catch (const std::out_of_range& oor)
|
||||
catch (const out_of_range& oor)
|
||||
{
|
||||
LOG(error) << "Invalid channel key: \"" << channelKey << "\"";
|
||||
LOG(error) << "out of range error: " << oor.what() << '\n';
|
||||
|
@ -204,7 +212,11 @@ bool FairMQPollerSHM::CheckOutput(const string& channelKey, const int index)
|
|||
}
|
||||
}
|
||||
|
||||
FairMQPollerSHM::~FairMQPollerSHM()
|
||||
Poller::~Poller()
|
||||
{
|
||||
delete[] fItems;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
|
@ -5,32 +5,35 @@
|
|||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
#ifndef FAIRMQPOLLERSHM_H_
|
||||
#define FAIRMQPOLLERSHM_H_
|
||||
#ifndef FAIR_MQ_SHMEM_POLLER_H_
|
||||
#define FAIR_MQ_SHMEM_POLLER_H_
|
||||
|
||||
#include <zmq.h>
|
||||
|
||||
#include <FairMQPoller.h>
|
||||
#include <FairMQChannel.h>
|
||||
|
||||
#include <vector>
|
||||
#include <unordered_map>
|
||||
|
||||
#include <zmq.h>
|
||||
|
||||
#include "FairMQPoller.h"
|
||||
#include "FairMQChannel.h"
|
||||
#include "FairMQTransportFactorySHM.h"
|
||||
|
||||
class FairMQChannel;
|
||||
|
||||
class FairMQPollerSHM final : public FairMQPoller
|
||||
namespace fair
|
||||
{
|
||||
namespace mq
|
||||
{
|
||||
namespace shmem
|
||||
{
|
||||
friend class FairMQChannel;
|
||||
friend class FairMQTransportFactorySHM;
|
||||
|
||||
class Poller final : public fair::mq::Poller
|
||||
{
|
||||
public:
|
||||
FairMQPollerSHM(const std::vector<FairMQChannel>& channels);
|
||||
FairMQPollerSHM(const std::vector<FairMQChannel*>& channels);
|
||||
FairMQPollerSHM(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList);
|
||||
Poller(const std::vector<FairMQChannel>& channels);
|
||||
Poller(const std::vector<FairMQChannel*>& channels);
|
||||
Poller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList);
|
||||
|
||||
FairMQPollerSHM(const FairMQPollerSHM&) = delete;
|
||||
FairMQPollerSHM operator=(const FairMQPollerSHM&) = delete;
|
||||
Poller(const Poller&) = delete;
|
||||
Poller operator=(const Poller&) = delete;
|
||||
|
||||
void SetItemEvents(zmq_pollitem_t& item, const int type);
|
||||
|
||||
|
@ -40,7 +43,7 @@ class FairMQPollerSHM final : public FairMQPoller
|
|||
bool CheckInput(const std::string& channelKey, const int index) override;
|
||||
bool CheckOutput(const std::string& channelKey, const int index) override;
|
||||
|
||||
~FairMQPollerSHM() override;
|
||||
~Poller() override;
|
||||
|
||||
private:
|
||||
zmq_pollitem_t* fItems;
|
||||
|
@ -49,4 +52,8 @@ class FairMQPollerSHM final : public FairMQPoller
|
|||
std::unordered_map<std::string, int> fOffsetMap;
|
||||
};
|
||||
|
||||
#endif /* FAIRMQPOLLERSHM_H_ */
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#endif /* FAIR_MQ_SHMEM_POLLER_H_ */
|
|
@ -6,17 +6,18 @@
|
|||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
|
||||
#include <fairmq/shmem/Region.h>
|
||||
#include <fairmq/shmem/Common.h>
|
||||
#include <fairmq/shmem/Manager.h>
|
||||
#include "Region.h"
|
||||
#include "Common.h"
|
||||
#include "Manager.h"
|
||||
|
||||
#include <fairmq/tools/CppSTL.h>
|
||||
#include <fairmq/tools/Strings.h>
|
||||
|
||||
#include <boost/filesystem.hpp>
|
||||
#include <boost/process.hpp>
|
||||
#include <boost/date_time/posix_time/posix_time.hpp>
|
||||
#include <cerrno>
|
||||
|
||||
#include <cerrno>
|
||||
#include <chrono>
|
||||
|
||||
using namespace std;
|
||||
|
@ -31,7 +32,7 @@ namespace mq
|
|||
namespace shmem
|
||||
{
|
||||
|
||||
Region::Region(Manager& manager, uint64_t id, uint64_t size, bool remote, FairMQRegionCallback callback, const string& path /* = "" */, int flags /* = 0 */)
|
||||
Region::Region(Manager& manager, uint64_t id, uint64_t size, bool remote, RegionCallback callback, const string& path /* = "" */, int flags /* = 0 */)
|
||||
: fManager(manager)
|
||||
, fRemote(remote)
|
||||
, fStop(false)
|
||||
|
|
|
@ -6,7 +6,7 @@
|
|||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
/**
|
||||
* FairMQShmManager.h
|
||||
* Region.h
|
||||
*
|
||||
* @since 2016-04-08
|
||||
* @author A. Rybalchenko
|
||||
|
@ -15,10 +15,10 @@
|
|||
#ifndef FAIR_MQ_SHMEM_REGION_H_
|
||||
#define FAIR_MQ_SHMEM_REGION_H_
|
||||
|
||||
#include "FairMQLogger.h"
|
||||
#include "FairMQUnmanagedRegion.h"
|
||||
#include "Common.h"
|
||||
|
||||
#include <fairmq/shmem/Common.h>
|
||||
#include <FairMQLogger.h>
|
||||
#include <FairMQUnmanagedRegion.h>
|
||||
|
||||
#include <boost/interprocess/managed_shared_memory.hpp>
|
||||
#include <boost/interprocess/file_mapping.hpp>
|
||||
|
@ -40,7 +40,7 @@ class Manager;
|
|||
|
||||
struct Region
|
||||
{
|
||||
Region(Manager& manager, uint64_t id, uint64_t size, bool remote, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0);
|
||||
Region(Manager& manager, uint64_t id, uint64_t size, bool remote, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0);
|
||||
|
||||
Region() = delete;
|
||||
|
||||
|
@ -75,7 +75,7 @@ struct Region
|
|||
|
||||
std::thread fReceiveAcksWorker;
|
||||
std::thread fSendAcksWorker;
|
||||
FairMQRegionCallback fCallback;
|
||||
RegionCallback fCallback;
|
||||
};
|
||||
|
||||
} // namespace shmem
|
||||
|
|
|
@ -5,12 +5,13 @@
|
|||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
#include "Common.h"
|
||||
|
||||
#include "FairMQSocketSHM.h"
|
||||
#include "FairMQMessageSHM.h"
|
||||
#include "FairMQUnmanagedRegionSHM.h"
|
||||
#include "FairMQLogger.h"
|
||||
#include "Common.h"
|
||||
#include "Socket.h"
|
||||
#include "Message.h"
|
||||
#include "UnmanagedRegion.h"
|
||||
|
||||
#include <FairMQLogger.h>
|
||||
#include <fairmq/Tools.h>
|
||||
|
||||
#include <zmq.h>
|
||||
|
@ -18,13 +19,31 @@
|
|||
#include <stdexcept>
|
||||
|
||||
using namespace std;
|
||||
using namespace fair::mq::shmem;
|
||||
using namespace fair::mq;
|
||||
|
||||
atomic<bool> FairMQSocketSHM::fInterrupted(false);
|
||||
namespace fair
|
||||
{
|
||||
namespace mq
|
||||
{
|
||||
namespace shmem
|
||||
{
|
||||
|
||||
FairMQSocketSHM::FairMQSocketSHM(Manager& manager, const string& type, const string& name, const string& id /*= ""*/, void* context, FairMQTransportFactory* fac /*=nullptr*/)
|
||||
: FairMQSocket{fac}
|
||||
atomic<bool> Socket::fInterrupted(false);
|
||||
|
||||
struct ZMsg
|
||||
{
|
||||
ZMsg() { int rc __attribute__((unused)) = zmq_msg_init(&fMsg); assert(rc == 0); }
|
||||
explicit ZMsg(size_t size) { int rc __attribute__((unused)) = zmq_msg_init_size(&fMsg, size); assert(rc == 0); }
|
||||
~ZMsg() { int rc __attribute__((unused)) = zmq_msg_close(&fMsg); assert(rc == 0); }
|
||||
|
||||
void* Data() { return zmq_msg_data(&fMsg); }
|
||||
size_t Size() { return zmq_msg_size(&fMsg); }
|
||||
zmq_msg_t* Msg() { return &fMsg; }
|
||||
|
||||
zmq_msg_t fMsg;
|
||||
};
|
||||
|
||||
Socket::Socket(Manager& manager, const string& type, const string& name, const string& id /*= ""*/, void* context, FairMQTransportFactory* fac /*=nullptr*/)
|
||||
: fair::mq::Socket{fac}
|
||||
, fSocket(nullptr)
|
||||
, fManager(manager)
|
||||
, fId(id + "." + name + "." + type)
|
||||
|
@ -40,7 +59,7 @@ FairMQSocketSHM::FairMQSocketSHM(Manager& manager, const string& type, const str
|
|||
|
||||
if (fSocket == nullptr) {
|
||||
LOG(error) << "Failed creating socket " << fId << ", reason: " << zmq_strerror(errno);
|
||||
exit(EXIT_FAILURE);
|
||||
throw SocketError(tools::ToString("Failed creating socket ", fId, ", reason: ", zmq_strerror(errno)));
|
||||
}
|
||||
|
||||
if (zmq_setsockopt(fSocket, ZMQ_IDENTITY, fId.c_str(), fId.length()) != 0) {
|
||||
|
@ -72,16 +91,14 @@ FairMQSocketSHM::FairMQSocketSHM(Manager& manager, const string& type, const str
|
|||
|
||||
if (type == "sub" || type == "pub") {
|
||||
LOG(error) << "PUB/SUB socket type is not supported for shared memory transport";
|
||||
throw fair::mq::SocketError("PUB/SUB socket type is not supported for shared memory transport");
|
||||
throw SocketError("PUB/SUB socket type is not supported for shared memory transport");
|
||||
}
|
||||
|
||||
LOG(debug) << "Created socket " << GetId();
|
||||
}
|
||||
|
||||
bool FairMQSocketSHM::Bind(const string& address)
|
||||
bool Socket::Bind(const string& address)
|
||||
{
|
||||
// LOG(info) << "binding socket " << fId << " on " << address;
|
||||
|
||||
if (zmq_bind(fSocket, address.c_str()) != 0) {
|
||||
if (errno == EADDRINUSE) {
|
||||
// do not print error in this case, this is handled by FairMQDevice in case no connection could be established after trying a number of random ports from a range.
|
||||
|
@ -93,19 +110,17 @@ bool FairMQSocketSHM::Bind(const string& address)
|
|||
return true;
|
||||
}
|
||||
|
||||
bool FairMQSocketSHM::Connect(const string& address)
|
||||
bool Socket::Connect(const string& address)
|
||||
{
|
||||
// LOG(info) << "connecting socket " << fId << " on " << address;
|
||||
|
||||
if (zmq_connect(fSocket, address.c_str()) != 0) {
|
||||
LOG(error) << "Failed connecting socket " << fId << ", reason: " << zmq_strerror(errno);
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
int FairMQSocketSHM::Send(FairMQMessagePtr& msg, const int timeout)
|
||||
int Socket::Send(MessagePtr& msg, const int timeout)
|
||||
{
|
||||
int flags = 0;
|
||||
if (timeout == 0) {
|
||||
|
@ -113,16 +128,17 @@ int FairMQSocketSHM::Send(FairMQMessagePtr& msg, const int timeout)
|
|||
}
|
||||
int elapsed = 0;
|
||||
|
||||
Message* shmMsg = static_cast<Message*>(msg.get());
|
||||
ZMsg zmqMsg(sizeof(MetaHeader));
|
||||
std::memcpy(zmqMsg.Data(), &(shmMsg->fMeta), sizeof(MetaHeader));
|
||||
|
||||
while (true && !fInterrupted) {
|
||||
int nbytes = zmq_msg_send(static_cast<FairMQMessageSHM*>(msg.get())->GetMessage(), fSocket, flags);
|
||||
if (nbytes == 0) {
|
||||
int nbytes = zmq_msg_send(zmqMsg.Msg(), fSocket, flags);
|
||||
if (nbytes > 0) {
|
||||
shmMsg->fQueued = true;
|
||||
++fMessagesTx;
|
||||
return nbytes;
|
||||
} else if (nbytes > 0) {
|
||||
static_cast<FairMQMessageSHM*>(msg.get())->fQueued = true;
|
||||
size_t size = msg->GetSize();
|
||||
fBytesTx += size;
|
||||
++fMessagesTx;
|
||||
return size;
|
||||
} else if (zmq_errno() == EAGAIN) {
|
||||
if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) {
|
||||
|
@ -140,7 +156,7 @@ int FairMQSocketSHM::Send(FairMQMessagePtr& msg, const int timeout)
|
|||
LOG(info) << "terminating socket " << fId;
|
||||
return -1;
|
||||
} else {
|
||||
LOG(error) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno);
|
||||
LOG(error) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno) << ", nbytes = " << nbytes;
|
||||
return nbytes;
|
||||
}
|
||||
}
|
||||
|
@ -148,7 +164,7 @@ int FairMQSocketSHM::Send(FairMQMessagePtr& msg, const int timeout)
|
|||
return -1;
|
||||
}
|
||||
|
||||
int FairMQSocketSHM::Receive(FairMQMessagePtr& msg, const int timeout)
|
||||
int Socket::Receive(MessagePtr& msg, const int timeout)
|
||||
{
|
||||
int flags = 0;
|
||||
if (timeout == 0) {
|
||||
|
@ -156,28 +172,18 @@ int FairMQSocketSHM::Receive(FairMQMessagePtr& msg, const int timeout)
|
|||
}
|
||||
int elapsed = 0;
|
||||
|
||||
ZMsg zmqMsg;
|
||||
|
||||
while (true) {
|
||||
FairMQMessageSHM* shmMsg = static_cast<FairMQMessageSHM*>(msg.get());
|
||||
zmq_msg_t* zmqMsg = shmMsg->GetMessage();
|
||||
int nbytes = zmq_msg_recv(zmqMsg, fSocket, flags);
|
||||
if (nbytes == 0) {
|
||||
++fMessagesRx;
|
||||
return nbytes;
|
||||
} else if (nbytes > 0) {
|
||||
Message* shmMsg = static_cast<Message*>(msg.get());
|
||||
int nbytes = zmq_msg_recv(zmqMsg.Msg(), fSocket, flags);
|
||||
if (nbytes > 0) {
|
||||
// check for number of received messages. must be 1
|
||||
const auto numMsgs = nbytes / sizeof(MetaHeader);
|
||||
if (numMsgs > 1) {
|
||||
LOG(error) << "Receiving SHM multipart with a single message receive call";
|
||||
}
|
||||
assert((nbytes / sizeof(MetaHeader)) == 1);
|
||||
|
||||
assert(numMsgs == 1);
|
||||
|
||||
MetaHeader* hdr = static_cast<MetaHeader*>(zmq_msg_data(zmqMsg));
|
||||
MetaHeader* hdr = static_cast<MetaHeader*>(zmqMsg.Data());
|
||||
size_t size = hdr->fSize;
|
||||
shmMsg->fHandle = hdr->fHandle;
|
||||
shmMsg->fSize = size;
|
||||
shmMsg->fRegionId = hdr->fRegionId;
|
||||
shmMsg->fHint = hdr->fHint;
|
||||
shmMsg->fMeta = *hdr;
|
||||
|
||||
fBytesRx += size;
|
||||
++fMessagesRx;
|
||||
|
@ -198,95 +204,74 @@ int FairMQSocketSHM::Receive(FairMQMessagePtr& msg, const int timeout)
|
|||
LOG(info) << "terminating socket " << fId;
|
||||
return -1;
|
||||
} else {
|
||||
LOG(error) << "Failed receiving on socket " << fId << ", reason: " << zmq_strerror(errno);
|
||||
LOG(error) << "Failed receiving on socket " << fId << ", reason: " << zmq_strerror(errno) << ", nbytes = " << nbytes;
|
||||
return nbytes;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int64_t FairMQSocketSHM::Send(vector<FairMQMessagePtr>& msgVec, const int timeout)
|
||||
int64_t Socket::Send(vector<MessagePtr>& msgVec, const int timeout)
|
||||
{
|
||||
int flags = 0;
|
||||
if (timeout == 0) {
|
||||
flags = ZMQ_DONTWAIT;
|
||||
}
|
||||
const unsigned int vecSize = msgVec.size();
|
||||
int elapsed = 0;
|
||||
|
||||
// put it into zmq message
|
||||
zmq_msg_t zmqMsg;
|
||||
zmq_msg_init_size(&zmqMsg, vecSize * sizeof(MetaHeader));
|
||||
const unsigned int vecSize = msgVec.size();
|
||||
ZMsg zmqMsg(vecSize * sizeof(MetaHeader));
|
||||
|
||||
// prepare the message with shm metas
|
||||
MetaHeader* metas = static_cast<MetaHeader*>(zmq_msg_data(&zmqMsg));
|
||||
MetaHeader* metas = static_cast<MetaHeader*>(zmqMsg.Data());
|
||||
|
||||
for (auto& msg : msgVec) {
|
||||
zmq_msg_t* metaMsg = static_cast<FairMQMessageSHM*>(msg.get())->GetMessage();
|
||||
if (zmq_msg_size(metaMsg) > 0) {
|
||||
memcpy(metas++, zmq_msg_data(metaMsg), sizeof(MetaHeader));
|
||||
} else {
|
||||
// if the message is empty, create meta data to reflect this
|
||||
// (always creating meta data for empty messages would add an unnecessary allocation for the receive case, so we do it lazily here)
|
||||
MetaHeader hdr;
|
||||
hdr.fSize = 0;
|
||||
hdr.fHandle = -1;
|
||||
hdr.fRegionId = 0;
|
||||
hdr.fHint = 0;
|
||||
memcpy(metas++, &hdr, sizeof(MetaHeader));
|
||||
}
|
||||
Message* shmMsg = static_cast<Message*>(msg.get());
|
||||
std::memcpy(metas++, &(shmMsg->fMeta), sizeof(MetaHeader));
|
||||
}
|
||||
|
||||
while (!fInterrupted) {
|
||||
int64_t totalSize = 0;
|
||||
int nbytes = zmq_msg_send(&zmqMsg, fSocket, flags);
|
||||
if (nbytes == 0) {
|
||||
zmq_msg_close(&zmqMsg);
|
||||
return nbytes;
|
||||
} else if (nbytes > 0) {
|
||||
int nbytes = zmq_msg_send(zmqMsg.Msg(), fSocket, flags);
|
||||
if (nbytes > 0) {
|
||||
assert(static_cast<unsigned int>(nbytes) == (vecSize * sizeof(MetaHeader))); // all or nothing
|
||||
|
||||
for (auto& msg : msgVec) {
|
||||
FairMQMessageSHM* shmMsg = static_cast<FairMQMessageSHM*>(msg.get());
|
||||
Message* shmMsg = static_cast<Message*>(msg.get());
|
||||
shmMsg->fQueued = true;
|
||||
totalSize += shmMsg->fSize;
|
||||
totalSize += shmMsg->fMeta.fSize;
|
||||
}
|
||||
|
||||
// store statistics on how many messages have been sent
|
||||
fMessagesTx++;
|
||||
fBytesTx += totalSize;
|
||||
|
||||
zmq_msg_close(&zmqMsg);
|
||||
return totalSize;
|
||||
} else if (zmq_errno() == EAGAIN) {
|
||||
if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) {
|
||||
if (timeout > 0) {
|
||||
elapsed += fSndTimeout;
|
||||
if (elapsed >= timeout) {
|
||||
zmq_msg_close(&zmqMsg);
|
||||
return -2;
|
||||
}
|
||||
}
|
||||
continue;
|
||||
} else {
|
||||
zmq_msg_close(&zmqMsg);
|
||||
return -2;
|
||||
}
|
||||
} else if (zmq_errno() == ETERM) {
|
||||
zmq_msg_close(&zmqMsg);
|
||||
LOG(info) << "terminating socket " << fId;
|
||||
return -1;
|
||||
} else {
|
||||
zmq_msg_close(&zmqMsg);
|
||||
LOG(error) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno);
|
||||
LOG(error) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno) << ", nbytes = " << nbytes;
|
||||
return nbytes;
|
||||
}
|
||||
}
|
||||
|
||||
zmq_msg_close(&zmqMsg);
|
||||
return -1;
|
||||
}
|
||||
|
||||
int64_t FairMQSocketSHM::Receive(vector<FairMQMessagePtr>& msgVec, const int timeout)
|
||||
int64_t Socket::Receive(vector<MessagePtr>& msgVec, const int timeout)
|
||||
{
|
||||
int flags = 0;
|
||||
if (timeout == 0) {
|
||||
|
@ -294,18 +279,14 @@ int64_t FairMQSocketSHM::Receive(vector<FairMQMessagePtr>& msgVec, const int tim
|
|||
}
|
||||
int elapsed = 0;
|
||||
|
||||
zmq_msg_t zmqMsg;
|
||||
zmq_msg_init(&zmqMsg);
|
||||
ZMsg zmqMsg;
|
||||
|
||||
while (!fInterrupted) {
|
||||
int64_t totalSize = 0;
|
||||
int nbytes = zmq_msg_recv(&zmqMsg, fSocket, flags);
|
||||
if (nbytes == 0) {
|
||||
zmq_msg_close(&zmqMsg);
|
||||
return 0;
|
||||
} else if (nbytes > 0) {
|
||||
MetaHeader* hdrVec = static_cast<MetaHeader*>(zmq_msg_data(&zmqMsg));
|
||||
const auto hdrVecSize = zmq_msg_size(&zmqMsg);
|
||||
int nbytes = zmq_msg_recv(zmqMsg.Msg(), fSocket, flags);
|
||||
if (nbytes > 0) {
|
||||
MetaHeader* hdrVec = static_cast<MetaHeader*>(zmqMsg.Data());
|
||||
const auto hdrVecSize = zmqMsg.Size();
|
||||
assert(hdrVecSize > 0);
|
||||
assert(hdrVecSize % sizeof(MetaHeader) == 0);
|
||||
|
||||
|
@ -314,12 +295,9 @@ int64_t FairMQSocketSHM::Receive(vector<FairMQMessagePtr>& msgVec, const int tim
|
|||
msgVec.reserve(numMessages);
|
||||
|
||||
for (size_t m = 0; m < numMessages; m++) {
|
||||
// get the meta data pointer
|
||||
MetaHeader* hdr = &hdrVec[m];
|
||||
|
||||
// create new message (part)
|
||||
msgVec.emplace_back(tools::make_unique<FairMQMessageSHM>(fManager, hdr, GetTransport()));
|
||||
FairMQMessageSHM* shmMsg = static_cast<FairMQMessageSHM*>(msgVec.back().get());
|
||||
msgVec.emplace_back(tools::make_unique<Message>(fManager, hdrVec[m], GetTransport()));
|
||||
Message* shmMsg = static_cast<Message*>(msgVec.back().get());
|
||||
totalSize += shmMsg->GetSize();
|
||||
}
|
||||
|
||||
|
@ -327,34 +305,29 @@ int64_t FairMQSocketSHM::Receive(vector<FairMQMessagePtr>& msgVec, const int tim
|
|||
fMessagesRx++;
|
||||
fBytesRx += totalSize;
|
||||
|
||||
zmq_msg_close(&zmqMsg);
|
||||
return totalSize;
|
||||
} else if (zmq_errno() == EAGAIN) {
|
||||
if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) {
|
||||
if (timeout > 0) {
|
||||
elapsed += fRcvTimeout;
|
||||
if (elapsed >= timeout) {
|
||||
zmq_msg_close(&zmqMsg);
|
||||
return -2;
|
||||
}
|
||||
}
|
||||
continue;
|
||||
} else {
|
||||
zmq_msg_close(&zmqMsg);
|
||||
return -2;
|
||||
}
|
||||
} else {
|
||||
zmq_msg_close(&zmqMsg);
|
||||
LOG(error) << "Failed receiving on socket " << fId << ", reason: " << zmq_strerror(errno);
|
||||
LOG(error) << "Failed receiving on socket " << fId << ", reason: " << zmq_strerror(errno) << ", nbytes = " << nbytes;
|
||||
return nbytes;
|
||||
}
|
||||
}
|
||||
|
||||
zmq_msg_close(&zmqMsg);
|
||||
return -1;
|
||||
}
|
||||
|
||||
void FairMQSocketSHM::Close()
|
||||
void Socket::Close()
|
||||
{
|
||||
// LOG(debug) << "Closing socket " << fId;
|
||||
|
||||
|
@ -369,42 +342,42 @@ void FairMQSocketSHM::Close()
|
|||
fSocket = nullptr;
|
||||
}
|
||||
|
||||
void FairMQSocketSHM::Interrupt()
|
||||
void Socket::Interrupt()
|
||||
{
|
||||
Manager::Interrupt();
|
||||
FairMQMessageSHM::fInterrupted = true;
|
||||
Message::fInterrupted = true;
|
||||
fInterrupted = true;
|
||||
}
|
||||
|
||||
void FairMQSocketSHM::Resume()
|
||||
void Socket::Resume()
|
||||
{
|
||||
Manager::Resume();
|
||||
FairMQMessageSHM::fInterrupted = false;
|
||||
Message::fInterrupted = false;
|
||||
fInterrupted = false;
|
||||
}
|
||||
|
||||
void FairMQSocketSHM::SetOption(const string& option, const void* value, size_t valueSize)
|
||||
void Socket::SetOption(const string& option, const void* value, size_t valueSize)
|
||||
{
|
||||
if (zmq_setsockopt(fSocket, GetConstant(option), value, valueSize) < 0) {
|
||||
LOG(error) << "Failed setting socket option, reason: " << zmq_strerror(errno);
|
||||
}
|
||||
}
|
||||
|
||||
void FairMQSocketSHM::GetOption(const string& option, void* value, size_t* valueSize)
|
||||
void Socket::GetOption(const string& option, void* value, size_t* valueSize)
|
||||
{
|
||||
if (zmq_getsockopt(fSocket, GetConstant(option), value, valueSize) < 0) {
|
||||
LOG(error) << "Failed getting socket option, reason: " << zmq_strerror(errno);
|
||||
}
|
||||
}
|
||||
|
||||
void FairMQSocketSHM::SetLinger(const int value)
|
||||
void Socket::SetLinger(const int value)
|
||||
{
|
||||
if (zmq_setsockopt(fSocket, ZMQ_LINGER, &value, sizeof(value)) < 0) {
|
||||
throw SocketError(tools::ToString("failed setting ZMQ_LINGER, reason: ", zmq_strerror(errno)));
|
||||
}
|
||||
}
|
||||
|
||||
int FairMQSocketSHM::GetLinger() const
|
||||
int Socket::GetLinger() const
|
||||
{
|
||||
int value = 0;
|
||||
size_t valueSize = sizeof(value);
|
||||
|
@ -414,14 +387,14 @@ int FairMQSocketSHM::GetLinger() const
|
|||
return value;
|
||||
}
|
||||
|
||||
void FairMQSocketSHM::SetSndBufSize(const int value)
|
||||
void Socket::SetSndBufSize(const int value)
|
||||
{
|
||||
if (zmq_setsockopt(fSocket, ZMQ_SNDHWM, &value, sizeof(value)) < 0) {
|
||||
throw SocketError(tools::ToString("failed setting ZMQ_SNDHWM, reason: ", zmq_strerror(errno)));
|
||||
}
|
||||
}
|
||||
|
||||
int FairMQSocketSHM::GetSndBufSize() const
|
||||
int Socket::GetSndBufSize() const
|
||||
{
|
||||
int value = 0;
|
||||
size_t valueSize = sizeof(value);
|
||||
|
@ -431,14 +404,14 @@ int FairMQSocketSHM::GetSndBufSize() const
|
|||
return value;
|
||||
}
|
||||
|
||||
void FairMQSocketSHM::SetRcvBufSize(const int value)
|
||||
void Socket::SetRcvBufSize(const int value)
|
||||
{
|
||||
if (zmq_setsockopt(fSocket, ZMQ_RCVHWM, &value, sizeof(value)) < 0) {
|
||||
throw SocketError(tools::ToString("failed setting ZMQ_RCVHWM, reason: ", zmq_strerror(errno)));
|
||||
}
|
||||
}
|
||||
|
||||
int FairMQSocketSHM::GetRcvBufSize() const
|
||||
int Socket::GetRcvBufSize() const
|
||||
{
|
||||
int value = 0;
|
||||
size_t valueSize = sizeof(value);
|
||||
|
@ -448,14 +421,14 @@ int FairMQSocketSHM::GetRcvBufSize() const
|
|||
return value;
|
||||
}
|
||||
|
||||
void FairMQSocketSHM::SetSndKernelSize(const int value)
|
||||
void Socket::SetSndKernelSize(const int value)
|
||||
{
|
||||
if (zmq_setsockopt(fSocket, ZMQ_SNDBUF, &value, sizeof(value)) < 0) {
|
||||
throw SocketError(tools::ToString("failed getting ZMQ_SNDBUF, reason: ", zmq_strerror(errno)));
|
||||
}
|
||||
}
|
||||
|
||||
int FairMQSocketSHM::GetSndKernelSize() const
|
||||
int Socket::GetSndKernelSize() const
|
||||
{
|
||||
int value = 0;
|
||||
size_t valueSize = sizeof(value);
|
||||
|
@ -465,14 +438,14 @@ int FairMQSocketSHM::GetSndKernelSize() const
|
|||
return value;
|
||||
}
|
||||
|
||||
void FairMQSocketSHM::SetRcvKernelSize(const int value)
|
||||
void Socket::SetRcvKernelSize(const int value)
|
||||
{
|
||||
if (zmq_setsockopt(fSocket, ZMQ_RCVBUF, &value, sizeof(value)) < 0) {
|
||||
throw SocketError(tools::ToString("failed getting ZMQ_RCVBUF, reason: ", zmq_strerror(errno)));
|
||||
}
|
||||
}
|
||||
|
||||
int FairMQSocketSHM::GetRcvKernelSize() const
|
||||
int Socket::GetRcvKernelSize() const
|
||||
{
|
||||
int value = 0;
|
||||
size_t valueSize = sizeof(value);
|
||||
|
@ -482,7 +455,7 @@ int FairMQSocketSHM::GetRcvKernelSize() const
|
|||
return value;
|
||||
}
|
||||
|
||||
int FairMQSocketSHM::GetConstant(const string& constant)
|
||||
int Socket::GetConstant(const string& constant)
|
||||
{
|
||||
if (constant == "") return 0;
|
||||
if (constant == "sub") return ZMQ_SUB;
|
||||
|
@ -510,3 +483,7 @@ int FairMQSocketSHM::GetConstant(const string& constant)
|
|||
|
||||
return -1;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
|
@ -5,34 +5,42 @@
|
|||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
#ifndef FAIRMQSOCKETSHM_H_
|
||||
#define FAIRMQSOCKETSHM_H_
|
||||
#ifndef FAIR_MQ_SHMEM_SOCKET_H_
|
||||
#define FAIR_MQ_SHMEM_SOCKET_H_
|
||||
|
||||
#include "FairMQSocket.h"
|
||||
#include "FairMQMessage.h"
|
||||
#include "Manager.h"
|
||||
|
||||
#include <fairmq/shmem/Manager.h>
|
||||
#include <FairMQSocket.h>
|
||||
#include <FairMQMessage.h>
|
||||
|
||||
#include <atomic>
|
||||
#include <memory> // unique_ptr
|
||||
|
||||
class FairMQTransportFactory;
|
||||
|
||||
class FairMQSocketSHM final : public FairMQSocket
|
||||
namespace fair
|
||||
{
|
||||
namespace mq
|
||||
{
|
||||
namespace shmem
|
||||
{
|
||||
|
||||
class Socket final : public fair::mq::Socket
|
||||
{
|
||||
public:
|
||||
FairMQSocketSHM(fair::mq::shmem::Manager& manager, const std::string& type, const std::string& name, const std::string& id = "", void* context = nullptr, FairMQTransportFactory* fac = nullptr);
|
||||
FairMQSocketSHM(const FairMQSocketSHM&) = delete;
|
||||
FairMQSocketSHM operator=(const FairMQSocketSHM&) = delete;
|
||||
Socket(Manager& manager, const std::string& type, const std::string& name, const std::string& id = "", void* context = nullptr, FairMQTransportFactory* fac = nullptr);
|
||||
Socket(const Socket&) = delete;
|
||||
Socket operator=(const Socket&) = delete;
|
||||
|
||||
std::string GetId() override { return fId; }
|
||||
std::string GetId() const override { return fId; }
|
||||
|
||||
bool Bind(const std::string& address) override;
|
||||
bool Connect(const std::string& address) override;
|
||||
|
||||
int Send(FairMQMessagePtr& msg, const int timeout = -1) override;
|
||||
int Receive(FairMQMessagePtr& msg, const int timeout = -1) override;
|
||||
int64_t Send(std::vector<std::unique_ptr<FairMQMessage>>& msgVec, const int timeout = -1) override;
|
||||
int64_t Receive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec, const int timeout = -1) override;
|
||||
int Send(MessagePtr& msg, const int timeout = -1) override;
|
||||
int Receive(MessagePtr& msg, const int timeout = -1) override;
|
||||
int64_t Send(std::vector<MessagePtr>& msgVec, const int timeout = -1) override;
|
||||
int64_t Receive(std::vector<MessagePtr>& msgVec, const int timeout = -1) override;
|
||||
|
||||
void* GetSocket() const { return fSocket; }
|
||||
|
||||
|
@ -62,11 +70,11 @@ class FairMQSocketSHM final : public FairMQSocket
|
|||
|
||||
static int GetConstant(const std::string& constant);
|
||||
|
||||
~FairMQSocketSHM() override { Close(); }
|
||||
~Socket() override { Close(); }
|
||||
|
||||
private:
|
||||
void* fSocket;
|
||||
fair::mq::shmem::Manager& fManager;
|
||||
Manager& fManager;
|
||||
std::string fId;
|
||||
std::atomic<unsigned long> fBytesTx;
|
||||
std::atomic<unsigned long> fBytesRx;
|
||||
|
@ -79,4 +87,8 @@ class FairMQSocketSHM final : public FairMQSocket
|
|||
int fRcvTimeout;
|
||||
};
|
||||
|
||||
#endif /* FAIRMQSOCKETSHM_H_ */
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#endif /* FAIR_MQ_SHMEM_SOCKET_H_ */
|
|
@ -6,9 +6,9 @@
|
|||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
|
||||
#include "FairMQLogger.h"
|
||||
#include "FairMQTransportFactorySHM.h"
|
||||
#include "TransportFactory.h"
|
||||
|
||||
#include <FairMQLogger.h>
|
||||
#include <fairmq/Tools.h>
|
||||
|
||||
#include <zmq.h>
|
||||
|
@ -26,15 +26,21 @@
|
|||
#include <cstdlib> // getenv
|
||||
|
||||
using namespace std;
|
||||
using namespace fair::mq::shmem;
|
||||
|
||||
namespace bpt = ::boost::posix_time;
|
||||
namespace bipc = ::boost::interprocess;
|
||||
|
||||
fair::mq::Transport FairMQTransportFactorySHM::fTransportType = fair::mq::Transport::SHM;
|
||||
namespace fair
|
||||
{
|
||||
namespace mq
|
||||
{
|
||||
namespace shmem
|
||||
{
|
||||
|
||||
FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const fair::mq::ProgOptions* config)
|
||||
: FairMQTransportFactory(id)
|
||||
Transport TransportFactory::fTransportType = Transport::SHM;
|
||||
|
||||
TransportFactory::TransportFactory(const string& id, const ProgOptions* config)
|
||||
: fair::mq::TransportFactory(id)
|
||||
, fDeviceId(id)
|
||||
, fShmId()
|
||||
, fZMQContext(nullptr)
|
||||
|
@ -49,7 +55,7 @@ FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const fai
|
|||
|
||||
fZMQContext = zmq_ctx_new();
|
||||
if (!fZMQContext) {
|
||||
throw runtime_error(fair::mq::tools::ToString("failed creating context, reason: ", zmq_strerror(errno)));
|
||||
throw runtime_error(tools::ToString("failed creating context, reason: ", zmq_strerror(errno)));
|
||||
}
|
||||
|
||||
int numIoThreads = 1;
|
||||
|
@ -62,7 +68,7 @@ FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const fai
|
|||
segmentSize = config->GetProperty<size_t>("shm-segment-size", segmentSize);
|
||||
autolaunchMonitor = config->GetProperty<bool>("shm-monitor", autolaunchMonitor);
|
||||
} else {
|
||||
LOG(debug) << "fair::mq::ProgOptions not available! Using defaults.";
|
||||
LOG(debug) << "ProgOptions not available! Using defaults.";
|
||||
}
|
||||
|
||||
fShmId = buildShmIdFromSessionIdAndUserId(sessionName);
|
||||
|
@ -81,18 +87,18 @@ FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const fai
|
|||
Manager::StartMonitor(fShmId);
|
||||
}
|
||||
|
||||
fManager = fair::mq::tools::make_unique<Manager>(fShmId, segmentSize);
|
||||
fManager = tools::make_unique<Manager>(fShmId, segmentSize);
|
||||
|
||||
} catch (bipc::interprocess_exception& e) {
|
||||
LOG(error) << "Could not initialize shared memory transport: " << e.what();
|
||||
throw runtime_error(fair::mq::tools::ToString("Could not initialize shared memory transport: ", e.what()));
|
||||
throw runtime_error(tools::ToString("Could not initialize shared memory transport: ", e.what()));
|
||||
}
|
||||
|
||||
fSendHeartbeats = true;
|
||||
fHeartbeatThread = thread(&FairMQTransportFactorySHM::SendHeartbeats, this);
|
||||
fHeartbeatThread = thread(&TransportFactory::SendHeartbeats, this);
|
||||
}
|
||||
|
||||
void FairMQTransportFactorySHM::SendHeartbeats()
|
||||
void TransportFactory::SendHeartbeats()
|
||||
{
|
||||
string controlQueueName("fmq_" + fShmId + "_cq");
|
||||
while (fSendHeartbeats) {
|
||||
|
@ -111,58 +117,58 @@ void FairMQTransportFactorySHM::SendHeartbeats()
|
|||
}
|
||||
}
|
||||
|
||||
FairMQMessagePtr FairMQTransportFactorySHM::CreateMessage()
|
||||
MessagePtr TransportFactory::CreateMessage()
|
||||
{
|
||||
return unique_ptr<FairMQMessage>(new FairMQMessageSHM(*fManager, this));
|
||||
return tools::make_unique<Message>(*fManager, this);
|
||||
}
|
||||
|
||||
FairMQMessagePtr FairMQTransportFactorySHM::CreateMessage(const size_t size)
|
||||
MessagePtr TransportFactory::CreateMessage(const size_t size)
|
||||
{
|
||||
return unique_ptr<FairMQMessage>(new FairMQMessageSHM(*fManager, size, this));
|
||||
return tools::make_unique<Message>(*fManager, size, this);
|
||||
}
|
||||
|
||||
FairMQMessagePtr FairMQTransportFactorySHM::CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint)
|
||||
MessagePtr TransportFactory::CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint)
|
||||
{
|
||||
return unique_ptr<FairMQMessage>(new FairMQMessageSHM(*fManager, data, size, ffn, hint, this));
|
||||
return tools::make_unique<Message>(*fManager, data, size, ffn, hint, this);
|
||||
}
|
||||
|
||||
FairMQMessagePtr FairMQTransportFactorySHM::CreateMessage(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint)
|
||||
MessagePtr TransportFactory::CreateMessage(UnmanagedRegionPtr& region, void* data, const size_t size, void* hint)
|
||||
{
|
||||
return unique_ptr<FairMQMessage>(new FairMQMessageSHM(*fManager, region, data, size, hint, this));
|
||||
return tools::make_unique<Message>(*fManager, region, data, size, hint, this);
|
||||
}
|
||||
|
||||
FairMQSocketPtr FairMQTransportFactorySHM::CreateSocket(const string& type, const string& name)
|
||||
SocketPtr TransportFactory::CreateSocket(const string& type, const string& name)
|
||||
{
|
||||
assert(fZMQContext);
|
||||
return unique_ptr<FairMQSocket>(new FairMQSocketSHM(*fManager, type, name, GetId(), fZMQContext, this));
|
||||
return tools::make_unique<Socket>(*fManager, type, name, GetId(), fZMQContext, this);
|
||||
}
|
||||
|
||||
FairMQPollerPtr FairMQTransportFactorySHM::CreatePoller(const vector<FairMQChannel>& channels) const
|
||||
PollerPtr TransportFactory::CreatePoller(const vector<FairMQChannel>& channels) const
|
||||
{
|
||||
return unique_ptr<FairMQPoller>(new FairMQPollerSHM(channels));
|
||||
return tools::make_unique<Poller>(channels);
|
||||
}
|
||||
|
||||
FairMQPollerPtr FairMQTransportFactorySHM::CreatePoller(const vector<FairMQChannel*>& channels) const
|
||||
PollerPtr TransportFactory::CreatePoller(const vector<FairMQChannel*>& channels) const
|
||||
{
|
||||
return unique_ptr<FairMQPoller>(new FairMQPollerSHM(channels));
|
||||
return tools::make_unique<Poller>(channels);
|
||||
}
|
||||
|
||||
FairMQPollerPtr FairMQTransportFactorySHM::CreatePoller(const unordered_map<string, vector<FairMQChannel>>& channelsMap, const vector<string>& channelList) const
|
||||
PollerPtr TransportFactory::CreatePoller(const unordered_map<string, vector<FairMQChannel>>& channelsMap, const vector<string>& channelList) const
|
||||
{
|
||||
return unique_ptr<FairMQPoller>(new FairMQPollerSHM(channelsMap, channelList));
|
||||
return tools::make_unique<Poller>(channelsMap, channelList);
|
||||
}
|
||||
|
||||
FairMQUnmanagedRegionPtr FairMQTransportFactorySHM::CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */) const
|
||||
UnmanagedRegionPtr TransportFactory::CreateUnmanagedRegion(const size_t size, RegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */) const
|
||||
{
|
||||
return unique_ptr<FairMQUnmanagedRegion>(new FairMQUnmanagedRegionSHM(*fManager, size, callback, path, flags));
|
||||
return tools::make_unique<UnmanagedRegion>(*fManager, size, callback, path, flags);
|
||||
}
|
||||
|
||||
fair::mq::Transport FairMQTransportFactorySHM::GetType() const
|
||||
Transport TransportFactory::GetType() const
|
||||
{
|
||||
return fTransportType;
|
||||
}
|
||||
|
||||
FairMQTransportFactorySHM::~FairMQTransportFactorySHM()
|
||||
TransportFactory::~TransportFactory()
|
||||
{
|
||||
LOG(debug) << "Destroying Shared Memory transport...";
|
||||
fSendHeartbeats = false;
|
||||
|
@ -181,3 +187,7 @@ FairMQTransportFactorySHM::~FairMQTransportFactorySHM()
|
|||
LOG(error) << "context not available for shutdown";
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace shmem
|
||||
} // namespace mq
|
||||
} // namespace fair
|
78
fairmq/shmem/TransportFactory.h
Normal file
78
fairmq/shmem/TransportFactory.h
Normal file
|
@ -0,0 +1,78 @@
|
|||
/********************************************************************************
|
||||
* Copyright (C) 2016-2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
|
||||
#ifndef FAIR_MQ_SHMEM_TRANSPORTFACTORY_H_
|
||||
#define FAIR_MQ_SHMEM_TRANSPORTFACTORY_H_
|
||||
|
||||
#include "Manager.h"
|
||||
#include "Common.h"
|
||||
#include "Message.h"
|
||||
#include "Socket.h"
|
||||
#include "Poller.h"
|
||||
#include "UnmanagedRegion.h"
|
||||
|
||||
#include <FairMQTransportFactory.h>
|
||||
#include <fairmq/ProgOptions.h>
|
||||
|
||||
#include <vector>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <atomic>
|
||||
|
||||
namespace fair
|
||||
{
|
||||
namespace mq
|
||||
{
|
||||
namespace shmem
|
||||
{
|
||||
|
||||
class TransportFactory final : public fair::mq::TransportFactory
|
||||
{
|
||||
public:
|
||||
TransportFactory(const std::string& id = "", const ProgOptions* config = nullptr);
|
||||
TransportFactory(const TransportFactory&) = delete;
|
||||
TransportFactory operator=(const TransportFactory&) = delete;
|
||||
|
||||
MessagePtr CreateMessage() override;
|
||||
MessagePtr CreateMessage(const size_t size) override;
|
||||
MessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override;
|
||||
MessagePtr CreateMessage(UnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0) override;
|
||||
|
||||
SocketPtr CreateSocket(const std::string& type, const std::string& name) override;
|
||||
|
||||
PollerPtr CreatePoller(const std::vector<FairMQChannel>& channels) const override;
|
||||
PollerPtr CreatePoller(const std::vector<FairMQChannel*>& channels) const override;
|
||||
PollerPtr CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const override;
|
||||
|
||||
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) const override;
|
||||
|
||||
Transport GetType() const override;
|
||||
|
||||
void Interrupt() override { Socket::Interrupt(); }
|
||||
void Resume() override { Socket::Resume(); }
|
||||
void Reset() override {}
|
||||
|
||||
~TransportFactory() override;
|
||||
|
||||
private:
|
||||
void SendHeartbeats();
|
||||
|
||||
static Transport fTransportType;
|
||||
std::string fDeviceId;
|
||||
std::string fShmId;
|
||||
void* fZMQContext;
|
||||
std::unique_ptr<Manager> fManager;
|
||||
std::thread fHeartbeatThread;
|
||||
std::atomic<bool> fSendHeartbeats;
|
||||
};
|
||||
|
||||
} // namespace shmem
|
||||
} // namespace mq
|
||||
} // namespace fair
|
||||
|
||||
#endif /* FAIR_MQ_SHMEM_TRANSPORTFACTORY_H_ */
|
|
@ -7,15 +7,20 @@
|
|||
********************************************************************************/
|
||||
|
||||
#include "Common.h"
|
||||
|
||||
#include "FairMQUnmanagedRegionSHM.h"
|
||||
#include "UnmanagedRegion.h"
|
||||
|
||||
using namespace std;
|
||||
using namespace fair::mq::shmem;
|
||||
|
||||
namespace bipc = ::boost::interprocess;
|
||||
|
||||
FairMQUnmanagedRegionSHM::FairMQUnmanagedRegionSHM(Manager& manager, const size_t size, FairMQRegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */)
|
||||
namespace fair
|
||||
{
|
||||
namespace mq
|
||||
{
|
||||
namespace shmem
|
||||
{
|
||||
|
||||
UnmanagedRegion::UnmanagedRegion(Manager& manager, const size_t size, RegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */)
|
||||
: fManager(manager)
|
||||
, fRegion(nullptr)
|
||||
, fRegionId(0)
|
||||
|
@ -41,3 +46,7 @@ FairMQUnmanagedRegionSHM::FairMQUnmanagedRegionSHM(Manager& manager, const size_
|
|||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
|
@ -6,13 +6,13 @@
|
|||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
|
||||
#ifndef FAIRMQUNMANAGEDREGIONSHM_H_
|
||||
#define FAIRMQUNMANAGEDREGIONSHM_H_
|
||||
#ifndef FAIR_MQ_SHMEM_UNMANAGEDREGION_H_
|
||||
#define FAIR_MQ_SHMEM_UNMANAGEDREGION_H_
|
||||
|
||||
#include <fairmq/shmem/Manager.h>
|
||||
#include "Manager.h"
|
||||
|
||||
#include "FairMQUnmanagedRegion.h"
|
||||
#include "FairMQLogger.h"
|
||||
#include <FairMQUnmanagedRegion.h>
|
||||
#include <FairMQLogger.h>
|
||||
|
||||
#include <boost/interprocess/shared_memory_object.hpp>
|
||||
#include <boost/interprocess/mapped_region.hpp>
|
||||
|
@ -20,23 +20,37 @@
|
|||
#include <cstddef> // size_t
|
||||
#include <string>
|
||||
|
||||
class FairMQUnmanagedRegionSHM final : public FairMQUnmanagedRegion
|
||||
namespace fair
|
||||
{
|
||||
friend class FairMQSocketSHM;
|
||||
friend class FairMQMessageSHM;
|
||||
namespace mq
|
||||
{
|
||||
namespace shmem
|
||||
{
|
||||
|
||||
class Message;
|
||||
class Socket;
|
||||
|
||||
class UnmanagedRegion final : public fair::mq::UnmanagedRegion
|
||||
{
|
||||
friend class Message;
|
||||
friend class Socket;
|
||||
|
||||
public:
|
||||
FairMQUnmanagedRegionSHM(fair::mq::shmem::Manager& manager, const size_t size, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0);
|
||||
UnmanagedRegion(Manager& manager, const size_t size, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0);
|
||||
|
||||
void* GetData() const override { return fRegion->get_address(); }
|
||||
size_t GetSize() const override { return fRegion->get_size(); }
|
||||
|
||||
~FairMQUnmanagedRegionSHM() override { fManager.RemoveRegion(fRegionId); }
|
||||
~UnmanagedRegion() override { fManager.RemoveRegion(fRegionId); }
|
||||
|
||||
private:
|
||||
fair::mq::shmem::Manager& fManager;
|
||||
Manager& fManager;
|
||||
boost::interprocess::mapped_region* fRegion;
|
||||
uint64_t fRegionId;
|
||||
};
|
||||
|
||||
#endif /* FAIRMQUNMANAGEDREGIONSHM_H_ */
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#endif /* FAIR_MQ_SHMEM_UNMANAGEDREGION_H_ */
|
|
@ -5,8 +5,8 @@
|
|||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
#include <fairmq/shmem/Monitor.h>
|
||||
#include <fairmq/shmem/Common.h>
|
||||
#include "Monitor.h"
|
||||
#include "Common.h"
|
||||
|
||||
#include <boost/program_options.hpp>
|
||||
|
||||
|
|
|
@ -74,11 +74,6 @@ FairMQSocketZMQ::FairMQSocketZMQ(const string& type, const string& name, const s
|
|||
LOG(debug) << "Created socket " << GetId();
|
||||
}
|
||||
|
||||
string FairMQSocketZMQ::GetId()
|
||||
{
|
||||
return fId;
|
||||
}
|
||||
|
||||
bool FairMQSocketZMQ::Bind(const string& address)
|
||||
{
|
||||
// LOG(info) << "bind socket " << fId << " on " << address;
|
||||
|
|
|
@ -24,7 +24,7 @@ class FairMQSocketZMQ final : public FairMQSocket
|
|||
FairMQSocketZMQ(const FairMQSocketZMQ&) = delete;
|
||||
FairMQSocketZMQ operator=(const FairMQSocketZMQ&) = delete;
|
||||
|
||||
std::string GetId() override;
|
||||
std::string GetId() const override { return fId; }
|
||||
|
||||
bool Bind(const std::string& address) override;
|
||||
bool Connect(const std::string& address) override;
|
||||
|
|
Loading…
Reference in New Issue
Block a user