Shmem region: support huge pages via path to hugetlbfs mount

This commit is contained in:
Alexey Rybalchenko 2019-07-03 14:54:54 +02:00 committed by Dennis Klein
parent a8c76accdc
commit 0e35f1cb22
29 changed files with 466 additions and 537 deletions

View File

@ -40,8 +40,7 @@ void Sampler::InitTask()
10000000,
[this](void* /*data*/, size_t /*size*/, void* /*hint*/) { // callback to be called when message buffers no longer needed by transport
--fNumUnackedMsgs;
if (fMaxIterations > 0)
{
if (fMaxIterations > 0) {
LOG(debug) << "Received ack";
}
}
@ -58,12 +57,14 @@ bool Sampler::ConditionalRun()
nullptr // hint
));
if (Send(msg, "data", 0) > 0)
{
// static_cast<char*>(fRegion->GetData())[3] = 97;
// LOG(info) << "check: " << static_cast<char*>(fRegion->GetData())[3];
// std::this_thread::sleep_for(std::chrono::seconds(1));
if (Send(msg, "data", 0) > 0) {
++fNumUnackedMsgs;
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations)
{
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
return false;
}
@ -75,8 +76,7 @@ bool Sampler::ConditionalRun()
void Sampler::ResetTask()
{
// if not all messages acknowledged, wait for a bit. But only once, since receiver could be already dead.
if (fNumUnackedMsgs != 0)
{
if (fNumUnackedMsgs != 0) {
LOG(debug) << "waiting for all acknowledgements... (" << fNumUnackedMsgs << ")";
this_thread::sleep_for(chrono::milliseconds(500));
LOG(debug) << "done, still unacked: " << fNumUnackedMsgs;

View File

@ -35,14 +35,15 @@ void Sink::Run()
{
FairMQChannel& dataInChannel = fChannels.at("data").at(0);
while (!NewStatePending())
{
while (!NewStatePending()) {
FairMQMessagePtr msg(dataInChannel.Transport()->CreateMessage());
dataInChannel.Receive(msg);
// void* ptr = msg->GetData();
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations)
{
// void* ptr = msg->GetData();
// char* cptr = static_cast<char*>(ptr);
// LOG(info) << "check: " << cptr[3];
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
break;
}

View File

@ -345,9 +345,9 @@ class FairMQChannel
return Transport()->NewStaticMessage(data);
}
FairMQUnmanagedRegionPtr NewUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr)
FairMQUnmanagedRegionPtr NewUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0)
{
return Transport()->CreateUnmanagedRegion(size, callback);
return Transport()->CreateUnmanagedRegion(size, callback, path, flags);
}
private:

View File

@ -240,9 +240,9 @@ class FairMQDevice
}
// creates unmanaged region with the transport of the specified channel
FairMQUnmanagedRegionPtr NewUnmanagedRegionFor(const std::string& channel, int index, const size_t size, FairMQRegionCallback callback = nullptr)
FairMQUnmanagedRegionPtr NewUnmanagedRegionFor(const std::string& channel, int index, const size_t size, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0)
{
return GetChannel(channel, index).NewUnmanagedRegion(size, callback);
return GetChannel(channel, index).NewUnmanagedRegion(size, callback, path, flags);
}
template<typename ...Ts>

View File

@ -72,7 +72,7 @@ class FairMQTransportFactory
/// Create a poller for specific channels (all subchannels)
virtual FairMQPollerPtr CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const = 0;
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr) const = 0;
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) const = 0;
/// Get transport type
virtual fair::mq::Transport GetType() const = 0;

View File

@ -65,9 +65,9 @@ FairMQPollerPtr FairMQTransportFactoryNN::CreatePoller(const unordered_map<strin
return unique_ptr<FairMQPoller>(new FairMQPollerNN(channelsMap, channelList));
}
FairMQUnmanagedRegionPtr FairMQTransportFactoryNN::CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback) const
FairMQUnmanagedRegionPtr FairMQTransportFactoryNN::CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */) const
{
return unique_ptr<FairMQUnmanagedRegion>(new FairMQUnmanagedRegionNN(size, callback));
return unique_ptr<FairMQUnmanagedRegion>(new FairMQUnmanagedRegionNN(size, callback, path, flags));
}
fair::mq::Transport FairMQTransportFactoryNN::GetType() const

View File

@ -36,7 +36,7 @@ class FairMQTransportFactoryNN final : public FairMQTransportFactory
FairMQPollerPtr CreatePoller(const std::vector<FairMQChannel*>& channels) const override;
FairMQPollerPtr CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const override;
FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback) const override;
FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback, const std::string& path = "", int flags = 0) const override;
fair::mq::Transport GetType() const override;

View File

@ -11,7 +11,7 @@
using namespace std;
FairMQUnmanagedRegionNN::FairMQUnmanagedRegionNN(const size_t size, FairMQRegionCallback callback)
FairMQUnmanagedRegionNN::FairMQUnmanagedRegionNN(const size_t size, FairMQRegionCallback callback, const std::string& /*path = "" */, int /*flags = 0 */)
: fBuffer(malloc(size))
, fSize(size)
, fCallback(callback)

View File

@ -12,13 +12,14 @@
#include "FairMQUnmanagedRegion.h"
#include <cstddef> // size_t
#include <string>
class FairMQUnmanagedRegionNN final : public FairMQUnmanagedRegion
{
friend class FairMQSocketNN;
public:
FairMQUnmanagedRegionNN(const size_t size, FairMQRegionCallback callback);
FairMQUnmanagedRegionNN(const size_t size, FairMQRegionCallback callback, const std::string& path = "", int flags = 0);
FairMQUnmanagedRegionNN(const FairMQUnmanagedRegionNN&) = delete;
FairMQUnmanagedRegionNN operator=(const FairMQUnmanagedRegionNN&) = delete;

View File

@ -85,7 +85,7 @@ auto TransportFactory::CreatePoller(const unordered_map<string, vector<FairMQCha
// return PollerPtr{new Poller(channelsMap, channelList)};
}
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, FairMQRegionCallback /*callback*/) const -> UnmanagedRegionPtr
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, FairMQRegionCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */) const -> UnmanagedRegionPtr
{
throw runtime_error{"Not yet implemented UMR."};
}

View File

@ -46,7 +46,7 @@ class TransportFactory final : public FairMQTransportFactory
auto CreatePoller(const std::vector<FairMQChannel*>& channels) const -> PollerPtr override;
auto CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const -> PollerPtr override;
auto CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr) const -> UnmanagedRegionPtr override;
auto CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) const -> UnmanagedRegionPtr override;
auto GetType() const -> Transport override;

View File

