Compare commits

...

17 Commits

Author SHA1 Message Date
Giulio Eulisse
3c1723fc54 Allow sorting StateChange callbacks
If the key of the callback is a number, it will be used to invoke
callbacks with the correct ordering.
2023-09-06 09:49:30 +02:00
Christian Tacke
c3418cc7b8 chore: Run meta_update.py 2023-08-08 16:26:10 +02:00
Christian Tacke
cc00c5a6f1 docs: Add "readme" field to codemeta
A link to an introduction for people who are not experts in
the field.
2023-08-08 16:26:10 +02:00
Christian Tacke
e6bb14f535 ci: Check codemeta.json
Use the eOSSR tooling to validate our codemeta.json file
2023-08-08 16:26:10 +02:00
Christian Tacke
b18d60372c codemeta: Add GSI as "maintainer"
GSI is providing resources for maintaining FairMQ. So let's
document this in codemeta.json.
2023-08-08 16:21:48 +02:00
Giulio Eulisse
7ceccdeaa6 Print actual address we are trying to bind. 2023-06-29 12:28:23 +02:00
Dennis Klein
d1c99f7e15 ci: Update build matrix 2023-06-26 11:56:24 +02:00
Dennis Klein
bfc665d76e feat: Make the channel AutoBind default configurable 2023-06-26 11:56:24 +02:00
Dennis Klein
42d27af20f docs: Update install commands 2023-06-13 22:43:52 +02:00
Alexey Rybalchenko
25614e3e06 test: Add coverage for --shm-metadata-msg-size 2023-06-13 21:24:40 +02:00
Alexey Rybalchenko
3decac58fc test: Add data transfer and checks to protocol tests 2023-06-13 21:24:40 +02:00
Dennis Klein
f278e7e312 feat: Add new tunable --shm-metadata-msg-size
The shm metadata msg will be right-padded to the given size. This
tunable may be used to saturate the kernel msg buffers more quickly with
the effect that the ZeroMQ message queue size - on which the FairMQ
shmem transport relies upon - behaves more accurately for very small
queue sizes.

This introduces a change for the meta msg format in the multipart case:
old: | MetaHeader 1 | ... | MetaHeader n |
new: | n | MetaHeader 1 | ... | MetaHeader n | padded to fMetadataMsgSize |
where `n` is a `size_t` and contains the number of following meta headers.
Previously, this number was infered from the msg buffer size itself which is
no longer possible due to the potential padding.

Implements #432
2023-06-13 21:24:40 +02:00
Dennis Klein
491a943c63 feat: Use zmq_msg_send for single message Send 2023-06-13 21:24:40 +02:00
Dennis Klein
c47fc6f9fe feat: Move ZMsg to fair::mq::zmq
Implement move semantics.
2023-06-13 21:24:40 +02:00
Giulio Eulisse
7b259afdb5 Fix -Wunqualified-std-cast-call 2023-06-13 21:24:40 +02:00
Dennis Klein
33ddcaad5e docs: Add repology badge 2023-04-05 15:19:05 +02:00
Dennis Klein
4d5dbedeab build: Add spack develop environment 2023-03-23 14:14:08 +01:00
27 changed files with 419 additions and 121 deletions

View File

@@ -0,0 +1,21 @@
name: validate codemeta
on:
push:
paths:
- codemeta.json
- .github/workflows/codemeta_validate.yaml
pull_request:
paths:
- codemeta.json
- .github/workflows/codemeta_validate.yaml
jobs:
build:
runs-on: ubuntu-latest
container:
image: gitlab-registry.in2p3.fr/escape2020/wp3/eossr:v1.0
steps:
- uses: actions/checkout@v3
- name: validate codemeta
run: eossr-metadata-validator codemeta.json

2
.gitignore vendored
View File

@@ -4,3 +4,5 @@ install
.vscode .vscode
/compile_commands.json /compile_commands.json
.cache .cache
.spack-env
spack.lock

View File

