feat: Add new tunable --shm-metadata-msg-size

The shm metadata msg will be right-padded to the given size. This
tunable may be used to saturate the kernel msg buffers more quickly with
the effect that the ZeroMQ message queue size - on which the FairMQ
shmem transport relies upon - behaves more accurately for very small
queue sizes.

This introduces a change for the meta msg format in the multipart case:
old: | MetaHeader 1 | ... | MetaHeader n |
new: | n | MetaHeader 1 | ... | MetaHeader n | padded to fMetadataMsgSize |
where `n` is a `size_t` and contains the number of following meta headers.
Previously, this number was infered from the msg buffer size itself which is
no longer possible due to the potential padding.

Implements #432
This commit is contained in:
Dennis Klein 2023-06-07 22:24:42 +02:00 committed by Dennis Klein
parent 491a943c63
commit f278e7e312
4 changed files with 42 additions and 30 deletions

View File

@ -72,6 +72,7 @@ SAMPLER+=" --msg-size $msgSize"
SAMPLER+=" --multipart $multipart"
SAMPLER+=" --num-parts $numParts"
SAMPLER+=" --shm-throw-bad-alloc false"
# SAMPLER+=" --shm-metadata-msg-size 1024"
# SAMPLER+=" --msg-rate 1000"
SAMPLER+=" --max-iterations $maxIterations"
SAMPLER+=" --channel-config name=data,type=pair,method=bind,address=tcp://127.0.0.1:5555"

View File

@ -11,6 +11,7 @@
#include <fairmq/JSONParser.h>
#include <fairmq/SuboptParser.h>
#include <cstddef> // for std::size_t
#include <vector>
using namespace std;
@ -72,6 +73,7 @@ Plugin::ProgOptions ConfigPluginProgramOptions()
("shm-zero-segment", po::value<bool >()->default_value(false), "Shared memory: zero the shared memory segment memory after initialization (opened or created).")
("shm-zero-segment-on-creation", po::value<bool >()->default_value(false), "Shared memory: zero the shared memory segment memory only once when created.")
("shm-throw-bad-alloc", po::value<bool >()->default_value(true), "Shared memory: throw fair::mq::MessageBadAlloc if cannot allocate a message (retry if false).")
("shm-metadata-msg-size", po::value<std::size_t >()->default_value(0), "Shared memory: size of the zmq metadata message (values smaller than minimum are clamped to the minimum).")
("bad-alloc-max-attempts", po::value<int >(), "Maximum number of allocation attempts before throwing fair::mq::MessageBadAlloc. -1 is infinite. There is always at least one attempt, so 0 has safe effect as 1.")
("bad-alloc-attempt-interval", po::value<int >()->default_value(50), "Interval between attempts if cannot allocate a message (in ms).")
("shm-monitor", po::value<bool >()->default_value(false), "Shared memory: run monitor daemon.")

View File

@ -29,7 +29,7 @@
#include <algorithm> // max
#include <chrono>
#include <condition_variable>
#include <cstddef> // max_align_t
#include <cstddef> // max_align_t, std::size_t
#include <cstdlib> // getenv
#include <cstring> // memcpy
#include <memory> // make_unique
@ -151,6 +151,7 @@ class Manager
, fBadAllocMaxAttempts(1)
, fBadAllocAttemptIntervalInMs(config ? config->GetProperty<int>("bad-alloc-attempt-interval", 50) : 50)
, fNoCleanup(config ? config->GetProperty<bool>("shm-no-cleanup", false) : false)
, fMetadataMsgSize(config ? config->GetProperty<std::size_t>("shm-metadata-msg-size", 0) : 0)
{
using namespace boost::interprocess;
@ -828,6 +829,8 @@ class Manager
}
}
auto GetMetadataMsgSize() const noexcept { return fMetadataMsgSize; }
~Manager()
{
fRegionsGen += 1; // signal TL cache invalidation
@ -884,6 +887,8 @@ class Manager
int fBadAllocMaxAttempts;
int fBadAllocAttemptIntervalInMs;
bool fNoCleanup;
std::size_t fMetadataMsgSize;
};
} // namespace fair::mq::shmem

View File

