feat: Move ZMsg to fair::mq::zmq

Implement move semantics.
This commit is contained in:
Dennis Klein 2023-06-07 21:01:48 +02:00 committed by Dennis Klein
parent 7b259afdb5
commit c47fc6f9fe
3 changed files with 97 additions and 21 deletions

View File

@ -1,5 +1,5 @@
################################################################################
# Copyright (C) 2012-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# Copyright (C) 2012-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# #
# This software is distributed under the terms of the #
# GNU Lesser General Public Licence (LGPL) version 3, #
@ -108,6 +108,7 @@ if(BUILD_FAIRMQ)
zeromq/UnmanagedRegion.h
zeromq/Socket.h
zeromq/TransportFactory.h
zeromq/ZMsg.h
)
##########################

View File

@ -16,6 +16,7 @@
#include <fairmq/Socket.h>
#include <fairmq/tools/Strings.h>
#include <fairmq/zeromq/Common.h>
#include <fairmq/zeromq/ZMsg.h>
#include <fairlogger/Logger.h>
@ -31,24 +32,6 @@ namespace fair::mq {
namespace fair::mq::shmem
{
struct ZMsg
{
ZMsg() { int rc __attribute__((unused)) = zmq_msg_init(&fMsg); assert(rc == 0); }
explicit ZMsg(size_t size) { int rc __attribute__((unused)) = zmq_msg_init_size(&fMsg, size); assert(rc == 0); }
~ZMsg() { int rc __attribute__((unused)) = zmq_msg_close(&fMsg); assert(rc == 0); }
ZMsg(const ZMsg&) = delete;
ZMsg(ZMsg&&) = delete;
ZMsg& operator=(const ZMsg&) = delete;
ZMsg& operator=(ZMsg&&) = delete;
void* Data() { return zmq_msg_data(&fMsg); }
size_t Size() { return zmq_msg_size(&fMsg); }
zmq_msg_t* Msg() { return &fMsg; }
zmq_msg_t fMsg;
};
class Socket final : public fair::mq::Socket
{
public:
@ -213,7 +196,7 @@ class Socket final : public fair::mq::Socket
// put it into zmq message
const unsigned int vecSize = msgVec.size();
ZMsg zmqMsg(vecSize * sizeof(MetaHeader));
zmq::ZMsg zmqMsg(vecSize * sizeof(MetaHeader));
// prepare the message with shm metas
MetaHeader* metas = static_cast<MetaHeader*>(zmqMsg.Data());
@ -269,7 +252,7 @@ class Socket final : public fair::mq::Socket
}
int elapsed = 0;
ZMsg zmqMsg;
zmq::ZMsg zmqMsg;
while (true) {
int64_t totalSize = 0;

92
fairmq/zeromq/ZMsg.h Normal file
View File

@ -0,0 +1,92 @@
/********************************************************************************
* Copyright (C) 2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIR_MQ_ZMQ_ZMSG_H
#define FAIR_MQ_ZMQ_ZMSG_H
#include <cstddef> // for std::size_t
#include <fairmq/Error.h> // for assertm
#include <new> // for std::bad_alloc
#include <zmq.h> // for zmq_*
namespace fair::mq::zmq {
// Wraps a `zmq_msg_t` C object as a C++ type:
// * `zmq_msg_init` -> C++ default ctor
// * `zmq_msg_init_size` -> C++ ctor
// * `zmq_msg_init_data` -> C++ ctor
// * `zmq_msg_init_size` + `memcpy` -> C++ copy ctor
// * `zmq_msg_close` + `zmq_msg_init_size` + `memcpy` -> C++ copy assignment
// * `zmq_msg_init` + `zmq_msg_move`` -> C++ move ctor
// * `zmq_msg_move` -> C++ move assignment
// * `zmq_msg_close` -> C++ dtor
// * access the underlying `zmq_msg_t` via `Msg() [const] -> zmq_msg_t*`
// the const overload does a `const_cast<zmq_msg_t*>`, because the
// C interfaces do not model constness
// * `zmq_msg_data` -> `Data() -> void*`
// * `zmq_msg_size` -> `Size() -> std::size_t`
struct ZMsg
{
ZMsg() noexcept
{
[[maybe_unused]] auto const rc = zmq_msg_init(Msg());
assertm(rc == 0, "msg init successful"); // NOLINT
}
explicit ZMsg(std::size_t size)
{
auto const rc = zmq_msg_init_size(Msg(), size);
if (rc == -1) {
throw std::bad_alloc{};
}
}
explicit ZMsg(void* data,
std::size_t size,
zmq_free_fn* freefn = nullptr,
void* hint = nullptr)
{
auto const rc = zmq_msg_init_data(Msg(), data, size, freefn, hint);
if (rc == -1) {
throw std::bad_alloc{};
}
}
~ZMsg() noexcept
{
[[maybe_unused]] auto const rc = zmq_msg_close(Msg());
assertm(rc == 0, "msg close successful"); // NOLINT
}
ZMsg(const ZMsg& other) = delete;
ZMsg(ZMsg&& other) noexcept
{
[[maybe_unused]] auto rc = zmq_msg_init(Msg());
assertm(rc == 0, "msg init successful"); // NOLINT
rc = zmq_msg_move(Msg(), other.Msg());
assertm(rc == 0, "msg move successful"); // NOLINT
}
ZMsg& operator=(const ZMsg& rhs) = delete;
ZMsg& operator=(ZMsg&& rhs) noexcept
{
[[maybe_unused]] auto const rc = zmq_msg_move(Msg(), rhs.Msg());
assertm(rc == 0, "msg move successful"); // NOLINT
return *this;
}
zmq_msg_t* Msg() noexcept { return &fMsg; }
zmq_msg_t* Msg() const noexcept
{
return const_cast<zmq_msg_t*>(&fMsg); // NOLINT(cppcoreguidelines-pro-type-const-cast)
}
void* Data() const noexcept { return zmq_msg_data(Msg()); }
std::size_t Size() const noexcept { return zmq_msg_size(Msg()); }
private:
zmq_msg_t fMsg{};
};
} // namespace fair::mq::zmq
#endif /* FAIR_MQ_ZMQ_ZMSG_H */