@@ -1,6 +1,7 @@
{ {
"creators": [ "creators": [
{ {
"orcid": "0000-0002-8071-4497",
"name": "Al-Turany, Mohammad" "name": "Al-Turany, Mohammad"
}, },
{ {

4
Jenkinsfile vendored
View File

@@ -93,8 +93,8 @@ pipeline{
[os: 'fedora', ver: '36', arch: 'x86_64', compiler: 'gcc-12'], [os: 'fedora', ver: '36', arch: 'x86_64', compiler: 'gcc-12'],
[os: 'fedora', ver: '37', arch: 'x86_64', compiler: 'gcc-12'], [os: 'fedora', ver: '37', arch: 'x86_64', compiler: 'gcc-12'],
[os: 'fedora', ver: '38', arch: 'x86_64', compiler: 'gcc-13'], [os: 'fedora', ver: '38', arch: 'x86_64', compiler: 'gcc-13'],
[os: 'macos', ver: '12', arch: 'x86_64', compiler: 'apple-clang-13', extra: '-DHAS_ASIO=ON'], [os: 'macos', ver: '12', arch: 'x86_64', compiler: 'apple-clang-14'],
[os: 'macos', ver: '12', arch: 'arm64', compiler: 'apple-clang-13', extra: '-DHAS_ASIO=ON'], [os: 'macos', ver: '13', arch: 'arm64', compiler: 'apple-clang-14'],
]) ])
def all_debug = "-DCMAKE_BUILD_TYPE=Debug" def all_debug = "-DCMAKE_BUILD_TYPE=Debug"

View File

@@ -5,6 +5,7 @@
[![DOI](https://zenodo.org/badge/DOI/10.5281/zenodo.1689985.svg)](https://doi.org/10.5281/zenodo.1689985) [![DOI](https://zenodo.org/badge/DOI/10.5281/zenodo.1689985.svg)](https://doi.org/10.5281/zenodo.1689985)
[![OpenSSF Best Practices](https://bestpractices.coreinfrastructure.org/projects/6915/badge)](https://bestpractices.coreinfrastructure.org/projects/6915) [![OpenSSF Best Practices](https://bestpractices.coreinfrastructure.org/projects/6915/badge)](https://bestpractices.coreinfrastructure.org/projects/6915)
[![fair-software.eu](https://img.shields.io/badge/fair--software.eu-%E2%97%8F%20%20%E2%97%8F%20%20%E2%97%8B%20%20%E2%97%8F%20%20%E2%97%8F-yellow)](https://github.com/FairRootGroup/FairMQ/actions/workflows/fair-software.yml) [![fair-software.eu](https://img.shields.io/badge/fair--software.eu-%E2%97%8F%20%20%E2%97%8F%20%20%E2%97%8B%20%20%E2%97%8F%20%20%E2%97%8F-yellow)](https://github.com/FairRootGroup/FairMQ/actions/workflows/fair-software.yml)
[![Spack package](https://repology.org/badge/version-for-repo/spack/fairmq.svg)](https://repology.org/project/fairmq/versions)
C++ Message Queuing Library and Framework C++ Message Queuing Library and Framework
@@ -44,10 +45,10 @@ Recommended:
```bash ```bash
git clone https://github.com/FairRootGroup/FairMQ fairmq_source git clone https://github.com/FairRootGroup/FairMQ fairmq_source
cmake -S fairmq_source -B fairmq_build -GNinja -DCMAKE_BUILD_TYPE=Release -DCMAKE_INSTALL_PREFIX=fairmq_install cmake -S fairmq_source -B fairmq_build -GNinja -DCMAKE_BUILD_TYPE=Release
cmake --build fairmq_build cmake --build fairmq_build
cmake --build fairmq_build --target test ctest --test-dir fairmq_build --output-on-failure --schedule-random -j<ncpus>
cmake --build fairmq_build --target install cmake --install fairmq_build --prefix $(pwd)/fairmq_install
``` ```
Please consult the [manpages of your CMake version](https://cmake.org/cmake/help/latest/manual/cmake.1.html) for more options. Please consult the [manpages of your CMake version](https://cmake.org/cmake/help/latest/manual/cmake.1.html) for more options.
@@ -55,6 +56,7 @@ Please consult the [manpages of your CMake version](https://cmake.org/cmake/help
If dependencies are not installed in standard system directories, you can hint the installation location via If dependencies are not installed in standard system directories, you can hint the installation location via
`-DCMAKE_PREFIX_PATH=...` or per dependency via `-D{DEPENDENCY}_ROOT=...` (`*_ROOT` variables can also be environment variables). `-DCMAKE_PREFIX_PATH=...` or per dependency via `-D{DEPENDENCY}_ROOT=...` (`*_ROOT` variables can also be environment variables).
## Usage ## Usage
FairMQ ships as a CMake package, so in your `CMakeLists.txt` you can discover it like this: FairMQ ships as a CMake package, so in your `CMakeLists.txt` you can discover it like this:
@@ -96,6 +98,7 @@ On command line:
* `-DBUILD_TESTING=OFF` disables building of tests. * `-DBUILD_TESTING=OFF` disables building of tests.
* `-DBUILD_EXAMPLES=OFF` disables building of examples. * `-DBUILD_EXAMPLES=OFF` disables building of examples.
* `-DBUILD_DOCS=ON` enables building of API docs. * `-DBUILD_DOCS=ON` enables building of API docs.
* `-DFAIRMQ_CHANNEL_DEFAULT_AUTOBIND=OFF` disable channel `autoBind` by default
* You can hint non-system installations for dependent packages, see the #installation-from-source section above * You can hint non-system installations for dependent packages, see the #installation-from-source section above
After the `find_package(FairMQ)` call the following CMake variables are defined: After the `find_package(FairMQ)` call the following CMake variables are defined:

View File

@@ -96,7 +96,10 @@ endmacro()
macro(fairmq_summary_compile_definitions) macro(fairmq_summary_compile_definitions)
message(STATUS " ") message(STATUS " ")
message(STATUS " ${Cyan}COMPILE DEFINITION VALUE${CR}") message(STATUS " ${Cyan}COMPILE DEFINITION VALUE${CR}")
message(STATUS " ${BWhite}FAIRMQ_HAS_STD_FILESYSTEM${CR} ${FAIRMQ_HAS_STD_FILESYSTEM} (overridable with ${BMagenta}-DFAIRMQ_HAS_STD_FILESYSTEM=0|1${CR})") message(STATUS " ${BWhite}FAIRMQ_HAS_STD_FILESYSTEM${CR} ${FAIRMQ_HAS_STD_FILESYSTEM} (overridable with ${BMagenta}-DFAIRMQ_HAS_STD_FILESYSTEM=0|1${CR})")
message(STATUS " ${BWhite}FAIRMQ_HAS_STD_PMR${CR} ${FAIRMQ_HAS_STD_PMR} (overridable with ${BMagenta}-DFAIRMQ_HAS_STD_PMR=0|1${CR})") message(STATUS " ${BWhite}FAIRMQ_HAS_STD_PMR${CR} ${FAIRMQ_HAS_STD_PMR} (overridable with ${BMagenta}-DFAIRMQ_HAS_STD_PMR=0|1${CR})")
if(DEFINED FAIRMQ_CHANNEL_DEFAULT_AUTOBIND)
message(STATUS " ${BWhite}FAIRMQ_CHANNEL_DEFAULT_AUTOBIND${CR} ${FAIRMQ_CHANNEL_DEFAULT_AUTOBIND}")
endif()
endmacro() endmacro()

View File

@@ -7,11 +7,20 @@
"datePublished": "2018-04-15", "datePublished": "2018-04-15",
"developmentStatus": "active", "developmentStatus": "active",
"codeRepository": "https://github.com/FairRootGroup/FairMQ/", "codeRepository": "https://github.com/FairRootGroup/FairMQ/",
"readme": "https://github.com/FairRootGroup/FairMQ/#readme",
"issueTracker": "https://github.com/FairRootGroup/FairMQ/issues", "issueTracker": "https://github.com/FairRootGroup/FairMQ/issues",
"identifier": "https://doi.org/10.5281/zenodo.1689985", "identifier": "https://doi.org/10.5281/zenodo.1689985",
"maintainer": [
{
"@type": "ResearchOrganisation",
"@id": "https://ror.org/02k8cbn47",
"name": "GSI Helmholtz Centre for Heavy Ion Research"
}
],
"author": [ "author": [
{ {
"@type": "Person", "@type": "Person",
"@id": "https://orcid.org/0000-0002-8071-4497",
"givenName": "Mohammad", "givenName": "Mohammad",
"familyName": "Al-Turany" "familyName": "Al-Turany"
}, },

View File

@@ -54,7 +54,7 @@ struct Receiver : fair::mq::Device
fBuffer[h.id].start = chrono::steady_clock::now(); fBuffer[h.id].start = chrono::steady_clock::now();
} }
// if the received ID has not previously been discarded, store the data part in the buffer // if the received ID has not previously been discarded, store the data part in the buffer
fBuffer[h.id].parts.AddPart(move(parts.At(1))); fBuffer[h.id].parts.AddPart(std::move(parts.At(1)));
} else { } else {
// if received ID has been previously discarded. // if received ID has been previously discarded.
LOG(debug) << "Received part from an already discarded timeframe with id " << h.id; LOG(debug) << "Received part from an already discarded timeframe with id " << h.id;

View File

@@ -1,5 +1,5 @@
################################################################################ ################################################################################
# Copyright (C) 2012-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH # # Copyright (C) 2012-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# # # #
# This software is distributed under the terms of the # # This software is distributed under the terms of the #
# GNU Lesser General Public Licence (LGPL) version 3, # # GNU Lesser General Public Licence (LGPL) version 3, #
@@ -108,6 +108,7 @@ if(BUILD_FAIRMQ)
zeromq/UnmanagedRegion.h zeromq/UnmanagedRegion.h
zeromq/Socket.h zeromq/Socket.h
zeromq/TransportFactory.h zeromq/TransportFactory.h
zeromq/ZMsg.h
) )
########################## ##########################
@@ -173,6 +174,15 @@ if(BUILD_FAIRMQ)
FAIRMQ_HAS_STD_FILESYSTEM=${FAIRMQ_HAS_STD_FILESYSTEM} FAIRMQ_HAS_STD_FILESYSTEM=${FAIRMQ_HAS_STD_FILESYSTEM}
FAIRMQ_HAS_STD_PMR=${FAIRMQ_HAS_STD_PMR} FAIRMQ_HAS_STD_PMR=${FAIRMQ_HAS_STD_PMR}
) )
if(DEFINED FAIRMQ_CHANNEL_DEFAULT_AUTOBIND)
# translate CMake boolean (TRUE, FALSE, 0, 1, OFF, ON) into C++ boolean literal (true, false)
if(FAIRMQ_CHANNEL_DEFAULT_AUTOBIND)
set(value "true")
else()
set(value "false")
endif()
target_compile_definitions(${target} PUBLIC FAIRMQ_CHANNEL_DEFAULT_AUTOBIND=${value})
endif()
####################### #######################

View File

@@ -379,7 +379,11 @@ class Channel
static constexpr int DefaultRateLogging = 1; static constexpr int DefaultRateLogging = 1;
static constexpr int DefaultPortRangeMin = 22000; static constexpr int DefaultPortRangeMin = 22000;
static constexpr int DefaultPortRangeMax = 23000; static constexpr int DefaultPortRangeMax = 23000;
#ifdef FAIRMQ_CHANNEL_DEFAULT_AUTOBIND
static constexpr bool DefaultAutoBind = FAIRMQ_CHANNEL_DEFAULT_AUTOBIND;
#else
static constexpr bool DefaultAutoBind = true; static constexpr bool DefaultAutoBind = true;
#endif
friend std::ostream& operator<<(std::ostream& os, const Channel& ch) friend std::ostream& operator<<(std::ostream& os, const Channel& ch)
{ {
@@ -425,10 +429,10 @@ class Channel
msg.get() msg.get()
)); ));
msg.release(); msg.release();
msg = move(msgWrapper); msg = std::move(msgWrapper);
} else { } else {
MessagePtr newMsg(NewMessage()); MessagePtr newMsg(NewMessage());
msg = move(newMsg); msg = std::move(newMsg);
} }
} }
} }
@@ -446,10 +450,10 @@ class Channel
msg.get() msg.get()
)); ));
msg.release(); msg.release();
msg = move(msgWrapper); msg = std::move(msgWrapper);
} else { } else {
MessagePtr newMsg(NewMessage()); MessagePtr newMsg(NewMessage());
msg = move(newMsg); msg = std::move(newMsg);
} }
} }
} }
@@ -459,7 +463,7 @@ class Channel
{ {
if (fTransportType != msg->GetType()) { if (fTransportType != msg->GetType()) {
MessagePtr newMsg(NewMessage()); MessagePtr newMsg(NewMessage());
msg = move(newMsg); msg = std::move(newMsg);
} }
} }
@@ -470,7 +474,7 @@ class Channel
if (fTransportType != msg->GetType()) { if (fTransportType != msg->GetType()) {
MessagePtr newMsg(NewMessage()); MessagePtr newMsg(NewMessage());
msg = move(newMsg); msg = std::move(newMsg);
} }
} }
} }