@ -10,8 +10,13 @@
#include <atomic>
#include <string>
#include <unordered_map>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/allocators/allocator.hpp>
#include <boost/interprocess/containers/map.hpp>
#include <boost/interprocess/containers/string.hpp>
#include <boost/interprocess/containers/vector.hpp>
#include <boost/functional/hash.hpp>
#include <unistd.h>
@ -24,6 +29,32 @@ namespace mq
namespace shmem
{
using SegmentManager = boost::interprocess::managed_shared_memory::segment_manager;
using VoidAlloc = boost::interprocess::allocator<void, SegmentManager>;
using CharAlloc = boost::interprocess::allocator<char, SegmentManager>;
using Str = boost::interprocess::basic_string<char, std::char_traits<char>, CharAlloc>;
using StrAlloc = boost::interprocess::allocator<Str, SegmentManager>;
using StrVector = boost::interprocess::vector<Str, StrAlloc>;
struct RegionInfo
{
RegionInfo(const VoidAlloc& alloc)
: fPath("", alloc)
, fFlags(0)
{}
RegionInfo(const char* path, int flags, const VoidAlloc& alloc)
: fPath(path, alloc)
, fFlags(flags)
{}
Str fPath;
int fFlags;
};
using Uint64RegionInfoPairAlloc = boost::interprocess::allocator<std::pair<const uint64_t, RegionInfo>, SegmentManager>;
using Uint64RegionInfoMap = boost::interprocess::map<uint64_t, RegionInfo, std::less<uint64_t>, Uint64RegionInfoPairAlloc>;
struct DeviceCounter
{
DeviceCounter(unsigned int c)
@ -35,11 +66,11 @@ struct DeviceCounter
struct RegionCounter
{
RegionCounter(unsigned int c)
RegionCounter(uint64_t c)
: fCount(c)
{}
std::atomic<unsigned int> fCount;
std::atomic<uint64_t> fCount;
};
struct MonitorStatus

View File

@ -223,25 +223,16 @@ zmq_msg_t* FairMQMessageSHM::GetMessage()
void* FairMQMessageSHM::GetData() const
{
if (fLocalPtr)
{
if (fLocalPtr) {
return fLocalPtr;
}
else
{
if (fRegionId == 0)
{
} else {
if (fRegionId == 0) {
return fManager.Segment().get_address_from_handle(fHandle);
}
else
{
} else {
fRegionPtr = fManager.GetRemoteRegion(fRegionId);
if (fRegionPtr)
{
if (fRegionPtr) {
fLocalPtr = reinterpret_cast<char*>(fRegionPtr->fRegion.get_address()) + fHandle;
}
else
{
} else {
// LOG(warn) << "could not get pointer from a region message";
fLocalPtr = nullptr;
}
@ -257,15 +248,10 @@ size_t FairMQMessageSHM::GetSize() const
bool FairMQMessageSHM::SetUsedSize(const size_t size)
{
if (size == fSize)
{
if (size == fSize) {
return true;
}
else if (size <= fSize)
{
try
{
} else if (size <= fSize) {
try {
bipc::managed_shared_memory::size_type shrunkSize = size;
fLocalPtr = fManager.Segment().allocation_command<char>(bipc::shrink_in_place, fSize + 128, shrunkSize, fLocalPtr);
fSize = size;
@ -274,15 +260,11 @@ bool FairMQMessageSHM::SetUsedSize(const size_t size)
MetaHeader* hdrPtr = static_cast<MetaHeader*>(zmq_msg_data(&fMessage));
hdrPtr->fSize = fSize;
return true;
}
catch (bipc::interprocess_exception& e)
{
} catch (bipc::interprocess_exception& e) {
LOG(info) << "could not set used size: " << e.what();
return false;
}
}
else
{
} else {
LOG(error) << "cannot set used size higher than original.";
return false;
}

View File

@ -375,20 +375,20 @@ int64_t FairMQSocketSHM::Receive(vector<FairMQMessagePtr>& msgVec, const int tim
for (size_t m = 0; m < numMessages; m++)
{
MetaHeader metaHeader;
memcpy(&metaHeader, &hdrVec[m], sizeof(MetaHeader));
MetaHeader hdr;
memcpy(&hdr, &hdrVec[m], sizeof(MetaHeader));
msgVec.emplace_back(fair::mq::tools::make_unique<FairMQMessageSHM>(fManager, GetTransport()));
FairMQMessageSHM* msg = static_cast<FairMQMessageSHM*>(msgVec.back().get());
MetaHeader* msgHdr = static_cast<MetaHeader*>(zmq_msg_data(msg->GetMessage()));
memcpy(msgHdr, &metaHeader, sizeof(MetaHeader));
memcpy(msgHdr, &hdr, sizeof(MetaHeader));
msg->fHandle = metaHeader.fHandle;
msg->fSize = metaHeader.fSize;
msg->fRegionId = metaHeader.fRegionId;
msg->fHint = metaHeader.fHint;
msg->fHandle = hdr.fHandle;
msg->fSize = hdr.fSize;
msg->fRegionId = hdr.fRegionId;
msg->fHint = hdr.fHint;
totalSize += msg->GetSize();
}

View File

@ -9,11 +9,11 @@
#include "FairMQLogger.h"
#include "FairMQTransportFactorySHM.h"
#include <fairmq/Tools.h>
#include <zmq.h>
#include <boost/version.hpp>
#include <boost/process.hpp>
#include <boost/filesystem.hpp>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
@ -28,7 +28,6 @@
using namespace std;
using namespace fair::mq::shmem;
namespace bfs = ::boost::filesystem;
namespace bpt = ::boost::posix_time;
namespace bipc = ::boost::interprocess;
@ -38,175 +37,73 @@ FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const Fai
: FairMQTransportFactory(id)
, fDeviceId(id)
, fShmId()
, fContext(nullptr)
, fZMQContext(nullptr)
, fManager(nullptr)
, fHeartbeatThread()
, fSendHeartbeats(true)
, fShMutex(nullptr)
, fDeviceCounter(nullptr)
, fManager(nullptr)
{
int major, minor, patch;
zmq_version(&major, &minor, &patch);
LOG(debug) << "Transport: Using ZeroMQ (" << major << "." << minor << "." << patch << ") & "
<< "boost::interprocess (" << (BOOST_VERSION / 100000) << "." << (BOOST_VERSION / 100 % 1000) << "." << (BOOST_VERSION % 100) << ")";
fContext = zmq_ctx_new();
if (!fContext)
{
LOG(error) << "failed creating context, reason: " << zmq_strerror(errno);
exit(EXIT_FAILURE);
fZMQContext = zmq_ctx_new();
if (!fZMQContext) {
throw runtime_error(fair::mq::tools::ToString("failed creating context, reason: ", zmq_strerror(errno)));
}
int numIoThreads = 1;
string sessionName = "default";
size_t segmentSize = 2000000000;
bool autolaunchMonitor = false;
if (config)
{
if (config) {
numIoThreads = config->GetValue<int>("io-threads");
sessionName = config->GetValue<string>("session");
segmentSize = config->GetValue<size_t>("shm-segment-size");
autolaunchMonitor = config->GetValue<bool>("shm-monitor");
}
else
{
} else {
LOG(debug) << "FairMQProgOptions not available! Using defaults.";
}
fShmId = buildShmIdFromSessionIdAndUserId(sessionName);
try
{
fShMutex = fair::mq::tools::make_unique<bipc::named_mutex>(bipc::open_or_create, string("fmq_" + fShmId + "_mtx").c_str());
if (zmq_ctx_set(fContext, ZMQ_IO_THREADS, numIoThreads) != 0)
{
try {
if (zmq_ctx_set(fZMQContext, ZMQ_IO_THREADS, numIoThreads) != 0) {
LOG(error) << "failed configuring context, reason: " << zmq_strerror(errno);
}
// Set the maximum number of allowed sockets on the context.
if (zmq_ctx_set(fContext, ZMQ_MAX_SOCKETS, 10000) != 0)
{
if (zmq_ctx_set(fZMQContext, ZMQ_MAX_SOCKETS, 10000) != 0) {
LOG(error) << "failed configuring context, reason: " << zmq_strerror(errno);
}
fManager = fair::mq::tools::make_unique<Manager>(fShmId, segmentSize);
LOG(debug) << "created/opened shared memory segment '" << "fmq_" << fShmId << "_main" << "' of " << segmentSize << " bytes. Available are " << fManager->Segment().get_free_memory() << " bytes.";
{
bipc::scoped_lock<bipc::named_mutex> lock(*fShMutex);
fDeviceCounter = fManager->Segment().find<DeviceCounter>(bipc::unique_instance).first;
if (fDeviceCounter)
{
LOG(debug) << "device counter found, with value of " << fDeviceCounter->fCount << ". incrementing.";
(fDeviceCounter->fCount)++;
LOG(debug) << "incremented device counter, now: " << fDeviceCounter->fCount;
}
else
{
LOG(debug) << "no device counter found, creating one and initializing with 1";
fDeviceCounter = fManager->Segment().construct<DeviceCounter>(bipc::unique_instance)(1);
LOG(debug) << "initialized device counter with: " << fDeviceCounter->fCount;
}
// start shm monitor
if (autolaunchMonitor)
{
try
{
MonitorStatus* monitorStatus = fManager->ManagementSegment().find<MonitorStatus>(bipc::unique_instance).first;
if (monitorStatus == nullptr)
{
LOG(debug) << "no fairmq-shmmonitor found, starting...";
StartMonitor();
}
else
{
LOG(debug) << "found fairmq-shmmonitor.";
}
}
catch (exception& e)
{
LOG(error) << "Exception during fairmq-shmmonitor initialization: " << e.what() << ", application will now exit";
exit(EXIT_FAILURE);
}
}
if (autolaunchMonitor) {
fManager->StartMonitor();
}
}
catch(bipc::interprocess_exception& e)
{
} catch (bipc::interprocess_exception& e) {
LOG(error) << "Could not initialize shared memory transport: " << e.what();
throw runtime_error("Cannot update configuration. Socket method (bind/connect) not specified.");
throw runtime_error(fair::mq::tools::ToString("Could not initialize shared memory transport: ", e.what()));
}
fSendHeartbeats = true;
fHeartbeatThread = thread(&FairMQTransportFactorySHM::SendHeartbeats, this);
}
void FairMQTransportFactorySHM::StartMonitor()
{
auto env = boost::this_process::environment();
vector<bfs::path> ownPath = boost::this_process::path();
if (const char* fmqp = getenv("FAIRMQ_PATH"))
{
ownPath.insert(ownPath.begin(), bfs::path(fmqp));
}
bfs::path p = boost::process::search_path("fairmq-shmmonitor", ownPath);
if (!p.empty())
{
boost::process::spawn(p, "-x", "--shmid", fShmId, "-d", "-t", "2000", env);
int numTries = 0;
do
{
MonitorStatus* monitorStatus = fManager->ManagementSegment().find<MonitorStatus>(bipc::unique_instance).first;
if (monitorStatus)
{
LOG(debug) << "fairmq-shmmonitor started";
break;
}
else
{
this_thread::sleep_for(chrono::milliseconds(10));
if (++numTries > 1000)
{
LOG(error) << "Did not get response from fairmq-shmmonitor after " << 10 * 1000 << " milliseconds. Exiting.";
exit(EXIT_FAILURE);
}
}
}
while (true);
}
else
{
LOG(warn) << "could not find fairmq-shmmonitor in the path";
}
}
void FairMQTransportFactorySHM::SendHeartbeats()
{
string controlQueueName("fmq_" + fShmId + "_cq");
while (fSendHeartbeats)
{
try
{
while (fSendHeartbeats) {
try {
bipc::message_queue mq(bipc::open_only, controlQueueName.c_str());
bpt::ptime sndTill = bpt::microsec_clock::universal_time() + bpt::milliseconds(100);
if (mq.timed_send(fDeviceId.c_str(), fDeviceId.size(), 0, sndTill))
{
if (mq.timed_send(fDeviceId.c_str(), fDeviceId.size(), 0, sndTill)) {
this_thread::sleep_for(chrono::milliseconds(100));
}
else
{
} else {
LOG(debug) << "control queue timeout";
}
}
catch (bipc::interprocess_exception& ie)
{
} catch (bipc::interprocess_exception& ie) {
this_thread::sleep_for(chrono::milliseconds(500));
// LOG(warn) << "no " << controlQueueName << " found";
}
@ -235,8 +132,8 @@ FairMQMessagePtr FairMQTransportFactorySHM::CreateMessage(FairMQUnmanagedRegionP
FairMQSocketPtr FairMQTransportFactorySHM::CreateSocket(const string& type, const string& name)
{
assert(fContext);
return unique_ptr<FairMQSocket>(new FairMQSocketSHM(*fManager, type, name, GetId(), fContext, this));
assert(fZMQContext);
return unique_ptr<FairMQSocket>(new FairMQSocketSHM(*fManager, type, name, GetId(), fZMQContext, this));
}
FairMQPollerPtr FairMQTransportFactorySHM::CreatePoller(const vector<FairMQChannel>& channels) const
@ -254,9 +151,14 @@ FairMQPollerPtr FairMQTransportFactorySHM::CreatePoller(const unordered_map<stri
return unique_ptr<FairMQPoller>(new FairMQPollerSHM(channelsMap, channelList));
}
FairMQUnmanagedRegionPtr FairMQTransportFactorySHM::CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback) const
FairMQUnmanagedRegionPtr FairMQTransportFactorySHM::CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */) const
{
return unique_ptr<FairMQUnmanagedRegion>(new FairMQUnmanagedRegionSHM(*fManager, size, callback));
return unique_ptr<FairMQUnmanagedRegion>(new FairMQUnmanagedRegionSHM(*fManager, size, callback, path, flags));
}
fair::mq::Transport FairMQTransportFactorySHM::GetType() const
{
return fTransportType;
}
FairMQTransportFactorySHM::~FairMQTransportFactorySHM()
@ -264,53 +166,16 @@ FairMQTransportFactorySHM::~FairMQTransportFactorySHM()
fSendHeartbeats = false;
fHeartbeatThread.join();
if (fContext)
{
if (zmq_ctx_term(fContext) != 0)
{
if (errno == EINTR)
{
if (fZMQContext) {
if (zmq_ctx_term(fZMQContext) != 0) {
if (errno == EINTR) {
LOG(error) << "failed closing context, reason: " << zmq_strerror(errno);
}
else
{
fContext = nullptr;
} else {
fZMQContext = nullptr;
return;
}
}
}
else
{
} else {
LOG(error) << "context not available for shutdown";
}
bool lastRemoved = false;
{ // mutex scope
bipc::scoped_lock<bipc::named_mutex> lock(*fShMutex);
(fDeviceCounter->fCount)--;
if (fDeviceCounter->fCount == 0)
{
LOG(debug) << "last segment user, removing segment.";
fManager->RemoveSegment();
lastRemoved = true;
}
else
{
LOG(debug) << "other segment users present (" << fDeviceCounter->fCount << "), not removing it.";
}
}
if (lastRemoved)
{
bipc::named_mutex::remove(string("fmq_" + fShmId + "_mtx").c_str());
}
}
fair::mq::Transport FairMQTransportFactorySHM::GetType() const
{
return fTransportType;
}

View File

@ -19,8 +19,6 @@
#include "FairMQUnmanagedRegionSHM.h"
#include <options/FairMQProgOptions.h>
#include <boost/interprocess/sync/named_mutex.hpp>
#include <vector>
#include <string>
#include <thread>
@ -44,7 +42,7 @@ class FairMQTransportFactorySHM final : public FairMQTransportFactory
FairMQPollerPtr CreatePoller(const std::vector<FairMQChannel*>& channels) const override;
FairMQPollerPtr CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const override;
FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr) const override;
FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) const override;
fair::mq::Transport GetType() const override;
@ -56,17 +54,14 @@ class FairMQTransportFactorySHM final : public FairMQTransportFactory
private:
void SendHeartbeats();
void StartMonitor();
static fair::mq::Transport fTransportType;
std::string fDeviceId;
std::string fShmId;
void* fContext;
void* fZMQContext;
std::unique_ptr<fair::mq::shmem::Manager> fManager;
std::thread fHeartbeatThread;
std::atomic<bool> fSendHeartbeats;
std::unique_ptr<boost::interprocess::named_mutex> fShMutex;
fair::mq::shmem::DeviceCounter* fDeviceCounter;
std::unique_ptr<fair::mq::shmem::Manager> fManager;
};
#endif /* FAIRMQTRANSPORTFACTORYSHM_H_ */

View File

@ -15,22 +15,18 @@ using namespace fair::mq::shmem;
namespace bipc = ::boost::interprocess;
FairMQUnmanagedRegionSHM::FairMQUnmanagedRegionSHM(Manager& manager, const size_t size, FairMQRegionCallback callback)
FairMQUnmanagedRegionSHM::FairMQUnmanagedRegionSHM(Manager& manager, const size_t size, FairMQRegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */)
: fManager(manager)
, fRegion(nullptr)
, fRegionId(0)
{
try
{
try {
RegionCounter* rc = fManager.ManagementSegment().find<RegionCounter>(bipc::unique_instance).first;
if (rc)
{
if (rc) {
LOG(debug) << "region counter found, with value of " << rc->fCount << ". incrementing.";
(rc->fCount)++;
LOG(debug) << "incremented region counter, now: " << rc->fCount;
}
else
{
} else {
LOG(debug) << "no region counter found, creating one and initializing with 1";
rc = fManager.ManagementSegment().construct<RegionCounter>(bipc::unique_instance)(1);
LOG(debug) << "initialized region counter with: " << rc->fCount;
@ -38,13 +34,11 @@ FairMQUnmanagedRegionSHM::FairMQUnmanagedRegionSHM(Manager& manager, const size_
fRegionId = rc->fCount;
fRegion = fManager.CreateRegion(size, fRegionId, callback);
}
catch (bipc::interprocess_exception& e)
{
fRegion = fManager.CreateRegion(size, fRegionId, callback, path, flags);
} catch (bipc::interprocess_exception& e) {
LOG(error) << "cannot create region. Already created/not cleaned up?";
LOG(error) << e.what();
exit(EXIT_FAILURE);
throw;
}
}

View File

@ -18,6 +18,7 @@
#include <boost/interprocess/mapped_region.hpp>
#include <cstddef> // size_t
#include <string>
class FairMQUnmanagedRegionSHM final : public FairMQUnmanagedRegion
{
@ -25,7 +26,7 @@ class FairMQUnmanagedRegionSHM final : public FairMQUnmanagedRegion
friend class FairMQMessageSHM;
public:
FairMQUnmanagedRegionSHM(fair::mq::shmem::Manager& manager, const size_t size, FairMQRegionCallback callback = nullptr);
FairMQUnmanagedRegionSHM(fair::mq::shmem::Manager& manager, const size_t size, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0);
void* GetData() const override;
size_t GetSize() const override;

View File

@ -9,8 +9,12 @@
#include <fairmq/shmem/Manager.h>
#include <fairmq/shmem/Common.h>
#include <boost/process.hpp>
#include <boost/filesystem.hpp>
using namespace std;
namespace bipc = ::boost::interprocess;
namespace bfs = ::boost::filesystem;
namespace fair
{
@ -21,19 +25,86 @@ namespace shmem
std::unordered_map<uint64_t, std::unique_ptr<Region>> Manager::fRegions;
Manager::Manager(const string& name, size_t size)
: fSessionName(name)
, fSegmentName("fmq_" + fSessionName + "_main")
, fManagementSegmentName("fmq_" + fSessionName + "_mng")
Manager::Manager(const std::string& id, size_t size)
: fShmId(id)
, fSegmentName("fmq_" + fShmId + "_main")
, fManagementSegmentName("fmq_" + fShmId + "_mng")
, fSegment(bipc::open_or_create, fSegmentName.c_str(), size)
, fManagementSegment(bipc::open_or_create, fManagementSegmentName.c_str(), 65536)
{}
, fShmMtx(bipc::open_or_create, string("fmq_" + fShmId + "_mtx").c_str())
, fDeviceCounter(nullptr)
{
LOG(debug) << "created/opened shared memory segment '" << "fmq_" << fShmId << "_main" << "' of " << size << " bytes. Available are " << fSegment.get_free_memory() << " bytes.";
bipc::scoped_lock<bipc::named_mutex> lock(fShmMtx);
fDeviceCounter = fManagementSegment.find<DeviceCounter>(bipc::unique_instance).first;
if (fDeviceCounter) {
LOG(debug) << "device counter found, with value of " << fDeviceCounter->fCount << ". incrementing.";
(fDeviceCounter->fCount)++;
LOG(debug) << "incremented device counter, now: " << fDeviceCounter->fCount;
} else {
LOG(debug) << "no device counter found, creating one and initializing with 1";
fDeviceCounter = fManagementSegment.construct<DeviceCounter>(bipc::unique_instance)(1);
LOG(debug) << "initialized device counter with: " << fDeviceCounter->fCount;
}
}
bipc::managed_shared_memory& Manager::Segment()
{
return fSegment;
}
bipc::managed_shared_memory& Manager::ManagementSegment()
{
return fManagementSegment;
}
void Manager::StartMonitor()
{
try {
MonitorStatus* monitorStatus = fManagementSegment.find<MonitorStatus>(bipc::unique_instance).first;
if (monitorStatus == nullptr) {
LOG(debug) << "no fairmq-shmmonitor found, starting...";
auto env = boost::this_process::environment();
vector<bfs::path> ownPath = boost::this_process::path();
if (const char* fmqp = getenv("FAIRMQ_PATH")) {
ownPath.insert(ownPath.begin(), bfs::path(fmqp));
}
bfs::path p = boost::process::search_path("fairmq-shmmonitor", ownPath);
if (!p.empty()) {
boost::process::spawn(p, "-x", "--shmid", fShmId, "-d", "-t", "2000", env);
int numTries = 0;
do {
monitorStatus = fManagementSegment.find<MonitorStatus>(bipc::unique_instance).first;
if (monitorStatus) {
LOG(debug) << "fairmq-shmmonitor started";
break;
} else {
this_thread::sleep_for(chrono::milliseconds(10));
if (++numTries > 1000) {
LOG(error) << "Did not get response from fairmq-shmmonitor after " << 10 * 1000 << " milliseconds. Exiting.";
throw runtime_error(fair::mq::tools::ToString("Did not get response from fairmq-shmmonitor after ", 10 * 1000, " milliseconds. Exiting."));
}
}
} while (true);
} else {
LOG(warn) << "could not find fairmq-shmmonitor in the path";
}
} else {
LOG(debug) << "found fairmq-shmmonitor.";
}
} catch (std::exception& e) {
LOG(error) << "Exception during fairmq-shmmonitor initialization: " << e.what() << ", application will now exit";
exit(EXIT_FAILURE);
}
}
void Manager::Interrupt()
{
}
@ -41,30 +112,31 @@ void Manager::Interrupt()
void Manager::Resume()
{
// close remote regions before processing new transfers
for (auto it = fRegions.begin(); it != fRegions.end(); /**/)
{
if (it->second->fRemote)
{
for (auto it = fRegions.begin(); it != fRegions.end(); /**/) {
if (it->second->fRemote) {
it = fRegions.erase(it);
}
else
{
} else {
++it;
}
}
}
bipc::mapped_region* Manager::CreateRegion(const size_t size, const uint64_t id, FairMQRegionCallback callback)
bipc::mapped_region* Manager::CreateRegion(const size_t size, const uint64_t id, FairMQRegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */)
{
auto it = fRegions.find(id);
if (it != fRegions.end())
{
if (it != fRegions.end()) {
LOG(error) << "Trying to create a region that already exists";
return nullptr;
}
else
{
auto r = fRegions.emplace(id, fair::mq::tools::make_unique<Region>(*this, id, size, false, callback));
} else {
// create region info
{
bipc::scoped_lock<bipc::named_mutex> lock(fShmMtx);
VoidAlloc voidAlloc(fManagementSegment.get_segment_manager());
Uint64RegionInfoMap* m = fManagementSegment.find_or_construct<Uint64RegionInfoMap>(bipc::unique_instance)(voidAlloc);
m->emplace(id, RegionInfo(path.c_str(), flags, voidAlloc));
}
auto r = fRegions.emplace(id, fair::mq::tools::make_unique<Region>(*this, id, size, false, callback, path, flags));
r.first->second->StartReceivingAcks();
@ -76,20 +148,28 @@ Region* Manager::GetRemoteRegion(const uint64_t id)
{
// remote region could actually be a local one if a message originates from this device (has been sent out and returned)
auto it = fRegions.find(id);
if (it != fRegions.end())
{
if (it != fRegions.end()) {
return it->second.get();
}
else
{
try
{
auto r = fRegions.emplace(id, fair::mq::tools::make_unique<Region>(*this, id, 0, true, nullptr));
} else {
try {
string path;
int flags;
// get region info
{
bipc::scoped_lock<bipc::named_mutex> lock(fShmMtx);
VoidAlloc voidAlloc(fSegment.get_segment_manager());
Uint64RegionInfoMap* m = fManagementSegment.find<Uint64RegionInfoMap>(bipc::unique_instance).first;
RegionInfo ri = m->at(id);
path = ri.fPath.c_str();
flags = ri.fFlags;
// LOG(debug) << "path: " << path << ", flags: " << flags;
}
auto r = fRegions.emplace(id, fair::mq::tools::make_unique<Region>(*this, id, 0, true, nullptr, path, flags));
return r.first->second.get();
}
catch (bipc::interprocess_exception& e)
{
// LOG(warn) << "remote region (" << id << ") no longer exists";
} catch (bipc::interprocess_exception& e) {
LOG(warn) << "Could not get remote region for id: " << id;
return nullptr;
}
@ -101,30 +181,43 @@ void Manager::RemoveRegion(const uint64_t id)
fRegions.erase(id);
}
void Manager::RemoveSegment()
void Manager::RemoveSegments()
{
if (bipc::shared_memory_object::remove(fSegmentName.c_str()))
{
if (bipc::shared_memory_object::remove(fSegmentName.c_str())) {
LOG(debug) << "successfully removed '" << fSegmentName << "' segment after the device has stopped.";
}
else
{
} else {
LOG(debug) << "did not remove " << fSegmentName << " segment after the device stopped. Already removed?";
}
if (bipc::shared_memory_object::remove(fManagementSegmentName.c_str()))
{
if (bipc::shared_memory_object::remove(fManagementSegmentName.c_str())) {
LOG(debug) << "successfully removed '" << fManagementSegmentName << "' segment after the device has stopped.";
}
else
{
} else {
LOG(debug) << "did not remove '" << fManagementSegmentName << "' segment after the device stopped. Already removed?";
}
}
bipc::managed_shared_memory& Manager::ManagementSegment()
Manager::~Manager()
{
return fManagementSegment;
bool lastRemoved = false;
{
bipc::scoped_lock<bipc::named_mutex> lock(fShmMtx);
(fDeviceCounter->fCount)--;
if (fDeviceCounter->fCount == 0) {
LOG(debug) << "last segment user, removing segment.";
RemoveSegments();
lastRemoved = true;
} else {
LOG(debug) << "other segment users present (" << fDeviceCounter->fCount << "), not removing it.";
}
}
if (lastRemoved) {
bipc::named_mutex::remove(string("fmq_" + fShmId + "_mtx").c_str());
}
}
} // namespace shmem

View File

@ -24,6 +24,7 @@
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/interprocess/sync/named_mutex.hpp>
#include <string>
#include <unordered_map>
@ -40,32 +41,41 @@ class Manager
friend struct Region;
public:
Manager(const std::string& name, size_t size);
Manager(const std::string& id, size_t size);
Manager() = delete;
Manager(const Manager&) = delete;
Manager operator=(const Manager&) = delete;
~Manager();
boost::interprocess::managed_shared_memory& Segment();
boost::interprocess::managed_shared_memory& ManagementSegment();
void StartMonitor();
static void Interrupt();
static void Resume();
boost::interprocess::mapped_region* CreateRegion(const size_t size, const uint64_t id, FairMQRegionCallback callback);
int GetDeviceCounter();
int IncrementDeviceCounter();
int DecrementDeviceCounter();
boost::interprocess::mapped_region* CreateRegion(const size_t size, const uint64_t id, FairMQRegionCallback callback, const std::string& path = "", int flags = 0);
Region* GetRemoteRegion(const uint64_t id);
void RemoveRegion(const uint64_t id);
void RemoveSegment();
boost::interprocess::managed_shared_memory& ManagementSegment();
void RemoveSegments();
private:
std::string fSessionName;
std::string fShmId;
std::string fSegmentName;
std::string fManagementSegmentName;
boost::interprocess::managed_shared_memory fSegment;
boost::interprocess::managed_shared_memory fManagementSegment;
boost::interprocess::named_mutex fShmMtx;
fair::mq::shmem::DeviceCounter* fDeviceCounter;
static std::unordered_map<uint64_t, std::unique_ptr<Region>> fRegions;
};

View File

@ -8,11 +8,10 @@
#include <fairmq/shmem/Monitor.h>
#include <fairmq/shmem/Common.h>
#include <fairmq/Tools.h>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/containers/vector.hpp>
#include <boost/interprocess/containers/string.hpp>
#include <boost/interprocess/allocators/allocator.hpp>
#include <boost/interprocess/file_mapping.hpp>
#include <boost/interprocess/sync/named_mutex.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
@ -29,11 +28,6 @@ using namespace std;
namespace bipc = ::boost::interprocess;
namespace bpt = ::boost::posix_time;
using CharAllocator = bipc::allocator<char, bipc::managed_shared_memory::segment_manager>;
using String = bipc::basic_string<char, char_traits<char>, CharAllocator>;
using StringAllocator = bipc::allocator<String, bipc::managed_shared_memory::segment_manager>;
using StringVector = bipc::vector<String, StringAllocator>;
namespace
{
volatile sig_atomic_t gSignalStatus = 0;
@ -70,8 +64,7 @@ Monitor::Monitor(const string& shmId, bool selfDestruct, bool interactive, unsig
, fDeviceHeartbeats()
{
MonitorStatus* monitorStatus = fManagementSegment.find<MonitorStatus>(bipc::unique_instance).first;
if (monitorStatus != nullptr)
{
if (monitorStatus != nullptr) {
cout << "fairmq-shmmonitor already started or not properly exited. Try `fairmq-shmmonitor --cleanup`" << endl;
exit(EXIT_FAILURE);
}
@ -89,16 +82,12 @@ void Monitor::CatchSignals()
void Monitor::SignalMonitor()
{
while (true)
{
if (gSignalStatus != 0)
{
while (true) {
if (gSignalStatus != 0) {
fTerminating = true;
cout << "signal: " << gSignalStatus << endl;
break;
}
else if (fTerminating)
{
} else if (fTerminating) {
break;
}
@ -110,14 +99,10 @@ void Monitor::Run()
{
thread heartbeatThread(&Monitor::MonitorHeartbeats, this);
if (fInteractive)
{
if (fInteractive) {
Interactive();
}
else
{
while (!fTerminating)
{
} else {
while (!fTerminating) {
this_thread::sleep_for(chrono::milliseconds(100));
CheckSegment();
}
@ -128,32 +113,25 @@ void Monitor::Run()
void Monitor::MonitorHeartbeats()
{
try
{
try {
bipc::message_queue mq(bipc::open_or_create, fControlQueueName.c_str(), 1000, 256);
unsigned int priority;
bipc::message_queue::size_type recvdSize;
char msg[256] = {0};
while (!fTerminating)
{
while (!fTerminating) {
bpt::ptime rcvTill = bpt::microsec_clock::universal_time() + bpt::milliseconds(100);
if (mq.timed_receive(&msg, sizeof(msg), recvdSize, priority, rcvTill))
{
if (mq.timed_receive(&msg, sizeof(msg), recvdSize, priority, rcvTill)) {
fHeartbeatTriggered = true;
fLastHeartbeat = chrono::high_resolution_clock::now();
string deviceId(msg, recvdSize);
fDeviceHeartbeats[deviceId] = fLastHeartbeat;
}
else
{
} else {
// cout << "control queue timeout" << endl;
}
}
}
catch (bipc::interprocess_exception& ie)
{
} catch (bipc::interprocess_exception& ie) {
cout << ie.what() << endl;
}
@ -178,19 +156,15 @@ void Monitor::Interactive()
cout << endl;
PrintHeader();
while (!fTerminating)
{
if (poll(cinfd, 1, 100))
{
if (fTerminating || gSignalStatus != 0)
{
while (!fTerminating) {
if (poll(cinfd, 1, 100)) {
if (fTerminating || gSignalStatus != 0) {
break;
}
c = getchar();
switch (c)
{
switch (c) {
case 'q':
cout << "\n[q] --> quitting." << endl;
fTerminating = true;
@ -216,23 +190,20 @@ void Monitor::Interactive()
break;
}
if (fTerminating)
{
if (fTerminating) {
break;
}
PrintHeader();
}
if (fTerminating)
{
if (fTerminating) {
break;
}
CheckSegment();
if (!fTerminating)
{
if (!fTerminating) {
cout << "\r";
}
}
@ -247,12 +218,10 @@ void Monitor::CheckSegment()
{
char c = '#';
if (fInteractive)
{
if (fInteractive) {
static uint64_t counter = 0;
int mod = counter++ % 5;
switch (mod)
{
switch (mod) {
case 0:
c = '-';
break;
@ -273,37 +242,33 @@ void Monitor::CheckSegment()
}
}
try
{
try {
bipc::managed_shared_memory segment(bipc::open_only, fSegmentName.c_str());
bipc::managed_shared_memory managementSegment(bipc::open_only, fManagementSegmentName.c_str());
fSeenOnce = true;
unsigned int numDevices = 0;
fair::mq::shmem::DeviceCounter* dc = segment.find<fair::mq::shmem::DeviceCounter>(bipc::unique_instance).first;
if (dc)
{
fair::mq::shmem::DeviceCounter* dc = managementSegment.find<fair::mq::shmem::DeviceCounter>(bipc::unique_instance).first;
if (dc) {
numDevices = dc->fCount;
}
auto now = chrono::high_resolution_clock::now();
unsigned int duration = chrono::duration_cast<chrono::milliseconds>(now - fLastHeartbeat).count();
if (fHeartbeatTriggered && duration > fTimeoutInMS)
{
if (fHeartbeatTriggered && duration > fTimeoutInMS) {
cout << "no heartbeats since over " << fTimeoutInMS << " milliseconds, cleaning..." << endl;
Cleanup(fShmId);
fHeartbeatTriggered = false;
if (fSelfDestruct)
{
if (fSelfDestruct) {
cout << "\nself destructing" << endl;
fTerminating = true;
}
}
if (fInteractive)
{
if (fInteractive) {
cout << "| "
<< setw(18) << fSegmentName << " | "
<< setw(10) << segment.get_size() << " | "
@ -317,12 +282,9 @@ void Monitor::CheckSegment()
<< c
<< flush;
}
}
catch (bipc::interprocess_exception& ie)
{
} catch (bipc::interprocess_exception& ie) {
fHeartbeatTriggered = false;
if (fInteractive)
{
if (fInteractive) {
cout << "| "
<< setw(18) << "-" << " | "
<< setw(10) << "-" << " | "
@ -338,21 +300,17 @@ void Monitor::CheckSegment()
auto now = chrono::high_resolution_clock::now();
unsigned int duration = chrono::duration_cast<chrono::milliseconds>(now - fLastHeartbeat).count();
if (fIsDaemon && duration > fTimeoutInMS * 2)
{
if (fIsDaemon && duration > fTimeoutInMS * 2) {
Cleanup(fShmId);
fHeartbeatTriggered = false;
if (fSelfDestruct)
{
if (fSelfDestruct) {
cout << "\nself destructing" << endl;
fTerminating = true;
}
}
if (fSelfDestruct)
{
if (fSeenOnce)
{
if (fSelfDestruct) {
if (fSeenOnce) {
cout << "self destructing" << endl;
fTerminating = true;
}
@ -363,29 +321,38 @@ void Monitor::CheckSegment()
void Monitor::Cleanup(const string& shmId)
{
string managementSegmentName("fmq_" + shmId + "_mng");
try
{
try {
bipc::managed_shared_memory managementSegment(bipc::open_only, managementSegmentName.c_str());
RegionCounter* rc = managementSegment.find<RegionCounter>(bipc::unique_instance).first;
if (rc)
{
if (rc) {
cout << "Region counter found: " << rc->fCount << endl;
unsigned int regionCount = rc->fCount;
for (unsigned int i = 1; i <= regionCount; ++i)
{
RemoveObject("fmq_" + shmId + "_rg_" + to_string(i));
uint64_t regionCount = rc->fCount;
Uint64RegionInfoMap* m = managementSegment.find<Uint64RegionInfoMap>(bipc::unique_instance).first;
for (uint64_t i = 1; i <= regionCount; ++i) {
if (m != nullptr) {
RegionInfo ri = m->at(i);
string path = ri.fPath.c_str();
int flags = ri.fFlags;
cout << "Found RegionInfo with path: '" << path << "', flags: " << flags << "'." << endl;
if (path != "") {
RemoveFileMapping(tools::ToString(path, "fmq_" + shmId + "_rg_" + to_string(i)));
} else {
RemoveObject("fmq_" + shmId + "_rg_" + to_string(i));
}
} else {
RemoveObject("fmq_" + shmId + "_rg_" + to_string(i));
}
RemoveQueue(string("fmq_" + shmId + "_rgq_" + to_string(i)));
}
}
else
{
cout << "shmem: no region counter found. no regions to cleanup." << endl;
} else {
cout << "No region counter found. no regions to cleanup." << endl;
}
RemoveObject(managementSegmentName.c_str());
}
catch (bipc::interprocess_exception& ie)
{
} catch (bipc::interprocess_exception& ie) {
cout << "Did not find '" << managementSegmentName << "' shared memory segment. No regions to cleanup." << endl;
}
@ -397,36 +364,36 @@ void Monitor::Cleanup(const string& shmId)
void Monitor::RemoveObject(const string& name)
{
if (bipc::shared_memory_object::remove(name.c_str()))
{
if (bipc::shared_memory_object::remove(name.c_str())) {
cout << "Successfully removed \"" << name << "\"." << endl;
} else {
cout << "Did not remove \"" << name << "\". Already removed?" << endl;
}
else
{
}
void Monitor::RemoveFileMapping(const string& name)
{
if (bipc::file_mapping::remove(name.c_str())) {
cout << "Successfully removed \"" << name << "\"." << endl;
} else {
cout << "Did not remove \"" << name << "\". Already removed?" << endl;
}
}
void Monitor::RemoveQueue(const string& name)
{
if (bipc::message_queue::remove(name.c_str()))
{
if (bipc::message_queue::remove(name.c_str())) {
cout << "Successfully removed \"" << name << "\"." << endl;
}
else
{
} else {
cout << "Did not remove \"" << name << "\". Already removed?" << endl;
}
}
void Monitor::RemoveMutex(const string& name)
{
if (bipc::named_mutex::remove(name.c_str()))
{
if (bipc::named_mutex::remove(name.c_str())) {
cout << "Successfully removed \"" << name << "\"." << endl;
}
else
{
} else {
cout << "Did not remove \"" << name << "\". Already removed?" << endl;
}
}
@ -435,47 +402,34 @@ void Monitor::PrintQueues()
{
cout << '\n';
try
{
try {
bipc::managed_shared_memory segment(bipc::open_only, fSegmentName.c_str());
StringVector* queues = segment.find<StringVector>(string("fmq_" + fShmId + "_qs").c_str()).first;
if (queues)
{
StrVector* queues = segment.find<StrVector>(string("fmq_" + fShmId + "_qs").c_str()).first;
if (queues) {
cout << "found " << queues->size() << " queue(s):" << endl;
for (const auto& queue : *queues)
{
for (const auto& queue : *queues) {
string name(queue.c_str());
cout << '\t' << name << " : ";
atomic<int>* queueSize = segment.find<atomic<int>>(name.c_str()).first;
if (queueSize)
{
if (queueSize) {
cout << *queueSize << " messages" << endl;
}
else
{
} else {
cout << "\tqueue does not have a queue size entry." << endl;
}
}
}
else
{
} else {
cout << "\tno queues found" << endl;
}
}
catch (bipc::interprocess_exception& ie)
{
} catch (bipc::interprocess_exception& ie) {
cout << "\tno queues found" << endl;
}
catch (out_of_range& ie)
{
} catch (out_of_range& ie) {
cout << "\tno queues found" << endl;
}
cout << "\n --> last heartbeats: " << endl << endl;
auto now = chrono::high_resolution_clock::now();
for (const auto& h : fDeviceHeartbeats)
{
for (const auto& h : fDeviceHeartbeats) {
cout << "\t" << h.first << " : " << chrono::duration<double, milli>(now - h.second).count() << "ms ago." << endl;
}
@ -505,12 +459,10 @@ void Monitor::PrintHelp()
Monitor::~Monitor()
{
fManagementSegment.destroy<MonitorStatus>(bipc::unique_instance);
if (fSignalThread.joinable())
{
if (fSignalThread.joinable()) {
fSignalThread.join();
}
if (fCleanOnExit)
{
if (fCleanOnExit) {
Cleanup(fShmId);
}
}

View File

@ -26,7 +26,7 @@ namespace shmem
class Monitor
{
public:
Monitor(const std::string& sessionName, bool selfDestruct, bool interactive, unsigned int timeoutInMS, bool runAsDaemon, bool cleanOnExit);
Monitor(const std::string& shmId, bool selfDestruct, bool interactive, unsigned int timeoutInMS, bool runAsDaemon, bool cleanOnExit);
Monitor(const Monitor&) = delete;
Monitor operator=(const Monitor&) = delete;
@ -36,8 +36,9 @@ class Monitor
virtual ~Monitor();
static void Cleanup(const std::string& sessionName);
static void Cleanup(const std::string& shmId);
static void RemoveObject(const std::string&);
static void RemoveFileMapping(const std::string&);
static void RemoveQueue(const std::string&);
static void RemoveMutex(const std::string&);

View File

@ -10,7 +10,10 @@
#include <fairmq/shmem/Common.h>
#include <fairmq/shmem/Manager.h>
#include <boost/filesystem.hpp>
#include <boost/process.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <cerrno>
#include <chrono>
@ -26,64 +29,77 @@ namespace mq
namespace shmem
{
Region::Region(Manager& manager, uint64_t id, uint64_t size, bool remote, FairMQRegionCallback callback)
Region::Region(Manager& manager, uint64_t id, uint64_t size, bool remote, FairMQRegionCallback callback, const string& path /* = "" */, int flags /* = 0 */)
: fManager(manager)
, fRemote(remote)
, fStop(false)
, fName("fmq_" + fManager.fSessionName +"_rg_" + to_string(id))
, fQueueName("fmq_" + fManager.fSessionName +"_rgq_" + to_string(id))
, fName("fmq_" + fManager.fShmId + "_rg_" + to_string(id))
, fQueueName("fmq_" + fManager.fShmId + "_rgq_" + to_string(id))
, fShmemObject()
, fFile(nullptr)
, fFileMapping()
, fQueue(nullptr)
, fReceiveAcksWorker()
, fSendAcksWorker()
, fCallback(callback)
{
if (fRemote)
{
fShmemObject = bipc::shared_memory_object(bipc::open_only, fName.c_str(), bipc::read_write);
LOG(debug) << "shmem: located remote region: " << fName;
if (path != "") {
fName = string(path + fName);
fQueue = fair::mq::tools::make_unique<bipc::message_queue>(bipc::open_only, fQueueName.c_str());
LOG(debug) << "shmem: located remote region queue: " << fQueueName;
fFile = fopen(fName.c_str(), fRemote ? "r+" : "w+");
if (!fFile) {
LOG(error) << "Failed to initialize file: " << fName;
LOG(error) << "errno: " << errno << ": " << strerror(errno);
throw runtime_error(tools::ToString("Failed to initialize file for shared memory region: ", strerror(errno)));
}
fFileMapping = bipc::file_mapping(fName.c_str(), bipc::read_write);
LOG(debug) << "shmem: initialized file: " << fName;
fRegion = bipc::mapped_region(fFileMapping, bipc::read_write, 0, size, 0, flags);
} else {
if (fRemote) {
fShmemObject = bipc::shared_memory_object(bipc::open_only, fName.c_str(), bipc::read_write);
} else {
fShmemObject = bipc::shared_memory_object(bipc::create_only, fName.c_str(), bipc::read_write);
fShmemObject.truncate(size);
}
fRegion = bipc::mapped_region(fShmemObject, bipc::read_write, 0, 0, 0, flags);
}
else
{
fShmemObject = bipc::shared_memory_object(bipc::create_only, fName.c_str(), bipc::read_write);
LOG(debug) << "shmem: created region: " << fName;
fShmemObject.truncate(size);
fQueue = fair::mq::tools::make_unique<bipc::message_queue>(bipc::create_only, fQueueName.c_str(), 1024, fAckBunchSize * sizeof(RegionBlock));
LOG(debug) << "shmem: created region queue: " << fQueueName;
InitializeQueues();
LOG(debug) << "shmem: initialized region: " << fName;
fSendAcksWorker = thread(&Region::SendAcks, this);
}
void Region::InitializeQueues()
{
if (fRemote) {
fQueue = tools::make_unique<bipc::message_queue>(bipc::open_only, fQueueName.c_str());
} else {
fQueue = tools::make_unique<bipc::message_queue>(bipc::create_only, fQueueName.c_str(), 1024, fAckBunchSize * sizeof(RegionBlock));
}
fRegion = bipc::mapped_region(fShmemObject, bipc::read_write); // TODO: add HUGEPAGES flag here
// fRegion = bipc::mapped_region(fShmemObject, bipc::read_write, 0, 0, 0, MAP_ANONYMOUS | MAP_HUGETLB);
fSendAcksWorker = std::thread(&Region::SendAcks, this);
LOG(debug) << "shmem: initialized region queue: " << fQueueName;
}
void Region::StartReceivingAcks()
{
fReceiveAcksWorker = std::thread(&Region::ReceiveAcks, this);
fReceiveAcksWorker = thread(&Region::ReceiveAcks, this);
}
void Region::ReceiveAcks()
{
unsigned int priority;
bipc::message_queue::size_type recvdSize;
std::unique_ptr<RegionBlock[]> blocks = fair::mq::tools::make_unique<RegionBlock[]>(fAckBunchSize);
unique_ptr<RegionBlock[]> blocks = tools::make_unique<RegionBlock[]>(fAckBunchSize);
while (!fStop) // end thread condition (should exist until region is destroyed)
{
while (!fStop) { // end thread condition (should exist until region is destroyed)
auto rcvTill = bpt::microsec_clock::universal_time() + bpt::milliseconds(500);
while (fQueue->timed_receive(blocks.get(), fAckBunchSize * sizeof(RegionBlock), recvdSize, priority, rcvTill))
{
while (fQueue->timed_receive(blocks.get(), fAckBunchSize * sizeof(RegionBlock), recvdSize, priority, rcvTill)) {
// LOG(debug) << "received: " << block.fHandle << " " << block.fSize << " " << block.fMessageId;
if (fCallback)
{
if (fCallback) {
const auto numBlocks = recvdSize / sizeof(RegionBlock);
for (size_t i = 0; i < numBlocks; i++)
{
for (size_t i = 0; i < numBlocks; i++) {
fCallback(reinterpret_cast<char*>(fRegion.get_address()) + blocks[i].fHandle, blocks[i].fSize, reinterpret_cast<void*>(blocks[i].fHint));
}
}
@ -95,12 +111,11 @@ void Region::ReceiveAcks()
void Region::ReleaseBlock(const RegionBlock &block)
{
std::unique_lock<std::mutex> lock(fBlockLock);
unique_lock<mutex> lock(fBlockLock);
fBlocksToFree.emplace_back(block);
if (fBlocksToFree.size() >= fAckBunchSize)
{
if (fBlocksToFree.size() >= fAckBunchSize) {
lock.unlock(); // reduces contention on fBlockLock
fBlockSendCV.notify_one();
}
@ -108,40 +123,33 @@ void Region::ReleaseBlock(const RegionBlock &block)
void Region::SendAcks()
{
std::unique_ptr<RegionBlock[]> blocks = fair::mq::tools::make_unique<RegionBlock[]>(fAckBunchSize);
unique_ptr<RegionBlock[]> blocks = tools::make_unique<RegionBlock[]>(fAckBunchSize);
while (true) // we'll try to send all acks before stopping
{
while (true) { // we'll try to send all acks before stopping
size_t blocksToSend = 0;
{ // mutex locking block
std::unique_lock<std::mutex> lock(fBlockLock);
unique_lock<mutex> lock(fBlockLock);
// try to get more blocks without waiting (we can miss a notify from CloseMessage())
if (!fStop && (fBlocksToFree.size() < fAckBunchSize))
{
if (!fStop && (fBlocksToFree.size() < fAckBunchSize)) {
// cv.wait() timeout: send whatever blocks we have
fBlockSendCV.wait_for(lock, std::chrono::milliseconds(500));
fBlockSendCV.wait_for(lock, chrono::milliseconds(500));
}
blocksToSend = std::min(fBlocksToFree.size(), fAckBunchSize);
blocksToSend = min(fBlocksToFree.size(), fAckBunchSize);
std::copy_n(fBlocksToFree.end() - blocksToSend, blocksToSend, blocks.get());
copy_n(fBlocksToFree.end() - blocksToSend, blocksToSend, blocks.get());
fBlocksToFree.resize(fBlocksToFree.size() - blocksToSend);
} // unlock the block mutex here while sending over IPC
if (blocksToSend > 0)
{
while (!fQueue->try_send(blocks.get(), blocksToSend * sizeof(RegionBlock), 0) && !fStop)
{
if (blocksToSend > 0) {
while (!fQueue->try_send(blocks.get(), blocksToSend * sizeof(RegionBlock), 0) && !fStop) {
// receiver slow? yield and try again...
this_thread::yield();
}
}
else // blocksToSend == 0
{
if (fStop)
{
} else { // blocksToSend == 0
if (fStop) {
break;
}
}
@ -154,30 +162,31 @@ Region::~Region()
{
fStop = true;
if (fSendAcksWorker.joinable())
{
if (fSendAcksWorker.joinable()) {
fSendAcksWorker.join();
}
if (!fRemote)
{
if (fReceiveAcksWorker.joinable())
{
if (!fRemote) {
if (fReceiveAcksWorker.joinable()) {
fReceiveAcksWorker.join();
}
if (bipc::shared_memory_object::remove(fName.c_str()))
{
if (bipc::shared_memory_object::remove(fName.c_str())) {
LOG(debug) << "shmem: destroyed region " << fName;
}
if (bipc::message_queue::remove(fQueueName.c_str()))
{
LOG(debug) << "shmem: removed region queue " << fName;
if (bipc::file_mapping::remove(fName.c_str())) {
LOG(debug) << "shmem: destroyed file mapping " << fName;
}
}
else
{
if (fFile) {
fclose(fFile);
}
if (bipc::message_queue::remove(fQueueName.c_str())) {
LOG(debug) << "shmem: removed region queue " << fQueueName;
}
} else {
// LOG(debug) << "shmem: region '" << fName << "' is remote, no cleanup necessary.";
LOG(debug) << "shmem: region queue '" << fQueueName << "' is remote, no cleanup necessary";
}

View File

@ -22,6 +22,7 @@
#include <fairmq/shmem/Common.h>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/file_mapping.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <thread>
@ -40,13 +41,15 @@ class Manager;
struct Region
{
Region(Manager& manager, uint64_t id, uint64_t size, bool remote, FairMQRegionCallback callback = nullptr);
Region(Manager& manager, uint64_t id, uint64_t size, bool remote, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0);
Region() = delete;
Region(const Region&) = default;
Region(Region&&) = default;
void InitializeQueues();
void StartReceivingAcks();
void ReceiveAcks();
@ -61,6 +64,8 @@ struct Region
std::string fName;
std::string fQueueName;
boost::interprocess::shared_memory_object fShmemObject;
FILE* fFile;
boost::interprocess::file_mapping fFileMapping;
boost::interprocess::mapped_region fRegion;
std::mutex fBlockLock;

View File

@ -39,36 +39,30 @@ static void daemonize()
umask(0);
// Create a new SID for the child process
if (setsid() < 0)
{
if (setsid() < 0) {
exit(1);
}
// Change the current working directory. This prevents the current directory from being locked; hence not being able to remove it.
if ((chdir("/")) < 0)
{
if ((chdir("/")) < 0) {
exit(1);
}
// Redirect standard files to /dev/null
if (!freopen("/dev/null", "r", stdin))
{
if (!freopen("/dev/null", "r", stdin)) {
cout << "could not redirect stdin to /dev/null" << endl;
}
if (!freopen("/dev/null", "w", stdout))
{
if (!freopen("/dev/null", "w", stdout)) {
cout << "could not redirect stdout to /dev/null" << endl;
}
if (!freopen("/dev/null", "w", stderr))
{
if (!freopen("/dev/null", "w", stderr)) {
cout << "could not redirect stderr to /dev/null" << endl;
}
}
int main(int argc, char** argv)
{
try
{
try {
string sessionName;
string shmId;
bool cleanup = false;
@ -93,26 +87,22 @@ int main(int argc, char** argv)
variables_map vm;
store(parse_command_line(argc, argv, desc), vm);
if (vm.count("help"))
{
if (vm.count("help")) {
cout << "FairMQ Shared Memory Monitor" << endl << desc << endl;
return 0;
}
notify(vm);
if (runAsDaemon)
{
if (runAsDaemon) {
daemonize();
}
if (shmId == "")
{
if (shmId == "") {
shmId = buildShmIdFromSessionIdAndUserId(sessionName);
}
if (cleanup)
{
if (cleanup) {
cout << "Cleaning up \"" << shmId << "\"..." << endl;
Monitor::Cleanup(shmId);
Monitor::RemoveQueue("fmq_" + shmId + "_cq");
@ -125,9 +115,7 @@ int main(int argc, char** argv)
monitor.CatchSignals();
monitor.Run();
}
catch (exception& e)
{
} catch (exception& e) {
cerr << "Unhandled Exception reached the top of main: " << e.what() << ", application will now exit" << endl;
return 2;
}

View File

@ -69,7 +69,7 @@ FairMQMessagePtr FairMQTransportFactoryZMQ::CreateMessage(FairMQUnmanagedRegionP
return unique_ptr<FairMQMessage>(new FairMQMessageZMQ(region, data, size, hint, this));
}
FairMQSocketPtr FairMQTransportFactoryZMQ::CreateSocket(const string& type, const string& name)
FairMQSocketPtr FairMQTransportFactoryZMQ::CreateSocket(const string& type, const string& name)
{
assert(fContext);
return unique_ptr<FairMQSocket>(new FairMQSocketZMQ(type, name, GetId(), fContext, this));
@ -90,9 +90,9 @@ FairMQPollerPtr FairMQTransportFactoryZMQ::CreatePoller(const unordered_map<stri
return unique_ptr<FairMQPoller>(new FairMQPollerZMQ(channelsMap, channelList));
}
FairMQUnmanagedRegionPtr FairMQTransportFactoryZMQ::CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback) const
FairMQUnmanagedRegionPtr FairMQTransportFactoryZMQ::CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */) const
{
return unique_ptr<FairMQUnmanagedRegion>(new FairMQUnmanagedRegionZMQ(size, callback));
return unique_ptr<FairMQUnmanagedRegion>(new FairMQUnmanagedRegionZMQ(size, callback, path, flags));
}
fair::mq::Transport FairMQTransportFactoryZMQ::GetType() const

View File

@ -45,7 +45,7 @@ class FairMQTransportFactoryZMQ final : public FairMQTransportFactory
FairMQPollerPtr CreatePoller(const std::vector<FairMQChannel*>& channels) const override;
FairMQPollerPtr CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const override;
FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback) const override;
FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback, const std::string& path = "", int flags = 0) const override;
fair::mq::Transport GetType() const override;

View File

@ -11,7 +11,7 @@
using namespace std;
FairMQUnmanagedRegionZMQ::FairMQUnmanagedRegionZMQ(const size_t size, FairMQRegionCallback callback)
FairMQUnmanagedRegionZMQ::FairMQUnmanagedRegionZMQ(const size_t size, FairMQRegionCallback callback, const std::string& /* path = "" */, int /* flags = 0 */)
: fBuffer(malloc(size))
, fSize(size)
, fCallback(callback)

View File

@ -12,6 +12,7 @@
#include "FairMQUnmanagedRegion.h"
#include <cstddef> // size_t
#include <string>
class FairMQUnmanagedRegionZMQ final : public FairMQUnmanagedRegion
{
@ -19,7 +20,7 @@ class FairMQUnmanagedRegionZMQ final : public FairMQUnmanagedRegion
friend class FairMQMessageZMQ;
public:
FairMQUnmanagedRegionZMQ(const size_t size, FairMQRegionCallback callback);
FairMQUnmanagedRegionZMQ(const size_t size, FairMQRegionCallback callback, const std::string& path = "", int flags = 0);
FairMQUnmanagedRegionZMQ(const FairMQUnmanagedRegionZMQ&) = delete;
FairMQUnmanagedRegionZMQ operator=(const FairMQUnmanagedRegionZMQ&) = delete;