Compare commits

...

13 Commits

Author SHA1 Message Date
Alexey Rybalchenko
2500771689 shm: ResetContent(): reset data after recreating the metadata 2022-05-28 14:46:21 +02:00
Alexey Rybalchenko
d2aa3b6bb0 shm: open managament data as read only during cleanup 2022-05-28 14:46:21 +02:00
Alexey Rybalchenko
00df117c7c Shm::Monitor: add nullptr check for segment info 2022-05-28 14:46:21 +02:00
Dennis Klein
69faa63c5b docs: Update README 2022-03-21 18:22:07 +01:00
Dennis Klein
b7474ae138 build: Deprecate components dds_plugin, sdk, sdk_commands 2022-03-21 18:22:07 +01:00
Dennis Klein
b426bf39d7 fix: Update metadata 2022-03-21 18:22:07 +01:00
Dennis Klein
6780b7452c fix(control): Honor SIGINT and SIGTERM in more places
* Queue next transition for long-running states (fix #421)
* Add *OrCustom/Push/Locked family of functions to StateQueue to enable
  composition with custom signals
2022-03-21 16:28:43 +01:00
Dennis Klein
27277b11b4 fix(Device): Warning about narrowing conversion 2022-03-21 16:28:43 +01:00
Dennis Klein
cb5029f826 fix(Device): Spawn rate logger thread only if needed 2022-03-21 16:28:43 +01:00
Dennis Klein
5d45d89269 feat: Remove --max-run-time option
BREAKING CHANGE: was introduced in 1.4.0 release but appears unused
2022-03-21 16:28:43 +01:00
Dennis Klein
eb9ddc81cf ci: Run thread sanitizer with clang++ 2022-03-21 16:28:43 +01:00
Dennis Klein
f5891d5ae3 ci: Add thread sanitizer check and bump all checks to Fedora 35 2022-03-21 16:28:43 +01:00
Dennis Klein
3b2ad1f6f4 ci: Add Fedora 35 build 2022-03-21 16:28:43 +01:00
18 changed files with 306 additions and 168 deletions

View File

@@ -1,5 +1,5 @@
################################################################################ ################################################################################
# Copyright (C) 2018-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH # # Copyright (C) 2018-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# # # #
# This software is distributed under the terms of the # # This software is distributed under the terms of the #
# GNU Lesser General Public Licence (LGPL) version 3, # # GNU Lesser General Public Licence (LGPL) version 3, #
@@ -9,7 +9,7 @@
# Project ###################################################################### # Project ######################################################################
cmake_minimum_required(VERSION 3.15 FATAL_ERROR) cmake_minimum_required(VERSION 3.15 FATAL_ERROR)
cmake_policy(VERSION 3.15...3.20) cmake_policy(VERSION 3.15...3.22)
list(PREPEND CMAKE_MODULE_PATH ${CMAKE_SOURCE_DIR}/cmake) list(PREPEND CMAKE_MODULE_PATH ${CMAKE_SOURCE_DIR}/cmake)
include(GitHelper) include(GitHelper)

View File

@@ -4,19 +4,19 @@ Upstream-Contact: Mohammad Al-Turany <m.al-turany@gsi.de>
Source: https://github.com/FairRootGroup/FairMQ Source: https://github.com/FairRootGroup/FairMQ
Files: * Files: *
Copyright: 2012-2021, GSI Helmholtzzentrum fuer Schwerionenforschung GmbH Copyright: 2012-2022, GSI Helmholtzzentrum fuer Schwerionenforschung GmbH
Copyright: 2012-2021, [see AUTHORS file] Copyright: 2012-2022, [see AUTHORS file]
Copyright: 2012-2021, [see CONTRIBUTORS file] Copyright: 2012-2022, [see CONTRIBUTORS file]
Comment: The copyright of individual contributors is documented in the Comment: The copyright of individual contributors is documented in the
Git history. Git history.
License: LGPL-3.0-only License: LGPL-3.0-only
Files: extern/googletest Files: extern/googletest
Copyright: 2008-2021, Google Inc. Copyright: 2008-2022, Google Inc.
License: GOOGLE License: GOOGLE
Files: extern/asio Files: extern/asio
Copyright: 2003-2021, Christopher M. Kohlhoff (chris at kohlhoff dot com) Copyright: 2003-2022, Christopher M. Kohlhoff (chris at kohlhoff dot com)
License: BSL-1.0 License: BSL-1.0
Files: extern/PicoSHA2 Files: extern/PicoSHA2

View File

@@ -72,6 +72,9 @@ endif()
if(ENABLE_SANITIZER_THREAD) if(ENABLE_SANITIZER_THREAD)
list(APPEND options "-DENABLE_SANITIZER_THREAD=ON") list(APPEND options "-DENABLE_SANITIZER_THREAD=ON")
endif() endif()
if(CMAKE_CXX_COMPILER)
list(APPEND options "-DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER}")
endif()
if(CMAKE_CXX_FLAGS) if(CMAKE_CXX_FLAGS)
list(APPEND options "-DCMAKE_CXX_FLAGS=${CMAKE_CXX_FLAGS}") list(APPEND options "-DCMAKE_CXX_FLAGS=${CMAKE_CXX_FLAGS}")
endif() endif()

6
Jenkinsfile vendored
View File

@@ -15,9 +15,9 @@ def jobMatrix(String type, List specs) {
ver = spec.ver ver = spec.ver
} else { // == 'check' } else { // == 'check'
job = "${spec.name}" job = "${spec.name}"
selector = 'fedora-34-x86_64' selector = 'fedora-35-x86_64'
os = 'fedora' os = 'fedora'
ver = '34' ver = '35'
} }
def label = "${job}" def label = "${job}"
@@ -96,6 +96,7 @@ pipeline{
[os: 'fedora', ver: '32', arch: 'x86_64', compiler: 'gcc-10', extra: all], [os: 'fedora', ver: '32', arch: 'x86_64', compiler: 'gcc-10', extra: all],
[os: 'fedora', ver: '33', arch: 'x86_64', compiler: 'gcc-10', extra: all], [os: 'fedora', ver: '33', arch: 'x86_64', compiler: 'gcc-10', extra: all],
[os: 'fedora', ver: '34', arch: 'x86_64', compiler: 'gcc-11', extra: all], [os: 'fedora', ver: '34', arch: 'x86_64', compiler: 'gcc-11', extra: all],
[os: 'fedora', ver: '35', arch: 'x86_64', compiler: 'gcc-11', extra: all],
[os: 'macos', ver: '11', arch: 'x86_64', compiler: 'apple-clang-12', extra: '-DHAS_ASIO=ON'], [os: 'macos', ver: '11', arch: 'x86_64', compiler: 'apple-clang-12', extra: '-DHAS_ASIO=ON'],
[os: 'macos', ver: '11', arch: 'arm64', compiler: 'apple-clang-13', extra: '-DHAS_ASIO=ON'], [os: 'macos', ver: '11', arch: 'arm64', compiler: 'apple-clang-13', extra: '-DHAS_ASIO=ON'],
]) ])
@@ -106,6 +107,7 @@ pipeline{
[name: 'static-analyzers', extra: "${all_debug} -DRUN_STATIC_ANALYSIS=ON"], [name: 'static-analyzers', extra: "${all_debug} -DRUN_STATIC_ANALYSIS=ON"],
[name: '{address,leak,ub}-sanitizers', [name: '{address,leak,ub}-sanitizers',
extra: "${all_debug} -DENABLE_SANITIZER_ADDRESS=ON -DENABLE_SANITIZER_LEAK=ON -DENABLE_SANITIZER_UNDEFINED_BEHAVIOUR=ON -DCMAKE_CXX_FLAGS='-O1 -fno-omit-frame-pointer'"], extra: "${all_debug} -DENABLE_SANITIZER_ADDRESS=ON -DENABLE_SANITIZER_LEAK=ON -DENABLE_SANITIZER_UNDEFINED_BEHAVIOUR=ON -DCMAKE_CXX_FLAGS='-O1 -fno-omit-frame-pointer'"],
[name: 'thread-sanitizer', extra: "${all_debug} -DENABLE_SANITIZER_THREAD=ON -DCMAKE_CXX_COMPILER=clang++"],
]) ])
parallel(builds + checks) parallel(builds + checks)