View File

@@ -288,7 +288,7 @@ void Device::AttachChannels(vector<Channel*>& chans)
// remove the channel from the uninitialized container // remove the channel from the uninitialized container
itr = chans.erase(itr); itr = chans.erase(itr);
} else { } else {
LOG(error) << "failed to attach channel " << (*itr)->fName << " (" << (*itr)->fMethod << ")"; LOG(error) << "failed to attach channel " << (*itr)->fName << " (" << (*itr)->fMethod << " on " << (*itr)->fAddress << ")";
++itr; ++itr;
} }
} else { } else {

View File

@@ -310,7 +310,9 @@ try {
void StateMachine::SubscribeToStateChange(const string& key, function<void(const State)> callback) void StateMachine::SubscribeToStateChange(const string& key, function<void(const State)> callback)
{ {
static_pointer_cast<FairMQFSM>(fFsm)->fStateChangeSignalsMap.insert({key, static_pointer_cast<FairMQFSM>(fFsm)->fStateChangeSignal.connect(callback)}); // Check if the key has a integer value as prefix, if yes, decode it.
int i = strtol(key.c_str(), nullptr, 10);
static_pointer_cast<FairMQFSM>(fFsm)->fStateChangeSignalsMap.insert({key, static_pointer_cast<FairMQFSM>(fFsm)->fStateChangeSignal.connect(i, callback)});
} }
void StateMachine::UnsubscribeFromStateChange(const string& key) void StateMachine::UnsubscribeFromStateChange(const string& key)

View File

@@ -72,6 +72,7 @@ SAMPLER+=" --msg-size $msgSize"
SAMPLER+=" --multipart $multipart" SAMPLER+=" --multipart $multipart"
SAMPLER+=" --num-parts $numParts" SAMPLER+=" --num-parts $numParts"
SAMPLER+=" --shm-throw-bad-alloc false" SAMPLER+=" --shm-throw-bad-alloc false"
# SAMPLER+=" --shm-metadata-msg-size 1024"
# SAMPLER+=" --msg-rate 1000" # SAMPLER+=" --msg-rate 1000"
SAMPLER+=" --max-iterations $maxIterations" SAMPLER+=" --max-iterations $maxIterations"
SAMPLER+=" --channel-config name=data,type=pair,method=bind,address=tcp://127.0.0.1:5555" SAMPLER+=" --channel-config name=data,type=pair,method=bind,address=tcp://127.0.0.1:5555"

View File

@@ -11,6 +11,7 @@
#include <fairmq/JSONParser.h> #include <fairmq/JSONParser.h>
#include <fairmq/SuboptParser.h> #include <fairmq/SuboptParser.h>
#include <cstddef> // for std::size_t
#include <vector> #include <vector>
using namespace std; using namespace std;
@@ -72,6 +73,7 @@ Plugin::ProgOptions ConfigPluginProgramOptions()
("shm-zero-segment", po::value<bool >()->default_value(false), "Shared memory: zero the shared memory segment memory after initialization (opened or created).") ("shm-zero-segment", po::value<bool >()->default_value(false), "Shared memory: zero the shared memory segment memory after initialization (opened or created).")
("shm-zero-segment-on-creation", po::value<bool >()->default_value(false), "Shared memory: zero the shared memory segment memory only once when created.") ("shm-zero-segment-on-creation", po::value<bool >()->default_value(false), "Shared memory: zero the shared memory segment memory only once when created.")
("shm-throw-bad-alloc", po::value<bool >()->default_value(true), "Shared memory: throw fair::mq::MessageBadAlloc if cannot allocate a message (retry if false).") ("shm-throw-bad-alloc", po::value<bool >()->default_value(true), "Shared memory: throw fair::mq::MessageBadAlloc if cannot allocate a message (retry if false).")
("shm-metadata-msg-size", po::value<std::size_t >()->default_value(0), "Shared memory: size of the zmq metadata message (values smaller than minimum are clamped to the minimum).")
("bad-alloc-max-attempts", po::value<int >(), "Maximum number of allocation attempts before throwing fair::mq::MessageBadAlloc. -1 is infinite. There is always at least one attempt, so 0 has safe effect as 1.") ("bad-alloc-max-attempts", po::value<int >(), "Maximum number of allocation attempts before throwing fair::mq::MessageBadAlloc. -1 is infinite. There is always at least one attempt, so 0 has safe effect as 1.")
("bad-alloc-attempt-interval", po::value<int >()->default_value(50), "Interval between attempts if cannot allocate a message (in ms).") ("bad-alloc-attempt-interval", po::value<int >()->default_value(50), "Interval between attempts if cannot allocate a message (in ms).")
("shm-monitor", po::value<bool >()->default_value(false), "Shared memory: run monitor daemon.") ("shm-monitor", po::value<bool >()->default_value(false), "Shared memory: run monitor daemon.")

View File

@@ -29,7 +29,7 @@
#include <algorithm> // max #include <algorithm> // max
#include <chrono> #include <chrono>
#include <condition_variable> #include <condition_variable>
#include <cstddef> // max_align_t #include <cstddef> // max_align_t, std::size_t
#include <cstdlib> // getenv #include <cstdlib> // getenv
#include <cstring> // memcpy #include <cstring> // memcpy
#include <memory> // make_unique #include <memory> // make_unique
@@ -151,6 +151,7 @@ class Manager
, fBadAllocMaxAttempts(1) , fBadAllocMaxAttempts(1)
, fBadAllocAttemptIntervalInMs(config ? config->GetProperty<int>("bad-alloc-attempt-interval", 50) : 50) , fBadAllocAttemptIntervalInMs(config ? config->GetProperty<int>("bad-alloc-attempt-interval", 50) : 50)
, fNoCleanup(config ? config->GetProperty<bool>("shm-no-cleanup", false) : false) , fNoCleanup(config ? config->GetProperty<bool>("shm-no-cleanup", false) : false)
, fMetadataMsgSize(config ? config->GetProperty<std::size_t>("shm-metadata-msg-size", 0) : 0)
{ {
using namespace boost::interprocess; using namespace boost::interprocess;
@@ -828,6 +829,8 @@ class Manager
} }
} }
auto GetMetadataMsgSize() const noexcept { return fMetadataMsgSize; }
~Manager() ~Manager()
{ {
fRegionsGen += 1; // signal TL cache invalidation fRegionsGen += 1; // signal TL cache invalidation
@@ -884,6 +887,8 @@ class Manager
int fBadAllocMaxAttempts; int fBadAllocMaxAttempts;
int fBadAllocAttemptIntervalInMs; int fBadAllocAttemptIntervalInMs;
bool fNoCleanup; bool fNoCleanup;
std::size_t fMetadataMsgSize;
}; };
} // namespace fair::mq::shmem } // namespace fair::mq::shmem

View File