@ -22,8 +22,12 @@
#include <zmq.h>
#include <algorithm> // for std::max
#include <atomic>
#include <memory> // make_unique
#include <cstddef> // for std::size_t
#include <cstring> // for std::memcpy
#include <exception> // for std::terminate
#include <memory> // for std::make_unique
namespace fair::mq {
class TransportFactory;
@ -47,6 +51,7 @@ class Socket final : public fair::mq::Socket
, fMessagesRx(0)
, fTimeout(100)
, fConnectedPeersCount(0)
, fMetadataMsgSize(manager.GetMetadataMsgSize())
{
assert(context);
@ -124,8 +129,8 @@ class Socket final : public fair::mq::Socket
}
int elapsed = 0;
// make meta msg
zmq::ZMsg zmqMsg(sizeof(MetaHeader));
// meta msg format: | MetaHeader | padded to fMetadataMsgSize |
zmq::ZMsg zmqMsg(std::max(fMetadataMsgSize, sizeof(MetaHeader)));
std::memcpy(zmqMsg.Data(), &(shmMsg->fMeta), sizeof(MetaHeader));
while (true) {
@ -165,11 +170,11 @@ class Socket final : public fair::mq::Socket
int nbytes = zmq_recv(fSocket, &(shmMsg->fMeta), sizeof(MetaHeader), flags);
if (nbytes > 0) {
// check for number of received messages. must be 1
if (nbytes != sizeof(MetaHeader)) {
if (static_cast<std::size_t>(nbytes) < sizeof(MetaHeader)) {
throw SocketError(
tools::ToString("Received message is not a valid FairMQ shared memory message. ",
"Possibly due to a misconfigured transport on the sender side. ",
"Expected size of ", sizeof(MetaHeader), " bytes, received ", nbytes));
"Expected minimum size of ", sizeof(MetaHeader), " bytes, received ", nbytes));
}
size_t size = shmMsg->GetSize();
@ -198,13 +203,14 @@ class Socket final : public fair::mq::Socket
}
int elapsed = 0;
// put it into zmq message
const unsigned int vecSize = msgVec.size();
zmq::ZMsg zmqMsg(vecSize * sizeof(MetaHeader));
// prepare the message with shm metas
MetaHeader* metas = static_cast<MetaHeader*>(zmqMsg.Data());
// meta msg format: | n | MetaHeader 1 | ... | MetaHeader n | padded to fMetadataMsgSize |
auto const n = msgVec.size();
zmq::ZMsg zmqMsg(std::max(fMetadataMsgSize, sizeof(std::size_t) + n * sizeof(MetaHeader)));
auto meta_n = static_cast<std::size_t*>(zmqMsg.Data());
*meta_n = n;
++meta_n;
auto metas = static_cast<MetaHeader*>(static_cast<void*>(meta_n));
for (auto& msg : msgVec) {
auto msgPtr = msg.get();
if (!msgPtr) {
@ -219,7 +225,7 @@ class Socket final : public fair::mq::Socket
int64_t totalSize = 0;
int nbytes = zmq_msg_send(zmqMsg.Msg(), fSocket, flags);
if (nbytes > 0) {
assert(static_cast<unsigned int>(nbytes) == (vecSize * sizeof(MetaHeader))); // all or nothing
assert(static_cast<unsigned int>(nbytes) >= sizeof(std::size_t) + (n * sizeof(MetaHeader)));
for (auto& msg : msgVec) {
Message* shmMsg = static_cast<Message*>(msg.get());
@ -259,26 +265,23 @@ class Socket final : public fair::mq::Socket
zmq::ZMsg zmqMsg;
while (true) {
int64_t totalSize = 0;
std::size_t totalSize = 0;
int nbytes = zmq_msg_recv(zmqMsg.Msg(), fSocket, flags);
if (nbytes > 0) {
MetaHeader* hdrVec = static_cast<MetaHeader*>(zmqMsg.Data());
const auto hdrVecSize = zmqMsg.Size();
[[maybe_unused]] auto const size = zmqMsg.Size();
assert(size > sizeof(std::size_t));
auto meta_n = static_cast<std::size_t*>(zmqMsg.Data());
auto const n = *meta_n;
assert(size >= sizeof(std::size_t) + n * sizeof(MetaHeader));
++meta_n;
auto metas = static_cast<MetaHeader*>(static_cast<void*>(meta_n));
msgVec.reserve(msgVec.size() + n);
auto const transport = GetTransport();
assert(hdrVecSize > 0);
if (hdrVecSize % sizeof(MetaHeader) != 0) {
throw SocketError(
tools::ToString("Received message is not a valid FairMQ shared memory message. ",
"Possibly due to a misconfigured transport on the sender side. ",
"Expected size of ", sizeof(MetaHeader), " bytes, received ", nbytes));
}
const auto numMessages = hdrVecSize / sizeof(MetaHeader);
msgVec.reserve(numMessages);
for (size_t m = 0; m < numMessages; m++) {
// create new message (part)
msgVec.emplace_back(std::make_unique<Message>(fManager, hdrVec[m], GetTransport()));
for (std::size_t i = 0; i < n; ++i) {
msgVec.push_back(std::make_unique<Message>(fManager, *metas, transport));
++metas;
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-static-cast-downcast)
Message* shmMsg = static_cast<Message*>(msgVec.back().get());
totalSize += shmMsg->GetSize();
}
@ -456,6 +459,7 @@ class Socket final : public fair::mq::Socket
int fTimeout;
mutable unsigned long fConnectedPeersCount;
std::size_t fMetadataMsgSize;
};
} // namespace fair::mq::shmem