Compare commits

...

17 Commits

Author SHA1 Message Date
Giulio Eulisse
3c1723fc54 Allow sorting StateChange callbacks
If the key of the callback is a number, it will be used to invoke
callbacks with the correct ordering.
2023-09-06 09:49:30 +02:00
Christian Tacke
c3418cc7b8 chore: Run meta_update.py 2023-08-08 16:26:10 +02:00
Christian Tacke
cc00c5a6f1 docs: Add "readme" field to codemeta
A link to an introduction for people who are not experts in
the field.
2023-08-08 16:26:10 +02:00
Christian Tacke
e6bb14f535 ci: Check codemeta.json
Use the eOSSR tooling to validate our codemeta.json file
2023-08-08 16:26:10 +02:00
Christian Tacke
b18d60372c codemeta: Add GSI as "maintainer"
GSI is providing resources for maintaining FairMQ. So let's
document this in codemeta.json.
2023-08-08 16:21:48 +02:00
Giulio Eulisse
7ceccdeaa6 Print actual address we are trying to bind. 2023-06-29 12:28:23 +02:00
Dennis Klein
d1c99f7e15 ci: Update build matrix 2023-06-26 11:56:24 +02:00
Dennis Klein
bfc665d76e feat: Make the channel AutoBind default configurable 2023-06-26 11:56:24 +02:00
Dennis Klein
42d27af20f docs: Update install commands 2023-06-13 22:43:52 +02:00
Alexey Rybalchenko
25614e3e06 test: Add coverage for --shm-metadata-msg-size 2023-06-13 21:24:40 +02:00
Alexey Rybalchenko
3decac58fc test: Add data transfer and checks to protocol tests 2023-06-13 21:24:40 +02:00
Dennis Klein
f278e7e312 feat: Add new tunable --shm-metadata-msg-size
The shm metadata msg will be right-padded to the given size. This
tunable may be used to saturate the kernel msg buffers more quickly with
the effect that the ZeroMQ message queue size - on which the FairMQ
shmem transport relies upon - behaves more accurately for very small
queue sizes.

This introduces a change for the meta msg format in the multipart case:
old: | MetaHeader 1 | ... | MetaHeader n |
new: | n | MetaHeader 1 | ... | MetaHeader n | padded to fMetadataMsgSize |
where `n` is a `size_t` and contains the number of following meta headers.
Previously, this number was infered from the msg buffer size itself which is
no longer possible due to the potential padding.

Implements #432
2023-06-13 21:24:40 +02:00
Dennis Klein
491a943c63 feat: Use zmq_msg_send for single message Send 2023-06-13 21:24:40 +02:00
Dennis Klein
c47fc6f9fe feat: Move ZMsg to fair::mq::zmq
Implement move semantics.
2023-06-13 21:24:40 +02:00
Giulio Eulisse
7b259afdb5 Fix -Wunqualified-std-cast-call 2023-06-13 21:24:40 +02:00
Dennis Klein
33ddcaad5e docs: Add repology badge 2023-04-05 15:19:05 +02:00
Dennis Klein
4d5dbedeab build: Add spack develop environment 2023-03-23 14:14:08 +01:00
27 changed files with 419 additions and 121 deletions

View File

@@ -0,0 +1,21 @@
name: validate codemeta
on:
push:
paths:
- codemeta.json
- .github/workflows/codemeta_validate.yaml
pull_request:
paths:
- codemeta.json
- .github/workflows/codemeta_validate.yaml
jobs:
build:
runs-on: ubuntu-latest
container:
image: gitlab-registry.in2p3.fr/escape2020/wp3/eossr:v1.0
steps:
- uses: actions/checkout@v3
- name: validate codemeta
run: eossr-metadata-validator codemeta.json

2
.gitignore vendored
View File

@@ -4,3 +4,5 @@ install
.vscode
/compile_commands.json
.cache
.spack-env
spack.lock

View File

