Files
FairMQ/fairmq/shmem/Manager.cxx
Alexey Rybalchenko 215c31428b feat(shmem): expose side-channel metadata API for unsent messages
Add two public entry points needed by the ALICE use case where shmem
messages are allocated via a transport but never sent — their metadata
is instead serialised into Arrow tables and delivered over a separate
channel, allowing consumer devices to resolve the payload pointer
without taking ownership.

shmem::Message::GetMeta() returns the MetaHeader of the message,
mirroring the existing positional-init pattern already used in Socket.h.

shmem::GetDataAddressFromHandle(TransportFactory&, const MetaHeader&)
is a free function declared in Common.h and defined in Manager.cxx.
Keeping it out of the TransportFactory class body means callers only
need to include Common.h (available transitively via Message.h) and do
not drag in Socket.h or zmq.h. The implementation handles both managed
segments and unmanaged regions, and throws SharedMemoryError with a
typed message on a bad segment or region id. TransportFactory also
gains a same-named member for callers that already have the concrete
type. Lifetime of the returned pointer is the caller's responsibility;
the cache device is expected to hold the messages alive.

A SideChannel test covers the GetMeta/GetDataAddressFromHandle
round-trip for both standard and expanded-metadata configurations.
2026-06-10 18:51:04 +02:00

64 lines
2.4 KiB
C++

/********************************************************************************
* Copyright (C) 2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "Manager.h"
#include "TransportFactory.h"
// Needed to compile-firewall the <boost/process/async.hpp> header because it
// interferes with the <asio/buffer.hpp> header. So, let's factor
// the whole dependency to Boost.Process out of the header.
#ifdef FAIRMQ_BOOST_PROCESS_V1_HEADER
#include <boost/process/v1.hpp>
namespace bp = boost::process::v1;
#else
#include <boost/process.hpp>
namespace bp = boost::process;
#endif
#include <fairlogger/Logger.h>
namespace fair::mq::shmem {
bool Manager::SpawnShmMonitor(const std::string& id)
{
auto const env(boost::this_process::environment());
std::string const fairmq_path_key("FAIRMQ_PATH");
std::string const shmmonitor_exe_name("fairmq-shmmonitor");
std::string const shmmonitor_verbose_key("FAIRMQ_SHMMONITOR_VERBOSE");
auto path(boost::this_process::path());
if (env.count(fairmq_path_key)) {
path.emplace(path.begin(), env.at(fairmq_path_key).to_string());
}
auto exe(bp::search_path(shmmonitor_exe_name, path));
if (exe.empty()) {
LOG(warn) << "could not find " << shmmonitor_exe_name << " in \"$" << fairmq_path_key
<< ":$PATH\"";
return false;
}
// TODO Move this to fairmq-shmmonitor itself ?
bool verbose(env.count(shmmonitor_verbose_key)
&& env.at(shmmonitor_verbose_key).to_string() == "true");
bp::spawn(
exe, "-x", "-m", "--shmid", id, "-d", "-t", "2000", (verbose ? "--verbose" : ""), env);
return true;
}
char* GetDataAddressFromHandle(fair::mq::TransportFactory& factory, const MetaHeader& meta)
{
if (factory.GetType() != fair::mq::Transport::SHM) {
throw SharedMemoryError("GetDataAddressFromHandle called on a non-shmem transport");
}
return static_cast<TransportFactory&>(factory).GetDataAddressFromHandle(meta);
}
} // namespace fair::mq::shmem