View File

@@ -1,12 +1,9 @@
<!-- {#mainpage} --> <!-- {#mainpage} -->
# FairMQ [![license](https://alfa-ci.gsi.de/shields/badge/license-LGPL--3.0-orange.svg)](COPYRIGHT) [![build status](https://alfa-ci.gsi.de/buildStatus/icon?job=FairRootGroup/FairMQ/dev)](https://alfa-ci.gsi.de/blue/organizations/jenkins/FairRootGroup%2FFairMQ/branches) [![Coverity Badge](https://alfa-ci.gsi.de/shields/coverity/scan/fairrootgroup-fairmq.svg)](https://scan.coverity.com/projects/fairrootgroup-fairmq) # FairMQ [![license](https://alfa-ci.gsi.de/shields/badge/license-LGPL--3.0-orange.svg)](COPYRIGHT)
C++ Message Queuing Library and Framework C++ Message Queuing Library and Framework
| Release | Version | Docs | Docs: [Book](https://github.com/FairRootGroup/FairMQ/blob/dev/README.md#documentation)
| :---: | :--- | :--- |
| `stable` | [![release](https://alfa-ci.gsi.de/shields/github/release/FairRootGroup/FairMQ.svg)](https://github.com/FairRootGroup/FairMQ/releases/latest) | [API](https://fairrootgroup.github.io/FairMQ/latest), [Book](https://github.com/FairRootGroup/FairMQ/blob/master/README.md#documentation) |
| `testing` | [![dev tag](https://alfa-ci.gsi.de/shields/github/tag/FairRootGroup/FairMQ.svg)](https://github.com/FairRootGroup/FairMQ/tags) | [Book](https://github.com/FairRootGroup/FairMQ/blob/dev/README.md#documentation) |
Find all FairMQ releases [here](https://github.com/FairRootGroup/FairMQ/releases). Find all FairMQ releases [here](https://github.com/FairRootGroup/FairMQ/releases).
@@ -24,11 +21,13 @@ FairMQ provides multiple implementations for its API (so-called "transports",
e.g. `zeromq`, `shmem` and `ofi` (in development)) to cover a variety of use cases e.g. `zeromq`, `shmem` and `ofi` (in development)) to cover a variety of use cases
(e.g. inter-thread, inter-process, inter-node communication) and machines (e.g. Ethernet, Infiniband). (e.g. inter-thread, inter-process, inter-node communication) and machines (e.g. Ethernet, Infiniband).
In addition to this core functionality FairMQ provides a framework for creating "devices" - actors which In addition to this core functionality FairMQ provides a framework for creating "devices" - actors which
are communicating through message passing. FairMQ does not only allow the user to use different transport but also to mix them; i.e: A Device can communicate using different transport on different channels at the same time. Device execution is modelled as a simple state machine that are communicating through message passing. FairMQ does not only allow the user to use different transport
shapes the integration points for the user task. Devices also incorporate a plugin system for runtime configuration and control. but also to mix them; i.e: A Device can communicate using different transport on different channels at the
Next to the provided devices and plugins (e.g. [DDS](https://github.com/FairRootGroup/DDS)) same time. Device execution is modelled as a simple state machine that shapes the integration points for
the user can extend FairMQ by developing his own plugins to integrate his devices with external the user task. Devices also incorporate a plugin system for runtime configuration and control.
configuration and control services. Next to the provided [devices](https://github.com/FairRootGroup/FairMQ/tree/master/fairmq/devices) and
[plugins](https://github.com/FairRootGroup/FairMQ/tree/master/fairmq/plugins) the user can extend FairMQ
by developing his own plugins to integrate his devices with external configuration and control services.
FairMQ has been developed in the context of its mother project [FairRoot](https://github.com/FairRootGroup/FairRoot) - FairMQ has been developed in the context of its mother project [FairRoot](https://github.com/FairRootGroup/FairRoot) -
a simulation, reconstruction and analysis framework. a simulation, reconstruction and analysis framework.
@@ -47,14 +46,15 @@ cmake --build fairmq_build --target install
Please consult the [manpages of your CMake version](https://cmake.org/cmake/help/latest/manual/cmake.1.html) for more options. Please consult the [manpages of your CMake version](https://cmake.org/cmake/help/latest/manual/cmake.1.html) for more options.
If dependencies are not installed in standard system directories, you can hint the installation location via `-DCMAKE_PREFIX_PATH=...` or per dependency via `-D{DEPENDENCY}_ROOT=...`. `{DEPENDENCY}` can be `GTEST`, `BOOST`, `FAIRLOGGER`, `ZEROMQ`, `OFI`, `PMIX`, `ASIO`, `ASIOFI` or `DDS` (`*_ROOT` variables can also be environment variables). If dependencies are not installed in standard system directories, you can hint the installation location via
`-DCMAKE_PREFIX_PATH=...` or per dependency via `-D{DEPENDENCY}_ROOT=...` (`*_ROOT` variables can also be environment variables).
## Usage ## Usage
FairMQ ships as a CMake package, so in your `CMakeLists.txt` you can discover it like this: FairMQ ships as a CMake package, so in your `CMakeLists.txt` you can discover it like this:
```cmake ```cmake
find_package(FairCMakeModules 0.2 REQUIRED) find_package(FairCMakeModules 1.0 REQUIRED)
include(FairFindPackage2) include(FairFindPackage2)
find_package2(FairMQ) find_package2(FairMQ)
find_package2_implicit_dependencies() find_package2_implicit_dependencies()
@@ -71,14 +71,14 @@ list(PREPEND CMAKE_PREFIX_PATH /path/to/fairmq_install)
Optionally, you can require certain FairMQ package components and a minimum version: Optionally, you can require certain FairMQ package components and a minimum version:
```cmake ```cmake
find_package(FairMQ 1.4.0 COMPONENTS dds_plugin) find_package(FairMQ 1.4.50 COMPONENTS ofi_transport)
``` ```
When building FairMQ, CMake will print a summary table of all available package components. When building FairMQ, CMake will print a summary table of all available package components.
## Dependencies ## Dependencies
* [asio](https://github.com/chriskohlhoff/asio) (optionally bundled) * [asio](https://github.com/chriskohlhoff/asio)
* [asiofi](https://github.com/FairRootGroup/asiofi) * [asiofi](https://github.com/FairRootGroup/asiofi)
* [Boost](https://www.boost.org/) * [Boost](https://www.boost.org/)
* [CMake](https://cmake.org/) * [CMake](https://cmake.org/)
@@ -86,13 +86,14 @@ When building FairMQ, CMake will print a summary table of all available package
* [Doxygen](http://www.doxygen.org/) * [Doxygen](http://www.doxygen.org/)
* [FairCMakeModules](https://github.com/FairRootGroup/FairCMakeModules) (optionally bundled) * [FairCMakeModules](https://github.com/FairRootGroup/FairCMakeModules) (optionally bundled)
* [FairLogger](https://github.com/FairRootGroup/FairLogger) * [FairLogger](https://github.com/FairRootGroup/FairLogger)
* [Flatbuffers](https://google.github.io/flatbuffers/)
* [GTest](https://github.com/google/googletest) (optionally bundled) * [GTest](https://github.com/google/googletest) (optionally bundled)
* [PMIx](https://pmix.org/) * [PMIx](https://pmix.org/)
* [ZeroMQ](http://zeromq.org/) * [ZeroMQ](http://zeromq.org/)
Which dependencies are required depends on which components are built. Which dependencies are required depends on which components are built.
Supported platforms: Linux and MacOS. Supported platform is Linux. macOS is supported on a best-effort basis.
## CMake options ## CMake options
@@ -102,7 +103,6 @@ On command line:
* `-DBUILD_TESTING=OFF` disables building of tests. * `-DBUILD_TESTING=OFF` disables building of tests.
* `-DBUILD_EXAMPLES=OFF` disables building of examples. * `-DBUILD_EXAMPLES=OFF` disables building of examples.
* `-DBUILD_OFI_TRANSPORT=ON` enables building of the experimental OFI transport. * `-DBUILD_OFI_TRANSPORT=ON` enables building of the experimental OFI transport.
* `-DBUILD_DDS_PLUGIN=ON` enables building of the DDS plugin.
* `-DBUILD_PMIX_PLUGIN=ON` enables building of the PMIx plugin. * `-DBUILD_PMIX_PLUGIN=ON` enables building of the PMIx plugin.
* `-DBUILD_DOCS=ON` enables building of API docs. * `-DBUILD_DOCS=ON` enables building of API docs.
* You can hint non-system installations for dependent packages, see the #installation-from-source section above * You can hint non-system installations for dependent packages, see the #installation-from-source section above

View File

@@ -34,9 +34,9 @@ macro(fairmq_summary_components)
endif() endif()
message(STATUS " ${BWhite}ofi_transport${CR} ${ofi_summary}") message(STATUS " ${BWhite}ofi_transport${CR} ${ofi_summary}")
if(BUILD_DDS_PLUGIN) if(BUILD_DDS_PLUGIN)
set(dds_summary "${BGreen}YES${CR} (disable with ${BMagenta}-DBUILD_DDS_PLUGIN=OFF${CR})") set(dds_summary "${BGreen}YES${CR} DEPRECATED (disable with ${BMagenta}-DBUILD_DDS_PLUGIN=OFF${CR})")
else() else()
set(dds_summary "${BRed} NO${CR} (default, enable with ${BMagenta}-DBUILD_DDS_PLUGIN=ON${CR})") set(dds_summary "${BRed} NO${CR} DEPRECATED (default, enable with ${BMagenta}-DBUILD_DDS_PLUGIN=ON${CR})")
endif() endif()
message(STATUS " ${BWhite}dds_plugin${CR} ${dds_summary}") message(STATUS " ${BWhite}dds_plugin${CR} ${dds_summary}")
if(BUILD_PMIX_PLUGIN) if(BUILD_PMIX_PLUGIN)
@@ -58,15 +58,15 @@ macro(fairmq_summary_components)
endif() endif()
message(STATUS " ${BWhite}docs${CR} ${docs_summary}") message(STATUS " ${BWhite}docs${CR} ${docs_summary}")
if(BUILD_SDK) if(BUILD_SDK)
set(sdk_summary "${BGreen}YES${CR} EXPERIMENTAL (disable with ${BMagenta}-DBUILD_SDK=OFF${CR})") set(sdk_summary "${BGreen}YES${CR} DEPRECATED (disable with ${BMagenta}-DBUILD_SDK=OFF${CR})")
else() else()
set(sdk_summary "${BRed} NO${CR} EXPERIMENTAL (default, enable with ${BMagenta}-DBUILD_SDK=ON${CR})") set(sdk_summary "${BRed} NO${CR} DEPRECATED (default, enable with ${BMagenta}-DBUILD_SDK=ON${CR})")
endif() endif()
message(STATUS " ${BWhite}sdk${CR} ${sdk_summary}") message(STATUS " ${BWhite}sdk${CR} ${sdk_summary}")
if(BUILD_SDK_COMMANDS) if(BUILD_SDK_COMMANDS)
set(sdk_commands_summary "${BGreen}YES${CR} (disable with ${BMagenta}-DBUILD_SDK_COMMANDS=OFF${CR})") set(sdk_commands_summary "${BGreen}YES${CR} DEPRECATED (disable with ${BMagenta}-DBUILD_SDK_COMMANDS=OFF${CR})")
else() else()
set(sdk_commands_summary "${BRed} NO${CR} (default, enable with ${BMagenta}-DBUILD_SDK_COMMANDS=ON${CR})") set(sdk_commands_summary "${BRed} NO${CR} DEPRECATED (default, enable with ${BMagenta}-DBUILD_SDK_COMMANDS=ON${CR})")
endif() endif()
message(STATUS " ${BWhite}sdk_commands${CR} ${sdk_commands_summary}") message(STATUS " ${BWhite}sdk_commands${CR} ${sdk_commands_summary}")
if(BUILD_TIDY_TOOL) if(BUILD_TIDY_TOOL)
@@ -75,6 +75,21 @@ macro(fairmq_summary_components)
set(sdk_tidy_summary "${BRed} NO${CR} (default, enable with ${BMagenta}-DBUILD_TIDY_TOOL=ON${CR})") set(sdk_tidy_summary "${BRed} NO${CR} (default, enable with ${BMagenta}-DBUILD_TIDY_TOOL=ON${CR})")
endif() endif()
message(STATUS " ${BWhite}tidy_tool${CR} ${sdk_tidy_summary}") message(STATUS " ${BWhite}tidy_tool${CR} ${sdk_tidy_summary}")
set(_deprecated)
if(BUILD_SDK)
list(APPEND _deprecated sdk)
endif()
if(BUILD_SDK_COMMANDS)
list(APPEND _deprecated sdk_commands)
endif()
if(BUILD_DDS_PLUGIN)
list(APPEND _deprecated dds_plugin)
endif()
list(JOIN _deprecated ", " _deprecated)
if(_deprecated)
message(DEPRECATION "You have selected to build deprecated components '${_deprecated}' which will be removed in a future release. See https://github.com/FairRootGroup/FairMQ/discussions/392 for more information. Use '-Wno-deprecated' to silence deprecation warnings.")
endif()
endmacro() endmacro()
macro(fairmq_summary_static_analysis) macro(fairmq_summary_static_analysis)

View File

@@ -26,7 +26,6 @@ Here is an overview of the device/channel options and when they are applied:
| `transport` | at the end of `fair::mq::State::InitializingDevice` | | `transport` | at the end of `fair::mq::State::InitializingDevice` |
| `network-interface` | at the end of `fair::mq::State::InitializingDevice` | | `network-interface` | at the end of `fair::mq::State::InitializingDevice` |
| `init-timeout` | at the end of `fair::mq::State::InitializingDevice` | | `init-timeout` | at the end of `fair::mq::State::InitializingDevice` |
| `max-run-time` | at the end of `fair::mq::State::InitializingDevice` |
| `shm-segment-size` | at the end of `fair::mq::State::InitializingDevice` | | `shm-segment-size` | at the end of `fair::mq::State::InitializingDevice` |
| `shm-monitor` | at the end of `fair::mq::State::InitializingDevice` | | `shm-monitor` | at the end of `fair::mq::State::InitializingDevice` |
| `ofi-size-hint` | at the end of `fair::mq::State::InitializingDevice` | | `ofi-size-hint` | at the end of `fair::mq::State::InitializingDevice` |

View File

@@ -1,19 +1,19 @@
/******************************************************************************** /********************************************************************************
* Copyright (C) 2012-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Copyright (C) 2012-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* * * *
* This software is distributed under the terms of the * * This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" * * copied verbatim in the file "LICENSE" *
********************************************************************************/ ********************************************************************************/
#include <algorithm> // std::max #include <algorithm> // std::max, std::any_of
#include <boost/algorithm/string.hpp> // join/split #include <boost/algorithm/string.hpp> // join/split
#include <chrono> #include <chrono>
#include <fairmq/Device.h> #include <fairmq/Device.h>
#include <fairmq/Tools.h> #include <fairmq/Tools.h>
#include <future>
#include <iomanip> #include <iomanip>
#include <list> #include <list>
#include <memory> // std::make_unique
#include <mutex> #include <mutex>
#include <thread> #include <thread>
@@ -27,7 +27,6 @@ constexpr const char* Device::DefaultTransportName;
constexpr mq::Transport Device::DefaultTransportType; constexpr mq::Transport Device::DefaultTransportType;
constexpr const char* Device::DefaultNetworkInterface; constexpr const char* Device::DefaultNetworkInterface;
constexpr int Device::DefaultInitTimeout; constexpr int Device::DefaultInitTimeout;
constexpr uint64_t Device::DefaultMaxRunTime;
constexpr float Device::DefaultRate; constexpr float Device::DefaultRate;
constexpr const char* Device::DefaultSession; constexpr const char* Device::DefaultSession;
@@ -83,7 +82,6 @@ Device::Device(ProgOptions* config, tools::Version version)
, fMultitransportProceed(false) , fMultitransportProceed(false)
, fVersion(version) , fVersion(version)
, fRate(DefaultRate) , fRate(DefaultRate)
, fMaxRunRuntimeInS(DefaultMaxRunTime)
, fInitializationTimeoutInS(DefaultInitTimeout) , fInitializationTimeoutInS(DefaultInitTimeout)
, fTransitioning(false) , fTransitioning(false)
{ {
@@ -215,7 +213,6 @@ void Device::InitWrapper()
Init(); Init();
fRate = fConfig->GetProperty<float>("rate", DefaultRate); fRate = fConfig->GetProperty<float>("rate", DefaultRate);
fMaxRunRuntimeInS = fConfig->GetProperty<uint64_t>("max-run-time", DefaultMaxRunTime);
fInitializationTimeoutInS = fConfig->GetProperty<int>("init-timeout", DefaultInitTimeout); fInitializationTimeoutInS = fConfig->GetProperty<int>("init-timeout", DefaultInitTimeout);
try { try {
@@ -293,7 +290,9 @@ void Device::BindWrapper()
Bind(); Bind();
ChangeState(Transition::Auto); if (!NewStatePending()) {
ChangeState(Transition::Auto);
}
} }
void Device::ConnectWrapper() void Device::ConnectWrapper()
@@ -330,7 +329,9 @@ void Device::ConnectWrapper()
Connect(); Connect();
ChangeState(Transition::Auto); if (!NewStatePending()) {
ChangeState(Transition::Auto);
}
} }
void Device::AttachChannels(vector<Channel*>& chans) void Device::AttachChannels(vector<Channel*>& chans)
@@ -430,15 +431,28 @@ void Device::InitTaskWrapper()
{ {
InitTask(); InitTask();
ChangeState(Transition::Auto); if (!NewStatePending()) {
ChangeState(Transition::Auto);
}
} }
void Device::RunWrapper() void Device::RunWrapper()
{ {
LOG(info) << "fair::mq::Device running..."; LOG(info) << "fair::mq::Device running...";
// start the rate logger thread unique_ptr<thread> rateLogger;
future<void> rateLogger = async(launch::async, &Device::LogSocketRates, this); // Check if rate logging thread is needed
const bool rateLogging = any_of(fChannels.cbegin(), fChannels.cend(), [](auto ch) {
return any_of(ch.second.cbegin(), ch.second.cend(), [](auto sub) { return sub.fRateLogging > 0; });
});
if (rateLogging) {
rateLogger = make_unique<thread>(&Device::LogSocketRates, this);
}
tools::CallOnDestruction joinRateLogger([&](){
if (rateLogging && rateLogger->joinable()) { rateLogger->join(); }
});
// notify transports to resume transfers // notify transports to resume transfers
for (auto& t : fTransports) { for (auto& t : fTransports) {
@@ -481,8 +495,6 @@ void Device::RunWrapper()
PostRun(); PostRun();
cod.disable(); cod.disable();
rateLogger.get();
} }
void Device::HandleSingleChannelInput() void Device::HandleSingleChannelInput()
@@ -710,7 +722,6 @@ void Device::LogSocketRates()
chrono::time_point<chrono::high_resolution_clock> t0(chrono::high_resolution_clock::now()); chrono::time_point<chrono::high_resolution_clock> t0(chrono::high_resolution_clock::now());
chrono::time_point<chrono::high_resolution_clock> t1; chrono::time_point<chrono::high_resolution_clock> t1;
uint64_t secondsElapsed = 0;
while (!NewStatePending()) { while (!NewStatePending()) {
WaitFor(chrono::seconds(1)); WaitFor(chrono::seconds(1));
@@ -743,7 +754,7 @@ void Device::LogSocketRates()
bytesOut.at(i) = bytesOutNew.at(i); bytesOut.at(i) = bytesOutNew.at(i);
msgOut.at(i) = msgOutNew.at(i); msgOut.at(i) = msgOutNew.at(i);
LOG(info) << setw(chanNameLen) << filteredChannelNames.at(i) << ": " LOG(info) << setw(static_cast<int>(chanNameLen)) << filteredChannelNames.at(i) << ": "
<< "in: " << msgPerSecIn.at(i) << " (" << mbPerSecIn.at(i) << " MB) " << "in: " << msgPerSecIn.at(i) << " (" << mbPerSecIn.at(i) << " MB) "
<< "out: " << msgPerSecOut.at(i) << " (" << mbPerSecOut.at(i) << " MB)"; << "out: " << msgPerSecOut.at(i) << " (" << mbPerSecOut.at(i) << " MB)";
} }
@@ -753,9 +764,6 @@ void Device::LogSocketRates()
} }
t0 = t1; t0 = t1;
if (fMaxRunRuntimeInS > 0 && ++secondsElapsed >= fMaxRunRuntimeInS) {
ChangeState(Transition::Stop);
}
} }
} }
@@ -770,7 +778,9 @@ void Device::ResetTaskWrapper()
{ {
ResetTask(); ResetTask();
ChangeState(Transition::Auto); if (!NewStatePending()) {
ChangeState(Transition::Auto);
}
} }
void Device::ResetWrapper() void Device::ResetWrapper()
@@ -784,7 +794,9 @@ void Device::ResetWrapper()
fChannels.clear(); fChannels.clear();
fTransports.clear(); fTransports.clear();
fTransportFactory.reset(); fTransportFactory.reset();
ChangeState(Transition::Auto); if (!NewStatePending()) {
ChangeState(Transition::Auto);
}
} }
Device::~Device() Device::~Device()

View File

@@ -1,5 +1,5 @@
/******************************************************************************** /********************************************************************************
* Copyright (C) 2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Copyright (C) 2021-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* * * *
* This software is distributed under the terms of the * * This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
@@ -565,7 +565,6 @@ class Device
static constexpr mq::Transport DefaultTransportType = mq::Transport::ZMQ; static constexpr mq::Transport DefaultTransportType = mq::Transport::ZMQ;
static constexpr const char* DefaultNetworkInterface = "default"; static constexpr const char* DefaultNetworkInterface = "default";
static constexpr int DefaultInitTimeout = 120; static constexpr int DefaultInitTimeout = 120;
static constexpr uint64_t DefaultMaxRunTime = 0;
static constexpr float DefaultRate = 0.; static constexpr float DefaultRate = 0.;
static constexpr const char* DefaultSession = "default"; static constexpr const char* DefaultSession = "default";

View File

@@ -187,9 +187,7 @@ struct Machine_ : public state_machine_def<Machine_>
{ {
unique_lock<mutex> lock(fStateMtx); unique_lock<mutex> lock(fStateMtx);
while (!fNewStatePending) { fNewStatePendingCV.wait(lock, [this]{ return fNewStatePending.load(); });
fNewStatePendingCV.wait_for(lock, chrono::milliseconds(100));
}
LOG(state) << fState << " ---> " << fNewState; LOG(state) << fState << " ---> " << fNewState;
fState = static_cast<State>(fNewState); fState = static_cast<State>(fNewState);

View File

@@ -1,5 +1,5 @@
/******************************************************************************** /********************************************************************************
* Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Copyright (C) 2019-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* * * *
* This software is distributed under the terms of the * * This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
@@ -9,16 +9,14 @@
#ifndef FAIRMQSTATEQUEUE_H_ #ifndef FAIRMQSTATEQUEUE_H_
#define FAIRMQSTATEQUEUE_H_ #define FAIRMQSTATEQUEUE_H_
#include <fairmq/States.h>
#include <queue>
#include <mutex>
#include <chrono> #include <chrono>
#include <utility> // pair
#include <condition_variable> #include <condition_variable>
#include <fairmq/States.h>
#include <mutex>
#include <queue>
#include <utility> // pair
namespace fair::mq namespace fair::mq {
{
class StateQueue class StateQueue
{ {
@@ -33,41 +31,47 @@ class StateQueue
fair::mq::State WaitForNext() fair::mq::State WaitForNext()
{ {
std::unique_lock<std::mutex> lock(fMtx); std::unique_lock<std::mutex> lock(fMtx);
while (fStates.empty()) { fCV.wait(lock, [this] { return Predicate(); });
fCV.wait_for(lock, std::chrono::milliseconds(50)); return PopFrontUnsafe();
}
fair::mq::State state = fStates.front();
if (state == fair::mq::State::Error) {
throw DeviceErrorState("Controlled device transitioned to error state.");
}
fStates.pop();
return state;
} }
template<typename Rep, typename Period> template<typename Timeout>
std::pair<bool, fair::mq::State> WaitForNext(std::chrono::duration<Rep, Period> const& duration) std::pair<bool, fair::mq::State> WaitForNext(Timeout&& duration)
{ {
std::unique_lock<std::mutex> lock(fMtx); std::unique_lock<std::mutex> lock(fMtx);
fCV.wait_for(lock, duration); fCV.wait_for(lock, std::forward<Timeout>(duration), [this] { return Predicate(); });
return ReturnPairUnsafe();
if (fStates.empty()) {
return { false, fair::mq::State::Ok };
}
fair::mq::State state = fStates.front();
if (state == fair::mq::State::Error) {
throw DeviceErrorState("Controlled device transitioned to error state.");
}
fStates.pop();
return { true, state };
} }
void WaitForState(fair::mq::State state) { while (WaitForNext() != state) {} } template<typename CustomPredicate>
std::pair<bool, fair::mq::State> WaitForNextOrCustom(CustomPredicate&& customPredicate)
{
std::unique_lock<std::mutex> lock(fMtx);
fCV.wait(lock, [this, cp = std::move(customPredicate)] { return Predicate() || cp(); });
return ReturnPairUnsafe();
}
template<typename CustomPredicate>
std::pair<bool, fair::mq::State> WaitForCustom(CustomPredicate&& customPredicate)
{
std::unique_lock<std::mutex> lock(fMtx);
fCV.wait(lock, [cp = std::move(customPredicate)] { return cp(); });
return ReturnPairUnsafe();
}
void WaitForState(fair::mq::State state)
{
while (WaitForNext() != state) {}
}
template<typename CustomPredicate>
void WaitForStateOrCustom(fair::mq::State state, CustomPredicate customPredicate)
{
auto next = WaitForNextOrCustom(customPredicate);
while (!customPredicate() && (next.first && next.second != state)) {
next = WaitForNextOrCustom(customPredicate);
}
}
void Push(fair::mq::State state) void Push(fair::mq::State state)
{ {
@@ -75,7 +79,35 @@ class StateQueue
std::lock_guard<std::mutex> lock(fMtx); std::lock_guard<std::mutex> lock(fMtx);
fStates.push(state); fStates.push(state);
} }
fCV.notify_all(); fCV.notify_one();
}
template<typename CustomSignal>
void Push(fair::mq::State state, CustomSignal&& signal)
{
{
std::lock_guard<std::mutex> lock(fMtx);
fStates.push(state);
signal();
}
fCV.notify_one();
}
template<typename CustomSignal>
void Notify(CustomSignal&& signal)
{
{
std::lock_guard<std::mutex> lock(fMtx);
signal();
}
fCV.notify_one();
}
template<typename CustomSignal>
void Locked(CustomSignal&& signal)
{
std::lock_guard<std::mutex> lock(fMtx);
signal();
} }
void Clear() void Clear()
@@ -88,8 +120,29 @@ class StateQueue
std::queue<fair::mq::State> fStates; std::queue<fair::mq::State> fStates;
std::mutex fMtx; std::mutex fMtx;
std::condition_variable fCV; std::condition_variable fCV;
// must be called under locked fMtx
fair::mq::State PopFrontUnsafe()
{
fair::mq::State state = fStates.front();
if (state == fair::mq::State::Error) {
throw DeviceErrorState("Controlled device transitioned to error state.");
}
fStates.pop();
return state;
}
// must be called under locked fMtx
std::pair<bool, fair::mq::State> ReturnPairUnsafe()
{
auto const pred = Predicate();
return {pred, pred ? PopFrontUnsafe() : fair::mq::State::Ok};
}
// must be called under locked fMtx
bool Predicate() { return !fStates.empty(); }
}; };
} // namespace fair::mq } // namespace fair::mq
#endif /* FAIRMQSTATEQUEUE_H_ */ #endif /* FAIRMQSTATEQUEUE_H_ */

View File

@@ -1,5 +1,5 @@
/******************************************************************************** /********************************************************************************
* Copyright (C) 2018-2020 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Copyright (C) 2018-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* * * *
* This software is distributed under the terms of the * * This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
@@ -21,7 +21,7 @@
#define FAIRMQ_GIT_DATE "@PROJECT_GIT_DATE@" #define FAIRMQ_GIT_DATE "@PROJECT_GIT_DATE@"
#define FAIRMQ_REPO_URL "https://github.com/FairRootGroup/FairMQ" #define FAIRMQ_REPO_URL "https://github.com/FairRootGroup/FairMQ"
#define FAIRMQ_LICENSE "LGPL-3.0" #define FAIRMQ_LICENSE "LGPL-3.0"
#define FAIRMQ_COPYRIGHT "2012-2021 GSI" #define FAIRMQ_COPYRIGHT "2012-2022 GSI"
#define FAIRMQ_BUILD_TYPE "@CMAKE_BUILD_TYPE@" #define FAIRMQ_BUILD_TYPE "@CMAKE_BUILD_TYPE@"
#endif // FAIR_MQ_VERSION_H #endif // FAIR_MQ_VERSION_H

View File

@@ -1,5 +1,5 @@
/******************************************************************************** /********************************************************************************
* Copyright (C) 2017-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Copyright (C) 2017-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* * * *
* This software is distributed under the terms of the * * This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
@@ -62,7 +62,6 @@ Plugin::ProgOptions ConfigPluginProgramOptions()
("transport", po::value<string >()->default_value("zeromq"), "Transport ('zeromq'/'shmem').") ("transport", po::value<string >()->default_value("zeromq"), "Transport ('zeromq'/'shmem').")
("network-interface", po::value<string >()->default_value("default"), "Network interface to bind on (e.g. eth0, ib0..., default will try to detect the interface of the default route).") ("network-interface", po::value<string >()->default_value("default"), "Network interface to bind on (e.g. eth0, ib0..., default will try to detect the interface of the default route).")
("init-timeout", po::value<int >()->default_value(120), "Timeout for the initialization in seconds (when expecting dynamic initialization).") ("init-timeout", po::value<int >()->default_value(120), "Timeout for the initialization in seconds (when expecting dynamic initialization).")
("max-run-time", po::value<uint64_t >()->default_value(0), "Maximum runtime for the Running state handler, after which state will change to Ready (in seconds, 0 for no limit).")
("print-channels", po::value<bool >()->implicit_value(true), "Print registered channel endpoints in a machine-readable format (<channel name>:<min num subchannels>:<max num subchannels>)") ("print-channels", po::value<bool >()->implicit_value(true), "Print registered channel endpoints in a machine-readable format (<channel name>:<min num subchannels>:<max num subchannels>)")
("shm-segment-size", po::value<size_t >()->default_value(2ULL << 30), "Shared memory: size of the shared memory segment (in bytes).") ("shm-segment-size", po::value<size_t >()->default_value(2ULL << 30), "Shared memory: size of the shared memory segment (in bytes).")
("shm-allocation", po::value<string >()->default_value("rbtree_best_fit"), "Shared memory allocation algorithm: rbtree_best_fit/simple_seq_fit.") ("shm-allocation", po::value<string >()->default_value("rbtree_best_fit"), "Shared memory allocation algorithm: rbtree_best_fit/simple_seq_fit.")

View File

@@ -1,5 +1,5 @@
/******************************************************************************** /********************************************************************************
* Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Copyright (C) 2017-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* * * *
* This software is distributed under the terms of the * * This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
@@ -36,7 +36,7 @@ REGISTER_FAIRMQ_PLUGIN(
config, // Plugin name config, // Plugin name
(Plugin::Version{FAIRMQ_VERSION_MAJOR, FAIRMQ_VERSION_MINOR, FAIRMQ_VERSION_PATCH}), (Plugin::Version{FAIRMQ_VERSION_MAJOR, FAIRMQ_VERSION_MINOR, FAIRMQ_VERSION_PATCH}),
"FairRootGroup <fairroot@gsi.de>", "FairRootGroup <fairroot@gsi.de>",
"https://github.com/FairRootGroup/FairRoot", "https://github.com/FairRootGroup/FairMQ",
ConfigPluginProgramOptions ConfigPluginProgramOptions
) )

View File

@@ -1,5 +1,5 @@
/******************************************************************************** /********************************************************************************
* Copyright (C) 2017-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Copyright (C) 2017-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* * * *
* This software is distributed under the terms of the * * This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
@@ -56,11 +56,11 @@ Control::Control(const string& name, Plugin::Version version, const string& main
SubscribeToDeviceStateChange([&](DeviceState newState) { SubscribeToDeviceStateChange([&](DeviceState newState) {
LOG(trace) << "control plugin notified on new state: " << newState; LOG(trace) << "control plugin notified on new state: " << newState;
fStateQueue.Push(newState);
if (newState == DeviceState::Error) { if (newState == DeviceState::Error) {
fPluginShutdownRequested = true; fPluginShutdownRequested = true;
fDeviceShutdownRequested = true; fStateQueue.Push(newState, [this]{ fDeviceShutdownRequested = true; });
} else {
fStateQueue.Push(newState);
} }
}); });
@@ -99,18 +99,42 @@ Control::Control(const string& name, Plugin::Version version, const string& main
auto Control::RunStartupSequence() -> void auto Control::RunStartupSequence() -> void
{ {
ChangeDeviceState(DeviceStateTransition::InitDevice); using Transition = DeviceStateTransition;
while (fStateQueue.WaitForNext() != DeviceState::InitializingDevice) {} using State = DeviceState;
ChangeDeviceState(DeviceStateTransition::CompleteInit); auto shutdownRequested = [this]{ return fDeviceShutdownRequested.load(); };
while (fStateQueue.WaitForNext() != DeviceState::Initialized) {}
ChangeDeviceState(DeviceStateTransition::Bind); ChangeDeviceState(Transition::InitDevice);
while (fStateQueue.WaitForNext() != DeviceState::Bound) {} fStateQueue.WaitForStateOrCustom(State::InitializingDevice, shutdownRequested);
ChangeDeviceState(DeviceStateTransition::Connect); if (fDeviceShutdownRequested) { return; /* --> shutdown sequence */ }
while (fStateQueue.WaitForNext() != DeviceState::DeviceReady) {}
ChangeDeviceState(DeviceStateTransition::InitTask); ChangeDeviceState(Transition::CompleteInit);
while (fStateQueue.WaitForNext() != DeviceState::Ready) {} fStateQueue.WaitForStateOrCustom(State::Initialized, shutdownRequested);
ChangeDeviceState(DeviceStateTransition::Run); if (fDeviceShutdownRequested) { return; /* --> shutdown sequence */ }
while (fStateQueue.WaitForNext() != DeviceState::Running) {}
ChangeDeviceState(Transition::Bind);
fStateQueue.WaitForStateOrCustom(State::Binding, shutdownRequested);
if (fDeviceShutdownRequested) { return; /* --> shutdown sequence */ }
fStateQueue.WaitForStateOrCustom(State::Bound, shutdownRequested);
if (fDeviceShutdownRequested) { return; /* --> shutdown sequence */ }
ChangeDeviceState(Transition::Connect);
fStateQueue.WaitForStateOrCustom(State::Connecting, shutdownRequested);
if (fDeviceShutdownRequested) { return; /* --> shutdown sequence */ }
fStateQueue.WaitForStateOrCustom(State::DeviceReady, shutdownRequested);
if (fDeviceShutdownRequested) { return; /* --> shutdown sequence */ }
ChangeDeviceState(Transition::InitTask);
fStateQueue.WaitForStateOrCustom(State::InitializingTask, shutdownRequested);
if (fDeviceShutdownRequested) { return; /* --> shutdown sequence */ }
fStateQueue.WaitForStateOrCustom(State::Ready, shutdownRequested);
if (fDeviceShutdownRequested) { return; /* --> shutdown sequence */ }
ChangeDeviceState(Transition::Run);
fStateQueue.WaitForStateOrCustom(State::Running, shutdownRequested);
if (fDeviceShutdownRequested) { return; /* --> shutdown sequence */ }
} }
auto ControlPluginProgramOptions() -> Plugin::ProgOptions auto ControlPluginProgramOptions() -> Plugin::ProgOptions
@@ -123,10 +147,8 @@ auto ControlPluginProgramOptions() -> Plugin::ProgOptions
return pluginOptions; return pluginOptions;
} }
auto Control::InteractiveMode() -> void auto Control::RunREPL() -> void
try { {
RunStartupSequence();
char input = 0; // hold the user console input char input = 0; // hold the user console input
pollfd cinfd[1]; pollfd cinfd[1];
cinfd[0].fd = fileno(stdin); cinfd[0].fd = fileno(stdin);
@@ -161,7 +183,7 @@ try {
case 'i': case 'i':
cout << "\n --> [i] init device\n\n" << flush; cout << "\n --> [i] init device\n\n" << flush;
if (ChangeDeviceState(DeviceStateTransition::InitDevice)) { if (ChangeDeviceState(DeviceStateTransition::InitDevice)) {
while (fStateQueue.WaitForNext() != DeviceState::InitializingDevice) {} fStateQueue.WaitForState(DeviceState::InitializingDevice);
ChangeDeviceState(DeviceStateTransition::CompleteInit); ChangeDeviceState(DeviceStateTransition::CompleteInit);
} }
break; break;
@@ -243,7 +265,19 @@ try {
} }
} }
RunShutdownSequence(); }
auto Control::InteractiveMode() -> void
try {
RunStartupSequence();
if(!fDeviceShutdownRequested) {
RunREPL();
}
if(!fDeviceShutdownRequested) {
RunShutdownSequence();
}
} catch (PluginServices::DeviceControlError& e) { } catch (PluginServices::DeviceControlError& e) {
// If we are here, it means another plugin has taken control. That's fine, just print the exception message and do nothing else. // If we are here, it means another plugin has taken control. That's fine, just print the exception message and do nothing else.
LOG(debug) << e.what(); LOG(debug) << e.what();
@@ -366,16 +400,13 @@ auto Control::StaticMode() -> void
try { try {
RunStartupSequence(); RunStartupSequence();
{ // Wait for next state, which is DeviceState::Ready,
// Wait for next state, which is DeviceState::Ready, // or for device shutdown request (Ctrl-C)
// or for device shutdown request (Ctrl-C) fStateQueue.WaitForNextOrCustom([this]{ return fDeviceShutdownRequested.load(); });
pair<bool, fair::mq::State> result;
do {
result = fStateQueue.WaitForNext(chrono::milliseconds(50));
} while (result.first == false && !fDeviceShutdownRequested);
}
RunShutdownSequence(); if(!fDeviceShutdownRequested) {
RunShutdownSequence();
}
} catch (PluginServices::DeviceControlError& e) { } catch (PluginServices::DeviceControlError& e) {
// If we are here, it means another plugin has taken control. That's fine, just print the exception message and do nothing else. // If we are here, it means another plugin has taken control. That's fine, just print the exception message and do nothing else.
LOG(debug) << e.what(); LOG(debug) << e.what();
@@ -387,16 +418,12 @@ auto Control::GUIMode() -> void
try { try {
RunStartupSequence(); RunStartupSequence();
{ // Wait for device shutdown request (Ctrl-C)
// Wait for next state, which is DeviceState::Ready, fStateQueue.WaitForCustom([this]{ return fDeviceShutdownRequested.load(); });
// or for device shutdown request (Ctrl-C)
pair<bool, fair::mq::State> result;
do {
result = fStateQueue.WaitForNext(chrono::milliseconds(50));
} while (!fDeviceShutdownRequested);
}
RunShutdownSequence(); if(!fDeviceShutdownRequested) {
RunShutdownSequence();
}
} catch (PluginServices::DeviceControlError& e) { } catch (PluginServices::DeviceControlError& e) {
// If we are here, it means another plugin has taken control. That's fine, just print the // If we are here, it means another plugin has taken control. That's fine, just print the
// exception message and do nothing else. // exception message and do nothing else.
@@ -416,10 +443,10 @@ auto Control::SignalHandler() -> void
LOG(info) << "Waiting for graceful device shutdown. Hit Ctrl-C again to abort immediately."; LOG(info) << "Waiting for graceful device shutdown. Hit Ctrl-C again to abort immediately.";
// Signal and wait for controller thread, if we are controller // Signal and wait for controller thread, if we are controller
fDeviceShutdownRequested = true; fStateQueue.Notify([this] { fDeviceShutdownRequested = true; });
{ {
unique_lock<mutex> lock(fControllerMutex); unique_lock<mutex> lock(fControllerMutex);
if (fControllerThread.joinable()) fControllerThread.join(); if (fControllerThread.joinable()) { fControllerThread.join(); }
} }
if (!fDeviceHasShutdown) { if (!fDeviceHasShutdown) {
@@ -462,6 +489,12 @@ auto Control::RunShutdownSequence() -> void
case DeviceState::Running: case DeviceState::Running:
ChangeDeviceState(DeviceStateTransition::Stop); ChangeDeviceState(DeviceStateTransition::Stop);
break; break;
case DeviceState::Binding:
case DeviceState::Connecting:
case DeviceState::InitializingTask:
case DeviceState::ResettingTask:
case DeviceState::ResettingDevice:
ChangeDeviceState(DeviceStateTransition::Auto);
default: default:
// LOG(debug) << "Controller ignoring event: " << nextState; // LOG(debug) << "Controller ignoring event: " << nextState;
break; break;
@@ -481,9 +514,9 @@ Control::~Control()
{ {
unique_lock<mutex> lock(fControllerMutex); unique_lock<mutex> lock(fControllerMutex);
if (fControllerThread.joinable()) fControllerThread.join(); if (fControllerThread.joinable()) { fControllerThread.join(); }
} }
if (fSignalHandlerThread.joinable()) fSignalHandlerThread.join(); if (fSignalHandlerThread.joinable()) { fSignalHandlerThread.join(); }
UnsubscribeFromDeviceStateChange(); UnsubscribeFromDeviceStateChange();
} }

View File

@@ -1,5 +1,5 @@
/******************************************************************************** /********************************************************************************
* Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Copyright (C) 2017-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* * * *
* This software is distributed under the terms of the * * This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
@@ -46,6 +46,7 @@ class Control : public Plugin
auto GUIMode() -> void; auto GUIMode() -> void;
auto SignalHandler() -> void; auto SignalHandler() -> void;
auto RunShutdownSequence() -> void; auto RunShutdownSequence() -> void;
auto RunREPL() -> void;
auto RunStartupSequence() -> void; auto RunStartupSequence() -> void;
std::thread fControllerThread; std::thread fControllerThread;

View File

@@ -574,7 +574,7 @@ std::vector<std::pair<std::string, bool>> Monitor::Cleanup(const ShmId& shmIdT,
string managementSegmentName("fmq_" + shmId + "_mng"); string managementSegmentName("fmq_" + shmId + "_mng");
try { try {
bipc::managed_shared_memory managementSegment(bipc::open_only, managementSegmentName.c_str()); bipc::managed_shared_memory managementSegment(bipc::open_read_only, managementSegmentName.c_str());
Uint16RegionInfoHashMap* shmRegions = managementSegment.find<Uint16RegionInfoHashMap>(bipc::unique_instance).first; Uint16RegionInfoHashMap* shmRegions = managementSegment.find<Uint16RegionInfoHashMap>(bipc::unique_instance).first;
if (shmRegions) { if (shmRegions) {
@@ -660,27 +660,35 @@ void Monitor::ResetContent(const ShmId& shmIdT, bool verbose /* = true */)
managed_shared_memory managementSegment(open_only, managementSegmentName.c_str()); managed_shared_memory managementSegment(open_only, managementSegmentName.c_str());
Uint16SegmentInfoHashMap* segmentInfos = managementSegment.find<Uint16SegmentInfoHashMap>(unique_instance).first; Uint16SegmentInfoHashMap* segmentInfos = managementSegment.find<Uint16SegmentInfoHashMap>(unique_instance).first;
for (const auto& s : *segmentInfos) { if (segmentInfos) {
if (verbose) { cout << "Found info for " << segmentInfos->size() << " managed segments" << endl;
cout << "Resetting content of segment '" << "fmq_" << shmId << "_m_" << s.first << "'..." << endl; for (const auto& s : *segmentInfos) {
}
try {
if (s.second.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) {
RBTreeBestFitSegment segment(open_only, std::string("fmq_" + shmId + "_m_" + to_string(s.first)).c_str());
void* ptr = segment.get_segment_manager();
size_t size = segment.get_segment_manager()->get_size();
new(ptr) segment_manager<char, rbtree_best_fit<mutex_family, offset_ptr<void>>, null_index>(size);
} else {
SimpleSeqFitSegment segment(open_only, std::string("fmq_" + shmId + "_m_" + to_string(s.first)).c_str());
void* ptr = segment.get_segment_manager();
size_t size = segment.get_segment_manager()->get_size();
new(ptr) segment_manager<char, simple_seq_fit<mutex_family, offset_ptr<void>>, null_index>(size);
}
} catch (bie& e) {
if (verbose) { if (verbose) {
cout << "Error resetting content of segment '" << std::string("fmq_" + shmId + "_m_" + to_string(s.first)) << "': " << e.what() << endl; cout << "Resetting content of segment '" << "fmq_" << shmId << "_m_" << s.first << "'..." << endl;
}
try {
if (s.second.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) {
RBTreeBestFitSegment segment(open_only, std::string("fmq_" + shmId + "_m_" + to_string(s.first)).c_str());
void* ptr = segment.get_segment_manager();
size_t size = segment.get_segment_manager()->get_size();
new(ptr) segment_manager<char, rbtree_best_fit<mutex_family, offset_ptr<void>>, null_index>(size);
} else {
SimpleSeqFitSegment segment(open_only, std::string("fmq_" + shmId + "_m_" + to_string(s.first)).c_str());
void* ptr = segment.get_segment_manager();
size_t size = segment.get_segment_manager()->get_size();
new(ptr) segment_manager<char, simple_seq_fit<mutex_family, offset_ptr<void>>, null_index>(size);
}
if (verbose) {
cout << "Done." << endl;
}
} catch (bie& e) {
if (verbose) {
cout << "Error resetting content of segment '" << std::string("fmq_" + shmId + "_m_" + to_string(s.first)) << "': " << e.what() << endl;
}
} }
} }
} else {
cout << "Found management segment, but cannot locate segment info, something went wrong..." << endl;
} }
Uint16RegionInfoHashMap* shmRegions = managementSegment.find<Uint16RegionInfoHashMap>(bipc::unique_instance).first; Uint16RegionInfoHashMap* shmRegions = managementSegment.find<Uint16RegionInfoHashMap>(bipc::unique_instance).first;
@@ -717,13 +725,15 @@ void Monitor::ResetContent(const ShmId& shmIdT, const std::vector<SegmentConfig>
std::string shmId = shmIdT.shmId; std::string shmId = shmIdT.shmId;
std::string managementSegmentName("fmq_" + shmId + "_mng"); std::string managementSegmentName("fmq_" + shmId + "_mng");
// reset managed segments
ResetContent(shmIdT, verbose);
// delete management segment // delete management segment
cout << "deleting management segment" << endl;
Remove<bipc::shared_memory_object>(managementSegmentName, verbose); Remove<bipc::shared_memory_object>(managementSegmentName, verbose);
// recreate management segment // recreate management segment
cout << "recreating management segment..." << endl;
managed_shared_memory mngSegment(create_only, managementSegmentName.c_str(), kManagementSegmentSize); managed_shared_memory mngSegment(create_only, managementSegmentName.c_str(), kManagementSegmentSize);
cout << "done." << endl;
// fill management segment with segment & region infos // fill management segment with segment & region infos
cout << "filling management segment with managed segment configs..." << endl;
for (const auto& s : segmentCfgs) { for (const auto& s : segmentCfgs) {
if (s.allocationAlgorithm == "rbtree_best_fit") { if (s.allocationAlgorithm == "rbtree_best_fit") {
Segment::Register(shmId, s.id, AllocationAlgorithm::rbtree_best_fit); Segment::Register(shmId, s.id, AllocationAlgorithm::rbtree_best_fit);
@@ -734,9 +744,14 @@ void Monitor::ResetContent(const ShmId& shmIdT, const std::vector<SegmentConfig>
throw MonitorError("Unknown allocation algorithm provided: " + s.allocationAlgorithm); throw MonitorError("Unknown allocation algorithm provided: " + s.allocationAlgorithm);
} }
} }
cout << "done." << endl;
cout << "filling management segment with unmanaged region configs..." << endl;
for (const auto& r : regionCfgs) { for (const auto& r : regionCfgs) {
fair::mq::shmem::UnmanagedRegion::Register(shmId, r); fair::mq::shmem::UnmanagedRegion::Register(shmId, r);
} }
cout << "done." << endl;
// reset managed segments
ResetContent(shmIdT, verbose);
} }
void Monitor::ResetContent(const SessionId& sessionId, const std::vector<SegmentConfig>& segmentCfgs, const std::vector<RegionConfig>& regionCfgs, bool verbose /* = true */) void Monitor::ResetContent(const SessionId& sessionId, const std::vector<SegmentConfig>& segmentCfgs, const std::vector<RegionConfig>& regionCfgs, bool verbose /* = true */)

9
test/ci/fedora.35.def Normal file
View File

@@ -0,0 +1,9 @@
Bootstrap: docker
From: fedora:35
%post
dnf -y update
dnf -y install https://alfa-ci.gsi.de/packages/rpm/fedora-35-x86_64/fairsoft-release-dev.rpm
dnf -y install clang cli11-devel pmix-devel ninja-build 'dnf-command(builddep)' libasan liblsan libtsan libubsan clang-tools-extra
dnf -y builddep fairmq
dnf -y clean all