@@ -1,5 +1,5 @@
/******************************************************************************** /********************************************************************************
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Copyright (C) 2014-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* * * *
* This software is distributed under the terms of the * * This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
@@ -11,18 +11,23 @@
#include "Common.h" #include "Common.h"
#include "Manager.h" #include "Manager.h"
#include "Message.h" #include "Message.h"
#include <fairmq/Error.h> #include <fairmq/Error.h> // for assertm
#include <fairmq/Message.h> #include <fairmq/Message.h>
#include <fairmq/Socket.h> #include <fairmq/Socket.h>
#include <fairmq/tools/Strings.h> #include <fairmq/tools/Strings.h>
#include <fairmq/zeromq/Common.h> #include <fairmq/zeromq/Common.h> // for zmq::HandleErrors, zmq::ShouldRetry
#include <fairmq/zeromq/ZMsg.h> // for zmq::ZMsg
#include <fairlogger/Logger.h> #include <fairlogger/Logger.h>
#include <zmq.h> #include <zmq.h>
#include <algorithm> // for std::max
#include <atomic> #include <atomic>
#include <memory> // make_unique #include <cstddef> // for std::size_t
#include <cstring> // for std::memcpy
#include <exception> // for std::terminate
#include <memory> // for std::make_unique
namespace fair::mq { namespace fair::mq {
class TransportFactory; class TransportFactory;
@@ -31,24 +36,6 @@ namespace fair::mq {
namespace fair::mq::shmem namespace fair::mq::shmem
{ {
struct ZMsg
{
ZMsg() { int rc __attribute__((unused)) = zmq_msg_init(&fMsg); assert(rc == 0); }
explicit ZMsg(size_t size) { int rc __attribute__((unused)) = zmq_msg_init_size(&fMsg, size); assert(rc == 0); }
~ZMsg() { int rc __attribute__((unused)) = zmq_msg_close(&fMsg); assert(rc == 0); }
ZMsg(const ZMsg&) = delete;
ZMsg(ZMsg&&) = delete;
ZMsg& operator=(const ZMsg&) = delete;
ZMsg& operator=(ZMsg&&) = delete;
void* Data() { return zmq_msg_data(&fMsg); }
size_t Size() { return zmq_msg_size(&fMsg); }
zmq_msg_t* Msg() { return &fMsg; }
zmq_msg_t fMsg;
};
class Socket final : public fair::mq::Socket class Socket final : public fair::mq::Socket
{ {
public: public:
@@ -64,6 +51,7 @@ class Socket final : public fair::mq::Socket
, fMessagesRx(0) , fMessagesRx(0)
, fTimeout(100) , fTimeout(100)
, fConnectedPeersCount(0) , fConnectedPeersCount(0)
, fMetadataMsgSize(manager.GetMetadataMsgSize())
{ {
assert(context); assert(context);
@@ -141,8 +129,12 @@ class Socket final : public fair::mq::Socket
} }
int elapsed = 0; int elapsed = 0;
// meta msg format: | MetaHeader | padded to fMetadataMsgSize |
zmq::ZMsg zmqMsg(std::max(fMetadataMsgSize, sizeof(MetaHeader)));
std::memcpy(zmqMsg.Data(), &(shmMsg->fMeta), sizeof(MetaHeader));
while (true) { while (true) {
int nbytes = zmq_send(fSocket, &(shmMsg->fMeta), sizeof(MetaHeader), flags); int nbytes = zmq_msg_send(zmqMsg.Msg(), fSocket, flags);
if (nbytes > 0) { if (nbytes > 0) {
shmMsg->fQueued = true; shmMsg->fQueued = true;
++fMessagesTx; ++fMessagesTx;
@@ -178,11 +170,11 @@ class Socket final : public fair::mq::Socket
int nbytes = zmq_recv(fSocket, &(shmMsg->fMeta), sizeof(MetaHeader), flags); int nbytes = zmq_recv(fSocket, &(shmMsg->fMeta), sizeof(MetaHeader), flags);
if (nbytes > 0) { if (nbytes > 0) {
// check for number of received messages. must be 1 // check for number of received messages. must be 1
if (nbytes != sizeof(MetaHeader)) { if (static_cast<std::size_t>(nbytes) < sizeof(MetaHeader)) {
throw SocketError( throw SocketError(
tools::ToString("Received message is not a valid FairMQ shared memory message. ", tools::ToString("Received message is not a valid FairMQ shared memory message. ",
"Possibly due to a misconfigured transport on the sender side. ", "Possibly due to a misconfigured transport on the sender side. ",
"Expected size of ", sizeof(MetaHeader), " bytes, received ", nbytes)); "Expected minimum size of ", sizeof(MetaHeader), " bytes, received ", nbytes));
} }
size_t size = shmMsg->GetSize(); size_t size = shmMsg->GetSize();
@@ -211,13 +203,14 @@ class Socket final : public fair::mq::Socket
} }
int elapsed = 0; int elapsed = 0;
// put it into zmq message // meta msg format: | n | MetaHeader 1 | ... | MetaHeader n | padded to fMetadataMsgSize |
const unsigned int vecSize = msgVec.size(); auto const n = msgVec.size();
ZMsg zmqMsg(vecSize * sizeof(MetaHeader)); zmq::ZMsg zmqMsg(std::max(fMetadataMsgSize, sizeof(std::size_t) + n * sizeof(MetaHeader)));
// prepare the message with shm metas
MetaHeader* metas = static_cast<MetaHeader*>(zmqMsg.Data());
auto meta_n = static_cast<std::size_t*>(zmqMsg.Data());
*meta_n = n;
++meta_n;
auto metas = static_cast<MetaHeader*>(static_cast<void*>(meta_n));
for (auto& msg : msgVec) { for (auto& msg : msgVec) {
auto msgPtr = msg.get(); auto msgPtr = msg.get();
if (!msgPtr) { if (!msgPtr) {
@@ -232,7 +225,7 @@ class Socket final : public fair::mq::Socket
int64_t totalSize = 0; int64_t totalSize = 0;
int nbytes = zmq_msg_send(zmqMsg.Msg(), fSocket, flags); int nbytes = zmq_msg_send(zmqMsg.Msg(), fSocket, flags);
if (nbytes > 0) { if (nbytes > 0) {
assert(static_cast<unsigned int>(nbytes) == (vecSize * sizeof(MetaHeader))); // all or nothing assert(static_cast<unsigned int>(nbytes) >= sizeof(std::size_t) + (n * sizeof(MetaHeader)));
for (auto& msg : msgVec) { for (auto& msg : msgVec) {
Message* shmMsg = static_cast<Message*>(msg.get()); Message* shmMsg = static_cast<Message*>(msg.get());
@@ -269,29 +262,26 @@ class Socket final : public fair::mq::Socket
} }
int elapsed = 0; int elapsed = 0;
ZMsg zmqMsg; zmq::ZMsg zmqMsg;
while (true) { while (true) {
int64_t totalSize = 0; std::size_t totalSize = 0;
int nbytes = zmq_msg_recv(zmqMsg.Msg(), fSocket, flags); int nbytes = zmq_msg_recv(zmqMsg.Msg(), fSocket, flags);
if (nbytes > 0) { if (nbytes > 0) {
MetaHeader* hdrVec = static_cast<MetaHeader*>(zmqMsg.Data()); [[maybe_unused]] auto const size = zmqMsg.Size();
const auto hdrVecSize = zmqMsg.Size(); assert(size > sizeof(std::size_t));
auto meta_n = static_cast<std::size_t*>(zmqMsg.Data());
auto const n = *meta_n;
assert(size >= sizeof(std::size_t) + n * sizeof(MetaHeader));
++meta_n;
auto metas = static_cast<MetaHeader*>(static_cast<void*>(meta_n));
msgVec.reserve(msgVec.size() + n);
auto const transport = GetTransport();
assert(hdrVecSize > 0); for (std::size_t i = 0; i < n; ++i) {
if (hdrVecSize % sizeof(MetaHeader) != 0) { msgVec.push_back(std::make_unique<Message>(fManager, *metas, transport));
throw SocketError( ++metas;
tools::ToString("Received message is not a valid FairMQ shared memory message. ", // NOLINTNEXTLINE(cppcoreguidelines-pro-type-static-cast-downcast)
"Possibly due to a misconfigured transport on the sender side. ",
"Expected size of ", sizeof(MetaHeader), " bytes, received ", nbytes));
}
const auto numMessages = hdrVecSize / sizeof(MetaHeader);
msgVec.reserve(numMessages);
for (size_t m = 0; m < numMessages; m++) {
// create new message (part)
msgVec.emplace_back(std::make_unique<Message>(fManager, hdrVec[m], GetTransport()));
Message* shmMsg = static_cast<Message*>(msgVec.back().get()); Message* shmMsg = static_cast<Message*>(msgVec.back().get());
totalSize += shmMsg->GetSize(); totalSize += shmMsg->GetSize();
} }
@@ -469,6 +459,7 @@ class Socket final : public fair::mq::Socket
int fTimeout; int fTimeout;
mutable unsigned long fConnectedPeersCount; mutable unsigned long fConnectedPeersCount;
std::size_t fMetadataMsgSize;
}; };
} // namespace fair::mq::shmem } // namespace fair::mq::shmem

92
fairmq/zeromq/ZMsg.h Normal file
View File

@@ -0,0 +1,92 @@
/********************************************************************************
* Copyright (C) 2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIR_MQ_ZMQ_ZMSG_H
#define FAIR_MQ_ZMQ_ZMSG_H
#include <cstddef> // for std::size_t
#include <fairmq/Error.h> // for assertm
#include <new> // for std::bad_alloc
#include <zmq.h> // for zmq_*
namespace fair::mq::zmq {
// Wraps a `zmq_msg_t` C object as a C++ type:
// * `zmq_msg_init` -> C++ default ctor
// * `zmq_msg_init_size` -> C++ ctor
// * `zmq_msg_init_data` -> C++ ctor
// * `zmq_msg_init_size` + `memcpy` -> C++ copy ctor
// * `zmq_msg_close` + `zmq_msg_init_size` + `memcpy` -> C++ copy assignment
// * `zmq_msg_init` + `zmq_msg_move`` -> C++ move ctor
// * `zmq_msg_move` -> C++ move assignment
// * `zmq_msg_close` -> C++ dtor
// * access the underlying `zmq_msg_t` via `Msg() [const] -> zmq_msg_t*`
// the const overload does a `const_cast<zmq_msg_t*>`, because the
// C interfaces do not model constness
// * `zmq_msg_data` -> `Data() -> void*`
// * `zmq_msg_size` -> `Size() -> std::size_t`
struct ZMsg
{
ZMsg() noexcept
{
[[maybe_unused]] auto const rc = zmq_msg_init(Msg());
assertm(rc == 0, "msg init successful"); // NOLINT
}
explicit ZMsg(std::size_t size)
{
auto const rc = zmq_msg_init_size(Msg(), size);
if (rc == -1) {
throw std::bad_alloc{};
}
}
explicit ZMsg(void* data,
std::size_t size,
zmq_free_fn* freefn = nullptr,
void* hint = nullptr)
{
auto const rc = zmq_msg_init_data(Msg(), data, size, freefn, hint);
if (rc == -1) {
throw std::bad_alloc{};
}
}
~ZMsg() noexcept
{
[[maybe_unused]] auto const rc = zmq_msg_close(Msg());
assertm(rc == 0, "msg close successful"); // NOLINT
}
ZMsg(const ZMsg& other) = delete;
ZMsg(ZMsg&& other) noexcept
{
[[maybe_unused]] auto rc = zmq_msg_init(Msg());
assertm(rc == 0, "msg init successful"); // NOLINT
rc = zmq_msg_move(Msg(), other.Msg());
assertm(rc == 0, "msg move successful"); // NOLINT
}
ZMsg& operator=(const ZMsg& rhs) = delete;
ZMsg& operator=(ZMsg&& rhs) noexcept
{
[[maybe_unused]] auto const rc = zmq_msg_move(Msg(), rhs.Msg());
assertm(rc == 0, "msg move successful"); // NOLINT
return *this;
}
zmq_msg_t* Msg() noexcept { return &fMsg; }
zmq_msg_t* Msg() const noexcept
{
return const_cast<zmq_msg_t*>(&fMsg); // NOLINT(cppcoreguidelines-pro-type-const-cast)
}
void* Data() const noexcept { return zmq_msg_data(Msg()); }
std::size_t Size() const noexcept { return zmq_msg_size(Msg()); }
private:
zmq_msg_t fMsg{};
};
} // namespace fair::mq::zmq
#endif /* FAIR_MQ_ZMQ_ZMSG_H */

