Compare commits

...

50 Commits

Author SHA1 Message Date
Dennis Klein
fa64faf3f7 fix(boost): add compatibility for Boost.Process v1 API in Boost 1.89+
Boost 1.88 replaced Boost.Process with v2, breaking the v1 API.
Boost 1.89 restores v1 compatibility via <boost/process/v1.hpp>.

- Fail configuration if Boost 1.88 is detected
- Define FAIRMQ_BOOST_PROCESS_V1_HEADER for Boost >= 1.89
- Use conditional includes to select v1.hpp or process.hpp
- Add namespace aliases (bp, bp_this) for portable API access
2026-01-05 14:11:19 +01:00
Dennis Klein
25abd605f3 ci: trigger buildcache on setup-deps changes 2025-12-01 09:29:24 +01:00
Dennis Klein
695ed89b6c ci: fix gcc version disambiguation in setup-deps
Use JSON output and jq to select the newest installed gcc version
when multiple versions match, avoiding conflicts between system and
spack-installed compilers.
2025-12-01 09:03:12 +01:00
Dennis Klein
cd074a3f1e ci: add boost187 variant to CI build matrix
Add boost187 environment variant to match the buildcache workflow,
using gcc-15 with the boost187 spack environment.
2025-12-01 08:58:20 +01:00
Dennis Klein
00c343858e ci: name setup-deps steps consistently 2025-11-30 21:03:27 +01:00
Dennis Klein
5dfeebba95 ci: add log grouping to setup-deps action 2025-11-30 20:59:46 +01:00
Dennis Klein
642a4e06f0 ci: force generic x86_64_v3 target for all packages 2025-11-30 20:48:13 +01:00
Dennis Klein
89f9f09c82 ci: deduplicate buildcache workflow using setup-deps action 2025-11-30 20:34:20 +01:00
Dennis Klein
a422361ee9 ci: set x86_64_v3 target for consistent buildcache 2025-11-30 20:10:50 +01:00
Dennis Klein
d1fbe4e89a ci: add named spack environments with boost187 variant 2025-11-30 19:58:19 +01:00
Dennis Klein
ef98c9c7ec ci: add --fresh flag to gcc installation
Force fresh concretization to avoid pulling gcc-runtime from public
buildcache which may be built with an incompatible gcc version.
2025-11-30 16:03:39 +01:00
Dennis Klein
399170879c ci: improve buildcache workflow
- rename mirror to ghcr-buildcache
- find system compiler before building gcc
- separate update-index job to avoid race condition
- always attempt push even on partial failure
2025-11-30 15:51:13 +01:00
Dennis Klein
1392a31250 ci: fix OCI registry authentication for buildcache push 2025-11-30 14:50:02 +01:00
Dennis Klein
060a5f2599 ci: add OCI registry login for buildcache push 2025-11-29 10:50:40 +01:00
Dennis Klein
7a8ccb8df6 ci: migrate from Jenkins to GitHub Actions 2025-11-28 21:13:30 +01:00
Dennis Klein
dcea48fcee fix: parse errors
```
/test/memory_resources/_memory_resources.cxx: In member function ‘virtual void {anonymous}::MemoryResources_allocator_Test::TestBody()’:
/test/memory_resources/_memory_resources.cxx:104:12: error: parse error in template argument list
  104 |     config.SetProperty<string>("session", to_string(session));
      |            ^~~~~~~~~~~~~~~~~~~
/test/memory_resources/_memory_resources.cxx:104:31: error: no matching function for call to ‘fair::mq::ProgOptions::SetProperty<<expression error> >(const char [8], std::string)’
  104 |     config.SetProperty<string>("session", to_string(session));
      |     ~~~~~~~~~~~~~~~~~~~~~~~~~~^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
In file included from /home/dklein/projects/FairMQ2/test/memory_resources/_memory_resources.cxx:11:
/fairmq/ProgOptions.h:269:6: note: candidate: ‘template<class T> void fair::mq::ProgOptions::SetProperty(const std::string&, T)’
  269 | void fair::mq::ProgOptions::SetProperty(const std::string& key, T val)
      |      ^~~~
/fairmq/ProgOptions.h:269:6: note:   template argument deduction/substitution failed:
/test/memory_resources/_memory_resources.cxx:104:31: error: template argument 1 is invalid
  104 |     config.SetProperty<string>("session", to_string(session));
      |     ~~~~~~~~~~~~~~~~~~~~~~~~~~^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
/test/memory_resources/_memory_resources.cxx: In member function ‘virtual void {anonymous}::MemoryResources_getMessage_Test::TestBody()’:
/test/memory_resources/_memory_resources.cxx:132:12: error: parse error in template argument list
  132 |     config.SetProperty<string>("session", to_string(session));
      |            ^~~~~~~~~~~~~~~~~~~
/test/memory_resources/_memory_resources.cxx:132:31: error: no matching function for call to ‘fair::mq::ProgOptions::SetProperty<<expression error> >(const char [8], std::string)’
  132 |     config.SetProperty<string>("session", to_string(session));
      |     ~~~~~~~~~~~~~~~~~~~~~~~~~~^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
/fairmq/ProgOptions.h:269:6: note: candidate: ‘template<class T> void fair::mq::ProgOptions::SetProperty(const std::string&, T)’
  269 | void fair::mq::ProgOptions::SetProperty(const std::string& key, T val)
      |      ^~~~
/fairmq/ProgOptions.h:269:6: note:   template argument deduction/substitution failed:
/test/memory_resources/_memory_resources.cxx:132:31: error: template argument 1 is invalid
  132 |     config.SetProperty<string>("session", to_string(session));
      |     ~~~~~~~~~~~~~~~~~~~~~~~~~~^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
```
2025-06-13 08:17:53 +02:00
Giulio Eulisse
67dcf77a7f De-boostify: use std::pmr from C++17 2025-06-13 08:02:06 +02:00
Alexey Rybalchenko
24e7a5b8d0 Make shmem headers public 2025-03-17 15:16:46 +01:00
Giulio Eulisse
c11506e958 feat(EventManager): Out of line some methods 2025-01-23 15:35:26 +01:00
Dennis Klein
e4f258c9ea fix: Update copyright 2025-01-09 17:09:57 +01:00
Dennis Klein
324a27a2e1 fix(tools): No longer use removed alias io_service
Deprecated via d3bbf3756d
and removed via 49fcd03434
in Boost 1.87 or Asio 1.33.
2025-01-09 17:09:57 +01:00
Dennis Klein
c80f97b338 fix(tools): No longer use removed query API
Deprecated via 74fe2b8e14
and removed via e916bdfb1a
in Boost 1.87 or Asio 1.33.
2025-01-09 17:09:57 +01:00
Dennis Klein
76824fee36 build(googletest): Update metadata 2025-01-09 17:09:57 +01:00
dependabot[bot]
d2e4679dc8 build(deps): bump extern/googletest from 530d5c8 to 7d76a23
Bumps [extern/googletest](https://github.com/google/googletest) from `530d5c8` to `7d76a23`.
- [Release notes](https://github.com/google/googletest/releases)
- [Commits](530d5c8c84...7d76a231b0)

---
updated-dependencies:
- dependency-name: extern/googletest
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-01-07 18:24:47 +01:00
Alexey Rybalchenko
6bb70bd519 Update Mac CI nodes 2025-01-07 17:42:16 +01:00
Giulio Eulisse
fe2127e12f Reduce bloat due to statics
Avoid disseminating every compile unit including Message.h with TransportNames and
TransportTypes and the associated unordered_map helper methods (e.g.
murmur_hash).
2025-01-07 17:41:30 +01:00
Giulio Eulisse
41165cf16b Out of line ProgOption::SetProperty for int and std::string
The specializations are common enough to show up in O2 compilation profiles
and they are not time critical (once per run at max).
2025-01-07 17:34:22 +01:00
Dennis Klein
8fe95e644e ci: Update 2024-08-20 15:56:21 +02:00
Dennis Klein
6628a231e2 build: Adopt all CMake policies up to 3.30
Modernizing to the policy range syntax supported by
[`cmake_minimum_required`](https://cmake.org/cmake/help/latest/command/cmake_minimum_required.html)
since CMake 3.12.
2024-08-20 15:56:21 +02:00
Giulio Eulisse
91b31f0799 Hide actual container from the API 2024-05-23 15:54:24 +02:00
Alexey Rybalchenko
39cb021827 Add 'no control' controller 2024-02-19 22:09:54 +01:00
Alexey Rybalchenko
36b48f5594 Update MacOS CI entires 2024-02-16 13:12:40 +01:00
Alexey Rybalchenko
0e221b28b8 shm: use node_allocator for ref counts 2024-01-25 10:45:34 +01:00
Alexey Rybalchenko
1ee0977df4 shm: use (de)allocate_one() for ref counts 2024-01-25 10:45:34 +01:00
Alexey Rybalchenko
24d578a4ba shm: extend monitor output for refCount region 2024-01-25 10:45:34 +01:00
Christian Tacke
ce1a4499cc ci: Check codemeta/zenodo with AUTHORS/CONTRIBUTORS
If AUTHORS or CONTRIBUTORS are changed,
check that the changes are merged into codemeta.json,
and .zenodo.json.
2023-12-20 16:51:13 +01:00
Dennis Klein
7d009f0915 docs: Update installation section 2023-12-14 13:15:18 +00:00
Dennis Klein
b70b181c38 ci: Create devcontainer.json 2023-12-14 13:40:47 +01:00
dependabot[bot]
94602d23b3 build(deps): bump extern/googletest from a1cc8c5 to 530d5c8
Bumps [extern/googletest](https://github.com/google/googletest) from `a1cc8c5` to `530d5c8`.
- [Release notes](https://github.com/google/googletest/releases)
- [Commits](a1cc8c5519...530d5c8c84)

---
updated-dependencies:
- dependency-name: extern/googletest
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-12-14 13:35:01 +01:00
Dennis Klein
41ac755c57 ci: Fix dependabot gitsubmodule directory 2023-12-13 15:48:21 +01:00
Dennis Klein
6d4a82427b Update dependabot.yml 2023-12-13 15:31:14 +01:00
Dennis Klein
0966dee55d build: Enable dependabot 2023-12-13 15:26:44 +01:00
Christian Tacke
b649356c5a chore: upgrade checkout step to v4 2023-12-13 13:34:35 +01:00
Alexey Rybalchenko
2df3d909fa shm: when refCount segment size is zero, fallback to old behaviour
, which is to store reference counts inside the main data segment
2023-11-29 19:21:42 +01:00
Alexey Rybalchenko
05a2ae6a31 example: configure new script too 2023-11-29 19:21:42 +01:00
Alexey Rybalchenko
58ffdfd1f4 Remove unused ctor and constant 2023-11-29 19:21:42 +01:00
Alexey Rybalchenko
addfd071bb Fix incorrect parameters in region example scripts 2023-11-24 14:19:21 +01:00
Alexey Rybalchenko
2d27abc533 Examples: add a script for externally created region 2023-11-24 14:19:21 +01:00
Alexey Rybalchenko
faf577086a shm: fix initialization of rc segment when region is created externally 2023-11-24 14:19:21 +01:00
Alexey Rybalchenko
ff1f9b94ef shm: include rcCountSegment free memory in the monitor output 2023-11-24 14:19:21 +01:00
62 changed files with 819 additions and 514 deletions

View File

@@ -0,0 +1,5 @@
{
"image": "ghcr.io/fairrootgroup/fairmq-dev/fedora-38:latest",
"features": {
}
}

52
.github/actions/setup-deps/action.yml vendored Normal file
View File

@@ -0,0 +1,52 @@
name: Install dependencies
description: Setup spack and install dependencies
inputs:
gcc:
description: 'GCC version to use'
required: true
env:
description: 'Spack environment name (latest, boost187)'
default: 'latest'
fresh:
description: 'Use fresh concretization'
default: 'false'
runs:
using: composite
# Composite action step names are not shown in the UI (https://github.com/actions/runner/issues/1877),
# so we use ::group:: to make them visible.
steps:
- name: Setup spack
uses: spack/setup-spack@v2
with:
ref: v1.1.0
color: true
buildcache: true
- name: Find system compiler
shell: spack-bash {0}
run: |
echo "::group::Find system compiler"
spack compiler find
echo "::endgroup::"
- name: Install GCC
shell: spack-bash {0}
run: |
echo "::group::Install GCC"
spack install ${{ inputs.fresh == 'true' && '--fresh' || '' }} gcc@${{ inputs.gcc }} target=x86_64_v3
gcc_hash=$(spack find --json gcc@${{ inputs.gcc }} target=x86_64_v3 | jq -r 'sort_by(.version | split(".") | map(tonumber)) | last | .hash')
spack compiler find "$(spack location -i /$gcc_hash)"
echo "::endgroup::"
- name: Install dependencies
shell: spack-bash {0}
run: |
echo "::group::Install dependencies"
spack env create fairmq test/ci/spack-${{ inputs.env }}.yaml
spack -e fairmq add gcc@${{ inputs.gcc }}
spack -e fairmq config add "packages:all:require:'%gcc@${{ inputs.gcc }}'"
spack -e fairmq install --fail-fast ${{ inputs.fresh == 'true' && '--fresh' || '' }}
spack env activate --sh fairmq | grep '^export ' | sed 's/^export //;s/;$//' >> $GITHUB_ENV
echo "::endgroup::"

12
.github/dependabot.yml vendored Normal file
View File

@@ -0,0 +1,12 @@
version: 2
updates:
- package-ecosystem: "github-actions"
directory: "/"
target-branch: "dev"
schedule:
interval: "monthly"
- package-ecosystem: "gitsubmodule"
directory: "/"
target-branch: "dev"
schedule:
interval: "monthly"

74
.github/workflows/buildcache.yml vendored Normal file
View File

@@ -0,0 +1,74 @@
name: Spack Buildcache
on:
workflow_dispatch:
schedule:
- cron: '0 3 * * 0' # Weekly on Sunday at 3am UTC
push:
paths:
- 'test/ci/spack-*.yaml'
- '.github/workflows/buildcache.yml'
- '.github/actions/setup-deps/**'
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true
jobs:
build:
if: github.repository == 'FairRootGroup/FairMQ'
name: ${{ matrix.env }}-gcc-${{ matrix.gcc }}
runs-on: ubuntu-latest
permissions:
packages: write
strategy:
fail-fast: false
matrix:
gcc: ['12', '13', '14', '15']
env: ['latest']
include:
- gcc: '15'
env: 'boost187'
steps:
- uses: actions/checkout@v4
- name: Setup spack environment
uses: ./.github/actions/setup-deps
with:
gcc: ${{ matrix.gcc }}
env: ${{ matrix.env }}
fresh: 'true'
- name: Push to buildcache
if: ${{ !cancelled() }}
shell: spack-bash {0}
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
spack -e fairmq mirror set --oci-username ${{ github.actor }} --oci-password-variable GITHUB_TOKEN ghcr-buildcache
spack -e fairmq buildcache push --unsigned ghcr-buildcache
update-index:
if: github.repository == 'FairRootGroup/FairMQ' && !cancelled()
needs: build
runs-on: ubuntu-latest
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
permissions:
packages: write
steps:
- uses: actions/checkout@v4
- name: Setup spack
uses: spack/setup-spack@v2
with:
ref: v1.1.0
color: true
- name: Update buildcache index
shell: spack-bash {0}
run: |
spack env create fairmq test/ci/spack-latest.yaml
spack -e fairmq mirror set --oci-username ${{ github.actor }} --oci-password-variable GITHUB_TOKEN ghcr-buildcache
spack -e fairmq buildcache update-index ghcr-buildcache

29
.github/workflows/check_metadata.yaml vendored Normal file
View File

@@ -0,0 +1,29 @@
# SPDX-FileCopyrightText: 2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH, Darmstadt, Germany
#
# SPDX-License-Identifier: CC0-1.0
name: Check AUTHORS and CONTRIBUTORS in metadata
on:
push:
paths:
- AUTHORS
- CONTRIBUTORS
- codemeta.json
- .zenodo.json
pull_request:
paths:
- AUTHORS
- CONTRIBUTORS
- codemeta.json
- .zenodo.json
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Try updating metadata
run: python meta_update.py
- name: Check for Updates
run: git diff --exit-code

134
.github/workflows/ci.yml vendored Normal file
View File

@@ -0,0 +1,134 @@
name: CI
on:
push:
branches: [master, dev]
pull_request:
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true
jobs:
build:
if: github.repository == 'FairRootGroup/FairMQ'
name: ${{ matrix.env }}-gcc-${{ matrix.gcc }}
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
gcc: ['12', '13', '14', '15']
env: ['latest']
include:
- gcc: '15'
env: 'boost187'
steps:
- uses: actions/checkout@v4
with:
submodules: true
fetch-tags: true
fetch-depth: 0
- name: Setup spack environment
uses: ./.github/actions/setup-deps
with:
gcc: ${{ matrix.gcc }}
env: ${{ matrix.env }}
- name: Configure and Build
uses: threeal/cmake-action@v2
with:
generator: Ninja
options: |
CMAKE_BUILD_TYPE=RelWithDebInfo
CMAKE_INSTALL_PREFIX=${{ github.workspace }}/install
BUILD_TESTING=ON
- name: Test
uses: threeal/ctest-action@v1
with:
test-dir: build
- name: Install
run: cmake --install build
sanitizers:
if: github.repository == 'FairRootGroup/FairMQ'
name: ${{ matrix.sanitizer.name }}
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
sanitizer:
- name: asan+lsan+ubsan
options: |
ENABLE_SANITIZER_ADDRESS=ON
ENABLE_SANITIZER_LEAK=ON
ENABLE_SANITIZER_UNDEFINED_BEHAVIOUR=ON
cxx-flags: -O1 -fno-omit-frame-pointer
- name: tsan
options: ENABLE_SANITIZER_THREAD=ON
cxx-compiler: clang++
steps:
- uses: actions/checkout@v4
with:
submodules: true
fetch-tags: true
fetch-depth: 0
- name: Setup spack environment
uses: ./.github/actions/setup-deps
with:
gcc: '14'
- name: Configure and Build
uses: threeal/cmake-action@v2
with:
generator: Ninja
cxx-compiler: ${{ matrix.sanitizer.cxx-compiler }}
cxx-flags: ${{ matrix.sanitizer.cxx-flags }}
options: |
CMAKE_BUILD_TYPE=Debug
BUILD_TESTING=ON
${{ matrix.sanitizer.options }}
- name: Test
uses: threeal/ctest-action@v1
with:
test-dir: build
static-analysis:
if: github.repository == 'FairRootGroup/FairMQ'
name: static-analysis
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
submodules: true
fetch-tags: true
fetch-depth: 0
- name: Setup spack environment
uses: ./.github/actions/setup-deps
with:
gcc: '14'
- name: Configure and Build
uses: threeal/cmake-action@v2
with:
generator: Ninja
options: |
CMAKE_BUILD_TYPE=Debug
BUILD_TESTING=ON
RUN_STATIC_ANALYSIS=ON
- name: Check for warnings
run: |
if grep -q "warning:" build.log; then
echo "::warning::Static analysis found warnings"
grep "warning:" build.log
exit 1
fi

View File

@@ -16,6 +16,6 @@ jobs:
container: container:
image: gitlab-registry.in2p3.fr/escape2020/wp3/eossr:v1.0 image: gitlab-registry.in2p3.fr/escape2020/wp3/eossr:v1.0
steps: steps:
- uses: actions/checkout@v3 - uses: actions/checkout@v4
- name: validate codemeta - name: validate codemeta
run: eossr-metadata-validator codemeta.json run: eossr-metadata-validator codemeta.json

View File

@@ -1,5 +1,5 @@
################################################################################ ################################################################################
# Copyright (C) 2018-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH # # Copyright (C) 2018-2024 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# # # #
# This software is distributed under the terms of the # # This software is distributed under the terms of the #
# GNU Lesser General Public Licence (LGPL) version 3, # # GNU Lesser General Public Licence (LGPL) version 3, #
@@ -8,8 +8,7 @@
# Project ###################################################################### # Project ######################################################################
cmake_minimum_required(VERSION 3.15 FATAL_ERROR) cmake_minimum_required(VERSION 3.15...3.30 FATAL_ERROR)
cmake_policy(VERSION 3.15...3.26)
list(PREPEND CMAKE_MODULE_PATH ${CMAKE_SOURCE_DIR}/cmake) list(PREPEND CMAKE_MODULE_PATH ${CMAKE_SOURCE_DIR}/cmake)
include(GitHelper) include(GitHelper)

View File

@@ -1,93 +0,0 @@
################################################################################
# Copyright (C) 2021-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# #
# This software is distributed under the terms of the #
# GNU Lesser General Public Licence (LGPL) version 3, #
# copied verbatim in the file "LICENSE" #
################################################################################
cmake_host_system_information(RESULT fqdn QUERY FQDN)
set(CTEST_SOURCE_DIRECTORY .)
set(CTEST_BINARY_DIRECTORY build)
set(CTEST_CMAKE_GENERATOR "Ninja")
set(CTEST_USE_LAUNCHERS ON)
set(CTEST_CONFIGURATION_TYPE "RelWithDebInfo")
set(CTEST_CUSTOM_MAXIMUM_PASSED_TEST_OUTPUT_SIZE 102400)
if(NOT NCPUS)
if(ENV{SLURM_CPUS_PER_TASK})
set(NCPUS $ENV{SLURM_CPUS_PER_TASK})
else()
include(ProcessorCount)
ProcessorCount(NCPUS)
if(NCPUS EQUAL 0)
set(NCPUS 1)
endif()
endif()
endif()
if ("$ENV{CTEST_SITE}" STREQUAL "")
set(CTEST_SITE "${fqdn}")
else()
set(CTEST_SITE $ENV{CTEST_SITE})
endif()
if ("$ENV{LABEL}" STREQUAL "")
set(CTEST_BUILD_NAME "build")
else()
set(CTEST_BUILD_NAME $ENV{LABEL})
endif()
ctest_start(Continuous)
list(APPEND options "-DDISABLE_COLOR=ON" "-DBUILD_EXAMPLES=ON" "-DBUILD_TESTING=ON")
if(RUN_STATIC_ANALYSIS)
list(APPEND options "-DRUN_STATIC_ANALYSIS=ON")
endif()
if(CMAKE_BUILD_TYPE)
set(CTEST_CONFIGURATION_TYPE ${CMAKE_BUILD_TYPE})
endif()
if(ENABLE_SANITIZER_ADDRESS)
list(APPEND options "-DENABLE_SANITIZER_ADDRESS=ON")
endif()
if(ENABLE_SANITIZER_LEAK)
list(APPEND options "-DENABLE_SANITIZER_LEAK=ON")
endif()
if(ENABLE_SANITIZER_UNDEFINED_BEHAVIOR)
list(APPEND options "-DENABLE_SANITIZER_UNDEFINED_BEHAVIOR=ON")
endif()
if(ENABLE_SANITIZER_MEMORY)
list(APPEND options "-DENABLE_SANITIZER_MEMORY=ON")
endif()
if(ENABLE_SANITIZER_THREAD)
list(APPEND options "-DENABLE_SANITIZER_THREAD=ON")
endif()
if(CMAKE_CXX_COMPILER)
list(APPEND options "-DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER}")
endif()
if(CMAKE_CXX_FLAGS)
list(APPEND options "-DCMAKE_CXX_FLAGS=${CMAKE_CXX_FLAGS}")
endif()
list(REMOVE_DUPLICATES options)
list(JOIN options ";" optionsstr)
ctest_configure(OPTIONS "${optionsstr}")
ctest_submit()
ctest_build(FLAGS "-j${NCPUS}")
ctest_submit()
if(NOT RUN_STATIC_ANALYSIS)
ctest_test(BUILD "${CTEST_BINARY_DIRECTORY}"
PARALLEL_LEVEL ${NCPUS}
SCHEDULE_RANDOM ON
RETURN_VALUE _ctest_test_ret_val)
ctest_submit()
endif()
if(_ctest_test_ret_val)
Message(FATAL_ERROR "Some tests failed.")
endif()

114
Jenkinsfile vendored
View File

@@ -1,114 +0,0 @@
#!groovy
def jobMatrix(String type, List specs) {
def nodes = [:]
for (spec in specs) {
def job = ""
def selector = "slurm"
def os = ""
def ver = ""
if (type == 'build') {
job = "${spec.os}-${spec.ver}-${spec.arch}-${spec.compiler}"
if (spec.os =~ /^macos/) {
selector = "${spec.os}-${spec.ver}-${spec.arch}"
}
os = spec.os
ver = spec.ver
} else { // == 'check'
job = "${spec.name}"
os = 'fedora'
ver = '36'
}
def label = "${job}"
def extra = spec.extra
nodes[label] = {
node(selector) {
githubNotify(context: "${label}", description: 'Building ...', status: 'PENDING')
try {
deleteDir()
checkout scm
def jobscript = 'job.sh'
def ctestcmd = "ctest ${extra} -S FairMQTest.cmake -V --output-on-failure"
sh "echo \"set -e\" >> ${jobscript}"
sh "echo \"export LABEL=\\\"\${JOB_BASE_NAME} ${label}\\\"\" >> ${jobscript}"
if (selector =~ /^macos/) {
sh """\
echo \"${ctestcmd}\" >> ${jobscript}
"""
sh "cat ${jobscript}"
sh "bash ${jobscript}"
} else { // selector == "slurm"
def imageurl = "oras://ghcr.io/fairrootgroup/fairmq-dev/${os}-${ver}-sif:latest"
def execopts = "--net --ipc --uts --pid -B/shared"
def containercmd = "singularity exec ${execopts} ${imageurl} bash -l -c \\\"${ctestcmd} ${extra}\\\""
sh """\
echo \"echo \\\"*** Job started at .......: \\\$(date -R)\\\"\" >> ${jobscript}
echo \"echo \\\"*** Job ID ...............: \\\${SLURM_JOB_ID}\\\"\" >> ${jobscript}
echo \"echo \\\"*** Compute node .........: \\\$(hostname -f)\\\"\" >> ${jobscript}
echo \"unset http_proxy\" >> ${jobscript}
echo \"unset HTTP_PROXY\" >> ${jobscript}
echo \"${containercmd}\" >> ${jobscript}
"""
sh "cat ${jobscript}"
sh "test/ci/slurm-submit.sh \"FairMQ \${JOB_BASE_NAME} ${label}\" ${jobscript}"
if (job == "static-analyzers") {
recordIssues(enabledForFailure: true,
tools: [gcc(pattern: 'build/Testing/Temporary/*.log')],
filters: [excludeFile('extern/*'), excludeFile('usr/*')],
skipBlames: true,
skipPublishingChecks: true)
}
}
deleteDir()
githubNotify(context: "${label}", description: 'Success', status: 'SUCCESS')
} catch (e) {
deleteDir()
githubNotify(context: "${label}", description: 'Error', status: 'ERROR')
throw e
}
}
}
}
return nodes
}
pipeline{
agent none
stages {
stage("CI") {
steps{
script {
def builds = jobMatrix('build', [
[os: 'ubuntu', ver: '20.04', arch: 'x86_64', compiler: 'gcc-9'],
[os: 'ubuntu', ver: '22.04', arch: 'x86_64', compiler: 'gcc-11'],
[os: 'fedora', ver: '33', arch: 'x86_64', compiler: 'gcc-10'],
[os: 'fedora', ver: '34', arch: 'x86_64', compiler: 'gcc-11'],
[os: 'fedora', ver: '35', arch: 'x86_64', compiler: 'gcc-11'],
[os: 'fedora', ver: '36', arch: 'x86_64', compiler: 'gcc-12'],
[os: 'fedora', ver: '37', arch: 'x86_64', compiler: 'gcc-12'],
[os: 'fedora', ver: '38', arch: 'x86_64', compiler: 'gcc-13'],
[os: 'macos', ver: '12', arch: 'x86_64', compiler: 'apple-clang-14'],
[os: 'macos', ver: '13', arch: 'arm64', compiler: 'apple-clang-14'],
])
def all_debug = "-DCMAKE_BUILD_TYPE=Debug"
def checks = jobMatrix('check', [
[name: 'static-analyzers', extra: "${all_debug} -DRUN_STATIC_ANALYSIS=ON"],
[name: '{address,leak,ub}-sanitizers',
extra: "${all_debug} -DENABLE_SANITIZER_ADDRESS=ON -DENABLE_SANITIZER_LEAK=ON -DENABLE_SANITIZER_UNDEFINED_BEHAVIOUR=ON -DCMAKE_CXX_FLAGS='-O1 -fno-omit-frame-pointer'"],
[name: 'thread-sanitizer', extra: "${all_debug} -DENABLE_SANITIZER_THREAD=ON -DCMAKE_CXX_COMPILER=clang++"],
])
parallel(builds + checks)
}
}
}
}
}

View File

@@ -45,9 +45,9 @@ Recommended:
```bash ```bash
git clone https://github.com/FairRootGroup/FairMQ fairmq_source git clone https://github.com/FairRootGroup/FairMQ fairmq_source
cmake -S fairmq_source -B fairmq_build -GNinja -DCMAKE_BUILD_TYPE=Release cmake -S fairmq_source -B fairmq_build -GNinja -DCMAKE_BUILD_TYPE=Release [-DBUILD_TESTING=ON]
cmake --build fairmq_build cmake --build fairmq_build
ctest --test-dir fairmq_build --output-on-failure --schedule-random -j<ncpus> [ctest --test-dir fairmq_build --output-on-failure --schedule-random -j<ncpus>] # needs -DBUILD_TESTING=ON
cmake --install fairmq_build --prefix $(pwd)/fairmq_install cmake --install fairmq_build --prefix $(pwd)/fairmq_install
``` ```
@@ -56,6 +56,24 @@ Please consult the [manpages of your CMake version](https://cmake.org/cmake/help
If dependencies are not installed in standard system directories, you can hint the installation location via If dependencies are not installed in standard system directories, you can hint the installation location via
`-DCMAKE_PREFIX_PATH=...` or per dependency via `-D{DEPENDENCY}_ROOT=...` (`*_ROOT` variables can also be environment variables). `-DCMAKE_PREFIX_PATH=...` or per dependency via `-D{DEPENDENCY}_ROOT=...` (`*_ROOT` variables can also be environment variables).
## Installation via Spack
Prerequisite: [Spack](https://spack.readthedocs.io/en/latest/getting_started.html)
```bash
spack info fairmq # inspect build options
spack install fairmq # build latest packaged version with default options
```
Build FairMQ's dependencies via Spack for development:
```bash
git clone -b dev https://github.com/FairRootGroup/FairMQ fairmq_source
spack --env fairmq_source install # installs deps declared in fairmq_source/spack.yaml
spack env activate fairmq_source # sets $CMAKE_PREFIX_PATH which is used by CMake to find FairMQ's deps
cmake -S fairmq_source -B fairmq_build -GNinja -DCMAKE_BUILD_TYPE=Debug -DBUILD_TESTING=ON
# develop, compile, test
spack env deactivate # at end of dev session, or simply close the shell
```
## Usage ## Usage

View File

@@ -1,5 +1,5 @@
################################################################################ ################################################################################
# Copyright (C) 2018-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH # # Copyright (C) 2018-2025 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# # # #
# This software is distributed under the terms of the # # This software is distributed under the terms of the #
# GNU Lesser General Public Licence (LGPL) version 3, # # GNU Lesser General Public Licence (LGPL) version 3, #
@@ -23,6 +23,18 @@ if(BUILD_FAIRMQ OR BUILD_TIDY_TOOL)
find_package2(PUBLIC Boost REQUIRED VERSION 1.66 find_package2(PUBLIC Boost REQUIRED VERSION 1.66
COMPONENTS container program_options filesystem date_time regex COMPONENTS container program_options filesystem date_time regex
) )
# Check Boost.Process compatibility
# Boost 1.88 has broken Boost.Process v2 without v1 compatibility headers
# Boost 1.89+ provides <boost/process/v1.hpp> for the old API
if(Boost_VERSION VERSION_EQUAL "1.88.0")
message(FATAL_ERROR "Boost version 1.88 is not supported due to Boost.Process API changes. "
"Please use Boost < 1.88 or >= 1.89")
endif()
if(Boost_VERSION VERSION_GREATER_EQUAL "1.89")
set(FAIRMQ_BOOST_PROCESS_V1_HEADER ON CACHE INTERNAL "Use boost/process/v1.hpp for Boost >= 1.89")
endif()
endif() endif()
if(BUILD_FAIRMQ) if(BUILD_FAIRMQ)
@@ -41,7 +53,7 @@ if(BUILD_TESTING)
endif() endif()
find_package2(BUNDLED GTest REQUIRED) find_package2(BUNDLED GTest REQUIRED)
if(GTest_BUNDLED) if(GTest_BUNDLED)
set(GTest_VERSION "Apr 8 2022 @a1cc8c55") set(GTest_VERSION "Dec 26 2024 @7d76a23")
set(GTest_PREFIX "<bundled>") set(GTest_PREFIX "<bundled>")
endif() endif()
endif() endif()

View File

@@ -61,7 +61,7 @@ function(add_example)
set(FAIRMQ_BIN_DIR ${CMAKE_BINARY_DIR}/fairmq) set(FAIRMQ_BIN_DIR ${CMAKE_BINARY_DIR}/fairmq)
foreach(script IN LISTS scripts) foreach(script IN LISTS scripts)
set(script_file "${script_prefix}-${script}.sh") set(script_file "${script_prefix}-${script}.sh")
configure_file("${CMAKE_CURRENT_SOURCE_DIR}/${script_file}.in" "${CMAKE_CURRENT_BINARY_DIR}/${script_file}") configure_file("${CMAKE_CURRENT_SOURCE_DIR}/${script_file}.in" "${CMAKE_CURRENT_BINARY_DIR}/${script_file}" @ONLY)
endforeach() endforeach()
if(ARG_CONFIG) if(ARG_CONFIG)
@@ -119,7 +119,7 @@ function(add_example)
set(FAIRMQ_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}/fairmq) set(FAIRMQ_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}/fairmq)
foreach(script IN LISTS scripts) foreach(script IN LISTS scripts)
set(script_file "${script_prefix}-${script}.sh") set(script_file "${script_prefix}-${script}.sh")
configure_file("${CMAKE_CURRENT_SOURCE_DIR}/${script_file}.in" "${CMAKE_CURRENT_BINARY_DIR}/${script_file}_install") configure_file("${CMAKE_CURRENT_SOURCE_DIR}/${script_file}.in" "${CMAKE_CURRENT_BINARY_DIR}/${script_file}_install" @ONLY)
install( install(
PROGRAMS "${CMAKE_CURRENT_BINARY_DIR}/${script_file}_install" PROGRAMS "${CMAKE_CURRENT_BINARY_DIR}/${script_file}_install"
DESTINATION ${PROJECT_INSTALL_BINDIR} DESTINATION ${PROJECT_INSTALL_BINDIR}

View File

@@ -8,5 +8,5 @@
add_example(NAME region add_example(NAME region
DEVICE sampler processor sink keep-alive DEVICE sampler processor sink keep-alive
SCRIPT region region-advanced SCRIPT region region-advanced region-advanced-external
) )

View File

@@ -0,0 +1,95 @@
#!/bin/bash
export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@
transport=${1:-shmem}
msgSize=${2:-1000000}
SAMPLER="fairmq-ex-region-sampler"
SAMPLER+=" --id sampler1"
# SAMPLER+=" --sampling-rate 10"
SAMPLER+=" --severity debug"
SAMPLER+=" --msg-size $msgSize"
SAMPLER+=" --transport $transport"
SAMPLER+=" --shmid 1"
SAMPLER+=" --shm-monitor false"
SAMPLER+=" --rc-segment-size 200000000"
SAMPLER+=" --external-region true"
SAMPLER+=" --shm-no-cleanup true"
SAMPLER+=" --chan-name data1"
SAMPLER+=" --channel-config name=data1,type=push,method=bind,address=tcp://127.0.0.1:7777"
xterm -geometry 90x60+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER &
PROCESSOR1="fairmq-ex-region-processor"
PROCESSOR1+=" --id processor1"
PROCESSOR1+=" --severity debug"
PROCESSOR1+=" --transport $transport"
PROCESSOR1+=" --shmid 1"
PROCESSOR1+=" --shm-segment-id 1"
PROCESSOR1+=" --shm-monitor false"
PROCESSOR1+=" --shm-no-cleanup true"
PROCESSOR1+=" --channel-config name=data1,type=pull,method=connect,address=tcp://127.0.0.1:7777"
PROCESSOR1+=" name=data2,type=push,method=bind,address=tcp://127.0.0.1:7778"
PROCESSOR1+=" name=data3,type=push,method=bind,address=tcp://127.0.0.1:7779"
xterm -geometry 90x40+550+40 -hold -e @EX_BIN_DIR@/$PROCESSOR1 &
PROCESSOR2="fairmq-ex-region-processor"
PROCESSOR2+=" --id processor2"
PROCESSOR2+=" --severity debug"
PROCESSOR2+=" --transport $transport"
PROCESSOR2+=" --shmid 1"
PROCESSOR2+=" --shm-segment-id 2"
PROCESSOR2+=" --shm-monitor false"
PROCESSOR2+=" --shm-no-cleanup true"
PROCESSOR2+=" --channel-config name=data1,type=pull,method=connect,address=tcp://127.0.0.1:7777"
PROCESSOR2+=" name=data2,type=push,method=bind,address=tcp://127.0.0.1:7788"
PROCESSOR2+=" name=data3,type=push,method=bind,address=tcp://127.0.0.1:7789"
xterm -geometry 90x40+550+600 -hold -e @EX_BIN_DIR@/$PROCESSOR2 &
SINK1_1="fairmq-ex-region-sink"
SINK1_1+=" --id sink1_1"
SINK1_1+=" --severity debug"
SINK1_1+=" --chan-name data2"
SINK1_1+=" --transport $transport"
SINK1_1+=" --shmid 1"
SINK1_1+=" --shm-segment-id 1"
SINK1_1+=" --shm-monitor false"
SINK1_1+=" --shm-no-cleanup true"
SINK1_1+=" --channel-config name=data2,type=pull,method=connect,address=tcp://127.0.0.1:7778"
xterm -geometry 90x20+1100+0 -hold -e @EX_BIN_DIR@/$SINK1_1 &
SINK1_2="fairmq-ex-region-sink"
SINK1_2+=" --id sink1_2"
SINK1_2+=" --severity debug"
SINK1_2+=" --chan-name data3"
SINK1_2+=" --transport $transport"
SINK1_2+=" --shmid 1"
SINK1_2+=" --shm-segment-id 1"
SINK1_2+=" --shm-monitor false"
SINK1_2+=" --shm-no-cleanup true"
SINK1_2+=" --channel-config name=data3,type=pull,method=connect,address=tcp://127.0.0.1:7779"
xterm -geometry 90x20+1100+300 -hold -e @EX_BIN_DIR@/$SINK1_2 &
SINK2_1="fairmq-ex-region-sink"
SINK2_1+=" --id sink2_1"
SINK2_1+=" --severity debug"
SINK2_1+=" --chan-name data2"
SINK2_1+=" --transport $transport"
SINK2_1+=" --shmid 1"
SINK2_1+=" --shm-segment-id 2"
SINK2_1+=" --shm-monitor false"
SINK2_1+=" --shm-no-cleanup true"
SINK2_1+=" --channel-config name=data2,type=pull,method=connect,address=tcp://127.0.0.1:7788"
xterm -geometry 90x20+1100+600 -hold -e @EX_BIN_DIR@/$SINK2_1 &
SINK2_2="fairmq-ex-region-sink"
SINK2_2+=" --id sink2_2"
SINK2_2+=" --severity debug"
SINK2_2+=" --chan-name data3"
SINK2_2+=" --transport $transport"
SINK2_2+=" --shmid 1"
SINK2_2+=" --shm-segment-id 2"
SINK2_2+=" --shm-monitor false"
SINK2_2+=" --shm-no-cleanup true"
SINK2_2+=" --channel-config name=data3,type=pull,method=connect,address=tcp://127.0.0.1:7789"
xterm -geometry 90x20+1100+900 -hold -e @EX_BIN_DIR@/$SINK2_2 &

View File

@@ -2,16 +2,8 @@
export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@ export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@
transport="shmem" transport=${1:-shmem}
msgSize="1000000" msgSize=${2:-1000000}
if [[ $1 =~ ^[a-z]+$ ]]; then
transport=$1
fi
if [[ $2 =~ ^[0-9]+$ ]]; then
msgSize=$1
fi
SAMPLER="fairmq-ex-region-sampler" SAMPLER="fairmq-ex-region-sampler"
SAMPLER+=" --id sampler1" SAMPLER+=" --id sampler1"
@@ -19,6 +11,7 @@ SAMPLER+=" --id sampler1"
SAMPLER+=" --severity debug" SAMPLER+=" --severity debug"
SAMPLER+=" --msg-size $msgSize" SAMPLER+=" --msg-size $msgSize"
SAMPLER+=" --transport $transport" SAMPLER+=" --transport $transport"
#SAMPLER+=" --rc-segment-size 0"
SAMPLER+=" --shm-monitor true" SAMPLER+=" --shm-monitor true"
SAMPLER+=" --chan-name data1" SAMPLER+=" --chan-name data1"
SAMPLER+=" --channel-config name=data1,type=push,method=bind,address=tcp://127.0.0.1:7777" SAMPLER+=" --channel-config name=data1,type=push,method=bind,address=tcp://127.0.0.1:7777"

View File

@@ -2,16 +2,8 @@
export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@ export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@
transport="shmem" transport=${1:-shmem}
msgSize="1000000" msgSize=${2:-1000000}
if [[ $1 =~ ^[a-z]+$ ]]; then
transport=$1
fi
if [[ $2 =~ ^[0-9]+$ ]]; then
msgSize=$1
fi
SAMPLER="fairmq-ex-region-sampler" SAMPLER="fairmq-ex-region-sampler"
SAMPLER+=" --id sampler1" SAMPLER+=" --id sampler1"

View File

@@ -95,10 +95,11 @@ struct ShmManager
uint64_t size = stoull(conf.at(1)); uint64_t size = stoull(conf.at(1));
fair::mq::RegionConfig cfg; fair::mq::RegionConfig cfg;
cfg.id = id; cfg.id = id;
cfg.rcSegmentSize = 0;
cfg.size = size; cfg.size = size;
regionCfgs.push_back(cfg); regionCfgs.push_back(cfg);
auto ret = regions.emplace(id, make_unique<fair::mq::shmem::UnmanagedRegion>(shmId, id, size)); auto ret = regions.emplace(id, make_unique<fair::mq::shmem::UnmanagedRegion>(shmId, cfg));
fair::mq::shmem::UnmanagedRegion& region = *(ret.first->second); fair::mq::shmem::UnmanagedRegion& region = *(ret.first->second);
LOG(info) << "Created unamanged region " << id << " of size " << region.GetSize() LOG(info) << "Created unamanged region " << id << " of size " << region.GetSize()
<< ", starting at " << region.GetData() << ". Locking..."; << ", starting at " << region.GetData() << ". Locking...";

View File

@@ -1,5 +1,5 @@
################################################################################ ################################################################################
# Copyright (C) 2012-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH # # Copyright (C) 2012-2025 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# # # #
# This software is distributed under the terms of the # # This software is distributed under the terms of the #
# GNU Lesser General Public Licence (LGPL) version 3, # # GNU Lesser General Public Licence (LGPL) version 3, #
@@ -63,14 +63,21 @@ if(BUILD_FAIRMQ)
Tools.h Tools.h
TransportFactory.h TransportFactory.h
Transports.h Transports.h
TransportEnum.h
UnmanagedRegion.h UnmanagedRegion.h
options/FairMQProgOptions.h options/FairMQProgOptions.h
runDevice.h runDevice.h
runFairMQDevice.h runFairMQDevice.h
shmem/Common.h shmem/Common.h
shmem/Manager.h
shmem/Message.h
shmem/Monitor.h shmem/Monitor.h
shmem/Poller.h
shmem/Segment.h shmem/Segment.h
shmem/Socket.h
shmem/TransportFactory.h
shmem/UnmanagedRegion.h shmem/UnmanagedRegion.h
shmem/UnmanagedRegionImpl.h
tools/Compiler.h tools/Compiler.h
tools/CppSTL.h tools/CppSTL.h
tools/Exceptions.h tools/Exceptions.h
@@ -95,12 +102,6 @@ if(BUILD_FAIRMQ)
plugins/Builtin.h plugins/Builtin.h
plugins/config/Config.h plugins/config/Config.h
plugins/control/Control.h plugins/control/Control.h
shmem/Message.h
shmem/Poller.h
shmem/UnmanagedRegionImpl.h
shmem/Socket.h
shmem/TransportFactory.h
shmem/Manager.h
zeromq/Common.h zeromq/Common.h
zeromq/Context.h zeromq/Context.h
zeromq/Message.h zeromq/Message.h
@@ -118,6 +119,7 @@ if(BUILD_FAIRMQ)
Channel.cxx Channel.cxx
Device.cxx Device.cxx
DeviceRunner.cxx DeviceRunner.cxx
EventManager.cxx
JSONParser.cxx JSONParser.cxx
MemoryResources.cxx MemoryResources.cxx
Plugin.cxx Plugin.cxx
@@ -174,6 +176,9 @@ if(BUILD_FAIRMQ)
FAIRMQ_HAS_STD_FILESYSTEM=${FAIRMQ_HAS_STD_FILESYSTEM} FAIRMQ_HAS_STD_FILESYSTEM=${FAIRMQ_HAS_STD_FILESYSTEM}
FAIRMQ_HAS_STD_PMR=${FAIRMQ_HAS_STD_PMR} FAIRMQ_HAS_STD_PMR=${FAIRMQ_HAS_STD_PMR}
) )
if(FAIRMQ_BOOST_PROCESS_V1_HEADER)
target_compile_definitions(${target} PRIVATE FAIRMQ_BOOST_PROCESS_V1_HEADER)
endif()
if(DEFINED FAIRMQ_CHANNEL_DEFAULT_AUTOBIND) if(DEFINED FAIRMQ_CHANNEL_DEFAULT_AUTOBIND)
# translate CMake boolean (TRUE, FALSE, 0, 1, OFF, ON) into C++ boolean literal (true, false) # translate CMake boolean (TRUE, FALSE, 0, 1, OFF, ON) into C++ boolean literal (true, false)
if(FAIRMQ_CHANNEL_DEFAULT_AUTOBIND) if(FAIRMQ_CHANNEL_DEFAULT_AUTOBIND)

View File

@@ -12,6 +12,7 @@
#include <fairmq/Channel.h> #include <fairmq/Channel.h>
#include <fairmq/Properties.h> #include <fairmq/Properties.h>
#include <fairmq/Tools.h> #include <fairmq/Tools.h>
#include <fairmq/Transports.h>
#include <random> #include <random>
#include <regex> #include <regex>
#include <set> #include <set>
@@ -383,4 +384,10 @@ bool Channel::BindEndpoint(string& endpoint)
} }
} }
std::string Channel::GetTransportName() const { return TransportName(fTransportType); }
Transport Channel::GetTransportType() const { return fTransportType; }
void Channel::UpdateTransport(const std::string& transport) { fTransportType = TransportType(transport); Invalidate(); }
} // namespace fair::mq } // namespace fair::mq