@@ -1,6 +1,7 @@
{
"creators": [
{
"orcid": "0000-0002-8071-4497",
"name": "Al-Turany, Mohammad"
},
{

4
Jenkinsfile vendored
View File

@@ -93,8 +93,8 @@ pipeline{
[os: 'fedora', ver: '36', arch: 'x86_64', compiler: 'gcc-12'],
[os: 'fedora', ver: '37', arch: 'x86_64', compiler: 'gcc-12'],
[os: 'fedora', ver: '38', arch: 'x86_64', compiler: 'gcc-13'],
[os: 'macos', ver: '12', arch: 'x86_64', compiler: 'apple-clang-13', extra: '-DHAS_ASIO=ON'],
[os: 'macos', ver: '12', arch: 'arm64', compiler: 'apple-clang-13', extra: '-DHAS_ASIO=ON'],
[os: 'macos', ver: '12', arch: 'x86_64', compiler: 'apple-clang-14'],
[os: 'macos', ver: '13', arch: 'arm64', compiler: 'apple-clang-14'],
])
def all_debug = "-DCMAKE_BUILD_TYPE=Debug"

View File

@@ -5,6 +5,7 @@
[![DOI](https://zenodo.org/badge/DOI/10.5281/zenodo.1689985.svg)](https://doi.org/10.5281/zenodo.1689985)
[![OpenSSF Best Practices](https://bestpractices.coreinfrastructure.org/projects/6915/badge)](https://bestpractices.coreinfrastructure.org/projects/6915)
[![fair-software.eu](https://img.shields.io/badge/fair--software.eu-%E2%97%8F%20%20%E2%97%8F%20%20%E2%97%8B%20%20%E2%97%8F%20%20%E2%97%8F-yellow)](https://github.com/FairRootGroup/FairMQ/actions/workflows/fair-software.yml)
[![Spack package](https://repology.org/badge/version-for-repo/spack/fairmq.svg)](https://repology.org/project/fairmq/versions)
C++ Message Queuing Library and Framework
@@ -44,10 +45,10 @@ Recommended:
```bash
git clone https://github.com/FairRootGroup/FairMQ fairmq_source
cmake -S fairmq_source -B fairmq_build -GNinja -DCMAKE_BUILD_TYPE=Release -DCMAKE_INSTALL_PREFIX=fairmq_install
cmake -S fairmq_source -B fairmq_build -GNinja -DCMAKE_BUILD_TYPE=Release
cmake --build fairmq_build
cmake --build fairmq_build --target test
cmake --build fairmq_build --target install
ctest --test-dir fairmq_build --output-on-failure --schedule-random -j<ncpus>
cmake --install fairmq_build --prefix $(pwd)/fairmq_install
```
Please consult the [manpages of your CMake version](https://cmake.org/cmake/help/latest/manual/cmake.1.html) for more options.
@@ -55,6 +56,7 @@ Please consult the [manpages of your CMake version](https://cmake.org/cmake/help
If dependencies are not installed in standard system directories, you can hint the installation location via
`-DCMAKE_PREFIX_PATH=...` or per dependency via `-D{DEPENDENCY}_ROOT=...` (`*_ROOT` variables can also be environment variables).
## Usage
FairMQ ships as a CMake package, so in your `CMakeLists.txt` you can discover it like this:
@@ -96,6 +98,7 @@ On command line:
* `-DBUILD_TESTING=OFF` disables building of tests.
* `-DBUILD_EXAMPLES=OFF` disables building of examples.
* `-DBUILD_DOCS=ON` enables building of API docs.
* `-DFAIRMQ_CHANNEL_DEFAULT_AUTOBIND=OFF` disable channel `autoBind` by default
* You can hint non-system installations for dependent packages, see the #installation-from-source section above
After the `find_package(FairMQ)` call the following CMake variables are defined:

View File

@@ -96,7 +96,10 @@ endmacro()
macro(fairmq_summary_compile_definitions)
message(STATUS " ")
message(STATUS " ${Cyan}COMPILE DEFINITION VALUE${CR}")
message(STATUS " ${BWhite}FAIRMQ_HAS_STD_FILESYSTEM${CR} ${FAIRMQ_HAS_STD_FILESYSTEM} (overridable with ${BMagenta}-DFAIRMQ_HAS_STD_FILESYSTEM=0|1${CR})")
message(STATUS " ${BWhite}FAIRMQ_HAS_STD_PMR${CR} ${FAIRMQ_HAS_STD_PMR} (overridable with ${BMagenta}-DFAIRMQ_HAS_STD_PMR=0|1${CR})")
message(STATUS " ${Cyan}COMPILE DEFINITION VALUE${CR}")
message(STATUS " ${BWhite}FAIRMQ_HAS_STD_FILESYSTEM${CR} ${FAIRMQ_HAS_STD_FILESYSTEM} (overridable with ${BMagenta}-DFAIRMQ_HAS_STD_FILESYSTEM=0|1${CR})")
message(STATUS " ${BWhite}FAIRMQ_HAS_STD_PMR${CR} ${FAIRMQ_HAS_STD_PMR} (overridable with ${BMagenta}-DFAIRMQ_HAS_STD_PMR=0|1${CR})")
if(DEFINED FAIRMQ_CHANNEL_DEFAULT_AUTOBIND)
message(STATUS " ${BWhite}FAIRMQ_CHANNEL_DEFAULT_AUTOBIND${CR} ${FAIRMQ_CHANNEL_DEFAULT_AUTOBIND}")
endif()
endmacro()

View File

@@ -7,11 +7,20 @@
"datePublished": "2018-04-15",
"developmentStatus": "active",
"codeRepository": "https://github.com/FairRootGroup/FairMQ/",
"readme": "https://github.com/FairRootGroup/FairMQ/#readme",
"issueTracker": "https://github.com/FairRootGroup/FairMQ/issues",
"identifier": "https://doi.org/10.5281/zenodo.1689985",
"maintainer": [
{
"@type": "ResearchOrganisation",
"@id": "https://ror.org/02k8cbn47",
"name": "GSI Helmholtz Centre for Heavy Ion Research"
}
],
"author": [
{
"@type": "Person",
"@id": "https://orcid.org/0000-0002-8071-4497",
"givenName": "Mohammad",
"familyName": "Al-Turany"
},

View File

@@ -54,7 +54,7 @@ struct Receiver : fair::mq::Device
fBuffer[h.id].start = chrono::steady_clock::now();
}
// if the received ID has not previously been discarded, store the data part in the buffer
fBuffer[h.id].parts.AddPart(move(parts.At(1)));
fBuffer[h.id].parts.AddPart(std::move(parts.At(1)));
} else {
// if received ID has been previously discarded.
LOG(debug) << "Received part from an already discarded timeframe with id " << h.id;

View File

@@ -1,5 +1,5 @@
################################################################################
# Copyright (C) 2012-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# Copyright (C) 2012-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# #
# This software is distributed under the terms of the #
# GNU Lesser General Public Licence (LGPL) version 3, #
@@ -108,6 +108,7 @@ if(BUILD_FAIRMQ)
zeromq/UnmanagedRegion.h
zeromq/Socket.h
zeromq/TransportFactory.h
zeromq/ZMsg.h
)
##########################
@@ -173,6 +174,15 @@ if(BUILD_FAIRMQ)
FAIRMQ_HAS_STD_FILESYSTEM=${FAIRMQ_HAS_STD_FILESYSTEM}
FAIRMQ_HAS_STD_PMR=${FAIRMQ_HAS_STD_PMR}
)
if(DEFINED FAIRMQ_CHANNEL_DEFAULT_AUTOBIND)
# translate CMake boolean (TRUE, FALSE, 0, 1, OFF, ON) into C++ boolean literal (true, false)
if(FAIRMQ_CHANNEL_DEFAULT_AUTOBIND)
set(value "true")
else()
set(value "false")
endif()
target_compile_definitions(${target} PUBLIC FAIRMQ_CHANNEL_DEFAULT_AUTOBIND=${value})
endif()
#######################

View File

@@ -379,7 +379,11 @@ class Channel
static constexpr int DefaultRateLogging = 1;
static constexpr int DefaultPortRangeMin = 22000;
static constexpr int DefaultPortRangeMax = 23000;
#ifdef FAIRMQ_CHANNEL_DEFAULT_AUTOBIND
static constexpr bool DefaultAutoBind = FAIRMQ_CHANNEL_DEFAULT_AUTOBIND;
#else
static constexpr bool DefaultAutoBind = true;
#endif
friend std::ostream& operator<<(std::ostream& os, const Channel& ch)
{
@@ -425,10 +429,10 @@ class Channel
msg.get()
));
msg.release();
msg = move(msgWrapper);
msg = std::move(msgWrapper);
} else {
MessagePtr newMsg(NewMessage());
msg = move(newMsg);
msg = std::move(newMsg);
}
}
}
@@ -446,10 +450,10 @@ class Channel
msg.get()
));
msg.release();
msg = move(msgWrapper);
msg = std::move(msgWrapper);
} else {
MessagePtr newMsg(NewMessage());
msg = move(newMsg);
msg = std::move(newMsg);
}
}
}
@@ -459,7 +463,7 @@ class Channel
{
if (fTransportType != msg->GetType()) {
MessagePtr newMsg(NewMessage());
msg = move(newMsg);
msg = std::move(newMsg);
}
}
@@ -470,7 +474,7 @@ class Channel
if (fTransportType != msg->GetType()) {
MessagePtr newMsg(NewMessage());
msg = move(newMsg);
msg = std::move(newMsg);
}
}
}