10
spack.yaml Normal file
View File

@@ -0,0 +1,10 @@
spack:
specs:
- boost+container+program_options+filesystem+date_time+regex
- faircmakemodules
- fairlogger+pretty
- fmt
- libzmq
view: true
concretizer:
unify: true

View File

@@ -28,10 +28,26 @@ class Pull : public Device
auto Run() -> void override auto Run() -> void override
{ {
auto msg = NewMessage(); int counter = 0;
if (Receive(msg, "data") >= 0) auto msg1 = NewMessageFor("data", 0);
{
if (Receive(msg1, "data") >= 0) {
++counter;
}
auto msg2 = NewMessageFor("data", 0);
auto ret = Receive(msg2, "data");
if (ret >= 0) {
auto content = std::string{static_cast<char*>(msg2->GetData()), msg2->GetSize()};
LOG(info) << "Transferred " << static_cast<std::size_t>(ret) << " bytes, msg size: " << msg2->GetSize() << ", content: " << content;
if (msg2->GetSize() == static_cast<std::size_t>(ret) && content == "testdata1234") {
++counter;
}
}
if (counter == 2) {
LOG(info) << "PUSH-PULL test successfull"; LOG(info) << "PUSH-PULL test successfull";
} }
}; };

View File

@@ -25,8 +25,12 @@ class Push : public Device
auto Run() -> void override auto Run() -> void override
{ {
auto msg = NewMessage(); // empty message
Send(msg, "data"); auto msg1 = NewMessageFor("data", 0);
Send(msg1, "data");
// message with short text data
auto msg2(NewSimpleMessageFor("data", 0, "testdata1234"));
Send(msg2, "data");
}; };
}; };

View File