View File

@@ -14,7 +14,7 @@
#include <fairmq/Properties.h> #include <fairmq/Properties.h>
#include <fairmq/Socket.h> #include <fairmq/Socket.h>
#include <fairmq/TransportFactory.h> #include <fairmq/TransportFactory.h>
#include <fairmq/Transports.h> #include <fairmq/TransportEnum.h>
#include <fairmq/UnmanagedRegion.h> #include <fairmq/UnmanagedRegion.h>
#include <cstdint> // int64_t #include <cstdint> // int64_t
@@ -145,11 +145,11 @@ class Channel
/// Get channel transport name ("default", "zeromq" or "shmem") /// Get channel transport name ("default", "zeromq" or "shmem")
/// @return Returns channel transport name (e.g. "default", "zeromq" or "shmem") /// @return Returns channel transport name (e.g. "default", "zeromq" or "shmem")
std::string GetTransportName() const { return TransportName(fTransportType); } std::string GetTransportName() const;
/// Get channel transport type /// Get channel transport type
/// @return Returns channel transport type /// @return Returns channel transport type
mq::Transport GetTransportType() const { return fTransportType; } mq::Transport GetTransportType() const;
/// Get socket send buffer size (in number of messages) /// Get socket send buffer size (in number of messages)
/// @return Returns socket send buffer size (in number of messages) /// @return Returns socket send buffer size (in number of messages)
@@ -221,7 +221,7 @@ class Channel
/// Set channel transport /// Set channel transport
/// @param transport transport string ("default", "zeromq" or "shmem") /// @param transport transport string ("default", "zeromq" or "shmem")
void UpdateTransport(const std::string& transport) { fTransportType = TransportType(transport); Invalidate(); } void UpdateTransport(const std::string& transport);
/// Set socket send buffer size /// Set socket send buffer size
/// @param sndBufSize Socket send buffer size (in number of messages) /// @param sndBufSize Socket send buffer size (in number of messages)
@@ -438,7 +438,7 @@ class Channel
} }
void CheckSendCompatibility(Parts& parts) { CheckSendCompatibility(parts.fParts); } void CheckSendCompatibility(Parts& parts) { CheckSendCompatibility(parts.fParts); }
void CheckSendCompatibility(std::vector<MessagePtr>& msgVec) void CheckSendCompatibility(Parts::container & msgVec)
{ {
for (auto& msg : msgVec) { for (auto& msg : msgVec) {
if (fTransportType != msg->GetType()) { if (fTransportType != msg->GetType()) {
@@ -468,7 +468,7 @@ class Channel
} }
void CheckReceiveCompatibility(Parts& parts) { CheckReceiveCompatibility(parts.fParts); } void CheckReceiveCompatibility(Parts& parts) { CheckReceiveCompatibility(parts.fParts); }
void CheckReceiveCompatibility(std::vector<MessagePtr>& msgVec) void CheckReceiveCompatibility(Parts::container& msgVec)
{ {
for (auto& msg : msgVec) { for (auto& msg : msgVec) {
if (fTransportType != msg->GetType()) { if (fTransportType != msg->GetType()) {

View File

@@ -9,6 +9,7 @@
// FairMQ // FairMQ
#include <fairmq/Device.h> #include <fairmq/Device.h>
#include <fairmq/Tools.h> #include <fairmq/Tools.h>
#include <fairmq/Transports.h>
// boost // boost
#include <boost/algorithm/string.hpp> // join/split #include <boost/algorithm/string.hpp> // join/split

View File

@@ -19,7 +19,7 @@
#include <fairmq/StateQueue.h> #include <fairmq/StateQueue.h>
#include <fairmq/Tools.h> #include <fairmq/Tools.h>
#include <fairmq/TransportFactory.h> #include <fairmq/TransportFactory.h>
#include <fairmq/Transports.h> #include <fairmq/TransportEnum.h>
#include <fairmq/UnmanagedRegion.h> #include <fairmq/UnmanagedRegion.h>
// logger // logger

20
fairmq/EventManager.cxx Normal file
View File

@@ -0,0 +1,20 @@
/********************************************************************************
* Copyright (C) 2025 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "EventManager.h"
#include <string>
#include <typeindex>
template std::shared_ptr<
fair::mq::EventManager::Signal<fair::mq::PropertyChangeAsString, std::string>>
fair::mq::EventManager::GetSignal<fair::mq::PropertyChangeAsString, std::string>(
const std::pair<std::type_index, std::type_index>& key) const;
template void fair::mq::EventManager::Subscribe<fair::mq::PropertyChangeAsString, std::string>(
const std::string& subscriber,
std::function<void(typename fair::mq::PropertyChangeAsString::KeyType, std::string)>);

View File

@@ -1,5 +1,5 @@
/******************************************************************************** /********************************************************************************
* Copyright (C) 2014-2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Copyright (C) 2014-2025 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* * * *
* This software is distributed under the terms of the * * This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
@@ -57,27 +57,8 @@ class EventManager
template<typename E, typename ...Args> template<typename E, typename ...Args>
using Signal = boost::signals2::signal<void(typename E::KeyType, Args...)>; using Signal = boost::signals2::signal<void(typename E::KeyType, Args...)>;
template<typename E, typename ...Args> template<typename E, typename... Args>
auto Subscribe(const std::string& subscriber, std::function<void(typename E::KeyType, Args...)> callback) -> void auto Subscribe(const std::string& subscriber, std::function<void(typename E::KeyType, Args...)> callback) -> void;
{
const std::type_index event_type_index{typeid(E)};
const std::type_index callback_type_index{typeid(std::function<void(typename E::KeyType, Args...)>)};
const auto signalsKey = std::make_pair(event_type_index, callback_type_index);
const auto connectionsKey = std::make_pair(subscriber, signalsKey);
const auto connection = GetSignal<E, Args...>(signalsKey)->connect(callback);
{
std::lock_guard<std::mutex> lock{fMutex};
if (fConnections.find(connectionsKey) != fConnections.end())
{
fConnections.at(connectionsKey).disconnect();
fConnections.erase(connectionsKey);
}
fConnections.insert({connectionsKey, connection});
}
}
template<typename E, typename ...Args> template<typename E, typename ...Args>
auto Unsubscribe(const std::string& subscriber) -> void auto Unsubscribe(const std::string& subscriber) -> void
@@ -119,21 +100,58 @@ class EventManager
mutable std::mutex fMutex; mutable std::mutex fMutex;
template<typename E, typename ...Args> template<typename E, typename ...Args>
auto GetSignal(const SignalsKey& key) const -> std::shared_ptr<Signal<E, Args...>> auto GetSignal(const SignalsKey& key) const -> std::shared_ptr<Signal<E, Args...>>;
}; /* class EventManager */
struct PropertyChangeAsString : Event<std::string> {};
template<typename E, typename... Args>
auto EventManager::GetSignal(const SignalsKey& key) const -> std::shared_ptr<Signal<E, Args...>>
{
std::lock_guard<std::mutex> lock{fMutex};
if (fSignals.find(key) == fSignals.end()) {
// wrapper is needed because boost::signals2::signal is neither copyable nor movable
// and I don't know how else to insert it into the map
auto signal = std::make_shared<Signal<E, Args...>>();
fSignals.insert(std::make_pair(key, signal));
}
return boost::any_cast<std::shared_ptr<Signal<E, Args...>>>(fSignals.at(key));
}
template<typename E, typename... Args>
auto EventManager::Subscribe(const std::string& subscriber,
std::function<void(typename E::KeyType, Args...)> callback) -> void
{
const std::type_index event_type_index{typeid(E)};
const std::type_index callback_type_index{
typeid(std::function<void(typename E::KeyType, Args...)>)};
const auto signalsKey = std::make_pair(event_type_index, callback_type_index);
const auto connectionsKey = std::make_pair(subscriber, signalsKey);
const auto connection = GetSignal<E, Args...>(signalsKey)->connect(callback);
{ {
std::lock_guard<std::mutex> lock{fMutex}; std::lock_guard<std::mutex> lock{fMutex};
if (fSignals.find(key) == fSignals.end()) if (fConnections.find(connectionsKey) != fConnections.end()) {
{ fConnections.at(connectionsKey).disconnect();
// wrapper is needed because boost::signals2::signal is neither copyable nor movable fConnections.erase(connectionsKey);
// and I don't know how else to insert it into the map
auto signal = std::make_shared<Signal<E, Args...>>();
fSignals.insert(std::make_pair(key, signal));
} }
fConnections.insert({connectionsKey, connection});
return boost::any_cast<std::shared_ptr<Signal<E, Args...>>>(fSignals.at(key));
} }
}; /* class EventManager */ }
extern template std::shared_ptr<
fair::mq::EventManager::Signal<fair::mq::PropertyChangeAsString, std::string>>
fair::mq::EventManager::GetSignal<fair::mq::PropertyChangeAsString, std::string>(
const std::pair<std::type_index, std::type_index>& key) const;
extern template void
fair::mq::EventManager::Subscribe<fair::mq::PropertyChangeAsString, std::string>(
const std::string& subscriber,
std::function<void(typename fair::mq::PropertyChangeAsString::KeyType, std::string)>);
} // namespace fair::mq } // namespace fair::mq

View File

@@ -17,7 +17,7 @@
#include <boost/container/container_fwd.hpp> #include <boost/container/container_fwd.hpp>
#include <boost/container/flat_map.hpp> #include <boost/container/flat_map.hpp>
#include <boost/container/pmr/memory_resource.hpp> #include <memory_resource>
#include <cstring> #include <cstring>
#include <fairmq/Message.h> #include <fairmq/Message.h>
#include <stdexcept> #include <stdexcept>
@@ -27,7 +27,7 @@ namespace fair::mq {
class TransportFactory; class TransportFactory;
using byte = unsigned char; using byte = unsigned char;
namespace pmr = boost::container::pmr; namespace pmr = std::pmr;
/// All FairMQ related memory resources need to inherit from this interface /// All FairMQ related memory resources need to inherit from this interface
/// class for the /// class for the

View File

@@ -10,7 +10,7 @@
#define FAIR_MQ_MESSAGE_H #define FAIR_MQ_MESSAGE_H
#include <cstddef> // for size_t #include <cstddef> // for size_t
#include <fairmq/Transports.h> #include <fairmq/TransportEnum.h>
#include <memory> // unique_ptr #include <memory> // unique_ptr
#include <stdexcept> #include <stdexcept>

View File

@@ -448,3 +448,6 @@ void ProgOptions::PrintOptionsRaw() const
} }
} // namespace fair::mq } // namespace fair::mq
template void fair::mq::ProgOptions::SetProperty<std::string>(const std::string& key, std::string val);
template void fair::mq::ProgOptions::SetProperty<int>(const std::string& key, int val);

View File

@@ -129,17 +129,7 @@ class ProgOptions
/// @param key /// @param key
/// @param val /// @param val
template<typename T> template<typename T>
void SetProperty(const std::string& key, T val) void SetProperty(const std::string& key, T val);
{
std::unique_lock<std::mutex> lock(fMtx);
SetVarMapValue<typename std::decay<T>::type>(key, val);
lock.unlock();
fEvents.Emit<fair::mq::PropertyChange, typename std::decay<T>::type>(key, val);
fEvents.Emit<fair::mq::PropertyChangeAsString, std::string>(key, GetPropertyAsString(key));
}
/// @brief Updates an existing config property (or fails if it doesn't exist) /// @brief Updates an existing config property (or fails if it doesn't exist)
/// @param key /// @param key
@@ -275,5 +265,20 @@ class ProgOptions
}; };
} // namespace fair::mq } // namespace fair::mq
template <typename T>
void fair::mq::ProgOptions::SetProperty(const std::string& key, T val)
{
std::unique_lock<std::mutex> lock(fMtx);
SetVarMapValue<typename std::decay<T>::type>(key, val);
lock.unlock();
fEvents.Emit<fair::mq::PropertyChange, typename std::decay<T>::type>(key, val);
fEvents.Emit<fair::mq::PropertyChangeAsString, std::string>(key, GetPropertyAsString(key));
}
extern template void fair::mq::ProgOptions::SetProperty<int>(const std::string& key, int val);
extern template void fair::mq::ProgOptions::SetProperty<std::string>(const std::string& key, std::string val);
#endif /* FAIR_MQ_PROGOPTIONS_H */ #endif /* FAIR_MQ_PROGOPTIONS_H */

View File

@@ -1,5 +1,5 @@
/******************************************************************************** /********************************************************************************
* Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Copyright (C) 2014-2025 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* * * *
* This software is distributed under the terms of the * * This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
@@ -29,7 +29,6 @@ using Property = boost::any;
using Properties = std::map<std::string, Property>; using Properties = std::map<std::string, Property>;
struct PropertyChange : Event<std::string> {}; struct PropertyChange : Event<std::string> {};
struct PropertyChangeAsString : Event<std::string> {};
class PropertyHelper class PropertyHelper
{ {

View File

@@ -52,8 +52,8 @@ struct Socket
virtual int64_t Send(MessagePtr& msg, int timeout = -1) = 0; virtual int64_t Send(MessagePtr& msg, int timeout = -1) = 0;
virtual int64_t Receive(MessagePtr& msg, int timeout = -1) = 0; virtual int64_t Receive(MessagePtr& msg, int timeout = -1) = 0;
virtual int64_t Send(std::vector<std::unique_ptr<Message>>& msgVec, int timeout = -1) = 0; virtual int64_t Send(Parts::container& msgVec, int timeout = -1) = 0;
virtual int64_t Receive(std::vector<std::unique_ptr<Message>>& msgVec, int timeout = -1) = 0; virtual int64_t Receive(Parts::container & msgVec, int timeout = -1) = 0;
virtual int64_t Send(Parts& parts, int timeout = -1) { return Send(parts.fParts, timeout); } virtual int64_t Send(Parts& parts, int timeout = -1) { return Send(parts.fParts, timeout); }
virtual int64_t Receive(Parts& parts, int timeout = -1) { return Receive(parts.fParts, timeout); } virtual int64_t Receive(Parts& parts, int timeout = -1) { return Receive(parts.fParts, timeout); }

22
fairmq/TransportEnum.h Normal file
View File

@@ -0,0 +1,22 @@
/********************************************************************************
* Copyright (C) 2014-2025 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIR_MQ_TRANSPORTENUMS_H
#define FAIR_MQ_TRANSPORTENUMS_H
namespace fair::mq {
enum class Transport {
DEFAULT,
ZMQ,
SHM
};
}
#endif // FAIR_MQ_TRANSPORTENUMS_H

View File

@@ -14,7 +14,7 @@
#include <fairmq/Message.h> #include <fairmq/Message.h>
#include <fairmq/Poller.h> #include <fairmq/Poller.h>
#include <fairmq/Socket.h> #include <fairmq/Socket.h>
#include <fairmq/Transports.h> #include <fairmq/TransportEnum.h>
#include <fairmq/UnmanagedRegion.h> #include <fairmq/UnmanagedRegion.h>
#include <memory> // shared_ptr #include <memory> // shared_ptr
#include <stdexcept> #include <stdexcept>

View File

@@ -10,6 +10,7 @@
#define FAIR_MQ_TRANSPORTS_H #define FAIR_MQ_TRANSPORTS_H
#include <fairmq/tools/Strings.h> #include <fairmq/tools/Strings.h>
#include <fairmq/TransportEnum.h>
#include <memory> #include <memory>
#include <ostream> #include <ostream>
#include <stdexcept> #include <stdexcept>
@@ -18,13 +19,6 @@
namespace fair::mq { namespace fair::mq {
enum class Transport
{
DEFAULT,
ZMQ,
SHM
};
struct TransportError : std::runtime_error struct TransportError : std::runtime_error
{ {
using std::runtime_error::runtime_error; using std::runtime_error::runtime_error;

View File

@@ -9,7 +9,7 @@
#ifndef FAIR_MQ_UNMANAGEDREGION_H #ifndef FAIR_MQ_UNMANAGEDREGION_H
#define FAIR_MQ_UNMANAGEDREGION_H #define FAIR_MQ_UNMANAGEDREGION_H
#include <fairmq/Transports.h> #include <fairmq/TransportEnum.h>
#include <cstddef> // size_t #include <cstddef> // size_t
#include <cstdint> // uint32_t #include <cstdint> // uint32_t

View File

@@ -1,5 +1,5 @@
/******************************************************************************** /********************************************************************************
* Copyright (C) 2018-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Copyright (C) 2018-2025 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* * * *
* This software is distributed under the terms of the * * This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
@@ -19,7 +19,7 @@
#define FAIRMQ_GIT_DATE "@PROJECT_GIT_DATE@" #define FAIRMQ_GIT_DATE "@PROJECT_GIT_DATE@"
#define FAIRMQ_REPO_URL "https://github.com/FairRootGroup/FairMQ" #define FAIRMQ_REPO_URL "https://github.com/FairRootGroup/FairMQ"
#define FAIRMQ_LICENSE "LGPL-3.0" #define FAIRMQ_LICENSE "LGPL-3.0"
#define FAIRMQ_COPYRIGHT "2012-2023 GSI" #define FAIRMQ_COPYRIGHT "2012-2025 GSI"
#define FAIRMQ_BUILD_TYPE "@CMAKE_BUILD_TYPE@" #define FAIRMQ_BUILD_TYPE "@CMAKE_BUILD_TYPE@"
#endif // FAIR_MQ_VERSION_H #endif // FAIR_MQ_VERSION_H

View File

@@ -65,13 +65,17 @@ Control::Control(const string& name, Plugin::Version version, const string& main
}); });
try { try {
TakeDeviceControl();
auto control = GetProperty<string>("control"); auto control = GetProperty<string>("control");
if (control != "none") {
TakeDeviceControl();
}
if (control == "static") { if (control == "static") {
LOG(debug) << "Running builtin controller: static"; LOG(debug) << "Running builtin controller: static";
fControllerThread = thread(&Control::StaticMode, this); fControllerThread = thread(&Control::StaticMode, this);
} else if (control == "none") {
LOG(debug) << "Builtin controller: disabled";
} else if (control == "gui") { } else if (control == "gui") {
LOG(debug) << "Running builtin controller: gui"; LOG(debug) << "Running builtin controller: gui";
fControllerThread = thread(&Control::GUIMode, this); fControllerThread = thread(&Control::GUIMode, this);
@@ -142,7 +146,7 @@ auto ControlPluginProgramOptions() -> Plugin::ProgOptions
namespace po = boost::program_options; namespace po = boost::program_options;
auto pluginOptions = po::options_description{"Control (builtin) Plugin"}; auto pluginOptions = po::options_description{"Control (builtin) Plugin"};
pluginOptions.add_options() pluginOptions.add_options()
("control", po::value<string>()->default_value("dynamic"), "Control mode, 'static' or 'dynamic' (aliases for dynamic are external and interactive)") ("control", po::value<string>()->default_value("dynamic"), "Control mode, 'static' or 'dynamic' (aliases for dynamic are external and interactive), 'none', 'gui'")
("catch-signals", po::value<int >()->default_value(1), "Enable signal handling (1/0)."); ("catch-signals", po::value<int >()->default_value(1), "Enable signal handling (1/0).");
return pluginOptions; return pluginOptions;
} }
@@ -271,11 +275,11 @@ auto Control::InteractiveMode() -> void
try { try {
RunStartupSequence(); RunStartupSequence();
if(!fDeviceShutdownRequested) { if (!fDeviceShutdownRequested) {
RunREPL(); RunREPL();
} }
if(!fDeviceShutdownRequested) { if (!fDeviceShutdownRequested) {
RunShutdownSequence(); RunShutdownSequence();
} }
} catch (PluginServices::DeviceControlError& e) { } catch (PluginServices::DeviceControlError& e) {
@@ -404,7 +408,7 @@ try {
// or for device shutdown request (Ctrl-C) // or for device shutdown request (Ctrl-C)
fStateQueue.WaitForNextOrCustom([this]{ return fDeviceShutdownRequested.load(); }); fStateQueue.WaitForNextOrCustom([this]{ return fDeviceShutdownRequested.load(); });
if(!fDeviceShutdownRequested) { if (!fDeviceShutdownRequested) {
RunShutdownSequence(); RunShutdownSequence();
} }
} catch (PluginServices::DeviceControlError& e) { } catch (PluginServices::DeviceControlError& e) {
@@ -421,7 +425,7 @@ try {
// Wait for device shutdown request (Ctrl-C) // Wait for device shutdown request (Ctrl-C)
fStateQueue.WaitForCustom([this]{ return fDeviceShutdownRequested.load(); }); fStateQueue.WaitForCustom([this]{ return fDeviceShutdownRequested.load(); });
if(!fDeviceShutdownRequested) { if (!fDeviceShutdownRequested) {
RunShutdownSequence(); RunShutdownSequence();
} }
} catch (PluginServices::DeviceControlError& e) { } catch (PluginServices::DeviceControlError& e) {

View File

@@ -13,7 +13,8 @@
#include <functional> // std::equal_to #include <functional> // std::equal_to
#include <boost/functional/hash.hpp> #include <boost/functional/hash.hpp>
#include <boost/interprocess/allocators/adaptive_pool.hpp> // #include <boost/interprocess/allocators/adaptive_pool.hpp>
#include <boost/interprocess/allocators/node_allocator.hpp>
#include <boost/interprocess/allocators/allocator.hpp> #include <boost/interprocess/allocators/allocator.hpp>
#include <boost/interprocess/containers/map.hpp> #include <boost/interprocess/containers/map.hpp>
#include <boost/interprocess/containers/string.hpp> #include <boost/interprocess/containers/string.hpp>
@@ -69,9 +70,10 @@ struct RefCount
static constexpr size_t numNodesPerBlock = 4096; static constexpr size_t numNodesPerBlock = 4096;
// Maximum number of totally free blocks that the adaptive node pool will hold. // Maximum number of totally free blocks that the adaptive node pool will hold.
// The rest of the totally free blocks will be deallocated with the segment manager. // The rest of the totally free blocks will be deallocated with the segment manager.
static constexpr size_t maxFreeBlocks = 2; // static constexpr size_t maxFreeBlocks = 2;
using RefCountPool = boost::interprocess::adaptive_pool<RefCount, boost::interprocess::managed_shared_memory::segment_manager, numNodesPerBlock, maxFreeBlocks>; using RefCountPool = boost::interprocess::node_allocator<RefCount, boost::interprocess::managed_shared_memory::segment_manager, numNodesPerBlock>;
// using RefCountPool = boost::interprocess::adaptive_pool<RefCount, boost::interprocess::managed_shared_memory::segment_manager, numNodesPerBlock, maxFreeBlocks>;
using SegmentManager = boost::interprocess::managed_shared_memory::segment_manager; using SegmentManager = boost::interprocess::managed_shared_memory::segment_manager;
using VoidAlloc = boost::interprocess::allocator<void, SegmentManager>; using VoidAlloc = boost::interprocess::allocator<void, SegmentManager>;
@@ -172,8 +174,6 @@ enum class AllocationAlgorithm : int
struct RegionInfo struct RegionInfo
{ {
static constexpr uint64_t DefaultRcSegmentSize = 10000000;
RegionInfo(const char* path, int flags, uint64_t userFlags, uint64_t size, uint64_t rcSegmentSize, const VoidAlloc& alloc) RegionInfo(const char* path, int flags, uint64_t userFlags, uint64_t size, uint64_t rcSegmentSize, const VoidAlloc& alloc)
: fPath(path, alloc) : fPath(path, alloc)
, fCreationFlags(flags) , fCreationFlags(flags)
@@ -183,14 +183,6 @@ struct RegionInfo
, fDestroyed(false) , fDestroyed(false)
{} {}
RegionInfo(const VoidAlloc& alloc)
: RegionInfo("", 0, 0, 0, DefaultRcSegmentSize, alloc)
{}
RegionInfo(const char* path, int flags, uint64_t userFlags, uint64_t size, const VoidAlloc& alloc)
: RegionInfo(path, flags, userFlags, size, DefaultRcSegmentSize, alloc)
{}
Str fPath; Str fPath;
int fCreationFlags; int fCreationFlags;
uint64_t fUserFlags; uint64_t fUserFlags;

View File

@@ -11,7 +11,13 @@
// Needed to compile-firewall the <boost/process/async.hpp> header because it // Needed to compile-firewall the <boost/process/async.hpp> header because it
// interferes with the <asio/buffer.hpp> header. So, let's factor // interferes with the <asio/buffer.hpp> header. So, let's factor
// the whole dependency to Boost.Process out of the header. // the whole dependency to Boost.Process out of the header.
#ifdef FAIRMQ_BOOST_PROCESS_V1_HEADER
#include <boost/process/v1.hpp>
namespace bp = boost::process::v1;
#else
#include <boost/process.hpp> #include <boost/process.hpp>
namespace bp = boost::process;
#endif
#include <fairlogger/Logger.h> #include <fairlogger/Logger.h>
namespace fair::mq::shmem { namespace fair::mq::shmem {
@@ -28,7 +34,7 @@ bool Manager::SpawnShmMonitor(const std::string& id)
path.emplace(path.begin(), env.at(fairmq_path_key).to_string()); path.emplace(path.begin(), env.at(fairmq_path_key).to_string());
} }
auto exe(boost::process::search_path(shmmonitor_exe_name, path)); auto exe(bp::search_path(shmmonitor_exe_name, path));
if (exe.empty()) { if (exe.empty()) {
LOG(warn) << "could not find " << shmmonitor_exe_name << " in \"$" << fairmq_path_key LOG(warn) << "could not find " << shmmonitor_exe_name << " in \"$" << fairmq_path_key
<< ":$PATH\""; << ":$PATH\"";
@@ -39,7 +45,7 @@ bool Manager::SpawnShmMonitor(const std::string& id)
bool verbose(env.count(shmmonitor_verbose_key) bool verbose(env.count(shmmonitor_verbose_key)
&& env.at(shmmonitor_verbose_key).to_string() == "true"); && env.at(shmmonitor_verbose_key).to_string() == "true");
boost::process::spawn( bp::spawn(
exe, "-x", "-m", "--shmid", id, "-d", "-t", "2000", (verbose ? "--verbose" : ""), env); exe, "-x", "-m", "--shmid", id, "-d", "-t", "2000", (verbose ? "--verbose" : ""), env);
return true; return true;

View File

@@ -323,7 +323,6 @@ class Manager
} }
const uint16_t id = cfg.id.value(); const uint16_t id = cfg.id.value();
const uint64_t rcSegmentSize = cfg.rcSegmentSize;
std::lock_guard<std::mutex> lock(fLocalRegionsMtx); std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
@@ -340,6 +339,12 @@ class Manager
LOG(debug) << "Unmanaged region (view) already present, promoting to controller"; LOG(debug) << "Unmanaged region (view) already present, promoting to controller";
region->BecomeController(cfg); region->BecomeController(cfg);
} else { } else {
// we need to update local config, if the region information already exists
auto info = fShmRegions->find(id);
if (info != fShmRegions->end()) {
cfg.rcSegmentSize = info->second.fRCSegmentSize;
}
auto res = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, size, true, cfg)); auto res = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, size, true, cfg));
region = res.first->second.get(); region = res.first->second.get();
} }
@@ -348,7 +353,6 @@ class Manager
// start ack receiver only if a callback has been provided. // start ack receiver only if a callback has been provided.
if (callback || bulkCallback) { if (callback || bulkCallback) {
region->SetCallbacks(callback, bulkCallback); region->SetCallbacks(callback, bulkCallback);
region->InitializeRefCountSegment(rcSegmentSize);
region->InitializeQueues(); region->InitializeQueues();
region->StartAckSender(); region->StartAckSender();
region->StartAckReceiver(); region->StartAckReceiver();
@@ -401,19 +405,18 @@ class Manager
try { try {
RegionConfig cfg; RegionConfig cfg;
const uint64_t rcSegmentSize = cfg.rcSegmentSize;
// get region info // get region info
{ {
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx); boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
RegionInfo regionInfo = fShmRegions->at(id); RegionInfo regionInfo = fShmRegions->at(id);
cfg.id = id; cfg.id = id;
cfg.creationFlags = regionInfo.fCreationFlags; cfg.creationFlags = regionInfo.fCreationFlags;
cfg.rcSegmentSize = regionInfo.fRCSegmentSize;
cfg.path = regionInfo.fPath.c_str(); cfg.path = regionInfo.fPath.c_str();
} }
// LOG(debug) << "Located remote region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'"; // LOG(debug) << "Located remote region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'";
auto r = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, 0, false, std::move(cfg))); auto r = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, 0, false, std::move(cfg)));
r.first->second->InitializeRefCountSegment(rcSegmentSize);
r.first->second->InitializeQueues(); r.first->second->InitializeQueues();
r.first->second->StartAckSender(); r.first->second->StartAckSender();
return r.first->second.get(); return r.first->second.get();
@@ -482,6 +485,7 @@ class Manager
cfg.id = info.id; cfg.id = info.id;
cfg.creationFlags = regionInfo.fCreationFlags; cfg.creationFlags = regionInfo.fCreationFlags;
cfg.path = regionInfo.fPath.c_str(); cfg.path = regionInfo.fPath.c_str();
cfg.rcSegmentSize = regionInfo.fRCSegmentSize;
regionCfgs.emplace(info.id, cfg); regionCfgs.emplace(info.id, cfg);
// fill the ptr+size info after shmLock is released, to avoid constructing local region under it // fill the ptr+size info after shmLock is released, to avoid constructing local region under it
} else { } else {
@@ -503,10 +507,8 @@ class Manager
if (it != fRegions.end()) { if (it != fRegions.end()) {
region = it->second.get(); region = it->second.get();
} else { } else {
const uint64_t rcSegmentSize = cfgIt->second.rcSegmentSize;
auto r = fRegions.emplace(cfgIt->first, std::make_unique<UnmanagedRegion>(fShmId, 0, false, cfgIt->second)); auto r = fRegions.emplace(cfgIt->first, std::make_unique<UnmanagedRegion>(fShmId, 0, false, cfgIt->second));
region = r.first->second.get(); region = r.first->second.get();
region->InitializeRefCountSegment(rcSegmentSize);
region->InitializeQueues(); region->InitializeQueues();
region->StartAckSender(); region->StartAckSender();
} }

View File

@@ -14,6 +14,7 @@
#include "UnmanagedRegionImpl.h" #include "UnmanagedRegionImpl.h"
#include <fairmq/Message.h> #include <fairmq/Message.h>
#include <fairmq/UnmanagedRegion.h> #include <fairmq/UnmanagedRegion.h>
#include <fairmq/Transports.h>
#include <fairlogger/Logger.h> #include <fairlogger/Logger.h>
@@ -251,7 +252,12 @@ class Message final : public fair::mq::Message
if (!fRegionPtr) { if (!fRegionPtr) {
throw TransportError(tools::ToString("Cannot get unmanaged region with id ", fRegionId)); throw TransportError(tools::ToString("Cannot get unmanaged region with id ", fRegionId));
} }
return fRegionPtr->GetRefCountAddressFromHandle(fShared)->Get(); if (fRegionPtr->fRcSegmentSize > 0) {
return fRegionPtr->GetRefCountAddressFromHandle(fShared)->Get();
} else {
fManager.GetSegment(fSegmentId);
return ShmHeader::RefCount(fManager.GetAddressFromHandle(fShared, fSegmentId));
}
} }
void Copy(const fair::mq::Message& other) override void Copy(const fair::mq::Message& other) override
@@ -277,19 +283,29 @@ class Message final : public fair::mq::Message
if (!fRegionPtr) { if (!fRegionPtr) {
throw TransportError(tools::ToString("Cannot get unmanaged region with id ", otherMsg.fRegionId)); throw TransportError(tools::ToString("Cannot get unmanaged region with id ", otherMsg.fRegionId));
} }
if (otherMsg.fShared < 0) { if (fRegionPtr->fRcSegmentSize > 0) {
// UR msg not yet shared, create the reference counting object with count 2 if (otherMsg.fShared < 0) {
try { // UR msg not yet shared, create the reference counting object with count 2
otherMsg.fShared = fRegionPtr->HandleFromAddress(&(fRegionPtr->MakeRefCount(2))); try {
} catch (boost::interprocess::bad_alloc& ba) { otherMsg.fShared = fRegionPtr->HandleFromAddress(&(fRegionPtr->MakeRefCount(2)));
throw RefCountBadAlloc(tools::ToString( } catch (boost::interprocess::bad_alloc& ba) {
"Insufficient space in the reference count segment ", throw RefCountBadAlloc(tools::ToString("Insufficient space in the reference count segment ", otherMsg.fRegionId, ", original exception: bad_alloc: ", ba.what()));
otherMsg.fRegionId, }
", original exception: bad_alloc: ", } else {
ba.what())); fRegionPtr->GetRefCountAddressFromHandle(otherMsg.fShared)->Increment();
}
} else { // if RefCount segment size is 0, store the ref count in the managed segment
if (otherMsg.fShared < 0) { // if UR msg is not yet shared
char* ptr = fManager.Allocate(2, 0);
// point the fShared in the unmanaged region message to the refCount holder
otherMsg.fShared = fManager.GetHandleFromAddress(ptr, fSegmentId);
// the message needs to be able to locate in which segment the refCount is stored
otherMsg.fSegmentId = fSegmentId;
ShmHeader::IncrementRefCount(ptr);
} else { // if the UR msg is already shared
fManager.GetSegment(otherMsg.fSegmentId);
ShmHeader::IncrementRefCount(fManager.GetAddressFromHandle(otherMsg.fShared, otherMsg.fSegmentId));
} }
} else {
fRegionPtr->GetRefCountAddressFromHandle(otherMsg.fShared)->Increment();
} }
} }
@@ -357,10 +373,21 @@ class Message final : public fair::mq::Message
if (!fRegionPtr) { if (!fRegionPtr) {
throw TransportError(tools::ToString("Cannot get unmanaged region with id ", fRegionId)); throw TransportError(tools::ToString("Cannot get unmanaged region with id ", fRegionId));
} }
uint16_t refCount = fRegionPtr->GetRefCountAddressFromHandle(fShared)->Decrement(); if (fRegionPtr->fRcSegmentSize > 0) {
if (refCount == 1) { uint16_t refCount = fRegionPtr->GetRefCountAddressFromHandle(fShared)->Decrement();
fRegionPtr->RemoveRefCount(*(fRegionPtr->GetRefCountAddressFromHandle(fShared))); if (refCount == 1) {
ReleaseUnmanagedRegionBlock(); fRegionPtr->RemoveRefCount(*(fRegionPtr->GetRefCountAddressFromHandle(fShared)));
ReleaseUnmanagedRegionBlock();
}
} else { // if RefCount segment size is 0, get the ref count from the managed segment
// make sure segment is initialized in this transport
fManager.GetSegment(fSegmentId);
// release unmanaged region block if ref count is one
uint16_t refCount = ShmHeader::DecrementRefCount(fManager.GetAddressFromHandle(fShared, fSegmentId));
if (refCount == 1) {
fManager.Deallocate(fShared, fSegmentId);
ReleaseUnmanagedRegionBlock();
}
} }
} else { } else {
ReleaseUnmanagedRegionBlock(); ReleaseUnmanagedRegionBlock();

View File

@@ -274,7 +274,9 @@ bool Monitor::PrintShm(const ShmId& shmId)
try { try {
managed_shared_memory rcCountSegment(open_read_only, MakeShmName(shmId.shmId, "rrc", id).c_str()); managed_shared_memory rcCountSegment(open_read_only, MakeShmName(shmId.shmId, "rrc", id).c_str());
ss << ", rcCountSegment size: " << rcCountSegment.get_size(); auto size = rcCountSegment.get_size();
auto free = rcCountSegment.get_free_memory();
ss << ", rcCountSegment size: " << size << ", free: " << free << ", used: " << size - free;
} catch (bie&) { } catch (bie&) {
ss << ", rcCountSegment: not found"; ss << ", rcCountSegment: not found";
} }

View File

@@ -10,6 +10,7 @@
#include <fairmq/shmem/Common.h> #include <fairmq/shmem/Common.h>
#include <fairmq/shmem/Monitor.h> #include <fairmq/shmem/Monitor.h>
#include <fairmq/Transports.h>
#include <cstdint> #include <cstdint>
#include <string> #include <string>

View File

@@ -200,7 +200,7 @@ class Socket final : public fair::mq::Socket
} }
} }
int64_t Send(std::vector<MessagePtr>& msgVec, int timeout = -1) override int64_t Send(Parts::container& msgVec, int timeout = -1) override
{ {
int flags = 0; int flags = 0;
if (timeout == 0) { if (timeout == 0) {
@@ -260,7 +260,7 @@ class Socket final : public fair::mq::Socket
return static_cast<int>(TransferCode::error); return static_cast<int>(TransferCode::error);
} }
int64_t Receive(std::vector<MessagePtr>& msgVec, int timeout = -1) override int64_t Receive(Parts::container& msgVec, int timeout = -1) override
{ {
int flags = 0; int flags = 0;
if (timeout == 0) { if (timeout == 0) {

View File

@@ -13,6 +13,7 @@
#include <fairmq/shmem/Monitor.h> #include <fairmq/shmem/Monitor.h>
#include <fairmq/tools/Strings.h> #include <fairmq/tools/Strings.h>
#include <fairmq/UnmanagedRegion.h> #include <fairmq/UnmanagedRegion.h>
#include <fairmq/Transports.h>
#include <fairlogger/Logger.h> #include <fairlogger/Logger.h>
@@ -65,6 +66,7 @@ struct UnmanagedRegion
, fShmemObject() , fShmemObject()
, fFile(nullptr) , fFile(nullptr)
, fFileMapping() , fFileMapping()
, fRcSegmentSize(cfg.rcSegmentSize)
, fQueue(nullptr) , fQueue(nullptr)
, fCallback(nullptr) , fCallback(nullptr)
, fBulkCallback(nullptr) , fBulkCallback(nullptr)
@@ -146,11 +148,13 @@ struct UnmanagedRegion
LOG(debug) << "Successfully zeroed free memory of region " << id << "."; LOG(debug) << "Successfully zeroed free memory of region " << id << ".";
} }
InitializeRefCountSegment(fRcSegmentSize);
if (fControlling && created) { if (fControlling && created) {
Register(shmId, cfg); Register(shmId, cfg);
} }
LOG(debug) << (created ? "Created" : "Opened") << " unmanaged shared memory region: " << fName << " (" << (fControlling ? "controller" : "viewer") << ")"; LOG(debug) << (created ? "Created" : "Opened") << " unmanaged shared memory region: " << fName << " (" << (fControlling ? "controller" : "viewer") << "), refCount segment size: " << fRcSegmentSize;
} }
UnmanagedRegion() = delete; UnmanagedRegion() = delete;
@@ -189,7 +193,7 @@ struct UnmanagedRegion
RefCount& MakeRefCount(uint16_t initialCount = 1) RefCount& MakeRefCount(uint16_t initialCount = 1)
{ {
RefCount* refCount = fRefCountPool->allocate(1).get(); RefCount* refCount = fRefCountPool->allocate_one().get();
new (refCount) RefCount(initialCount); new (refCount) RefCount(initialCount);
return *refCount; return *refCount;
} }
@@ -197,7 +201,7 @@ struct UnmanagedRegion
void RemoveRefCount(RefCount& refCount) void RemoveRefCount(RefCount& refCount)
{ {
refCount.~RefCount(); refCount.~RefCount();
fRefCountPool->deallocate(&refCount, 1); fRefCountPool->deallocate_one(&refCount);
} }
~UnmanagedRegion() ~UnmanagedRegion()
@@ -264,6 +268,7 @@ struct UnmanagedRegion
std::condition_variable fBlockSendCV; std::condition_variable fBlockSendCV;
std::vector<RegionBlock> fBlocksToFree; std::vector<RegionBlock> fBlocksToFree;
const std::size_t fAckBunchSize = 256; const std::size_t fAckBunchSize = 256;
uint64_t fRcSegmentSize;
std::unique_ptr<boost::interprocess::message_queue> fQueue; std::unique_ptr<boost::interprocess::message_queue> fQueue;
std::unique_ptr<boost::interprocess::managed_shared_memory> fRefCountSegment; std::unique_ptr<boost::interprocess::managed_shared_memory> fRefCountSegment;
std::unique_ptr<RefCountPool> fRefCountPool; std::unique_ptr<RefCountPool> fRefCountPool;
@@ -319,7 +324,7 @@ struct UnmanagedRegion
void InitializeRefCountSegment(uint64_t size) void InitializeRefCountSegment(uint64_t size)
{ {
using namespace boost::interprocess; using namespace boost::interprocess;
if (!fRefCountSegment) { if (!fRefCountSegment && size > 0) {
fRefCountSegment = std::make_unique<managed_shared_memory>(open_or_create, fRefCountSegmentName.c_str(), size); fRefCountSegment = std::make_unique<managed_shared_memory>(open_or_create, fRefCountSegmentName.c_str(), size);
LOG(trace) << "shmem: initialized ref count segment: " << fRefCountSegmentName; LOG(trace) << "shmem: initialized ref count segment: " << fRefCountSegmentName;
fRefCountPool = std::make_unique<RefCountPool>(fRefCountSegment->get_segment_manager()); fRefCountPool = std::make_unique<RefCountPool>(fRefCountSegment->get_segment_manager());

View File

@@ -1,5 +1,5 @@
/******************************************************************************** /********************************************************************************
* Copyright (C) 2017-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Copyright (C) 2017-2025 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* * * *
* This software is distributed under the terms of the * * This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
@@ -8,12 +8,12 @@
#include <fairlogger/Logger.h> #include <fairlogger/Logger.h>
#include <fairmq/tools/Network.h> #include <fairmq/tools/Network.h>
#include <fairmq/tools/Strings.h>
#ifndef _GNU_SOURCE #ifndef _GNU_SOURCE
#define _GNU_SOURCE // To get defns of NI_MAXSERV and NI_MAXHOST #define _GNU_SOURCE // To get defns of NI_MAXSERV and NI_MAXHOST
#endif #endif
#include <algorithm>
#include <array> #include <array>
#include <boost/algorithm/string.hpp> // trim #include <boost/algorithm/string.hpp> // trim
#include <boost/asio.hpp> #include <boost/asio.hpp>
@@ -158,33 +158,22 @@ string getDefaultRouteNetworkInterface()
} }
string getIpFromHostname(const string& hostname) string getIpFromHostname(const string& hostname)
{ try {
boost::asio::io_context ioc; boost::asio::io_context ioc;
boost::asio::ip::tcp::resolver resolver(ioc);
using namespace boost::asio::ip; auto const result = resolver.resolve(boost::asio::ip::tcp::v4(), hostname, "");
try {
tcp::resolver resolver(ioc);
tcp::resolver::query query(hostname, "");
tcp::resolver::iterator end;
auto it = find_if(static_cast<basic_resolver_iterator<tcp>>(resolver.resolve(query)),
end,
[](const tcp::endpoint& ep) { return ep.address().is_v4(); });
if (it != end) {
stringstream ss;
ss << static_cast<tcp::endpoint>(*it).address();
return ss.str();
}
if (result.empty()) {
LOG(warn) << "could not find ipv4 address for hostname '" << hostname << "'"; LOG(warn) << "could not find ipv4 address for hostname '" << hostname << "'";
return "";
} catch (exception& e) {
LOG(error) << "could not resolve hostname '" << hostname << "', reason: " << e.what();
return ""; return "";
} }
return ToString(result.begin()->endpoint().address());
}
catch (std::exception const& ex)
{
LOG(error) << "could not resolve hostname '" << hostname << "', reason: " << ex.what();
return "";
} }
} // namespace fair::mq::tools } // namespace fair::mq::tools

View File

@@ -1,5 +1,5 @@
/******************************************************************************** /********************************************************************************
* Copyright (C) 2017-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Copyright (C) 2017-2025 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* * * *
* This software is distributed under the terms of the * * This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
@@ -10,7 +10,11 @@
#include <fairmq/tools/Strings.h> #include <fairmq/tools/Strings.h>
#include <boost/asio.hpp> #include <boost/asio.hpp>
#ifdef FAIRMQ_BOOST_PROCESS_V1_HEADER
#include <boost/process/v1.hpp>
#else
#include <boost/process.hpp> #include <boost/process.hpp>
#endif
#include <chrono> #include <chrono>
#include <csignal> // kill, signals #include <csignal> // kill, signals
#include <iostream> #include <iostream>
@@ -20,7 +24,11 @@
#include <utility> #include <utility>
using namespace std; using namespace std;
#ifdef FAIRMQ_BOOST_PROCESS_V1_HEADER
namespace bp = boost::process::v1;
#else
namespace bp = boost::process; namespace bp = boost::process;
#endif
namespace ba = boost::asio; namespace ba = boost::asio;
namespace bs = boost::system; namespace bs = boost::system;
@@ -64,22 +72,22 @@ execute_result execute(const string& cmd, const string& prefix, const string& in
p.Print(cmd); p.Print(cmd);
ba::io_service ios; ba::io_context ioc;
// containers for std_in // containers for std_in
ba::const_buffer inputBuffer(ba::buffer(input)); ba::const_buffer inputBuffer(ba::buffer(input));
bp::async_pipe inputPipe(ios); bp::async_pipe inputPipe(ioc);
// containers for std_out // containers for std_out
ba::streambuf outputBuffer; ba::streambuf outputBuffer;
bp::async_pipe outputPipe(ios); bp::async_pipe outputPipe(ioc);
// containers for std_err // containers for std_err
ba::streambuf errorBuffer; ba::streambuf errorBuffer;
bp::async_pipe errorPipe(ios); bp::async_pipe errorPipe(ioc);
const string delimiter = "\n"; const string delimiter = "\n";
ba::steady_timer inputTimer(ios); ba::steady_timer inputTimer(ioc);
inputTimer.expires_after(std::chrono::milliseconds(1000)); // NOLINT inputTimer.expires_after(std::chrono::milliseconds(1000)); // NOLINT
ba::steady_timer signalTimer(ios); ba::steady_timer signalTimer(ioc);
signalTimer.expires_after(std::chrono::milliseconds(2000)); // NOLINT signalTimer.expires_after(std::chrono::milliseconds(2000)); // NOLINT
// child process // child process
@@ -154,7 +162,7 @@ execute_result execute(const string& cmd, const string& prefix, const string& in
}; };
ba::async_read_until(errorPipe, errorBuffer, delimiter, onStdErr); ba::async_read_until(errorPipe, errorBuffer, delimiter, onStdErr);
ios.run(); ioc.run();
c.wait(); c.wait();
result.exit_code = c.exit_code(); result.exit_code = c.exit_code();

View File

@@ -154,7 +154,7 @@ class Socket final : public fair::mq::Socket
} }
} }
int64_t Send(std::vector<std::unique_ptr<fair::mq::Message>>& msgVec, int timeout = -1) override int64_t Send(Parts::container& msgVec, int timeout = -1) override
{ {
int flags = 0; int flags = 0;
if (timeout == 0) { if (timeout == 0) {
@@ -206,7 +206,7 @@ class Socket final : public fair::mq::Socket
} }
} }
int64_t Receive(std::vector<std::unique_ptr<fair::mq::Message>>& msgVec, int timeout = -1) override int64_t Receive(Parts::container& msgVec, int timeout = -1) override
{ {
int flags = 0; int flags = 0;
if (timeout == 0) { if (timeout == 0) {

View File

@@ -16,6 +16,10 @@ if(FairLogger_VERSION VERSION_LESS 1.9.0 AND FairLogger_VERSION VERSION_GREATER_
LIST(APPEND definitions FAIR_MIN_SEVERITY=trace) LIST(APPEND definitions FAIR_MIN_SEVERITY=trace)
endif() endif()
if(FAIRMQ_BOOST_PROCESS_V1_HEADER)
LIST(APPEND definitions FAIRMQ_BOOST_PROCESS_V1_HEADER)
endif()
if(definitions) if(definitions)
set(definitions DEFINITIONS ${definitions}) set(definitions DEFINITIONS ${definitions})
endif() endif()
@@ -134,6 +138,7 @@ add_testsuite(Device
${CMAKE_CURRENT_SOURCE_DIR}/device ${CMAKE_CURRENT_SOURCE_DIR}/device
${CMAKE_CURRENT_BINARY_DIR} ${CMAKE_CURRENT_BINARY_DIR}
TIMEOUT 20 TIMEOUT 20
${definitions}
${environment} ${environment}
) )
@@ -253,7 +258,7 @@ add_testsuite(Tools
LINKS FairMQ LINKS FairMQ
INCLUDES ${CMAKE_CURRENT_SOURCE_DIR} INCLUDES ${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_BINARY_DIR} ${CMAKE_CURRENT_BINARY_DIR}
TIMEOUT 20 TIMEOUT 5
${environment} ${environment}
) )

View File

@@ -1,19 +0,0 @@
ARG VERSION=latest
FROM fedora:${VERSION}
ARG VERSION=latest
LABEL org.opencontainers.image.source "https://github.com/FairRootGroup/FairMQ"
LABEL org.opencontainers.image.description "FairMQ development environment"
RUN dnf -y update
# https://git.gsi.de/SDE/packages/builder
RUN dnf -y install https://alfa-ci.gsi.de/packages/rpm/fedora-$VERSION-x86_64/fairsoft-release-dev.rpm
RUN dnf -y install clang cli11-devel ninja-build 'dnf-command(builddep)' libasan liblsan libtsan libubsan clang-tools-extra
RUN dnf -y builddep fairmq
RUN dnf -y clean all
# buildah build --build-arg "VERSION=36" -t "ghcr.io/fairrootgroup/fairmq-dev/fedora-36:latest" -f test/ci/Containerfile.fedora .
# echo $GH_PAT | buildah login -u dennisklein --password-stdin ghcr.io
# buildah push ghcr.io/fairrootgroup/fairmq-dev/fedora-36:latest
# apptainer pull docker://ghcr.io/fairrootgroup/fairmq-dev/fedora-36:latest
# echo $GH_PAT | apptainer remote login -u dennisklein --password-stdin oras://ghcr.io
# apptainer push ./fedora-36_latest.sif oras://ghcr.io/fairrootgroup/fairmq-dev/fedora-36-sif:latest

View File

@@ -1,36 +0,0 @@
ARG VERSION=latest
FROM ubuntu:${VERSION}
ARG VERSION=latest
LABEL org.opencontainers.image.source "https://github.com/FairRootGroup/FairMQ"
LABEL org.opencontainers.image.description "FairMQ development environment"
RUN echo 'debconf debconf/frontend select Noninteractive' | debconf-set-selections
RUN apt-get update
RUN apt-get -y upgrade
RUN apt-get -y install ca-certificates patch cmake git libboost-dev libboost-log-dev libboost-system-dev libboost-regex-dev libboost-filesystem-dev libboost-container-dev libboost-thread-dev libboost-date-time-dev libboost-program-options-dev g++ libfmt-dev ninja-build wget libczmq-dev libxml2-utils libfabric-dev libfabric-bin pkg-config
RUN apt-get -y clean
RUN cd /tmp
RUN git clone -b v1.19.2 --recurse-submodules https://github.com/FairRootGroup/asio
RUN cmake -GNinja -S asio -B asio_build -DCMAKE_INSTALL_PREFIX=/usr -DCMAKE_BUILD_TYPE=Release
RUN cmake --build asio_build --target install
RUN rm -rf asio asio_build
RUN git clone -b v1.0.0 https://github.com/FairRootGroup/FairCMakeModules
RUN cmake -GNinja -S FairCMakeModules -B FairCMakeModules_build -DCMAKE_INSTALL_PREFIX=/usr -DCMAKE_BUILD_TYPE=Release
RUN cmake --build FairCMakeModules_build --target install
RUN rm -rf FairCMakeModules FairCMakeModules_build
RUN git clone -b v1.11.0 https://github.com/FairRootGroup/FairLogger
RUN cmake -GNinja -S FairLogger -B FairLogger_build -DCMAKE_INSTALL_PREFIX=/usr -DCMAKE_BUILD_TYPE=Release -DUSE_EXTERNAL_FMT=ON
RUN cmake --build FairLogger_build --target install
RUN rm -rf FairLogger FairLogger_build
# buildah build --build-arg "VERSION=22.04" -t "ghcr.io/fairrootgroup/fairmq-dev/ubuntu-22.04:latest" -f test/ci/Containerfile.ubuntu .
# echo $GH_PAT | buildah login -u dennisklein --password-stdin ghcr.io
# buildah push ghcr.io/fairrootgroup/fairmq-dev/ubuntu-22.04:latest
# apptainer pull docker://ghcr.io/fairrootgroup/fairmq-dev/ubuntu-22.04:latest
# echo $GH_PAT | apptainer remote login -u dennisklein --password-stdin oras://ghcr.io
# apptainer push ./ubuntu-22.04_latest.sif oras://ghcr.io/fairrootgroup/fairmq-dev/ubuntu-22.04-sif:latest

View File

@@ -1,42 +0,0 @@
#! /bin/bash
label="$1"
jobsh="$2"
if [ -z "$ALFACI_SLURM_CPUS" ]
then
ALFACI_SLURM_CPUS=20
fi
CPUS_PER_JOB=$(($ALFACI_SLURM_CPUS / 2))
if [ -z "$ALFACI_SLURM_EXTRA_OPTS" ]
then
ALFACI_SLURM_EXTRA_OPTS="--hint=compute_bound"
fi
if [ -z "$ALFACI_SLURM_TIMEOUT" ]
then
ALFACI_SLURM_TIMEOUT=30
fi
if [ -z "$ALFACI_SLURM_QUEUE" ]
then
ALFACI_SLURM_QUEUE=main
fi
echo "*** Slurm request options :"
echo "*** Working directory ..: $PWD"
echo "*** Queue ..............: $ALFACI_SLURM_QUEUE"
echo "*** CPUs ...............: $ALFACI_SLURM_CPUS"
echo "*** Wall Time ..........: $ALFACI_SLURM_TIMEOUT min"
echo "*** Job Name ...........: ${label}"
echo "*** Extra Options ......: ${ALFACI_SLURM_EXTRA_OPTS}"
echo "*** Submitting job at ....: $(date -R)"
(
set -x
srun -p $ALFACI_SLURM_QUEUE -c $CPUS_PER_JOB -n 1 \
-t $ALFACI_SLURM_TIMEOUT \
--job-name="${label}" \
${ALFACI_SLURM_EXTRA_OPTS} \
bash "${jobsh}"
)
retval=$?
echo "*** Exit Code ............: $retval"
exit "$retval"

View File

@@ -0,0 +1,21 @@
spack:
specs:
- "fairlogger@2.2.0 ^fmt@:11"
- "boost@1.87 +container +program_options +filesystem +date_time +regex"
- "libzmq@4.1.4:"
- ninja
- "cmake@3.15:"
concretizer:
targets:
granularity: generic
packages:
all:
require:
- target=x86_64_v3
mirrors:
ghcr-buildcache:
url: oci://ghcr.io/fairrootgroup/fairmq-spack-buildcache
signed: false

21
test/ci/spack-latest.yaml Normal file
View File

@@ -0,0 +1,21 @@
spack:
specs:
- "fairlogger@2.2.0 ^fmt@:11"
- "boost@1.66: +container +program_options +filesystem +date_time +regex"
- "libzmq@4.1.4:"
- ninja
- "cmake@3.15:"
concretizer:
targets:
granularity: generic
packages:
all:
require:
- target=x86_64_v3
mirrors:
ghcr-buildcache:
url: oci://ghcr.io/fairrootgroup/fairmq-spack-buildcache
signed: false

View File

@@ -27,7 +27,7 @@ class TestDevice : public Device
{ {
public: public:
TestDevice(const string& transport) TestDevice(const string& transport)
: fDeviceThread(&Device::RunStateMachine, this) : fDeviceThread(&Device::RunStateMachine, this)
{ {
SetTransport(transport); SetTransport(transport);
test::Control(*this, test::Cycle::ToRun); test::Control(*this, test::Cycle::ToRun);

View File

@@ -9,7 +9,11 @@
#include "runner.h" #include "runner.h"
#include <gtest/gtest.h> #include <gtest/gtest.h>
#ifdef FAIRMQ_BOOST_PROCESS_V1_HEADER
#include <boost/process/v1.hpp>
#else
#include <boost/process.hpp> #include <boost/process.hpp>
#endif
#include <fairmq/tools/Process.h> #include <fairmq/tools/Process.h>
#include <fairmq/tools/Unique.h> #include <fairmq/tools/Unique.h>
#include <fairmq/Device.h> #include <fairmq/Device.h>

View File

@@ -9,7 +9,11 @@
#include "runner.h" #include "runner.h"
#include <gtest/gtest.h> #include <gtest/gtest.h>
#ifdef FAIRMQ_BOOST_PROCESS_V1_HEADER
#include <boost/process/v1.hpp>
#else
#include <boost/process.hpp> #include <boost/process.hpp>
#endif
#include <fairmq/tools/Process.h> #include <fairmq/tools/Process.h>
#include <fairmq/tools/Unique.h> #include <fairmq/tools/Unique.h>

View File

@@ -9,7 +9,11 @@
#include "runner.h" #include "runner.h"
#include <gtest/gtest.h> #include <gtest/gtest.h>
#ifdef FAIRMQ_BOOST_PROCESS_V1_HEADER
#include <boost/process/v1.hpp>
#else
#include <boost/process.hpp> #include <boost/process.hpp>
#endif
#include <fairmq/tools/Unique.h> #include <fairmq/tools/Unique.h>
#include <fairmq/tools/Process.h> #include <fairmq/tools/Process.h>

View File

@@ -16,6 +16,7 @@
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <cstring> #include <cstring>
#include <string>
#include <vector> #include <vector>
namespace namespace
@@ -101,7 +102,7 @@ TEST(MemoryResources, allocator)
size_t session{tools::UuidHash()}; size_t session{tools::UuidHash()};
ProgOptions config; ProgOptions config;
config.SetProperty<string>("session", to_string(session)); config.SetProperty<std::string>("session", to_string(session));
FactoryType factoryZMQ = TransportFactory::CreateTransportFactory("zeromq", fair::mq::tools::Uuid(), &config); FactoryType factoryZMQ = TransportFactory::CreateTransportFactory("zeromq", fair::mq::tools::Uuid(), &config);
@@ -129,7 +130,7 @@ TEST(MemoryResources, getMessage)
size_t session{tools::UuidHash()}; size_t session{tools::UuidHash()};
ProgOptions config; ProgOptions config;
config.SetProperty<string>("session", to_string(session)); config.SetProperty<std::string>("session", to_string(session));
config.SetProperty<bool>("shm-monitor", true); config.SetProperty<bool>("shm-monitor", true);
FactoryType factoryZMQ = TransportFactory::CreateTransportFactory("zeromq", fair::mq::tools::Uuid(), &config); FactoryType factoryZMQ = TransportFactory::CreateTransportFactory("zeromq", fair::mq::tools::Uuid(), &config);

View File

@@ -287,8 +287,10 @@ auto ZeroCopy(bool expandedShmMetadata = false) -> void
// The "zero copy" property of the Copy() method is an implementation detail and is not guaranteed. // The "zero copy" property of the Copy() method is an implementation detail and is not guaranteed.
// Currently it holds true for the shmem (across devices) and for zeromq (within same device) transports. // Currently it holds true for the shmem (across devices) and for zeromq (within same device) transports.
auto ZeroCopyFromUnmanaged(string const& address, bool expandedShmMetadata = false) -> void auto ZeroCopyFromUnmanaged(string const& address, bool expandedShmMetadata, uint64_t rcSegmentSize) -> void
{ {
fair::Logger::SetConsoleSeverity(fair::Severity::debug);
ProgOptions config1; ProgOptions config1;
ProgOptions config2; ProgOptions config2;
string session(tools::Uuid()); string session(tools::Uuid());
@@ -311,11 +313,13 @@ auto ZeroCopyFromUnmanaged(string const& address, bool expandedShmMetadata = fal
const size_t msgSize{100}; const size_t msgSize{100};
const size_t regionSize{1000000}; const size_t regionSize{1000000};
RegionConfig cfg;
cfg.rcSegmentSize = rcSegmentSize;
tools::Semaphore blocker; tools::Semaphore blocker;
auto region = factory1->CreateUnmanagedRegion(regionSize, [&blocker](void*, size_t, void*) { auto region = factory1->CreateUnmanagedRegion(regionSize, [&blocker](void*, size_t, void*) {
blocker.Signal(); blocker.Signal();
}); }, cfg);
{ {
Channel push("Push", "push", factory1); Channel push("Push", "push", factory1);
@@ -461,12 +465,22 @@ TEST(ZeroCopy, shmem_expanded_metadata) // NOLINT
TEST(ZeroCopyFromUnmanaged, shmem) // NOLINT TEST(ZeroCopyFromUnmanaged, shmem) // NOLINT
{ {
ZeroCopyFromUnmanaged("ipc://test_zerocopy_unmanaged"); ZeroCopyFromUnmanaged("ipc://test_zerocopy_unmanaged", false, 10000000);
} }
TEST(ZeroCopyFromUnmanaged, shmem_expanded_metadata) // NOLINT TEST(ZeroCopyFromUnmanaged, shmem_expanded_metadata) // NOLINT
{ {
ZeroCopyFromUnmanaged("ipc://test_zerocopy_unmanaged", true); ZeroCopyFromUnmanaged("ipc://test_zerocopy_unmanaged_expanded", true, 10000000);
}
TEST(ZeroCopyFromUnmanaged, shmem_no_rc_segment) // NOLINT
{
ZeroCopyFromUnmanaged("ipc://test_zerocopy_unmanaged_no_rc_segment", false, 0);
}
TEST(ZeroCopyFromUnmanaged, shmem_expanded_metadata_no_rc_segment) // NOLINT
{
ZeroCopyFromUnmanaged("ipc://test_zerocopy_unmanaged_expanded_no_rc_segment", true, 0);
} }
} // namespace } // namespace

View File

@@ -1,28 +1,37 @@
/******************************************************************************** /********************************************************************************
* Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Copyright (C) 2018-2025 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* * * *
* This software is distributed under the terms of the * * This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" * * copied verbatim in the file "LICENSE" *
********************************************************************************/ ********************************************************************************/
#include <gtest/gtest.h>
#include <fairmq/tools/Network.h> #include <fairmq/tools/Network.h>
#include <gtest/gtest.h>
#include <string> #include <string>
namespace namespace
{ {
using namespace std; TEST(Tools, NetworkDefaultIP)
using namespace fair::mq;
TEST(Tools, Network)
{ {
string interface = fair::mq::tools::getDefaultRouteNetworkInterface(); auto const interface = fair::mq::tools::getDefaultRouteNetworkInterface();
EXPECT_NE(interface, ""); EXPECT_NE(interface, "");
string interfaceIP = fair::mq::tools::getInterfaceIP(interface); auto const interfaceIP = fair::mq::tools::getInterfaceIP(interface);
EXPECT_NE(interfaceIP, ""); EXPECT_NE(interfaceIP, "");
} }
TEST(Tools, NetworkIPv4Localhost)
{
auto const ip = fair::mq::tools::getIpFromHostname("localhost");
EXPECT_FALSE(ip.empty());
EXPECT_EQ(ip, "127.0.0.1");
}
TEST(Tools, NetworkInvalidHostname)
{
auto const ip = fair::mq::tools::getIpFromHostname("non.existent.domain.invalid");
EXPECT_TRUE(ip.empty());
}
} /* namespace */ } /* namespace */