View File

@@ -288,7 +288,7 @@ void Device::AttachChannels(vector<Channel*>& chans)
// remove the channel from the uninitialized container
itr = chans.erase(itr);
} else {
LOG(error) << "failed to attach channel " << (*itr)->fName << " (" << (*itr)->fMethod << ")";
LOG(error) << "failed to attach channel " << (*itr)->fName << " (" << (*itr)->fMethod << " on " << (*itr)->fAddress << ")";
++itr;
}
} else {

View File

@@ -310,7 +310,9 @@ try {
void StateMachine::SubscribeToStateChange(const string& key, function<void(const State)> callback)
{
static_pointer_cast<FairMQFSM>(fFsm)->fStateChangeSignalsMap.insert({key, static_pointer_cast<FairMQFSM>(fFsm)->fStateChangeSignal.connect(callback)});
// Check if the key has a integer value as prefix, if yes, decode it.
int i = strtol(key.c_str(), nullptr, 10);
static_pointer_cast<FairMQFSM>(fFsm)->fStateChangeSignalsMap.insert({key, static_pointer_cast<FairMQFSM>(fFsm)->fStateChangeSignal.connect(i, callback)});
}
void StateMachine::UnsubscribeFromStateChange(const string& key)

View File

@@ -72,6 +72,7 @@ SAMPLER+=" --msg-size $msgSize"
SAMPLER+=" --multipart $multipart"
SAMPLER+=" --num-parts $numParts"
SAMPLER+=" --shm-throw-bad-alloc false"
# SAMPLER+=" --shm-metadata-msg-size 1024"
# SAMPLER+=" --msg-rate 1000"
SAMPLER+=" --max-iterations $maxIterations"
SAMPLER+=" --channel-config name=data,type=pair,method=bind,address=tcp://127.0.0.1:5555"

View File

@@ -11,6 +11,7 @@
#include <fairmq/JSONParser.h>
#include <fairmq/SuboptParser.h>
#include <cstddef> // for std::size_t
#include <vector>
using namespace std;
@@ -72,6 +73,7 @@ Plugin::ProgOptions ConfigPluginProgramOptions()
("shm-zero-segment", po::value<bool >()->default_value(false), "Shared memory: zero the shared memory segment memory after initialization (opened or created).")
("shm-zero-segment-on-creation", po::value<bool >()->default_value(false), "Shared memory: zero the shared memory segment memory only once when created.")
("shm-throw-bad-alloc", po::value<bool >()->default_value(true), "Shared memory: throw fair::mq::MessageBadAlloc if cannot allocate a message (retry if false).")
("shm-metadata-msg-size", po::value<std::size_t >()->default_value(0), "Shared memory: size of the zmq metadata message (values smaller than minimum are clamped to the minimum).")
("bad-alloc-max-attempts", po::value<int >(), "Maximum number of allocation attempts before throwing fair::mq::MessageBadAlloc. -1 is infinite. There is always at least one attempt, so 0 has safe effect as 1.")
("bad-alloc-attempt-interval", po::value<int >()->default_value(50), "Interval between attempts if cannot allocate a message (in ms).")
("shm-monitor", po::value<bool >()->default_value(false), "Shared memory: run monitor daemon.")

View File

@@ -29,7 +29,7 @@
#include <algorithm> // max
#include <chrono>
#include <condition_variable>
#include <cstddef> // max_align_t
#include <cstddef> // max_align_t, std::size_t
#include <cstdlib> // getenv
#include <cstring> // memcpy
#include <memory> // make_unique
@@ -151,6 +151,7 @@ class Manager
, fBadAllocMaxAttempts(1)
, fBadAllocAttemptIntervalInMs(config ? config->GetProperty<int>("bad-alloc-attempt-interval", 50) : 50)
, fNoCleanup(config ? config->GetProperty<bool>("shm-no-cleanup", false) : false)
, fMetadataMsgSize(config ? config->GetProperty<std::size_t>("shm-metadata-msg-size", 0) : 0)
{
using namespace boost::interprocess;
@@ -828,6 +829,8 @@ class Manager
}
}
auto GetMetadataMsgSize() const noexcept { return fMetadataMsgSize; }
~Manager()
{
fRegionsGen += 1; // signal TL cache invalidation
@@ -884,6 +887,8 @@ class Manager
int fBadAllocMaxAttempts;
int fBadAllocAttemptIntervalInMs;
bool fNoCleanup;
std::size_t fMetadataMsgSize;
};
} // namespace fair::mq::shmem

View File