@@ -1,5 +1,5 @@
/******************************************************************************** /********************************************************************************
* Copyright (C) 2015-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Copyright (C) 2015-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* * * *
* This software is distributed under the terms of the * * This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
@@ -24,22 +24,40 @@ class Rep : public Device
std::this_thread::sleep_for(std::chrono::milliseconds(200)); std::this_thread::sleep_for(std::chrono::milliseconds(200));
} }
bool check(signed long ret, const fair::mq::MessagePtr& msg)
{
auto content = std::string{static_cast<char*>(msg->GetData()), msg->GetSize()};
LOG(info) << "Transferred " << static_cast<std::size_t>(ret) << " bytes, msg size: " << msg->GetSize() << ", content: " << content;
return msg->GetSize() == static_cast<std::size_t>(ret) && content == "request";
}
auto Run() -> void override auto Run() -> void override
{ {
auto request1 = NewMessage(); int counter = 0;
if (Receive(request1, "data") >= 0) { auto req1 = NewMessage();
auto ret1 = Receive(req1, "data");
if (ret1 >= 0) {
LOG(info) << "Received request 1"; LOG(info) << "Received request 1";
auto reply = NewMessage(); if (check(ret1, req1)) {
++counter;
}
auto reply = NewSimpleMessageFor("data", 0, "reply");
Send(reply, "data"); Send(reply, "data");
} }
auto request2 = NewMessage(); auto req2 = NewMessage();
if (Receive(request2, "data") >= 0) { auto ret2 = Receive(req2, "data");
if (ret2 >= 0) {
LOG(info) << "Received request 2"; LOG(info) << "Received request 2";
auto reply = NewMessage(); if (check(ret2, req2)) {
++counter;
}
auto reply = NewSimpleMessageFor("data", 0, "reply");
Send(reply, "data"); Send(reply, "data");
} }
LOG(info) << "REQ-REP test successfull"; if (counter == 2) {
LOG(info) << "REQ-REP test successfull";
}
}; };
}; };

View File

@@ -1,5 +1,5 @@
/******************************************************************************** /********************************************************************************
* Copyright (C) 2015-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Copyright (C) 2015-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* * * *
* This software is distributed under the terms of the * * This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
@@ -26,12 +26,17 @@ class Req : public Device
auto Run() -> void override auto Run() -> void override
{ {
auto request = NewMessage(); auto request = NewSimpleMessageFor("data", 0, "request");
Send(request, "data"); Send(request, "data");
auto reply = NewMessage(); auto reply = NewMessage();
if (Receive(reply, "data") >= 0) { if (Receive(reply, "data") >= 0) {
LOG(info) << "received reply"; LOG(info) << "received reply";
auto content = std::string{static_cast<char*>(reply->GetData()), reply->GetSize()};
LOG(info) << "Transferred reply of size: " << reply->GetSize() << ", content: " << content;
if (content != "reply") {
ChangeStateOrThrow(Transition::ErrorFound);
}
} }
}; };
}; };

View File

@@ -37,12 +37,15 @@ auto AsStringView(Message const& msg) -> string_view
return {static_cast<char const*>(msg.GetData()), msg.GetSize()}; return {static_cast<char const*>(msg.GetData()), msg.GetSize()};
} }
auto RunPushPullWithMsgResize(string const & transport, string const & _address) -> void auto RunPushPullWithMsgResize(string const & transport, string const & _address, bool expandedShmMetadata = false) -> void
{ {
ProgOptions config; ProgOptions config;
config.SetProperty<string>("session", tools::Uuid()); config.SetProperty<string>("session", tools::Uuid());
config.SetProperty<size_t>("shm-segment-size", 100000000); config.SetProperty<size_t>("shm-segment-size", 100000000);
config.SetProperty<bool>("shm-monitor", true); config.SetProperty<bool>("shm-monitor", true);
if (expandedShmMetadata) {
config.SetProperty<size_t>("shm-metadata-msg-size", 2048);
}
auto factory(TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config)); auto factory(TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config));
Channel push{"Push", "push", factory}; Channel push{"Push", "push", factory};
@@ -100,12 +103,15 @@ auto RunPushPullWithMsgResize(string const & transport, string const & _address)
} }
} }
auto RunMsgRebuild(const string& transport) -> void auto RunMsgRebuild(const string& transport, bool expandedShmMetadata = false) -> void
{ {
ProgOptions config; ProgOptions config;
config.SetProperty<string>("session", tools::Uuid()); config.SetProperty<string>("session", tools::Uuid());
config.SetProperty<size_t>("shm-segment-size", 100000000); config.SetProperty<size_t>("shm-segment-size", 100000000);
config.SetProperty<bool>("shm-monitor", true); config.SetProperty<bool>("shm-monitor", true);
if (expandedShmMetadata) {
config.SetProperty<size_t>("shm-metadata-msg-size", 2048);
}
auto factory(TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config)); auto factory(TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config));
size_t const msgSize{100}; size_t const msgSize{100};
@@ -134,12 +140,15 @@ auto CheckMsgAlignment(Message const& msg, fair::mq::Alignment alignment) -> boo
return (reinterpret_cast<uintptr_t>(msg.GetData()) % static_cast<size_t>(alignment)) == 0; // NOLINT return (reinterpret_cast<uintptr_t>(msg.GetData()) % static_cast<size_t>(alignment)) == 0; // NOLINT
} }
auto RunPushPullWithAlignment(string const& transport, string const& _address) -> void auto RunPushPullWithAlignment(string const& transport, string const& _address, bool expandedShmMetadata = false) -> void
{ {
ProgOptions config; ProgOptions config;
config.SetProperty<string>("session", tools::Uuid()); config.SetProperty<string>("session", tools::Uuid());
config.SetProperty<size_t>("shm-segment-size", 100000000); config.SetProperty<size_t>("shm-segment-size", 100000000);
config.SetProperty<bool>("shm-monitor", true); config.SetProperty<bool>("shm-monitor", true);
if (expandedShmMetadata) {
config.SetProperty<size_t>("shm-metadata-msg-size", 2048);
}
auto factory(TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config)); auto factory(TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config));
Channel push{"Push", "push", factory}; Channel push{"Push", "push", factory};
@@ -189,12 +198,15 @@ auto RunPushPullWithAlignment(string const& transport, string const& _address) -
ASSERT_TRUE(CheckMsgAlignment(*msgCopy, align32)); ASSERT_TRUE(CheckMsgAlignment(*msgCopy, align32));
} }
auto EmptyMessage(string const& transport, string const& _address) -> void auto EmptyMessage(string const& transport, string const& _address, bool expandedShmMetadata = false) -> void
{ {
ProgOptions config; ProgOptions config;
config.SetProperty<string>("session", tools::Uuid()); config.SetProperty<string>("session", tools::Uuid());
config.SetProperty<size_t>("shm-segment-size", 100000000); config.SetProperty<size_t>("shm-segment-size", 100000000);
config.SetProperty<bool>("shm-monitor", true); config.SetProperty<bool>("shm-monitor", true);
if (expandedShmMetadata) {
config.SetProperty<size_t>("shm-metadata-msg-size", 2048);
}
auto factory(TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config)); auto factory(TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config));
Channel push{"Push", "push", factory}; Channel push{"Push", "push", factory};
@@ -241,12 +253,15 @@ auto EmptyMessage(string const& transport, string const& _address) -> void
// The "zero copy" property of the Copy() method is an implementation detail and is not guaranteed. // The "zero copy" property of the Copy() method is an implementation detail and is not guaranteed.
// Currently it holds true for the shmem (across devices) and for zeromq (within same device) transports. // Currently it holds true for the shmem (across devices) and for zeromq (within same device) transports.
auto ZeroCopy() -> void auto ZeroCopy(bool expandedShmMetadata = false) -> void
{ {
ProgOptions config; ProgOptions config;
config.SetProperty<string>("session", tools::Uuid()); config.SetProperty<string>("session", tools::Uuid());
config.SetProperty<size_t>("shm-segment-size", 100000000); config.SetProperty<size_t>("shm-segment-size", 100000000);
config.SetProperty<bool>("shm-monitor", true); config.SetProperty<bool>("shm-monitor", true);
if (expandedShmMetadata) {
config.SetProperty<size_t>("shm-metadata-msg-size", 2048);
}
auto factory(TransportFactory::CreateTransportFactory("shmem", tools::Uuid(), &config)); auto factory(TransportFactory::CreateTransportFactory("shmem", tools::Uuid(), &config));
unique_ptr<string> str(make_unique<string>("asdf")); unique_ptr<string> str(make_unique<string>("asdf"));
@@ -272,7 +287,7 @@ auto ZeroCopy() -> void
// The "zero copy" property of the Copy() method is an implementation detail and is not guaranteed. // The "zero copy" property of the Copy() method is an implementation detail and is not guaranteed.
// Currently it holds true for the shmem (across devices) and for zeromq (within same device) transports. // Currently it holds true for the shmem (across devices) and for zeromq (within same device) transports.
auto ZeroCopyFromUnmanaged(string const& address) -> void auto ZeroCopyFromUnmanaged(string const& address, bool expandedShmMetadata = false) -> void
{ {
ProgOptions config1; ProgOptions config1;
ProgOptions config2; ProgOptions config2;
@@ -285,6 +300,12 @@ auto ZeroCopyFromUnmanaged(string const& address) -> void
config2.SetProperty<bool>("shm-monitor", true); config2.SetProperty<bool>("shm-monitor", true);
// ref counts should be accessible accross different segments // ref counts should be accessible accross different segments
config2.SetProperty<uint16_t>("shm-segment-id", 2); config2.SetProperty<uint16_t>("shm-segment-id", 2);
if (expandedShmMetadata) {
config1.SetProperty<size_t>("shm-metadata-msg-size", 2048);
}
if (expandedShmMetadata) {
config2.SetProperty<size_t>("shm-metadata-msg-size", 2048);
}
auto factory1(TransportFactory::CreateTransportFactory("shmem", tools::Uuid(), &config1)); auto factory1(TransportFactory::CreateTransportFactory("shmem", tools::Uuid(), &config1));
auto factory2(TransportFactory::CreateTransportFactory("shmem", tools::Uuid(), &config2)); auto factory2(TransportFactory::CreateTransportFactory("shmem", tools::Uuid(), &config2));
@@ -378,6 +399,11 @@ TEST(Resize, shmem) // NOLINT
RunPushPullWithMsgResize("shmem", "ipc://test_message_resize"); RunPushPullWithMsgResize("shmem", "ipc://test_message_resize");
} }
TEST(Resize, shmem_expanded_metadata) // NOLINT
{
RunPushPullWithMsgResize("shmem", "ipc://test_message_resize", true);
}
TEST(Rebuild, zeromq) // NOLINT TEST(Rebuild, zeromq) // NOLINT
{ {
RunMsgRebuild("zeromq"); RunMsgRebuild("zeromq");
@@ -388,11 +414,21 @@ TEST(Rebuild, shmem) // NOLINT
RunMsgRebuild("shmem"); RunMsgRebuild("shmem");
} }
TEST(Rebuild, shmem_expanded_metadata) // NOLINT
{
RunMsgRebuild("shmem", true);
}
TEST(Alignment, shmem) // NOLINT TEST(Alignment, shmem) // NOLINT
{ {
RunPushPullWithAlignment("shmem", "ipc://test_message_alignment"); RunPushPullWithAlignment("shmem", "ipc://test_message_alignment");
} }
TEST(Alignment, shmem_expanded_metadata) // NOLINT
{
RunPushPullWithAlignment("shmem", "ipc://test_message_alignment", true);
}
TEST(Alignment, zeromq) // NOLINT TEST(Alignment, zeromq) // NOLINT
{ {
RunPushPullWithAlignment("zeromq", "ipc://test_message_alignment"); RunPushPullWithAlignment("zeromq", "ipc://test_message_alignment");
@@ -408,14 +444,29 @@ TEST(EmptyMessage, shmem) // NOLINT
EmptyMessage("shmem", "ipc://test_empty_message"); EmptyMessage("shmem", "ipc://test_empty_message");
} }
TEST(EmptyMessage, shmem_expanded_metadata) // NOLINT
{
EmptyMessage("shmem", "ipc://test_empty_message", true);
}
TEST(ZeroCopy, shmem) // NOLINT TEST(ZeroCopy, shmem) // NOLINT
{ {
ZeroCopy(); ZeroCopy();
} }
TEST(ZeroCopy, shmem_expanded_metadata) // NOLINT
{
ZeroCopy(true);
}
TEST(ZeroCopyFromUnmanaged, shmem) // NOLINT TEST(ZeroCopyFromUnmanaged, shmem) // NOLINT
{ {
ZeroCopyFromUnmanaged("ipc://test_zerocopy_unmanaged"); ZeroCopyFromUnmanaged("ipc://test_zerocopy_unmanaged");
} }
TEST(ZeroCopyFromUnmanaged, shmem_expanded_metadata) // NOLINT
{
ZeroCopyFromUnmanaged("ipc://test_zerocopy_unmanaged", true);
}
} // namespace } // namespace

View File

@@ -22,7 +22,7 @@ using namespace std;
using namespace fair::mq::test; using namespace fair::mq::test;
using namespace fair::mq::tools; using namespace fair::mq::tools;
auto RunPair(string transport) -> void auto RunPair(const string& transport, const string& extraDeviceCmdArgs) -> void
{ {
size_t session{fair::mq::tools::UuidHash()}; size_t session{fair::mq::tools::UuidHash()};
string ipcFile("/tmp/fmq_" + to_string(session) + "_data_" + transport); string ipcFile("/tmp/fmq_" + to_string(session) + "_data_" + transport);
@@ -38,6 +38,7 @@ auto RunPair(string transport) -> void
<< " --shm-segment-size 100000000" << " --shm-segment-size 100000000"
<< " --session " << session << " --session " << session
<< " --color false" << " --color false"
<< extraDeviceCmdArgs
<< " --channel-config name=data,type=pair,method=bind,address=" << address; << " --channel-config name=data,type=pair,method=bind,address=" << address;
pairleft = execute(cmd.str(), "[PAIR L]"); pairleft = execute(cmd.str(), "[PAIR L]");
}); });
@@ -52,6 +53,7 @@ auto RunPair(string transport) -> void
<< " --shm-segment-size 100000000" << " --shm-segment-size 100000000"
<< " --session " << session << " --session " << session
<< " --color false" << " --color false"
<< extraDeviceCmdArgs
<< " --channel-config name=data,type=pair,method=connect,address=" << address; << " --channel-config name=data,type=pair,method=connect,address=" << address;
pairright = execute(cmd.str(), "[PAIR R]"); pairright = execute(cmd.str(), "[PAIR R]");
}); });
@@ -65,14 +67,19 @@ auto RunPair(string transport) -> void
exit(pairleft.exit_code + pairright.exit_code); exit(pairleft.exit_code + pairright.exit_code);
} }
TEST(Pair, SingleMsg_MP_tcp_zeromq) TEST(Pair, SingleMsg_MultiThreaded_tcp_zeromq)
{ {
EXPECT_EXIT(RunPair("zeromq"), ::testing::ExitedWithCode(0), "PAIR test successfull"); EXPECT_EXIT(RunPair("zeromq", ""), ::testing::ExitedWithCode(0), "PAIR test successfull");
} }
TEST(Pair, SingleMsg_MP_tcp_shmem) TEST(Pair, SingleMsg_MultiThreaded_tcp_shmem)
{ {
EXPECT_EXIT(RunPair("shmem"), ::testing::ExitedWithCode(0), "PAIR test successfull"); EXPECT_EXIT(RunPair("shmem", ""), ::testing::ExitedWithCode(0), "PAIR test successfull");
}
TEST(Pair, SingleMsg_MultiThreaded_tcp_shmem_expanded_metadata)
{
EXPECT_EXIT(RunPair("shmem", " --shm-metadata-msg-size 2048"), ::testing::ExitedWithCode(0), "PAIR test successfull");
} }
} // namespace } // namespace

View File

@@ -22,7 +22,7 @@ using namespace std;
using namespace fair::mq::test; using namespace fair::mq::test;
using namespace fair::mq::tools; using namespace fair::mq::tools;
auto RunPushPull(string transport) -> void auto RunPushPull(string transport, const string& extraDeviceCmdArgs) -> void
{ {
size_t session(fair::mq::tools::UuidHash()); size_t session(fair::mq::tools::UuidHash());
string ipcFile("/tmp/fmq_" + to_string(session) + "_data_" + transport); string ipcFile("/tmp/fmq_" + to_string(session) + "_data_" + transport);
@@ -38,6 +38,7 @@ auto RunPushPull(string transport) -> void
<< " --shm-segment-size 100000000" << " --shm-segment-size 100000000"
<< " --session " << session << " --session " << session
<< " --color false" << " --color false"
<< extraDeviceCmdArgs
<< " --channel-config name=data,type=push,method=bind,address=" << address; << " --channel-config name=data,type=push,method=bind,address=" << address;
push = execute(cmd.str(), "[PUSH]"); push = execute(cmd.str(), "[PUSH]");
}); });
@@ -52,6 +53,7 @@ auto RunPushPull(string transport) -> void
<< " --shm-segment-size 100000000" << " --shm-segment-size 100000000"
<< " --session " << session << " --session " << session
<< " --color false" << " --color false"
<< extraDeviceCmdArgs
<< " --channel-config name=data,type=pull,method=connect,address=" << address; << " --channel-config name=data,type=pull,method=connect,address=" << address;
pull = execute(cmd.str(), "[PULL]"); pull = execute(cmd.str(), "[PULL]");
}); });
@@ -65,14 +67,19 @@ auto RunPushPull(string transport) -> void
exit(push.exit_code + pull.exit_code); exit(push.exit_code + pull.exit_code);
} }
TEST(PushPull, SingleMsg_MP_ipc_zeromq) TEST(PushPull, SingleMsg_MultiThreaded_ipc_zeromq)
{ {
EXPECT_EXIT(RunPushPull("zeromq"), ::testing::ExitedWithCode(0), "PUSH-PULL test successfull"); EXPECT_EXIT(RunPushPull("zeromq", ""), ::testing::ExitedWithCode(0), "PUSH-PULL test successfull");
} }
TEST(PushPull, SingleMsg_MP_ipc_shmem) TEST(PushPull, SingleMsg_MultiThreaded_ipc_shmem)
{ {
EXPECT_EXIT(RunPushPull("shmem"), ::testing::ExitedWithCode(0), "PUSH-PULL test successfull"); EXPECT_EXIT(RunPushPull("shmem", ""), ::testing::ExitedWithCode(0), "PUSH-PULL test successfull");
}
TEST(PushPull, SingleMsg_MultiThreaded_ipc_shmem_expanded_metadata)
{
EXPECT_EXIT(RunPushPull("shmem", " --shm-metadata-msg-size 2048"), ::testing::ExitedWithCode(0), "PUSH-PULL test successfull");
} }
} // namespace } // namespace

View File

@@ -25,12 +25,15 @@ namespace
using namespace std; using namespace std;
using namespace fair::mq; using namespace fair::mq;
auto RunSingleThreadedMultipart(string transport, string address1, string address2) -> void { auto RunSingleThreadedMultipart(string transport, string address1, string address2, bool expandedShmMetadata) -> void {
fair::mq::ProgOptions config; fair::mq::ProgOptions config;
config.SetProperty<string>("session", tools::Uuid()); config.SetProperty<string>("session", tools::Uuid());
config.SetProperty<size_t>("shm-segment-size", 100000000); config.SetProperty<size_t>("shm-segment-size", 100000000);
config.SetProperty<bool>("shm-monitor", true); config.SetProperty<bool>("shm-monitor", true);
if (expandedShmMetadata) {
config.SetProperty<size_t>("shm-metadata-msg-size", 2048);
}
auto factory = TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config); auto factory = TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config);
@@ -104,13 +107,16 @@ auto RunSingleThreadedMultipart(string transport, string address1, string addres
} }
} }
auto RunMultiThreadedMultipart(string transport, string address1) -> void auto RunMultiThreadedMultipart(string transport, string address1, bool expandedShmMetadata) -> void
{ {
ProgOptions config; ProgOptions config;
config.SetProperty<string>("session", tools::Uuid()); config.SetProperty<string>("session", tools::Uuid());
config.SetProperty<int>("io-threads", 1); config.SetProperty<int>("io-threads", 1);
config.SetProperty<size_t>("shm-segment-size", 20000000); // NOLINT config.SetProperty<size_t>("shm-segment-size", 20000000); // NOLINT
config.SetProperty<bool>("shm-monitor", true); config.SetProperty<bool>("shm-monitor", true);
if (expandedShmMetadata) {
config.SetProperty<size_t>("shm-metadata-msg-size", 2048);
}
auto factory = TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config); auto factory = TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config);
@@ -147,44 +153,64 @@ auto RunMultiThreadedMultipart(string transport, string address1) -> void
puller.join(); puller.join();
} }
TEST(PushPull, Multipart_ST_inproc_zeromq) // NOLINT TEST(PushPull, Multipart_SingleThreaded_inproc_zeromq) // NOLINT
{ {
RunSingleThreadedMultipart("zeromq", "inproc://test1", "inproc://test2"); RunSingleThreadedMultipart("zeromq", "inproc://test1", "inproc://test2", false);
} }
TEST(PushPull, Multipart_ST_inproc_shmem) // NOLINT TEST(PushPull, Multipart_SingleThreaded_inproc_shmem) // NOLINT
{ {
RunSingleThreadedMultipart("shmem", "inproc://test1", "inproc://test2"); RunSingleThreadedMultipart("shmem", "inproc://test1", "inproc://test2", false);
} }
TEST(PushPull, Multipart_ST_ipc_zeromq) // NOLINT TEST(PushPull, Multipart_SingleThreaded_inproc_shmem_expanded_metadata) // NOLINT
{ {
RunSingleThreadedMultipart("zeromq", "ipc://test_Multipart_ST_ipc_zeromq_1", "ipc://test_Multipart_ST_ipc_zeromq_2"); RunSingleThreadedMultipart("shmem", "inproc://test1", "inproc://test2", true);
} }
TEST(PushPull, Multipart_ST_ipc_shmem) // NOLINT TEST(PushPull, Multipart_SingleThreaded_ipc_zeromq) // NOLINT
{ {
RunSingleThreadedMultipart("shmem", "ipc://test_Multipart_ST_ipc_shmem_1", "ipc://test_Multipart_ST_ipc_shmem_2"); RunSingleThreadedMultipart("zeromq", "ipc://test_Multipart_SingleThreaded_ipc_zeromq_1", "ipc://test_Multipart_SingleThreaded_ipc_zeromq_2", false);
} }
TEST(PushPull, Multipart_MT_inproc_zeromq) // NOLINT TEST(PushPull, Multipart_SingleThreaded_ipc_shmem) // NOLINT
{ {
RunMultiThreadedMultipart("zeromq", "inproc://test_1"); RunSingleThreadedMultipart("shmem", "ipc://test_Multipart_SingleThreaded_ipc_shmem_1", "ipc://test_Multipart_SingleThreaded_ipc_shmem_2", false);
} }
TEST(PushPull, Multipart_MT_inproc_shmem) // NOLINT TEST(PushPull, Multipart_SingleThreaded_ipc_shmem_expanded_metadata) // NOLINT
{ {
RunMultiThreadedMultipart("shmem", "inproc://test_1"); RunSingleThreadedMultipart("shmem", "ipc://test_Multipart_SingleThreaded_ipc_shmem_1", "ipc://test_Multipart_SingleThreaded_ipc_shmem_2", true);
} }
TEST(PushPull, Multipart_MT_ipc_zeromq) // NOLINT TEST(PushPull, Multipart_MultiThreaded_inproc_zeromq) // NOLINT
{ {
RunMultiThreadedMultipart("zeromq", "ipc://test_Multipart_MT_ipc_zeromq_1"); RunMultiThreadedMultipart("zeromq", "inproc://test_1", false);
} }
TEST(PushPull, Multipart_MT_ipc_shmem) // NOLINT TEST(PushPull, Multipart_MultiThreaded_inproc_shmem) // NOLINT
{ {
RunMultiThreadedMultipart("shmem", "ipc://test_Multipart_MT_ipc_shmem_1"); RunMultiThreadedMultipart("shmem", "inproc://test_1", false);
}
TEST(PushPull, Multipart_MultiThreaded_inproc_shmem_expanded_metadata) // NOLINT
{
RunMultiThreadedMultipart("shmem", "inproc://test_1", true);
}
TEST(PushPull, Multipart_MultiThreaded_ipc_zeromq) // NOLINT
{
RunMultiThreadedMultipart("zeromq", "ipc://test_Multipart_MultiThreaded_ipc_zeromq_1", false);
}
TEST(PushPull, Multipart_MultiThreaded_ipc_shmem) // NOLINT
{
RunMultiThreadedMultipart("shmem", "ipc://test_Multipart_MultiThreaded_ipc_shmem_1", false);
}
TEST(PushPull, Multipart_MultiThreaded_ipc_shmem_expanded_metadata) // NOLINT
{
RunMultiThreadedMultipart("shmem", "ipc://test_Multipart_MultiThreaded_ipc_shmem_1", true);
} }
} // namespace } // namespace

View File

@@ -22,7 +22,7 @@ using namespace std;
using namespace fair::mq::test; using namespace fair::mq::test;
using namespace fair::mq::tools; using namespace fair::mq::tools;
auto RunReqRep(string transport) -> void auto RunReqRep(string transport, const string& extraDeviceCmdArgs) -> void
{ {
size_t session{fair::mq::tools::UuidHash()}; size_t session{fair::mq::tools::UuidHash()};
string ipcFile("/tmp/fmq_" + to_string(session) + "_data_" + transport); string ipcFile("/tmp/fmq_" + to_string(session) + "_data_" + transport);
@@ -38,6 +38,7 @@ auto RunReqRep(string transport) -> void
<< " --shm-segment-size 100000000" << " --shm-segment-size 100000000"
<< " --session " << session << " --session " << session
<< " --color false" << " --color false"
<< extraDeviceCmdArgs
<< " --channel-config name=data,type=rep,method=bind,address=" << address; << " --channel-config name=data,type=rep,method=bind,address=" << address;
rep = execute(cmd.str(), "[REP]"); rep = execute(cmd.str(), "[REP]");
}); });
@@ -52,6 +53,7 @@ auto RunReqRep(string transport) -> void
<< " --shm-segment-size 100000000" << " --shm-segment-size 100000000"
<< " --session " << session << " --session " << session
<< " --color false" << " --color false"
<< extraDeviceCmdArgs
<< " --channel-config name=data,type=req,method=connect,address=" << address; << " --channel-config name=data,type=req,method=connect,address=" << address;
req1 = execute(cmd.str(), "[REQ1]"); req1 = execute(cmd.str(), "[REQ1]");
}); });
@@ -66,6 +68,7 @@ auto RunReqRep(string transport) -> void
<< " --shm-segment-size 100000000" << " --shm-segment-size 100000000"
<< " --session " << session << " --session " << session
<< " --color false" << " --color false"
<< extraDeviceCmdArgs
<< " --channel-config name=data,type=req,method=connect,address=" << address; << " --channel-config name=data,type=req,method=connect,address=" << address;
req2 = execute(cmd.str(), "[REQ2]"); req2 = execute(cmd.str(), "[REQ2]");
}); });
@@ -82,12 +85,17 @@ auto RunReqRep(string transport) -> void
TEST(ReqRep, zeromq) TEST(ReqRep, zeromq)
{ {
EXPECT_EXIT(RunReqRep("zeromq"), ::testing::ExitedWithCode(0), "REQ-REP test successfull"); EXPECT_EXIT(RunReqRep("zeromq", ""), ::testing::ExitedWithCode(0), "REQ-REP test successfull");
} }
TEST(ReqRep, shmem) TEST(ReqRep, shmem)
{ {
EXPECT_EXIT(RunReqRep("shmem"), ::testing::ExitedWithCode(0), "REQ-REP test successfull"); EXPECT_EXIT(RunReqRep("shmem", ""), ::testing::ExitedWithCode(0), "REQ-REP test successfull");
}
TEST(ReqRep, shmem_expanded_metadata)
{
EXPECT_EXIT(RunReqRep("shmem", " --shm-metadata-msg-size 2048"), ::testing::ExitedWithCode(0), "REQ-REP test successfull");
} }
} // namespace } // namespace