From 79f568b461dcbb9da8022f068842a27d76668033 Mon Sep 17 00:00:00 2001 From: Dennis Klein Date: Wed, 7 Jun 2023 21:01:48 +0200 Subject: [PATCH] feat: Move `ZMsg` to `fair::mq::zmq` Implement move semantics. --- fairmq/CMakeLists.txt | 3 +- fairmq/shmem/Socket.h | 23 ++--------- fairmq/zeromq/ZMsg.h | 92 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 97 insertions(+), 21 deletions(-) create mode 100644 fairmq/zeromq/ZMsg.h diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index 18c2a0d8..491bca87 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -1,5 +1,5 @@ ################################################################################ -# Copyright (C) 2012-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH # +# Copyright (C) 2012-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH # # # # This software is distributed under the terms of the # # GNU Lesser General Public Licence (LGPL) version 3, # @@ -108,6 +108,7 @@ if(BUILD_FAIRMQ) zeromq/UnmanagedRegion.h zeromq/Socket.h zeromq/TransportFactory.h + zeromq/ZMsg.h ) ########################## diff --git a/fairmq/shmem/Socket.h b/fairmq/shmem/Socket.h index eb912d5f..3a889fea 100644 --- a/fairmq/shmem/Socket.h +++ b/fairmq/shmem/Socket.h @@ -16,6 +16,7 @@ #include #include #include +#include #include @@ -31,24 +32,6 @@ namespace fair::mq { namespace fair::mq::shmem { -struct ZMsg -{ - ZMsg() { int rc __attribute__((unused)) = zmq_msg_init(&fMsg); assert(rc == 0); } - explicit ZMsg(size_t size) { int rc __attribute__((unused)) = zmq_msg_init_size(&fMsg, size); assert(rc == 0); } - ~ZMsg() { int rc __attribute__((unused)) = zmq_msg_close(&fMsg); assert(rc == 0); } - - ZMsg(const ZMsg&) = delete; - ZMsg(ZMsg&&) = delete; - ZMsg& operator=(const ZMsg&) = delete; - ZMsg& operator=(ZMsg&&) = delete; - - void* Data() { return zmq_msg_data(&fMsg); } - size_t Size() { return zmq_msg_size(&fMsg); } - zmq_msg_t* Msg() { return &fMsg; } - - zmq_msg_t fMsg; -}; - class Socket final : public fair::mq::Socket { public: @@ -213,7 +196,7 @@ class Socket final : public fair::mq::Socket // put it into zmq message const unsigned int vecSize = msgVec.size(); - ZMsg zmqMsg(vecSize * sizeof(MetaHeader)); + zmq::ZMsg zmqMsg(vecSize * sizeof(MetaHeader)); // prepare the message with shm metas MetaHeader* metas = static_cast(zmqMsg.Data()); @@ -269,7 +252,7 @@ class Socket final : public fair::mq::Socket } int elapsed = 0; - ZMsg zmqMsg; + zmq::ZMsg zmqMsg; while (true) { int64_t totalSize = 0; diff --git a/fairmq/zeromq/ZMsg.h b/fairmq/zeromq/ZMsg.h new file mode 100644 index 00000000..5330fff9 --- /dev/null +++ b/fairmq/zeromq/ZMsg.h @@ -0,0 +1,92 @@ +/******************************************************************************** + * Copyright (C) 2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#ifndef FAIR_MQ_ZMQ_ZMSG_H +#define FAIR_MQ_ZMQ_ZMSG_H + +#include // for std::size_t +#include // for assertm +#include // for std::bad_alloc +#include // for zmq_* + +namespace fair::mq::zmq { + +// Wraps a `zmq_msg_t` C object as a C++ type: +// * `zmq_msg_init` -> C++ default ctor +// * `zmq_msg_init_size` -> C++ ctor +// * `zmq_msg_init_data` -> C++ ctor +// * `zmq_msg_init_size` + `memcpy` -> C++ copy ctor +// * `zmq_msg_close` + `zmq_msg_init_size` + `memcpy` -> C++ copy assignment +// * `zmq_msg_init` + `zmq_msg_move`` -> C++ move ctor +// * `zmq_msg_move` -> C++ move assignment +// * `zmq_msg_close` -> C++ dtor +// * access the underlying `zmq_msg_t` via `Msg() [const] -> zmq_msg_t*` +// the const overload does a `const_cast`, because the +// C interfaces do not model constness +// * `zmq_msg_data` -> `Data() -> void*` +// * `zmq_msg_size` -> `Size() -> std::size_t` +struct ZMsg +{ + ZMsg() noexcept + { + [[maybe_unused]] auto const rc = zmq_msg_init(Msg()); + assertm(rc == 0, "msg init successful"); // NOLINT + } + explicit ZMsg(std::size_t size) + { + auto const rc = zmq_msg_init_size(Msg(), size); + if (rc == -1) { + throw std::bad_alloc{}; + } + } + explicit ZMsg(void* data, + std::size_t size, + zmq_free_fn* freefn = nullptr, + void* hint = nullptr) + { + auto const rc = zmq_msg_init_data(Msg(), data, size, freefn, hint); + if (rc == -1) { + throw std::bad_alloc{}; + } + } + ~ZMsg() noexcept + { + [[maybe_unused]] auto const rc = zmq_msg_close(Msg()); + assertm(rc == 0, "msg close successful"); // NOLINT + } + ZMsg(const ZMsg& other) = delete; + ZMsg(ZMsg&& other) noexcept + { + [[maybe_unused]] auto rc = zmq_msg_init(Msg()); + assertm(rc == 0, "msg init successful"); // NOLINT + rc = zmq_msg_move(Msg(), other.Msg()); + assertm(rc == 0, "msg move successful"); // NOLINT + } + ZMsg& operator=(const ZMsg& rhs) = delete; + ZMsg& operator=(ZMsg&& rhs) noexcept + { + [[maybe_unused]] auto const rc = zmq_msg_move(Msg(), rhs.Msg()); + assertm(rc == 0, "msg move successful"); // NOLINT + return *this; + } + + zmq_msg_t* Msg() noexcept { return &fMsg; } + zmq_msg_t* Msg() const noexcept + { + return const_cast(&fMsg); // NOLINT(cppcoreguidelines-pro-type-const-cast) + } + void* Data() const noexcept { return zmq_msg_data(Msg()); } + std::size_t Size() const noexcept { return zmq_msg_size(Msg()); } + + private: + zmq_msg_t fMsg{}; +}; + +} // namespace fair::mq::zmq + +#endif /* FAIR_MQ_ZMQ_ZMSG_H */