@@ -1,5 +1,5 @@
/********************************************************************************
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2014-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
@@ -11,18 +11,23 @@
#include "Common.h"
#include "Manager.h"
#include "Message.h"
#include <fairmq/Error.h>
#include <fairmq/Error.h> // for assertm
#include <fairmq/Message.h>
#include <fairmq/Socket.h>
#include <fairmq/tools/Strings.h>
#include <fairmq/zeromq/Common.h>
#include <fairmq/zeromq/Common.h> // for zmq::HandleErrors, zmq::ShouldRetry
#include <fairmq/zeromq/ZMsg.h> // for zmq::ZMsg
#include <fairlogger/Logger.h>
#include <zmq.h>
#include <algorithm> // for std::max
#include <atomic>
#include <memory> // make_unique
#include <cstddef> // for std::size_t
#include <cstring> // for std::memcpy
#include <exception> // for std::terminate
#include <memory> // for std::make_unique
namespace fair::mq {
class TransportFactory;
@@ -31,24 +36,6 @@ namespace fair::mq {
namespace fair::mq::shmem
{
struct ZMsg
{
ZMsg() { int rc __attribute__((unused)) = zmq_msg_init(&fMsg); assert(rc == 0); }
explicit ZMsg(size_t size) { int rc __attribute__((unused)) = zmq_msg_init_size(&fMsg, size); assert(rc == 0); }
~ZMsg() { int rc __attribute__((unused)) = zmq_msg_close(&fMsg); assert(rc == 0); }
ZMsg(const ZMsg&) = delete;
ZMsg(ZMsg&&) = delete;
ZMsg& operator=(const ZMsg&) = delete;
ZMsg& operator=(ZMsg&&) = delete;
void* Data() { return zmq_msg_data(&fMsg); }
size_t Size() { return zmq_msg_size(&fMsg); }
zmq_msg_t* Msg() { return &fMsg; }
zmq_msg_t fMsg;
};
class Socket final : public fair::mq::Socket
{
public:
@@ -64,6 +51,7 @@ class Socket final : public fair::mq::Socket
, fMessagesRx(0)
, fTimeout(100)
, fConnectedPeersCount(0)
, fMetadataMsgSize(manager.GetMetadataMsgSize())
{
assert(context);
@@ -141,8 +129,12 @@ class Socket final : public fair::mq::Socket
}
int elapsed = 0;
// meta msg format: | MetaHeader | padded to fMetadataMsgSize |
zmq::ZMsg zmqMsg(std::max(fMetadataMsgSize, sizeof(MetaHeader)));
std::memcpy(zmqMsg.Data(), &(shmMsg->fMeta), sizeof(MetaHeader));
while (true) {
int nbytes = zmq_send(fSocket, &(shmMsg->fMeta), sizeof(MetaHeader), flags);
int nbytes = zmq_msg_send(zmqMsg.Msg(), fSocket, flags);
if (nbytes > 0) {
shmMsg->fQueued = true;
++fMessagesTx;
@@ -178,11 +170,11 @@ class Socket final : public fair::mq::Socket
int nbytes = zmq_recv(fSocket, &(shmMsg->fMeta), sizeof(MetaHeader), flags);
if (nbytes > 0) {
// check for number of received messages. must be 1
if (nbytes != sizeof(MetaHeader)) {
if (static_cast<std::size_t>(nbytes) < sizeof(MetaHeader)) {
throw SocketError(
tools::ToString("Received message is not a valid FairMQ shared memory message. ",
"Possibly due to a misconfigured transport on the sender side. ",
"Expected size of ", sizeof(MetaHeader), " bytes, received ", nbytes));
"Expected minimum size of ", sizeof(MetaHeader), " bytes, received ", nbytes));
}
size_t size = shmMsg->GetSize();
@@ -211,13 +203,14 @@ class Socket final : public fair::mq::Socket
}
int elapsed = 0;
// put it into zmq message
const unsigned int vecSize = msgVec.size();
ZMsg zmqMsg(vecSize * sizeof(MetaHeader));
// prepare the message with shm metas
MetaHeader* metas = static_cast<MetaHeader*>(zmqMsg.Data());
// meta msg format: | n | MetaHeader 1 | ... | MetaHeader n | padded to fMetadataMsgSize |
auto const n = msgVec.size();
zmq::ZMsg zmqMsg(std::max(fMetadataMsgSize, sizeof(std::size_t) + n * sizeof(MetaHeader)));
auto meta_n = static_cast<std::size_t*>(zmqMsg.Data());
*meta_n = n;
++meta_n;
auto metas = static_cast<MetaHeader*>(static_cast<void*>(meta_n));
for (auto& msg : msgVec) {
auto msgPtr = msg.get();
if (!msgPtr) {
@@ -232,7 +225,7 @@ class Socket final : public fair::mq::Socket
int64_t totalSize = 0;
int nbytes = zmq_msg_send(zmqMsg.Msg(), fSocket, flags);
if (nbytes > 0) {
assert(static_cast<unsigned int>(nbytes) == (vecSize * sizeof(MetaHeader))); // all or nothing
assert(static_cast<unsigned int>(nbytes) >= sizeof(std::size_t) + (n * sizeof(MetaHeader)));
for (auto& msg : msgVec) {
Message* shmMsg = static_cast<Message*>(msg.get());
@@ -269,29 +262,26 @@ class Socket final : public fair::mq::Socket
}
int elapsed = 0;
ZMsg zmqMsg;
zmq::ZMsg zmqMsg;
while (true) {
int64_t totalSize = 0;
std::size_t totalSize = 0;
int nbytes = zmq_msg_recv(zmqMsg.Msg(), fSocket, flags);
if (nbytes > 0) {
MetaHeader* hdrVec = static_cast<MetaHeader*>(zmqMsg.Data());
const auto hdrVecSize = zmqMsg.Size();
[[maybe_unused]] auto const size = zmqMsg.Size();
assert(size > sizeof(std::size_t));
auto meta_n = static_cast<std::size_t*>(zmqMsg.Data());
auto const n = *meta_n;
assert(size >= sizeof(std::size_t) + n * sizeof(MetaHeader));
++meta_n;
auto metas = static_cast<MetaHeader*>(static_cast<void*>(meta_n));
msgVec.reserve(msgVec.size() + n);
auto const transport = GetTransport();
assert(hdrVecSize > 0);
if (hdrVecSize % sizeof(MetaHeader) != 0) {
throw SocketError(
tools::ToString("Received message is not a valid FairMQ shared memory message. ",
"Possibly due to a misconfigured transport on the sender side. ",
"Expected size of ", sizeof(MetaHeader), " bytes, received ", nbytes));
}
const auto numMessages = hdrVecSize / sizeof(MetaHeader);
msgVec.reserve(numMessages);
for (size_t m = 0; m < numMessages; m++) {
// create new message (part)
msgVec.emplace_back(std::make_unique<Message>(fManager, hdrVec[m], GetTransport()));
for (std::size_t i = 0; i < n; ++i) {
msgVec.push_back(std::make_unique<Message>(fManager, *metas, transport));
++metas;
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-static-cast-downcast)
Message* shmMsg = static_cast<Message*>(msgVec.back().get());
totalSize += shmMsg->GetSize();
}
@@ -469,6 +459,7 @@ class Socket final : public fair::mq::Socket
int fTimeout;
mutable unsigned long fConnectedPeersCount;
std::size_t fMetadataMsgSize;
};
} // namespace fair::mq::shmem

92
fairmq/zeromq/ZMsg.h Normal file
View File

@@ -0,0 +1,92 @@
/********************************************************************************
* Copyright (C) 2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIR_MQ_ZMQ_ZMSG_H
#define FAIR_MQ_ZMQ_ZMSG_H
#include <cstddef> // for std::size_t
#include <fairmq/Error.h> // for assertm
#include <new> // for std::bad_alloc
#include <zmq.h> // for zmq_*
namespace fair::mq::zmq {
// Wraps a `zmq_msg_t` C object as a C++ type:
// * `zmq_msg_init` -> C++ default ctor
// * `zmq_msg_init_size` -> C++ ctor
// * `zmq_msg_init_data` -> C++ ctor
// * `zmq_msg_init_size` + `memcpy` -> C++ copy ctor
// * `zmq_msg_close` + `zmq_msg_init_size` + `memcpy` -> C++ copy assignment
// * `zmq_msg_init` + `zmq_msg_move`` -> C++ move ctor
// * `zmq_msg_move` -> C++ move assignment
// * `zmq_msg_close` -> C++ dtor
// * access the underlying `zmq_msg_t` via `Msg() [const] -> zmq_msg_t*`
// the const overload does a `const_cast<zmq_msg_t*>`, because the
// C interfaces do not model constness
// * `zmq_msg_data` -> `Data() -> void*`
// * `zmq_msg_size` -> `Size() -> std::size_t`
struct ZMsg
{
ZMsg() noexcept
{
[[maybe_unused]] auto const rc = zmq_msg_init(Msg());
assertm(rc == 0, "msg init successful"); // NOLINT
}
explicit ZMsg(std::size_t size)
{
auto const rc = zmq_msg_init_size(Msg(), size);
if (rc == -1) {
throw std::bad_alloc{};
}
}
explicit ZMsg(void* data,
std::size_t size,
zmq_free_fn* freefn = nullptr,
void* hint = nullptr)
{
auto const rc = zmq_msg_init_data(Msg(), data, size, freefn, hint);
if (rc == -1) {
throw std::bad_alloc{};
}
}
~ZMsg() noexcept
{
[[maybe_unused]] auto const rc = zmq_msg_close(Msg());
assertm(rc == 0, "msg close successful"); // NOLINT
}
ZMsg(const ZMsg& other) = delete;
ZMsg(ZMsg&& other) noexcept
{
[[maybe_unused]] auto rc = zmq_msg_init(Msg());
assertm(rc == 0, "msg init successful"); // NOLINT
rc = zmq_msg_move(Msg(), other.Msg());
assertm(rc == 0, "msg move successful"); // NOLINT
}
ZMsg& operator=(const ZMsg& rhs) = delete;
ZMsg& operator=(ZMsg&& rhs) noexcept
{
[[maybe_unused]] auto const rc = zmq_msg_move(Msg(), rhs.Msg());
assertm(rc == 0, "msg move successful"); // NOLINT
return *this;
}
zmq_msg_t* Msg() noexcept { return &fMsg; }
zmq_msg_t* Msg() const noexcept
{
return const_cast<zmq_msg_t*>(&fMsg); // NOLINT(cppcoreguidelines-pro-type-const-cast)
}
void* Data() const noexcept { return zmq_msg_data(Msg()); }
std::size_t Size() const noexcept { return zmq_msg_size(Msg()); }
private:
zmq_msg_t fMsg{};
};
} // namespace fair::mq::zmq
#endif /* FAIR_MQ_ZMQ_ZMSG_H */

10
spack.yaml Normal file
View File

@@ -0,0 +1,10 @@
spack:
specs:
- boost+container+program_options+filesystem+date_time+regex
- faircmakemodules
- fairlogger+pretty
- fmt
- libzmq
view: true
concretizer:
unify: true

View File

@@ -28,10 +28,26 @@ class Pull : public Device
auto Run() -> void override
{
auto msg = NewMessage();
int counter = 0;
if (Receive(msg, "data") >= 0)
{
auto msg1 = NewMessageFor("data", 0);
if (Receive(msg1, "data") >= 0) {
++counter;
}
auto msg2 = NewMessageFor("data", 0);
auto ret = Receive(msg2, "data");
if (ret >= 0) {
auto content = std::string{static_cast<char*>(msg2->GetData()), msg2->GetSize()};
LOG(info) << "Transferred " << static_cast<std::size_t>(ret) << " bytes, msg size: " << msg2->GetSize() << ", content: " << content;
if (msg2->GetSize() == static_cast<std::size_t>(ret) && content == "testdata1234") {
++counter;
}
}
if (counter == 2) {
LOG(info) << "PUSH-PULL test successfull";
}
};

View File

@@ -25,8 +25,12 @@ class Push : public Device
auto Run() -> void override
{
auto msg = NewMessage();
Send(msg, "data");
// empty message
auto msg1 = NewMessageFor("data", 0);
Send(msg1, "data");
// message with short text data
auto msg2(NewSimpleMessageFor("data", 0, "testdata1234"));
Send(msg2, "data");
};
};

View File

@@ -1,5 +1,5 @@
/********************************************************************************
* Copyright (C) 2015-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2015-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
@@ -24,22 +24,40 @@ class Rep : public Device
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
bool check(signed long ret, const fair::mq::MessagePtr& msg)
{
auto content = std::string{static_cast<char*>(msg->GetData()), msg->GetSize()};
LOG(info) << "Transferred " << static_cast<std::size_t>(ret) << " bytes, msg size: " << msg->GetSize() << ", content: " << content;
return msg->GetSize() == static_cast<std::size_t>(ret) && content == "request";
}
auto Run() -> void override
{
auto request1 = NewMessage();
if (Receive(request1, "data") >= 0) {
int counter = 0;
auto req1 = NewMessage();
auto ret1 = Receive(req1, "data");
if (ret1 >= 0) {
LOG(info) << "Received request 1";
auto reply = NewMessage();
if (check(ret1, req1)) {
++counter;
}
auto reply = NewSimpleMessageFor("data", 0, "reply");
Send(reply, "data");
}
auto request2 = NewMessage();
if (Receive(request2, "data") >= 0) {
auto req2 = NewMessage();
auto ret2 = Receive(req2, "data");
if (ret2 >= 0) {
LOG(info) << "Received request 2";
auto reply = NewMessage();
if (check(ret2, req2)) {
++counter;
}
auto reply = NewSimpleMessageFor("data", 0, "reply");
Send(reply, "data");
}
LOG(info) << "REQ-REP test successfull";
if (counter == 2) {
LOG(info) << "REQ-REP test successfull";
}
};
};

View File

@@ -1,5 +1,5 @@
/********************************************************************************
* Copyright (C) 2015-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2015-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
@@ -26,12 +26,17 @@ class Req : public Device
auto Run() -> void override
{
auto request = NewMessage();
auto request = NewSimpleMessageFor("data", 0, "request");
Send(request, "data");
auto reply = NewMessage();
if (Receive(reply, "data") >= 0) {
LOG(info) << "received reply";
auto content = std::string{static_cast<char*>(reply->GetData()), reply->GetSize()};
LOG(info) << "Transferred reply of size: " << reply->GetSize() << ", content: " << content;
if (content != "reply") {
ChangeStateOrThrow(Transition::ErrorFound);
}
}
};
};

View File

@@ -37,12 +37,15 @@ auto AsStringView(Message const& msg) -> string_view
return {static_cast<char const*>(msg.GetData()), msg.GetSize()};
}
auto RunPushPullWithMsgResize(string const & transport, string const & _address) -> void
auto RunPushPullWithMsgResize(string const & transport, string const & _address, bool expandedShmMetadata = false) -> void
{
ProgOptions config;
config.SetProperty<string>("session", tools::Uuid());
config.SetProperty<size_t>("shm-segment-size", 100000000);
config.SetProperty<bool>("shm-monitor", true);
if (expandedShmMetadata) {
config.SetProperty<size_t>("shm-metadata-msg-size", 2048);
}
auto factory(TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config));
Channel push{"Push", "push", factory};
@@ -100,12 +103,15 @@ auto RunPushPullWithMsgResize(string const & transport, string const & _address)
}
}
auto RunMsgRebuild(const string& transport) -> void
auto RunMsgRebuild(const string& transport, bool expandedShmMetadata = false) -> void
{
ProgOptions config;
config.SetProperty<string>("session", tools::Uuid());
config.SetProperty<size_t>("shm-segment-size", 100000000);
config.SetProperty<bool>("shm-monitor", true);
if (expandedShmMetadata) {
config.SetProperty<size_t>("shm-metadata-msg-size", 2048);
}
auto factory(TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config));
size_t const msgSize{100};
@@ -134,12 +140,15 @@ auto CheckMsgAlignment(Message const& msg, fair::mq::Alignment alignment) -> boo
return (reinterpret_cast<uintptr_t>(msg.GetData()) % static_cast<size_t>(alignment)) == 0; // NOLINT
}
auto RunPushPullWithAlignment(string const& transport, string const& _address) -> void
auto RunPushPullWithAlignment(string const& transport, string const& _address, bool expandedShmMetadata = false) -> void
{
ProgOptions config;
config.SetProperty<string>("session", tools::Uuid());
config.SetProperty<size_t>("shm-segment-size", 100000000);
config.SetProperty<bool>("shm-monitor", true);
if (expandedShmMetadata) {
config.SetProperty<size_t>("shm-metadata-msg-size", 2048);
}
auto factory(TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config));
Channel push{"Push", "push", factory};
@@ -189,12 +198,15 @@ auto RunPushPullWithAlignment(string const& transport, string const& _address) -
ASSERT_TRUE(CheckMsgAlignment(*msgCopy, align32));
}
auto EmptyMessage(string const& transport, string const& _address) -> void
auto EmptyMessage(string const& transport, string const& _address, bool expandedShmMetadata = false) -> void
{
ProgOptions config;
config.SetProperty<string>("session", tools::Uuid());
config.SetProperty<size_t>("shm-segment-size", 100000000);
config.SetProperty<bool>("shm-monitor", true);
if (expandedShmMetadata) {
config.SetProperty<size_t>("shm-metadata-msg-size", 2048);
}
auto factory(TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config));
Channel push{"Push", "push", factory};
@@ -241,12 +253,15 @@ auto EmptyMessage(string const& transport, string const& _address) -> void
// The "zero copy" property of the Copy() method is an implementation detail and is not guaranteed.
// Currently it holds true for the shmem (across devices) and for zeromq (within same device) transports.
auto ZeroCopy() -> void
auto ZeroCopy(bool expandedShmMetadata = false) -> void
{
ProgOptions config;
config.SetProperty<string>("session", tools::Uuid());
config.SetProperty<size_t>("shm-segment-size", 100000000);
config.SetProperty<bool>("shm-monitor", true);
if (expandedShmMetadata) {
config.SetProperty<size_t>("shm-metadata-msg-size", 2048);
}
auto factory(TransportFactory::CreateTransportFactory("shmem", tools::Uuid(), &config));
unique_ptr<string> str(make_unique<string>("asdf"));
@@ -272,7 +287,7 @@ auto ZeroCopy() -> void
// The "zero copy" property of the Copy() method is an implementation detail and is not guaranteed.
// Currently it holds true for the shmem (across devices) and for zeromq (within same device) transports.
auto ZeroCopyFromUnmanaged(string const& address) -> void
auto ZeroCopyFromUnmanaged(string const& address, bool expandedShmMetadata = false) -> void
{
ProgOptions config1;
ProgOptions config2;
@@ -285,6 +300,12 @@ auto ZeroCopyFromUnmanaged(string const& address) -> void
config2.SetProperty<bool>("shm-monitor", true);
// ref counts should be accessible accross different segments
config2.SetProperty<uint16_t>("shm-segment-id", 2);
if (expandedShmMetadata) {
config1.SetProperty<size_t>("shm-metadata-msg-size", 2048);
}
if (expandedShmMetadata) {
config2.SetProperty<size_t>("shm-metadata-msg-size", 2048);
}
auto factory1(TransportFactory::CreateTransportFactory("shmem", tools::Uuid(), &config1));
auto factory2(TransportFactory::CreateTransportFactory("shmem", tools::Uuid(), &config2));
@@ -378,6 +399,11 @@ TEST(Resize, shmem) // NOLINT
RunPushPullWithMsgResize("shmem", "ipc://test_message_resize");
}
TEST(Resize, shmem_expanded_metadata) // NOLINT
{
RunPushPullWithMsgResize("shmem", "ipc://test_message_resize", true);
}
TEST(Rebuild, zeromq) // NOLINT
{
RunMsgRebuild("zeromq");
@@ -388,11 +414,21 @@ TEST(Rebuild, shmem) // NOLINT
RunMsgRebuild("shmem");
}
TEST(Rebuild, shmem_expanded_metadata) // NOLINT
{
RunMsgRebuild("shmem", true);
}
TEST(Alignment, shmem) // NOLINT
{
RunPushPullWithAlignment("shmem", "ipc://test_message_alignment");
}
TEST(Alignment, shmem_expanded_metadata) // NOLINT
{
RunPushPullWithAlignment("shmem", "ipc://test_message_alignment", true);
}
TEST(Alignment, zeromq) // NOLINT
{
RunPushPullWithAlignment("zeromq", "ipc://test_message_alignment");
@@ -408,14 +444,29 @@ TEST(EmptyMessage, shmem) // NOLINT
EmptyMessage("shmem", "ipc://test_empty_message");
}
TEST(EmptyMessage, shmem_expanded_metadata) // NOLINT
{
EmptyMessage("shmem", "ipc://test_empty_message", true);
}
TEST(ZeroCopy, shmem) // NOLINT
{
ZeroCopy();
}
TEST(ZeroCopy, shmem_expanded_metadata) // NOLINT
{
ZeroCopy(true);
}
TEST(ZeroCopyFromUnmanaged, shmem) // NOLINT
{
ZeroCopyFromUnmanaged("ipc://test_zerocopy_unmanaged");
}
TEST(ZeroCopyFromUnmanaged, shmem_expanded_metadata) // NOLINT
{
ZeroCopyFromUnmanaged("ipc://test_zerocopy_unmanaged", true);
}
} // namespace

View File

@@ -22,7 +22,7 @@ using namespace std;
using namespace fair::mq::test;
using namespace fair::mq::tools;
auto RunPair(string transport) -> void
auto RunPair(const string& transport, const string& extraDeviceCmdArgs) -> void
{
size_t session{fair::mq::tools::UuidHash()};
string ipcFile("/tmp/fmq_" + to_string(session) + "_data_" + transport);
@@ -38,6 +38,7 @@ auto RunPair(string transport) -> void
<< " --shm-segment-size 100000000"
<< " --session " << session
<< " --color false"
<< extraDeviceCmdArgs
<< " --channel-config name=data,type=pair,method=bind,address=" << address;
pairleft = execute(cmd.str(), "[PAIR L]");
});
@@ -52,6 +53,7 @@ auto RunPair(string transport) -> void
<< " --shm-segment-size 100000000"
<< " --session " << session
<< " --color false"
<< extraDeviceCmdArgs
<< " --channel-config name=data,type=pair,method=connect,address=" << address;
pairright = execute(cmd.str(), "[PAIR R]");
});
@@ -65,14 +67,19 @@ auto RunPair(string transport) -> void
exit(pairleft.exit_code + pairright.exit_code);
}
TEST(Pair, SingleMsg_MP_tcp_zeromq)
TEST(Pair, SingleMsg_MultiThreaded_tcp_zeromq)
{
EXPECT_EXIT(RunPair("zeromq"), ::testing::ExitedWithCode(0), "PAIR test successfull");
EXPECT_EXIT(RunPair("zeromq", ""), ::testing::ExitedWithCode(0), "PAIR test successfull");
}
TEST(Pair, SingleMsg_MP_tcp_shmem)
TEST(Pair, SingleMsg_MultiThreaded_tcp_shmem)
{
EXPECT_EXIT(RunPair("shmem"), ::testing::ExitedWithCode(0), "PAIR test successfull");
EXPECT_EXIT(RunPair("shmem", ""), ::testing::ExitedWithCode(0), "PAIR test successfull");
}
TEST(Pair, SingleMsg_MultiThreaded_tcp_shmem_expanded_metadata)
{
EXPECT_EXIT(RunPair("shmem", " --shm-metadata-msg-size 2048"), ::testing::ExitedWithCode(0), "PAIR test successfull");
}
} // namespace

View File

@@ -22,7 +22,7 @@ using namespace std;
using namespace fair::mq::test;
using namespace fair::mq::tools;
auto RunPushPull(string transport) -> void
auto RunPushPull(string transport, const string& extraDeviceCmdArgs) -> void
{
size_t session(fair::mq::tools::UuidHash());
string ipcFile("/tmp/fmq_" + to_string(session) + "_data_" + transport);
@@ -38,6 +38,7 @@ auto RunPushPull(string transport) -> void
<< " --shm-segment-size 100000000"
<< " --session " << session
<< " --color false"
<< extraDeviceCmdArgs
<< " --channel-config name=data,type=push,method=bind,address=" << address;
push = execute(cmd.str(), "[PUSH]");
});
@@ -52,6 +53,7 @@ auto RunPushPull(string transport) -> void
<< " --shm-segment-size 100000000"
<< " --session " << session
<< " --color false"
<< extraDeviceCmdArgs
<< " --channel-config name=data,type=pull,method=connect,address=" << address;
pull = execute(cmd.str(), "[PULL]");
});
@@ -65,14 +67,19 @@ auto RunPushPull(string transport) -> void
exit(push.exit_code + pull.exit_code);
}
TEST(PushPull, SingleMsg_MP_ipc_zeromq)
TEST(PushPull, SingleMsg_MultiThreaded_ipc_zeromq)
{
EXPECT_EXIT(RunPushPull("zeromq"), ::testing::ExitedWithCode(0), "PUSH-PULL test successfull");
EXPECT_EXIT(RunPushPull("zeromq", ""), ::testing::ExitedWithCode(0), "PUSH-PULL test successfull");
}
TEST(PushPull, SingleMsg_MP_ipc_shmem)
TEST(PushPull, SingleMsg_MultiThreaded_ipc_shmem)
{
EXPECT_EXIT(RunPushPull("shmem"), ::testing::ExitedWithCode(0), "PUSH-PULL test successfull");
EXPECT_EXIT(RunPushPull("shmem", ""), ::testing::ExitedWithCode(0), "PUSH-PULL test successfull");
}
TEST(PushPull, SingleMsg_MultiThreaded_ipc_shmem_expanded_metadata)
{
EXPECT_EXIT(RunPushPull("shmem", " --shm-metadata-msg-size 2048"), ::testing::ExitedWithCode(0), "PUSH-PULL test successfull");
}
} // namespace

View File

@@ -25,12 +25,15 @@ namespace
using namespace std;
using namespace fair::mq;
auto RunSingleThreadedMultipart(string transport, string address1, string address2) -> void {
auto RunSingleThreadedMultipart(string transport, string address1, string address2, bool expandedShmMetadata) -> void {
fair::mq::ProgOptions config;
config.SetProperty<string>("session", tools::Uuid());
config.SetProperty<size_t>("shm-segment-size", 100000000);
config.SetProperty<bool>("shm-monitor", true);
if (expandedShmMetadata) {
config.SetProperty<size_t>("shm-metadata-msg-size", 2048);
}
auto factory = TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config);
@@ -104,13 +107,16 @@ auto RunSingleThreadedMultipart(string transport, string address1, string addres
}
}
auto RunMultiThreadedMultipart(string transport, string address1) -> void
auto RunMultiThreadedMultipart(string transport, string address1, bool expandedShmMetadata) -> void
{
ProgOptions config;
config.SetProperty<string>("session", tools::Uuid());
config.SetProperty<int>("io-threads", 1);
config.SetProperty<size_t>("shm-segment-size", 20000000); // NOLINT
config.SetProperty<bool>("shm-monitor", true);
if (expandedShmMetadata) {
config.SetProperty<size_t>("shm-metadata-msg-size", 2048);
}
auto factory = TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config);
@@ -147,44 +153,64 @@ auto RunMultiThreadedMultipart(string transport, string address1) -> void
puller.join();
}
TEST(PushPull, Multipart_ST_inproc_zeromq) // NOLINT
TEST(PushPull, Multipart_SingleThreaded_inproc_zeromq) // NOLINT
{
RunSingleThreadedMultipart("zeromq", "inproc://test1", "inproc://test2");
RunSingleThreadedMultipart("zeromq", "inproc://test1", "inproc://test2", false);
}
TEST(PushPull, Multipart_ST_inproc_shmem) // NOLINT
TEST(PushPull, Multipart_SingleThreaded_inproc_shmem) // NOLINT
{
RunSingleThreadedMultipart("shmem", "inproc://test1", "inproc://test2");
RunSingleThreadedMultipart("shmem", "inproc://test1", "inproc://test2", false);
}
TEST(PushPull, Multipart_ST_ipc_zeromq) // NOLINT
TEST(PushPull, Multipart_SingleThreaded_inproc_shmem_expanded_metadata) // NOLINT
{
RunSingleThreadedMultipart("zeromq", "ipc://test_Multipart_ST_ipc_zeromq_1", "ipc://test_Multipart_ST_ipc_zeromq_2");
RunSingleThreadedMultipart("shmem", "inproc://test1", "inproc://test2", true);
}
TEST(PushPull, Multipart_ST_ipc_shmem) // NOLINT
TEST(PushPull, Multipart_SingleThreaded_ipc_zeromq) // NOLINT
{
RunSingleThreadedMultipart("shmem", "ipc://test_Multipart_ST_ipc_shmem_1", "ipc://test_Multipart_ST_ipc_shmem_2");
RunSingleThreadedMultipart("zeromq", "ipc://test_Multipart_SingleThreaded_ipc_zeromq_1", "ipc://test_Multipart_SingleThreaded_ipc_zeromq_2", false);
}
TEST(PushPull, Multipart_MT_inproc_zeromq) // NOLINT
TEST(PushPull, Multipart_SingleThreaded_ipc_shmem) // NOLINT
{
RunMultiThreadedMultipart("zeromq", "inproc://test_1");
RunSingleThreadedMultipart("shmem", "ipc://test_Multipart_SingleThreaded_ipc_shmem_1", "ipc://test_Multipart_SingleThreaded_ipc_shmem_2", false);
}
TEST(PushPull, Multipart_MT_inproc_shmem) // NOLINT
TEST(PushPull, Multipart_SingleThreaded_ipc_shmem_expanded_metadata) // NOLINT
{
RunMultiThreadedMultipart("shmem", "inproc://test_1");
RunSingleThreadedMultipart("shmem", "ipc://test_Multipart_SingleThreaded_ipc_shmem_1", "ipc://test_Multipart_SingleThreaded_ipc_shmem_2", true);
}
TEST(PushPull, Multipart_MT_ipc_zeromq) // NOLINT
TEST(PushPull, Multipart_MultiThreaded_inproc_zeromq) // NOLINT
{
RunMultiThreadedMultipart("zeromq", "ipc://test_Multipart_MT_ipc_zeromq_1");
RunMultiThreadedMultipart("zeromq", "inproc://test_1", false);
}
TEST(PushPull, Multipart_MT_ipc_shmem) // NOLINT
TEST(PushPull, Multipart_MultiThreaded_inproc_shmem) // NOLINT
{
RunMultiThreadedMultipart("shmem", "ipc://test_Multipart_MT_ipc_shmem_1");
RunMultiThreadedMultipart("shmem", "inproc://test_1", false);
}
TEST(PushPull, Multipart_MultiThreaded_inproc_shmem_expanded_metadata) // NOLINT
{
RunMultiThreadedMultipart("shmem", "inproc://test_1", true);
}
TEST(PushPull, Multipart_MultiThreaded_ipc_zeromq) // NOLINT
{
RunMultiThreadedMultipart("zeromq", "ipc://test_Multipart_MultiThreaded_ipc_zeromq_1", false);
}
TEST(PushPull, Multipart_MultiThreaded_ipc_shmem) // NOLINT
{
RunMultiThreadedMultipart("shmem", "ipc://test_Multipart_MultiThreaded_ipc_shmem_1", false);
}
TEST(PushPull, Multipart_MultiThreaded_ipc_shmem_expanded_metadata) // NOLINT
{
RunMultiThreadedMultipart("shmem", "ipc://test_Multipart_MultiThreaded_ipc_shmem_1", true);
}
} // namespace

View File

@@ -22,7 +22,7 @@ using namespace std;
using namespace fair::mq::test;
using namespace fair::mq::tools;
auto RunReqRep(string transport) -> void
auto RunReqRep(string transport, const string& extraDeviceCmdArgs) -> void
{
size_t session{fair::mq::tools::UuidHash()};
string ipcFile("/tmp/fmq_" + to_string(session) + "_data_" + transport);
@@ -38,6 +38,7 @@ auto RunReqRep(string transport) -> void
<< " --shm-segment-size 100000000"
<< " --session " << session
<< " --color false"
<< extraDeviceCmdArgs
<< " --channel-config name=data,type=rep,method=bind,address=" << address;
rep = execute(cmd.str(), "[REP]");
});
@@ -52,6 +53,7 @@ auto RunReqRep(string transport) -> void
<< " --shm-segment-size 100000000"
<< " --session " << session
<< " --color false"
<< extraDeviceCmdArgs
<< " --channel-config name=data,type=req,method=connect,address=" << address;
req1 = execute(cmd.str(), "[REQ1]");
});
@@ -66,6 +68,7 @@ auto RunReqRep(string transport) -> void
<< " --shm-segment-size 100000000"
<< " --session " << session
<< " --color false"
<< extraDeviceCmdArgs
<< " --channel-config name=data,type=req,method=connect,address=" << address;
req2 = execute(cmd.str(), "[REQ2]");
});
@@ -82,12 +85,17 @@ auto RunReqRep(string transport) -> void
TEST(ReqRep, zeromq)
{
EXPECT_EXIT(RunReqRep("zeromq"), ::testing::ExitedWithCode(0), "REQ-REP test successfull");
EXPECT_EXIT(RunReqRep("zeromq", ""), ::testing::ExitedWithCode(0), "REQ-REP test successfull");
}
TEST(ReqRep, shmem)
{
EXPECT_EXIT(RunReqRep("shmem"), ::testing::ExitedWithCode(0), "REQ-REP test successfull");
EXPECT_EXIT(RunReqRep("shmem", ""), ::testing::ExitedWithCode(0), "REQ-REP test successfull");
}
TEST(ReqRep, shmem_expanded_metadata)
{
EXPECT_EXIT(RunReqRep("shmem", " --shm-metadata-msg-size 2048"), ::testing::ExitedWithCode(0), "REQ-REP test successfull");
}
} // namespace