Compare commits

..

35 Commits

Author SHA1 Message Date
Dennis Klein
a7429ed79b Move and update PR template 2018-08-21 13:21:25 +02:00
Dennis Klein
09ef175736 Create issue templates 2018-08-21 13:21:25 +02:00
Dennis Klein
a55db74848 Fix variable name 2018-08-21 13:15:02 +02:00
Dennis Klein
8b3e3bbe28 Fix coverage reporting for FAST_BUILD 2018-08-09 16:15:15 +02:00
Dennis Klein
e71c9c1121 Parallelize unity build
Only needed with Makefile generator,
see https://github.com/sakra/cotire/blob/master/MANUAL.md#optimizing-the-build-process-for-multiple-processor-cores
2018-08-09 16:15:15 +02:00
Dennis Klein
fc0adba26b Support unity build 2018-08-09 16:15:15 +02:00
Dennis Klein
24dff2fd76 Enable FAST_BUILD for alfa-ci 2018-08-09 16:15:15 +02:00
Dennis Klein
70ffc0d8c6 Guard list operation for the case the list is empty 2018-08-09 16:15:15 +02:00
Dennis Klein
ff701006fd Reflect dev version in the installed artifacts 2018-08-09 16:15:15 +02:00
Dennis Klein
c8bd19b7a1 Add experimental FAST_BUILD option
Significantly reduces compile time for the FairMQ target with
precompiled headers and unity build. For maximum improvement, use
a multi-core-aware build tool, e.g. Ninja.

Leave it undocumented for now, let's first test it internally for a while.
2018-08-09 16:15:15 +02:00
Dennis Klein
aee2ba7b9b Increase sample size to catch time sensitive branches
and stabilize coverage diff reports.
2018-08-09 16:15:15 +02:00
Dennis Klein
9184d5bdae clean 2018-08-09 16:15:15 +02:00
Dennis Klein
5e6f3a5430 Enable color output with Ninja 2018-08-09 16:15:15 +02:00
Dennis Klein
d0fe175cab Print global cxx flags 2018-08-09 16:15:15 +02:00
Dennis Klein
6f22ccf4c1 Fix -Wsign-compare 2018-08-09 16:15:15 +02:00
Dennis Klein
b2034c20cf Fix -Wshadow 2018-08-09 16:15:15 +02:00
Dennis Klein
ab6fd35a86 Add header-only target for msgpack 2018-08-09 16:15:15 +02:00
Dennis Klein
4a8e46c65c Fix -Wunused-parameter 2018-08-09 16:15:15 +02:00
Dennis Klein
90e00730b1 Update clang-format 2018-08-09 16:15:15 +02:00
Dennis Klein
924c8ac5f6 Add hint how to change build type and change color of selected type 2018-08-09 16:15:15 +02:00
Dennis Klein
1e0159b775 Link against system threads library 2018-08-08 16:13:41 +02:00
Alexey Rybalchenko
ef3eb5f83e Simplify structure in DeviceRunner and plugin classes 2018-08-08 16:13:41 +02:00
Alexey Rybalchenko
ee8afd7d2b Fix race in plugin manager/services 2018-08-08 16:13:41 +02:00
Alexey Rybalchenko
a53ef79552 Run state handlers on the main thread (breaking change for control). 2018-08-08 16:13:41 +02:00
Dennis Klein
c064da91df Add ThreadSan/AddressSan build types and print table 2018-07-27 17:09:52 +02:00
Dennis Klein
f5e3212cbf Align with repo subtitle on github 2018-07-27 16:21:24 +02:00
Dennis Klein
c1d61007a1 Add Introduction header 2018-07-27 16:21:24 +02:00
Dennis Klein
93fb407af6 Compact dependency list 2018-07-27 16:21:24 +02:00
Dennis Klein
8e7e23e2d0 Add Copyright statement in separate file instead of README 2018-07-27 16:21:24 +02:00
Dennis Klein
f0ec5fa2be Refactor license section 2018-07-27 16:21:24 +02:00
Dennis Klein
aaaadf0a0b Switch to faster shields host 2018-07-23 20:34:23 +02:00
Dennis Klein
daec266341 Improve visibility of release and docs links 2018-07-23 15:47:04 +02:00
Dennis Klein
38a149d50c Make badges hyperlinks 2018-07-23 15:47:04 +02:00
Dennis Klein
cfebfb3407 Add codecov badges 2018-07-23 15:47:04 +02:00
Dennis Klein
e403d18cb9 Add codecov reports to PRs 2018-07-20 17:21:23 +02:00
45 changed files with 5057 additions and 674 deletions

33
.clang-format Normal file
View File

@@ -0,0 +1,33 @@
---
Language: Cpp
BasedOnStyle: Mozilla
AllowAllParametersOfDeclarationOnNextLine: false
AllowShortBlocksOnASingleLine: true
AllowShortFunctionsOnASingleLine: true
AllowShortIfStatementsOnASingleLine: false
AlignAfterOpenBracket: Align
AlignConsecutiveAssignments: false
AlignConsecutiveDeclarations: false
AlwaysBreakAfterReturnType: None
AlwaysBreakAfterDefinitionReturnType: None
BinPackArguments: false
BinPackParameters: false
BreakBeforeBinaryOperators: NonAssignment
BreakConstructorInitializers: BeforeComma
ColumnLimit: 100
ConstructorInitializerIndentWidth: 4
ContinuationIndentWidth: 4
Cpp11BracedListStyle: true
DerivePointerAlignment: true
FixNamespaceComments: true
IndentWidth: 4
IndentWrappedFunctionNames: true
IncludeBlocks: Regroup
NamespaceIndentation: None
PointerAlignment: Left
SortIncludes: true
SpacesBeforeTrailingComments: 3
Standard: Cpp11
UseTab: Never
...

27
.github/ISSUE_TEMPLATE/bug_report.md vendored Normal file
View File

@@ -0,0 +1,27 @@
---
name: Bug report
about: Create a report to help us improve
---
**Describe the bug**
A clear and concise description of what the bug is.
**To Reproduce**
Steps to reproduce the behavior:
1. Describe environment
2. Describe compile options used
3. Give commands you invoked
**Expected behavior**
A clear and concise description of what you expected to happen.
**Screenshots**
If applicable, add screenshots to help explain your problem.
**System information (please complete the following information):**
- OS: [e.g. MacOS, Fedora28, Ubuntu14.04]
- Environment: [e.g. FairSoft version, alfadist revision]
**Additional context**
Add any other context about the problem here.

View File

@@ -0,0 +1,17 @@
---
name: Feature request
about: Suggest an idea for this project
---
**Is your feature request related to a problem? Please describe.**
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]
**Describe the solution you'd like**
A clear and concise description of what you want to happen.
**Describe alternatives you've considered**
A clear and concise description of any alternative solutions or features you've considered.
**Additional context**
Add any other context or screenshots about the feature request here.

7
.github/ISSUE_TEMPLATE/question.md vendored Normal file
View File

@@ -0,0 +1,7 @@
---
name: Question / Support
about: Any FairMQ related matter you are interested in
---

View File

@@ -1,4 +1,6 @@
Replace me with your description.
Describe your proposal.
Mention any issue this PR is resolves or is related to.
---

View File

@@ -34,10 +34,30 @@ cmake_dependent_option(BUILD_OFI_TRANSPORT "Build experimental OFI transport." O
cmake_dependent_option(BUILD_DDS_PLUGIN "Build DDS plugin." OFF "BUILD_FAIRMQ" OFF)
cmake_dependent_option(BUILD_EXAMPLES "Build FairMQ examples." ON "BUILD_FAIRMQ" OFF)
option(BUILD_DOCS "Build FairMQ documentation." OFF)
option(FAST_BUILD "Fast production build. Not recommended for development." OFF)
################################################################################
# Dependencies #################################################################
if(FAST_BUILD)
include(cotire)
endif()
macro(find_msgpack)
if(NOT msgpack_FOUND)
find_package2(PRIVATE msgpack VERSION 3.0.0)
set(PROJECT_msgpack_VERSION 2.1.5)
if(NOT msgpack_FOUND)
find_package2(PRIVATE msgpack VERSION 2.1.5 REQUIRED)
endif()
set(msgpack_ROOT ${PACKAGE_PREFIX_DIR})
endif()
endmacro()
set(CMAKE_THREAD_PREFER_PTHREAD TRUE)
set(THREADS_PREFER_PTHREAD_FLAG TRUE)
find_package(Threads REQUIRED)
if(BUILD_FAIRMQ)
find_package2(PUBLIC Boost VERSION 1.64 REQUIRED
COMPONENTS program_options thread system filesystem regex date_time signals
@@ -166,6 +186,29 @@ install_cmake_package()
# Summary ######################################################################
if(CMAKE_CXX_FLAGS)
message(STATUS " ")
message(STATUS " ${Cyan}GLOBAL CXX FLAGS${CR} ${BGreen}${CMAKE_CXX_FLAGS}${CR}")
endif()
if(CMAKE_CONFIGURATION_TYPES)
message(STATUS " ")
message(STATUS " ${Cyan}BUILD TYPE CXX FLAGS${CR}")
string(TOUPPER "${CMAKE_BUILD_TYPE}" selected_type)
foreach(type IN LISTS CMAKE_CONFIGURATION_TYPES)
string(TOUPPER "${type}" type_upper)
if(type_upper STREQUAL selected_type)
pad("${type}" 18 " " type_padded)
message(STATUS "${BGreen}* ${type_padded}${CMAKE_CXX_FLAGS_${type_upper}}${CR}")
else()
pad("${type}" 18 " " type_padded)
message(STATUS " ${BWhite}${type_padded}${CR}${CMAKE_CXX_FLAGS_${type_upper}}")
endif()
unset(type_padded)
unset(type_upper)
endforeach()
message(STATUS " ")
message(STATUS " (Change the build type with ${BMagenta}-DCMAKE_BUILD_TYPE=...${CR})")
endif()
if(PROJECT_PACKAGE_DEPENDENCIES)
message(STATUS " ")
message(STATUS " ${Cyan}DEPENDENCY FOUND VERSION PREFIX${CR}")

41
COPYRIGHT Normal file
View File

@@ -0,0 +1,41 @@
Format: https://www.debian.org/doc/packaging-manuals/copyright-format/1.0/
Upstream-Name: FairMQ
Upstream-Contact: Mohammad Al-Turany <m.al-turany@gsi.de>
Source: https://github.com/FairRootGroup/FairMQ
Files: *
Copyright: 2012-2018, GSI Helmholtzzentrum fuer Schwerionenforschung GmbH
Copyright: 2012-2018, [see AUTHORS file]
Copyright: 2012-2018, [see CONTRIBUTORS file]
Comment: The copyright of individual contributors is documented in the
Git history.
License: LGPL-3.0-only
Files: cmake/cotire.cmake
Copyright: 2012-2018 Sascha Kratky
License: COTIRE
License: LGPL-3.0-only
[see LICENSE file]
License: COTIRE
Permission is hereby granted, free of charge, to any person
obtaining a copy of this software and associated documentation
files (the "Software"), to deal in the Software without
restriction, including without limitation the rights to use,
copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the
Software is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
OTHER DEALINGS IN THE SOFTWARE.

40
Dart.sh
View File

@@ -41,13 +41,15 @@ if [ "$#" -lt "2" ]; then
fi
# test if a valid ctest model is defined
if [ "$1" == "Experimental" -o "$1" == "Nightly" -o "$1" == "Continuous" -o "$1" == "Profile" -o "$1" == "alfa_ci" ]; then
echo ""
else
echo "-- Error -- This ctest model is not supported."
echo "-- Error -- Possible arguments are Nightly, Experimental, Continuous or Profile."
exit 1
fi
case "$1" in
Experimental|Nightly|Continuous|Profile|alfa_ci|codecov)
;;
*)
echo "-- Error -- This ctest model is not supported."
echo "-- Error -- Possible arguments are Nightly, Experimental, Continuous or Profile."
exit 1
;;
esac
# test if the input file exists and execute it
if [ -e "$2" ];then
@@ -61,6 +63,9 @@ fi
# set the ctest model to command line parameter
if [ "$1" == "alfa_ci" ]; then
export ctest_model=Experimental
elif [ "$1" == "codecov" ]; then
export ctest_model=Profile
export do_codecov_upload=1
else
export ctest_model=$1
fi
@@ -83,13 +88,20 @@ else
COMPILER=$CXX$($CXX -dumpversion)
fi
if [ "$1" == "alfa_ci" ]; then
export LABEL1=alfa_ci-$COMPILER-FairMQ_$GIT_BRANCH
export LABEL=$(echo $LABEL1 | sed -e 's#/#_#g')
else
export LABEL1=${LINUX_FLAVOUR}-$chip-$COMPILER-FairMQ_$GIT_BRANCH
export LABEL=$(echo $LABEL1 | sed -e 's#/#_#g')
fi
case "$1" in
alfa_ci)
export LABEL1=alfa_ci-$COMPILER-FairMQ_$GIT_BRANCH
export LABEL=$(echo $LABEL1 | sed -e 's#/#_#g')
;;
codecov)
export LABEL1=codecov-$COMPILER-FairMQ_$GIT_BRANCH
export LABEL=$(echo $LABEL1 | sed -e 's#/#_#g')
;;
*)
export LABEL1=${LINUX_FLAVOUR}-$chip-$COMPILER-FairMQ_$GIT_BRANCH
export LABEL=$(echo $LABEL1 | sed -e 's#/#_#g')
;;
esac
# get the number of processors
# and information about the host

View File

@@ -28,24 +28,24 @@ Set(configure_options "${configure_options};-DDISABLE_COLOR=ON")
Set(configure_options "${configure_options};-DCMAKE_PREFIX_PATH=$ENV{SIMPATH}")
Set(configure_options "${configure_options};-DBUILD_NANOMSG_TRANSPORT=ON")
Set(configure_options "${configure_options};-DBUILD_DDS_PLUGIN=ON")
Set(configure_options "${configure_options};-DFAST_BUILD=ON")
Set(configure_options "${configure_options};-DCOTIRE_MAXIMUM_NUMBER_OF_UNITY_INCLUDES=-j$ENV{number_of_processors}")
Set(EXTRA_FLAGS $ENV{EXTRA_FLAGS})
If(EXTRA_FLAGS)
Set(configure_options "${configure_options};${EXTRA_FLAGS}")
EndIf()
If($ENV{ctest_model} MATCHES Nightly OR $ENV{ctest_model} MATCHES Profile)
If($ENV{ctest_model} MATCHES Profile)
Find_Program(GCOV_COMMAND gcov)
If(GCOV_COMMAND)
Message("Found GCOV: ${GCOV_COMMAND}")
Set(CTEST_COVERAGE_COMMAND ${GCOV_COMMAND})
EndIf(GCOV_COMMAND)
EndIf()
Set(ENV{ctest_model} Nightly)
CTEST_EMPTY_BINARY_DIRECTORY(${CTEST_BINARY_DIRECTORY})
If($ENV{ctest_model} MATCHES Nightly OR $ENV{ctest_model} MATCHES Profile)
Ctest_Empty_Binary_Directory(${CTEST_BINARY_DIRECTORY})
EndIf()
Ctest_Start($ENV{ctest_model})
@@ -60,9 +60,26 @@ Ctest_Test(BUILD "${CTEST_BINARY_DIRECTORY}"
PARALLEL_LEVEL $ENV{number_of_processors}
RETURN_VALUE _ctest_test_ret_val
)
If("$ENV{do_codecov_upload}")
ForEach(i RANGE 4)
# Gather statistics to catch time sensitive branches
Ctest_Test(BUILD "${CTEST_BINARY_DIRECTORY}"
PARALLEL_LEVEL $ENV{number_of_processors}
)
EndForEach()
EndIf()
If(GCOV_COMMAND)
Ctest_Coverage(BUILD "${CTEST_BINARY_DIRECTORY}")
Ctest_Coverage(BUILD "${CTEST_BINARY_DIRECTORY}" LABELS coverage)
EndIf()
If("$ENV{do_codecov_upload}")
Execute_Process(COMMAND curl https://codecov.io/bash -o codecov_uploader.sh
WORKING_DIRECTORY ${CTEST_BINARY_DIRECTORY}
TIMEOUT 60)
Execute_Process(COMMAND bash ./codecov_uploader.sh -X gcov
WORKING_DIRECTORY ${CTEST_BINARY_DIRECTORY}
TIMEOUT 60)
EndIf()
Ctest_Submit()

40
Jenkinsfile vendored
View File

@@ -4,24 +4,31 @@ def specToLabel(Map spec) {
return "${spec.os}-${spec.arch}-${spec.compiler}-FairSoft_${spec.fairsoft}"
}
def buildMatrix(List specs, Closure callback) {
def jobMatrix(String prefix, List specs, Closure callback) {
def nodes = [:]
for (spec in specs) {
def label = specToLabel(spec)
nodes[label] = {
nodes["${prefix}/${label}"] = {
node(label) {
githubNotify(context: "alfa-ci/${label}", description: 'Building ...', status: 'PENDING')
githubNotify(context: "${prefix}/${label}", description: 'Building ...', status: 'PENDING')
try {
deleteDir()
checkout scm
sh '''\
echo "export BUILDDIR=$PWD/build" >> Dart.cfg
echo "export SOURCEDIR=$PWD" >> Dart.cfg
echo "export PATH=$SIMPATH/bin:$PATH" >> Dart.cfg
echo "export GIT_BRANCH=$JOB_BASE_NAME" >> Dart.cfg
'''
callback.call(spec, label)
deleteDir()
githubNotify(context: "alfa-ci/${label}", description: 'Success', status: 'SUCCESS')
githubNotify(context: "${prefix}/${label}", description: 'Success', status: 'SUCCESS')
} catch (e) {
deleteDir()
githubNotify(context: "alfa-ci/${label}", description: 'Error', status: 'ERROR')
githubNotify(context: "${prefix}/${label}", description: 'Error', status: 'ERROR')
throw e
}
}
@@ -33,22 +40,25 @@ def buildMatrix(List specs, Closure callback) {
pipeline{
agent none
stages {
stage("Run Build/Test Matrix") {
stage("Run CI Matrix") {
steps{
script {
parallel(buildMatrix([
def build_jobs = jobMatrix('alfa-ci/build', [
[os: 'Debian8', arch: 'x86_64', compiler: 'gcc4.9', fairsoft: 'may18'],
[os: 'MacOS10.11', arch: 'x86_64', compiler: 'AppleLLVM8.0.0', fairsoft: 'may18'],
[os: 'MacOS10.13', arch: 'x86_64', compiler: 'AppleLLVM9.0.0', fairsoft: 'may18'],
]) { spec, label ->
sh '''\
echo "export BUILDDIR=$PWD/build" >> Dart.cfg
echo "export SOURCEDIR=$PWD" >> Dart.cfg
echo "export PATH=$SIMPATH/bin:$PATH" >> Dart.cfg
echo "export GIT_BRANCH=$JOB_BASE_NAME" >> Dart.cfg
'''
sh './Dart.sh alfa_ci Dart.cfg'
})
}
def profile_jobs = jobMatrix('alfa-ci/codecov', [
[os: 'Debian8', arch: 'x86_64', compiler: 'gcc4.9', fairsoft: 'may18'],
]) { spec, label ->
withCredentials([string(credentialsId: 'fairmq_codecov_token', variable: 'CODECOV_TOKEN')]) {
sh './Dart.sh codecov Dart.cfg'
}
}
parallel(build_jobs + profile_jobs)
}
}
}

View File

@@ -1,12 +1,14 @@
<!-- {#mainpage} -->
# FairMQ
# FairMQ [![license](https://alfa-ci.gsi.de/shields/github/license/FairRootGroup/FairMQ.svg)](COPYRIGHT)
C++ Message passing framework
C++ Message Queuing Library and Framework
| Branch | Build Status |
| :---: | :--- |
| `master` | ![build status master branch](https://alfa-ci.gsi.de/buildStatus/icon?job=FairRootGroup/FairMQ/master) |
| `dev` | ![build status dev branch](https://alfa-ci.gsi.de/buildStatus/icon?job=FairRootGroup/FairMQ/dev) |
| Branch | Version | Docs | Status |
| :---: | :--- | :--- | :--- |
| `master` | [![release](https://alfa-ci.gsi.de/shields/github/release/FairRootGroup/FairMQ.svg)](https://github.com/FairRootGroup/FairMQ/releases/latest) | [API](https://fairrootgroup.github.io/FairMQ/latest), [Book](https://github.com/FairRootGroup/FairMQ/blob/master/README.md#documentation) | [![build status master branch](https://alfa-ci.gsi.de/buildStatus/icon?job=FairRootGroup/FairMQ/master)](https://alfa-ci.gsi.de/blue/organizations/jenkins/FairRootGroup%2FFairMQ/branches) [![test coverage master branch](https://codecov.io/gh/FairRootGroup/FairMQ/branch/master/graph/badge.svg)](https://codecov.io/gh/FairRootGroup/FairMQ/branch/master) |
| `dev` | [![dev tag](https://alfa-ci.gsi.de/shields/github/tag/FairRootGroup/FairMQ.svg)](https://github.com/FairRootGroup/FairMQ/tags) | [Book](https://github.com/FairRootGroup/FairMQ/blob/dev/README.md#documentation) | [![build status dev branch](https://alfa-ci.gsi.de/buildStatus/icon?job=FairRootGroup/FairMQ/dev)](https://alfa-ci.gsi.de/blue/organizations/jenkins/FairRootGroup%2FFairMQ/branches) [![test coverage dev branch](https://codecov.io/gh/FairRootGroup/FairMQ/branch/dev/graph/badge.svg)](https://codecov.io/gh/FairRootGroup/FairMQ/branch/dev) |
## Introduction
FairMQ is designed to help implementing large-scale data processing workflows needed in next-generation Particle Physics experiments. FairMQ is written in C++ and aims to
* provide **an asynchronous message passing abstraction** of different data transport technologies,
@@ -29,32 +31,17 @@ configuration and control services.
FairMQ has been developed in the context of its mother project [FairRoot](https://github.com/FairRootGroup/FairRoot) -
a simulation, reconstruction and analysis framework.
Find all FairMQ releases and development tags [here](https://github.com/FairRootGroup/FairMQ/releases).
## Dependencies
* [**Boost**](https://www.boost.org/) (PUBLIC)
* [**FairLogger**](https://github.com/FairRootGroup/FairLogger) (PUBLIC)
* [CMake](https://cmake.org/) (BUILD)
* [GTest](https://github.com/google/googletest) (BUILD, optional, `tests`)
* [Doxygen](http://www.doxygen.org/) (BUILD, optional, `docs`)
* [ZeroMQ](http://zeromq.org/) (PRIVATE)
* [Msgpack](https://msgpack.org/index.html) (PRIVATE, optional, `nanomsg_transport`)
* [nanomsg](http://nanomsg.org/) (PRIVATE, optional, `nanomsg_transport`)
* [OFI](https://ofiwg.github.io/libfabric/) (PRIVATE, optional, `ofi_transport`)
* [Protobuf](https://developers.google.com/protocol-buffers/) (PRIVATE, optional, `ofi_transport`)
* [DDS](http://dds.gsi.de) (PRIVATE, optional, `dds_plugin`)
* PUBLIC: [**Boost**](https://www.boost.org/), [**FairLogger**](https://github.com/FairRootGroup/FairLogger)
* BUILD: [CMake](https://cmake.org/), [GTest](https://github.com/google/googletest), [Doxygen](http://www.doxygen.org/)
* PRIVATE: [ZeroMQ](http://zeromq.org/), [Msgpack](https://msgpack.org/index.html), [nanomsg](http://nanomsg.org/),
[OFI](https://ofiwg.github.io/libfabric/), [Protobuf](https://developers.google.com/protocol-buffers/), [DDS](http://dds.gsi.de)
Supported platforms: Linux and MacOS.
## Releases
| Stable release | Date | API Docs |
| --- | --- | --- |
| [**1.2.3**](https://github.com/FairRootGroup/FairMQ/releases/tag/v1.2.3) | May 2018 | [link](https://fairrootgroup.github.io/FairMQ/v1.2.3/index.html) |
| [**1.2.1**](https://github.com/FairRootGroup/FairMQ/releases/tag/v1.2.1) | May 2018 | [link](https://fairrootgroup.github.io/FairMQ/v1.2.1/index.html) |
| [**1.2.0**](https://github.com/FairRootGroup/FairMQ/releases/tag/v1.2.0) | May 2018 | [link](https://fairrootgroup.github.io/FairMQ/v1.2.0/index.html) |
Find all FairMQ stable and development releases [here](https://github.com/FairRootGroup/FairMQ/releases).
## Installation from Source
```bash
@@ -89,7 +76,7 @@ In order to succesfully compile and link against the `FairMQ::FairMQ` target, yo
find_package(FairMQ)
if(FairMQ_FOUND)
find_package(FairLogger ${FairMQ_FairLogger_VERSION})
find_package(Boost ${FairMQ_Boost_VERSION} COMPONENTS ${FairMQ_BOOST_COMPONENTS})
find_package(Boost ${FairMQ_Boost_VERSION} COMPONENTS ${FairMQ_Boost_COMPONENTS})
endif()
```
@@ -101,7 +88,7 @@ Optionally, you can require certain FairMQ package components and a minimum vers
find_package(FairMQ 1.1.0 COMPONENTS nanomsg_transport dds_plugin)
if(FairMQ_FOUND)
find_package(FairLogger ${FairMQ_FairLogger_VERSION})
find_package(Boost ${FairMQ_Boost_VERSION} COMPONENTS ${FairMQ_BOOST_COMPONENTS})
find_package(Boost ${FairMQ_Boost_VERSION} COMPONENTS ${FairMQ_Boost_COMPONENTS})
endif()
```
@@ -169,9 +156,3 @@ After the `find_package(FairMQ)` call the following CMake variables are defined:
4. [File output](docs/Logging.md#54-file-output)
5. [Custom sinks](docs/Logging.md#55-custom-sinks)
6. [Examples](docs/Examples.md#6-examples)
## License
GNU Lesser General Public Licence (LGPL) version 3, see [LICENSE](LICENSE).
Copyright (C) 2013-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH

View File

@@ -112,7 +112,6 @@ macro(set_fairmq_defaults)
set(CMAKE_BUILD_TYPE RelWithDebInfo)
endif()
# Handle C++ standard level
set(CMAKE_CXX_STANDARD_REQUIRED ON)
if(NOT CMAKE_CXX_STANDARD)
@@ -158,9 +157,23 @@ macro(set_fairmq_defaults)
# Define export set, only one for now
set(PROJECT_EXPORT_SET ${PROJECT_NAME}Targets)
# Override CMake defaults
set(CMAKE_CXX_FLAGS_NIGHTLY "-O2 -g -Wshadow -Wall -Wextra")
set(CMAKE_CXX_FLAGS_PROFILE "-g3 -fno-inline -ftest-coverage -fprofile-arcs -Wshadow -Wall -Wextra -Wunused-variable")
# Configure build types
set(CMAKE_CONFIGURATION_TYPES "Debug" "Release" "RelWithDebInfo" "Nightly" "Profile" "AdressSan" "ThreadSan")
set(CMAKE_CXX_FLAGS_DEBUG "-g -Wshadow -Wall -Wextra")
set(CMAKE_CXX_FLAGS_RELEASE "-O2 -DNDEBUG")
set(CMAKE_CXX_FLAGS_RELWITHDEBINFO "-O2 -g -Wshadow -Wall -Wextra -DNDEBUG")
set(CMAKE_CXX_FLAGS_NIGHTLY "-O2 -g -Wshadow -Wall -Wextra")
set(CMAKE_CXX_FLAGS_PROFILE "-g3 -Wshadow -Wall -Wextra -fno-inline -ftest-coverage -fprofile-arcs")
set(CMAKE_CXX_FLAGS_ADRESSSAN "-O2 -g -Wshadow -Wall -Wextra -fsanitize=address -fno-omit-frame-pointer")
set(CMAKE_CXX_FLAGS_THREADSAN "-O2 -g -Wshadow -Wall -Wextra -fsanitize=thread")
if(CMAKE_GENERATOR STREQUAL "Ninja" AND
((CMAKE_CXX_COMPILER_ID STREQUAL "GNU" AND NOT CMAKE_CXX_COMPILER_VERSION VERSION_LESS 4.9) OR
(CMAKE_CXX_COMPILER_ID STREQUAL "Clang" AND NOT CMAKE_CXX_COMPILER_VERSION VERSION_LESS 3.5)))
# Force colored warnings in Ninja's output, if the compiler has -fdiagnostics-color support.
# Rationale in https://github.com/ninja-build/ninja/issues/814
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fdiagnostics-color=always")
endif()
endmacro()
function(join VALUES GLUE OUTPUT)
@@ -237,7 +250,7 @@ endfunction()
macro(install_cmake_package)
include(CMakePackageConfigHelpers)
set(PACKAGE_INSTALL_DESTINATION
${CMAKE_INSTALL_LIBDIR}/cmake/${PROJECT_NAME}-${PROJECT_VERSION}
${CMAKE_INSTALL_LIBDIR}/cmake/${PROJECT_NAME}-${PROJECT_GIT_VERSION}
)
if(BUILD_FAIRMQ)
install(EXPORT ${PROJECT_EXPORT_SET}

46
cmake/Findmsgpack.cmake Normal file
View File

@@ -0,0 +1,46 @@
################################################################################
# Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# #
# This software is distributed under the terms of the #
# GNU Lesser General Public Licence (LGPL) version 3, #
# copied verbatim in the file "LICENSE" #
################################################################################
unset(_args)
if(msgpack_FIND_VERSION)
list(APPEND _args ${msgpack_FIND_VERSION})
endif()
if(msgpack_FIND_EXACT)
list(APPEND _args "EXACT")
endif()
if(msgpack_FIND_QUIETLY)
list(APPEND _args "QUIET")
endif()
if(msgpack_FIND_REQUIRED)
list(APPEND _args "REQUIRED")
endif()
if(msgpack_FIND_COMPONENTS)
list(APPEND _args "COMPONENTS" ${msgpack_FIND_COMPONENTS})
endif()
find_package(msgpack ${_args} CONFIG)
if(msgpack_FOUND AND NOT TARGET msgpack::msgpack)
# config mode find_package does not set $msgpack_ROOT, workaround by extracting
# root path from library target
unset(_msgpack_lib)
unset(_prefix)
get_target_property(_msgpack_lib msgpackc INTERFACE_LOCATION)
get_filename_component(_prefix ${_msgpack_lib} DIRECTORY)
get_filename_component(_prefix ${_prefix}/.. ABSOLUTE)
add_library(msgpack::msgpack INTERFACE IMPORTED)
set_target_properties(msgpack::msgpack PROPERTIES
INTERFACE_INCLUDE_DIRECTORIES "${_prefix}/include"
)
endif()

4190
cmake/cotire.cmake Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -1,54 +0,0 @@
---
Language: Cpp
#AccessModifierOffset: -4
ConstructorInitializerIndentWidth: 4
AlignEscapedNewlinesLeft: true
AlignTrailingComments: true
AllowAllParametersOfDeclarationOnNextLine: false
AllowShortIfStatementsOnASingleLine: false
AllowShortLoopsOnASingleLine: false
AllowShortFunctionsOnASingleLine: false
AlwaysBreakTemplateDeclarations: true
# It is broken on windows. Breaks all #include "header.h"
#AlwaysBreakBeforeMultilineStrings: true
BreakBeforeBinaryOperators: false
BreakBeforeTernaryOperators: true
BreakConstructorInitializersBeforeComma: true
BinPackParameters: false
ColumnLimit: 160
ConstructorInitializerAllOnOneLineOrOnePerLine: false
ConstructorInitializerIndentWidth: 4
DerivePointerBinding: false
ExperimentalAutoDetectBinPacking: false
IndentCaseLabels: true
MaxEmptyLinesToKeep: 1
NamespaceIndentation: None
ObjCSpaceAfterProperty: false
ObjCSpaceBeforeProtocolList: false
PenaltyBreakBeforeFirstCallParameter: 1
PenaltyBreakComment: 300
PenaltyBreakString: 1000
PenaltyBreakFirstLessLess: 120
PenaltyExcessCharacter: 1000000
PenaltyReturnTypeOnItsOwnLine: 200
PointerBindsToType: true
SpacesBeforeTrailingComments: 1
Cpp11BracedListStyle: false
Standard: Cpp11
IndentWidth: 4
TabWidth: 4
UseTab: Never
BreakBeforeBraces: Allman
IndentFunctionDeclarationAfterType: true
SpacesInParentheses: false
SpacesInAngles: false
SpaceInEmptyParentheses: false
SpacesInCStyleCastParentheses: false
SpacesInContainerLiterals: true
SpaceBeforeAssignmentOperators: true
ContinuationIndentWidth: 4
CommentPragmas: '^ IWYU pragma:'
SpaceBeforeParens: ControlStatements
...

View File

@@ -191,16 +191,25 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/options/startConfigExample.sh.in ${CM
#################################
# define libFairMQ build target #
#################################
add_library(FairMQ SHARED
if(FAST_BUILD)
set(_target FairMQ_)
else()
set(_target FairMQ)
endif()
add_library(${_target} SHARED
${FAIRMQ_SOURCE_FILES}
${FAIRMQ_PUBLIC_HEADER_FILES} # for IDE integration
${FAIRMQ_PRIVATE_HEADER_FILES} # for IDE integration
)
set_target_properties(${_target} PROPERTIES LABELS coverage)
if(FAST_BUILD)
set_target_properties(${_target} PROPERTIES OUTPUT_NAME FairMQ)
endif()
#######################
# include directories #
#######################
target_include_directories(FairMQ
target_include_directories(${_target}
PUBLIC # consumers inherit public include directories
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}>
$<BUILD_INTERFACE:${CMAKE_SOURCE_DIR}>
@@ -213,15 +222,21 @@ target_include_directories(FairMQ
# link libraries #
##################
if(BUILD_NANOMSG_TRANSPORT)
set(NANOMSG_DEPS nanomsg msgpackc)
set(NANOMSG_DEPS nanomsg msgpack::msgpack)
endif()
if(BUILD_OFI_TRANSPORT)
set(OFI_DEPS OFI::libfabric protobuf::libprotobuf $<TARGET_OBJECTS:OfiTransport>)
endif()
target_link_libraries(FairMQ
set(optional_deps ${NANOMSG_DEPS} ${OFI_DEPS})
if(optional_deps)
list(REMOVE_DUPLICATES optional_deps)
endif()
target_link_libraries(${_target}
INTERFACE # only consumers link against interface dependencies
PUBLIC # libFairMQ AND consumers of libFairMQ link aginst public dependencies
Threads::Threads
dl
Boost::boost
Boost::program_options
@@ -238,15 +253,30 @@ target_link_libraries(FairMQ
${NANOMSG_DEPS}
${OFI_DEPS}
)
set_target_properties(FairMQ PROPERTIES
VERSION ${PROJECT_VERSION}
set_target_properties(${_target} PROPERTIES
VERSION ${PROJECT_GIT_VERSION}
SOVERSION "${PROJECT_VERSION_MAJOR}.${PROJECT_VERSION_MINOR}"
)
##############
# fast build #
##############
if(FAST_BUILD)
set_target_properties(${_target} PROPERTIES
COTIRE_UNITY_TARGET_NAME "FairMQ"
# COTIRE_ENABLE_PRECOMPILED_HEADER FALSE
EXCLUDE_FROM_ALL TRUE
)
cotire(${_target})
set_target_properties(FairMQ PROPERTIES EXCLUDE_FROM_ALL FALSE)
set_target_properties(FairMQ PROPERTIES LABELS coverage)
endif()
###############
# executables #
###############
add_executable(fairmq-bsampler run/runBenchmarkSampler.cxx)
target_link_libraries(fairmq-bsampler FairMQ)
@@ -274,11 +304,12 @@ target_link_libraries(fairmq-shmmonitor FairMQ)
add_executable(fairmq-uuid-gen run/runUuidGenerator.cxx)
target_link_libraries(fairmq-uuid-gen FairMQ)
###########
# install #
###########
install(
TARGETS # FairMQFull, tests are not installed
TARGETS
FairMQ
fairmq-bsampler
fairmq-merger
@@ -290,8 +321,8 @@ install(
fairmq-uuid-gen
EXPORT ${PROJECT_EXPORT_SET}
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
)
# preserve relative path and prepend fairmq

View File

@@ -13,11 +13,12 @@
using namespace fair::mq;
DeviceRunner::DeviceRunner(int argc, char* const argv[])
: fRawCmdLineArgs{tools::ToStrVector(argc, argv, false)}
, fPluginManager{PluginManager::MakeFromCommandLineOptions(fRawCmdLineArgs)}
, fDevice{nullptr}
{
}
: fRawCmdLineArgs(tools::ToStrVector(argc, argv, false))
, fConfig()
, fDevice(nullptr)
, fPluginManager(fRawCmdLineArgs)
, fEvents()
{}
auto DeviceRunner::Run() -> int
{
@@ -26,16 +27,16 @@ auto DeviceRunner::Run() -> int
////////////////////////
// Load builtin plugins last
fPluginManager->LoadPlugin("s:control");
fPluginManager.LoadPlugin("s:control");
////// CALL HOOK ///////
fEvents.Emit<hooks::SetCustomCmdLineOptions>(*this);
////////////////////////
fPluginManager->ForEachPluginProgOptions([&](boost::program_options::options_description options){
fPluginManager.ForEachPluginProgOptions([&](boost::program_options::options_description options){
fConfig.AddToCmdLineOptions(options);
});
fConfig.AddToCmdLineOptions(fPluginManager->ProgramOptions());
fConfig.AddToCmdLineOptions(fPluginManager.ProgramOptions());
////// CALL HOOK ///////
fEvents.Emit<hooks::ModifyRawCmdLineArgs>(*this);
@@ -82,13 +83,16 @@ auto DeviceRunner::Run() -> int
fDevice->SetConfig(fConfig);
// Initialize plugin services
fPluginManager->EmplacePluginServices(&fConfig, fDevice);
fPluginManager.EmplacePluginServices(fConfig, *fDevice);
// Instantiate and run plugins
fPluginManager->InstantiatePlugins();
fPluginManager.InstantiatePlugins();
// Run the device
fDevice->RunStateMachine();
// Wait for control plugin to release device control
fPluginManager->WaitForPluginsToReleaseDeviceControl();
fPluginManager.WaitForPluginsToReleaseDeviceControl();
return 0;
}

View File

@@ -62,9 +62,9 @@ class DeviceRunner
auto RemoveHook() -> void { fEvents.Unsubscribe<H>("runner"); }
std::vector<std::string> fRawCmdLineArgs;
std::shared_ptr<PluginManager> fPluginManager;
FairMQProgOptions fConfig;
std::shared_ptr<FairMQDevice> fDevice;
std::unique_ptr<FairMQDevice> fDevice;
PluginManager fPluginManager;
private:
EventManager fEvents;

View File

@@ -755,7 +755,7 @@ void FairMQChannel::CheckSendCompatibility(FairMQMessagePtr& msg) const
// LOG(debug) << "Channel type does not match message type. Creating wrapper";
FairMQMessagePtr msgWrapper(NewMessage(msg->GetData(),
msg->GetSize(),
[](void* /*data*/, void* msg) { delete static_cast<FairMQMessage*>(msg); },
[](void* /*data*/, void* _msg) { delete static_cast<FairMQMessage*>(_msg); },
msg.get()
));
msg.release();
@@ -772,7 +772,7 @@ void FairMQChannel::CheckSendCompatibility(vector<FairMQMessagePtr>& msgVec) con
// LOG(debug) << "Channel type does not match message type. Creating wrapper";
FairMQMessagePtr msgWrapper(NewMessage(msg->GetData(),
msg->GetSize(),
[](void* /*data*/, void* msg) { delete static_cast<FairMQMessage*>(msg); },
[](void* /*data*/, void* _msg) { delete static_cast<FairMQMessage*>(_msg); },
msg.get()
));
msg.release();

View File

@@ -416,6 +416,8 @@ class FairMQDevice : public FairMQStateMachine
void SetRawCmdLineArgs(const std::vector<std::string>& args) { fRawCmdLineArgs = args; }
std::vector<std::string> GetRawCmdLineArgs() const { return fRawCmdLineArgs; }
void RunStateMachine() { ProcessWork(); };
protected:
std::shared_ptr<FairMQTransportFactory> fTransportFactory; ///< Default transport factory
std::unordered_map<fair::mq::Transport, std::shared_ptr<FairMQTransportFactory>> fTransports; ///< Container for transports

View File

@@ -13,6 +13,7 @@
*/
#include "FairMQStateMachine.h"
#include <fairmq/Tools.h>
// Increase maximum number of boost::msm states (default is 10)
// This #define has to be before any msm header includes
@@ -29,13 +30,20 @@
#include <atomic>
#include <condition_variable>
#include <thread>
#include <chrono>
#include <array>
#include <unordered_map>
using namespace std;
using namespace boost::msm::front;
namespace msmf = boost::msm::front;
namespace std
{
template<>
struct hash<FairMQStateMachine::Event> : fair::mq::tools::HashEnum<FairMQStateMachine::Event> {};
} /* namespace std */
namespace fair
{
@@ -44,35 +52,111 @@ namespace mq
namespace fsm
{
// defining events for the boost MSM state machine
struct INIT_DEVICE_E { string name() const { return "INIT_DEVICE"; } };
struct internal_DEVICE_READY_E { string name() const { return "internal_DEVICE_READY"; } };
struct INIT_TASK_E { string name() const { return "INIT_TASK"; } };
struct internal_READY_E { string name() const { return "internal_READY"; } };
struct RUN_E { string name() const { return "RUN"; } };
struct PAUSE_E { string name() const { return "PAUSE"; } };
struct STOP_E { string name() const { return "STOP"; } };
struct RESET_TASK_E { string name() const { return "RESET_TASK"; } };
struct RESET_DEVICE_E { string name() const { return "RESET_DEVICE"; } };
struct internal_IDLE_E { string name() const { return "internal_IDLE"; } };
struct END_E { string name() const { return "END"; } };
struct ERROR_FOUND_E { string name() const { return "ERROR_FOUND"; } };
// list of FSM states
struct OK_FSM_STATE : public state<> { static string Name() { return "OK"; } static FairMQStateMachine::State Type() { return FairMQStateMachine::State::OK; } };
struct ERROR_FSM_STATE : public terminate_state<> { static string Name() { return "ERROR"; } static FairMQStateMachine::State Type() { return FairMQStateMachine::State::Error; } };
// deactivate the warning for non-virtual destructor thrown in the boost library
#if defined(__clang__)
_Pragma("clang diagnostic push")
_Pragma("clang diagnostic ignored \"-Wnon-virtual-dtor\"")
#elif defined(__GNUC__) || defined(__GNUG__)
_Pragma("GCC diagnostic push")
_Pragma("GCC diagnostic ignored \"-Wnon-virtual-dtor\"")
#endif
struct IDLE_FSM_STATE : public state<> { static string Name() { return "IDLE"; } static FairMQStateMachine::State Type() { return FairMQStateMachine::State::IDLE; } };
struct INITIALIZING_DEVICE_FSM_STATE : public state<> { static string Name() { return "INITIALIZING_DEVICE"; } static FairMQStateMachine::State Type() { return FairMQStateMachine::State::INITIALIZING_DEVICE; } };
struct DEVICE_READY_FSM_STATE : public state<> { static string Name() { return "DEVICE_READY"; } static FairMQStateMachine::State Type() { return FairMQStateMachine::State::DEVICE_READY; } };
struct INITIALIZING_TASK_FSM_STATE : public state<> { static string Name() { return "INITIALIZING_TASK"; } static FairMQStateMachine::State Type() { return FairMQStateMachine::State::INITIALIZING_TASK; } };
struct READY_FSM_STATE : public state<> { static string Name() { return "READY"; } static FairMQStateMachine::State Type() { return FairMQStateMachine::State::READY; } };
struct RUNNING_FSM_STATE : public state<> { static string Name() { return "RUNNING"; } static FairMQStateMachine::State Type() { return FairMQStateMachine::State::RUNNING; } };
struct PAUSED_FSM_STATE : public state<> { static string Name() { return "PAUSED"; } static FairMQStateMachine::State Type() { return FairMQStateMachine::State::PAUSED; } };
struct RESETTING_TASK_FSM_STATE : public state<> { static string Name() { return "RESETTING_TASK"; } static FairMQStateMachine::State Type() { return FairMQStateMachine::State::RESETTING_TASK; } };
struct RESETTING_DEVICE_FSM_STATE : public state<> { static string Name() { return "RESETTING_DEVICE"; } static FairMQStateMachine::State Type() { return FairMQStateMachine::State::RESETTING_DEVICE; } };
struct EXITING_FSM_STATE : public state<> { static string Name() { return "EXITING"; } static FairMQStateMachine::State Type() { return FairMQStateMachine::State::EXITING; } };
// list of FSM events
struct INIT_DEVICE_FSM_EVENT { static string Name() { return "INIT_DEVICE"; } static FairMQStateMachine::Event Type() { return FairMQStateMachine::Event::INIT_DEVICE; } };
struct internal_DEVICE_READY_FSM_EVENT { static string Name() { return "internal_DEVICE_READY"; } static FairMQStateMachine::Event Type() { return FairMQStateMachine::Event::internal_DEVICE_READY; } };
struct INIT_TASK_FSM_EVENT { static string Name() { return "INIT_TASK"; } static FairMQStateMachine::Event Type() { return FairMQStateMachine::Event::INIT_TASK; } };
struct internal_READY_FSM_EVENT { static string Name() { return "internal_READY"; } static FairMQStateMachine::Event Type() { return FairMQStateMachine::Event::internal_READY; } };
struct RUN_FSM_EVENT { static string Name() { return "RUN"; } static FairMQStateMachine::Event Type() { return FairMQStateMachine::Event::RUN; } };
struct PAUSE_FSM_EVENT { static string Name() { return "PAUSE"; } static FairMQStateMachine::Event Type() { return FairMQStateMachine::Event::PAUSE; } };
struct STOP_FSM_EVENT { static string Name() { return "STOP"; } static FairMQStateMachine::Event Type() { return FairMQStateMachine::Event::STOP; } };
struct RESET_TASK_FSM_EVENT { static string Name() { return "RESET_TASK"; } static FairMQStateMachine::Event Type() { return FairMQStateMachine::Event::RESET_TASK; } };
struct RESET_DEVICE_FSM_EVENT { static string Name() { return "RESET_DEVICE"; } static FairMQStateMachine::Event Type() { return FairMQStateMachine::Event::RESET_DEVICE; } };
struct internal_IDLE_FSM_EVENT { static string Name() { return "internal_IDLE"; } static FairMQStateMachine::Event Type() { return FairMQStateMachine::Event::internal_IDLE; } };
struct END_FSM_EVENT { static string Name() { return "END"; } static FairMQStateMachine::Event Type() { return FairMQStateMachine::Event::END; } };
struct ERROR_FOUND_FSM_EVENT { static string Name() { return "ERROR_FOUND"; } static FairMQStateMachine::Event Type() { return FairMQStateMachine::Event::ERROR_FOUND; } };
static array<string, 12> stateNames =
{
{
"OK",
"Error",
"IDLE",
"INITIALIZING_DEVICE",
"DEVICE_READY",
"INITIALIZING_TASK",
"READY",
"RUNNING",
"PAUSED",
"RESETTING_TASK",
"RESETTING_DEVICE",
"EXITING"
}
};
static array<string, 12> eventNames =
{
{
"INIT_DEVICE",
"internal_DEVICE_READY",
"INIT_TASK",
"internal_READY",
"RUN",
"PAUSE",
"STOP",
"RESET_TASK",
"RESET_DEVICE",
"internal_IDLE",
"END",
"ERROR_FOUND"
}
};
static map<string, int> stateNumbers =
{
{ "OK", FairMQStateMachine::State::OK },
{ "Error", FairMQStateMachine::State::Error },
{ "IDLE", FairMQStateMachine::State::IDLE },
{ "INITIALIZING_DEVICE", FairMQStateMachine::State::INITIALIZING_DEVICE },
{ "DEVICE_READY", FairMQStateMachine::State::DEVICE_READY },
{ "INITIALIZING_TASK", FairMQStateMachine::State::INITIALIZING_TASK },
{ "READY", FairMQStateMachine::State::READY },
{ "RUNNING", FairMQStateMachine::State::RUNNING },
{ "PAUSED", FairMQStateMachine::State::PAUSED },
{ "RESETTING_TASK", FairMQStateMachine::State::RESETTING_TASK },
{ "RESETTING_DEVICE", FairMQStateMachine::State::RESETTING_DEVICE },
{ "EXITING", FairMQStateMachine::State::EXITING }
};
static map<string, int> eventNumbers =
{
{ "INIT_DEVICE", FairMQStateMachine::Event::INIT_DEVICE },
{ "internal_DEVICE_READY", FairMQStateMachine::Event::internal_DEVICE_READY },
{ "INIT_TASK", FairMQStateMachine::Event::INIT_TASK },
{ "internal_READY", FairMQStateMachine::Event::internal_READY },
{ "RUN", FairMQStateMachine::Event::RUN },
{ "PAUSE", FairMQStateMachine::Event::PAUSE },
{ "STOP", FairMQStateMachine::Event::STOP },
{ "RESET_TASK", FairMQStateMachine::Event::RESET_TASK },
{ "RESET_DEVICE", FairMQStateMachine::Event::RESET_DEVICE },
{ "internal_IDLE", FairMQStateMachine::Event::internal_IDLE },
{ "END", FairMQStateMachine::Event::END },
{ "ERROR_FOUND", FairMQStateMachine::Event::ERROR_FOUND }
};
// defining the boost MSM state machine
struct Machine_ : public msmf::state_machine_def<Machine_>
struct Machine_ : public state_machine_def<Machine_>
{
public:
Machine_()
: fWork()
: fUnblockHandler()
, fStateHandlers()
, fWork()
, fWorkAvailableCondition()
, fWorkDoneCondition()
, fWorkMutex()
@@ -81,23 +165,22 @@ struct Machine_ : public msmf::state_machine_def<Machine_>
, fWorkAvailable(false)
, fStateChangeSignal()
, fStateChangeSignalsMap()
, fTerminationRequested(false)
, fState()
, fWorkerThread()
{}
virtual ~Machine_()
{}
// initial states
using initial_state = boost::mpl::vector<IDLE_FSM_STATE, OK_FSM_STATE>;
template<typename Event, typename FSM>
void on_entry(Event const&, FSM& fsm)
{
LOG(state) << "Starting FairMQ state machine";
fState = FairMQStateMachine::IDLE;
LOG(state) << "Entering IDLE state";
fsm.CallStateChangeCallbacks(FairMQStateMachine::IDLE);
// start a worker thread to execute user states in.
fsm.fWorkerThread = thread(&Machine_::Worker, &fsm);
}
template<typename Event, typename FSM>
@@ -106,41 +189,23 @@ struct Machine_ : public msmf::state_machine_def<Machine_>
LOG(state) << "Exiting FairMQ state machine";
}
// list of FSM states
struct OK_FSM : public msmf::state<> {};
struct ERROR_FSM : public msmf::terminate_state<> {};
struct IDLE_FSM : public msmf::state<> {};
struct INITIALIZING_DEVICE_FSM : public msmf::state<> {};
struct DEVICE_READY_FSM : public msmf::state<> {};
struct INITIALIZING_TASK_FSM : public msmf::state<> {};
struct READY_FSM : public msmf::state<> {};
struct RUNNING_FSM : public msmf::state<> {};
struct PAUSED_FSM : public msmf::state<> {};
struct RESETTING_TASK_FSM : public msmf::state<> {};
struct RESETTING_DEVICE_FSM : public msmf::state<> {};
struct EXITING_FSM : public msmf::state<> {};
// initial states
using initial_state = boost::mpl::vector<IDLE_FSM, OK_FSM>;
// actions
struct IdleFct
struct AutomaticFct
{
template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
void operator()(EVT const&, FSM& fsm, SourceState& /* ss */, TargetState& ts)
{
LOG(state) << "Entering IDLE state";
fsm.fState = FairMQStateMachine::IDLE;
fsm.fState = ts.Type();
LOG(state) << "Entering " << ts.Name() << " state";
}
};
struct InitDeviceFct
struct DefaultFct
{
template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
void operator()(EVT const& e, FSM& fsm, SourceState& /* ss */, TargetState& ts)
{
fsm.fState = FairMQStateMachine::INITIALIZING_DEVICE;
fsm.fState = ts.Type();
unique_lock<mutex> lock(fsm.fWorkMutex);
while (fsm.fWorkActive)
@@ -148,66 +213,8 @@ struct Machine_ : public msmf::state_machine_def<Machine_>
fsm.fWorkDoneCondition.wait(lock);
}
fsm.fWorkAvailable = true;
LOG(state) << "Entering INITIALIZING DEVICE state";
fsm.fWork = fsm.fInitWrapperHandler;
fsm.fWorkAvailableCondition.notify_one();
}
};
struct DeviceReadyFct
{
template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
{
LOG(state) << "Entering DEVICE READY state";
fsm.fState = FairMQStateMachine::DEVICE_READY;
}
};
struct InitTaskFct
{
template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
{
fsm.fState = FairMQStateMachine::INITIALIZING_TASK;
unique_lock<mutex> lock(fsm.fWorkMutex);
while (fsm.fWorkActive)
{
fsm.fWorkDoneCondition.wait(lock);
}
fsm.fWorkAvailable = true;
LOG(state) << "Entering INITIALIZING TASK state";
fsm.fWork = fsm.fInitTaskWrapperHandler;
fsm.fWorkAvailableCondition.notify_one();
}
};
struct ReadyFct
{
template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
{
LOG(state) << "Entering READY state";
fsm.fState = FairMQStateMachine::READY;
}
};
struct RunFct
{
template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
{
fsm.fState = FairMQStateMachine::RUNNING;
unique_lock<mutex> lock(fsm.fWorkMutex);
while (fsm.fWorkActive)
{
fsm.fWorkDoneCondition.wait(lock);
}
fsm.fWorkAvailable = true;
LOG(state) << "Entering RUNNING state";
fsm.fWork = fsm.fRunWrapperHandler;
LOG(state) << "Entering " << ts.Name() << " state";
fsm.fWork = fsm.fStateHandlers.at(e.Type());
fsm.fWorkAvailableCondition.notify_one();
}
};
@@ -215,9 +222,9 @@ struct Machine_ : public msmf::state_machine_def<Machine_>
struct PauseFct
{
template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
void operator()(EVT const&, FSM& fsm, SourceState& /* ss */, TargetState& ts)
{
fsm.fState = FairMQStateMachine::PAUSED;
fsm.fState = ts.Type();
fsm.fUnblockHandler();
unique_lock<mutex> lock(fsm.fWorkMutex);
@@ -226,37 +233,18 @@ struct Machine_ : public msmf::state_machine_def<Machine_>
fsm.fWorkDoneCondition.wait(lock);
}
fsm.fWorkAvailable = true;
LOG(state) << "Entering PAUSED state";
LOG(state) << "Entering " << ts.Name() << " state";
fsm.fWork = fsm.fPauseWrapperHandler;
fsm.fWorkAvailableCondition.notify_one();
}
};
struct ResumeFct
{
template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
{
fsm.fState = FairMQStateMachine::RUNNING;
unique_lock<mutex> lock(fsm.fWorkMutex);
while (fsm.fWorkActive)
{
fsm.fWorkDoneCondition.wait(lock);
}
fsm.fWorkAvailable = true;
LOG(state) << "Entering RUNNING state";
fsm.fWork = fsm.fRunWrapperHandler;
fsm.fWorkAvailableCondition.notify_one();
}
};
struct StopFct
{
template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
void operator()(EVT const&, FSM& fsm, SourceState& /* ss */, TargetState& ts)
{
fsm.fState = FairMQStateMachine::READY;
fsm.fState = ts.Type();
fsm.fUnblockHandler();
unique_lock<mutex> lock(fsm.fWorkMutex);
@@ -264,115 +252,70 @@ struct Machine_ : public msmf::state_machine_def<Machine_>
{
fsm.fWorkDoneCondition.wait(lock);
}
LOG(state) << "Entering READY state";
LOG(state) << "Entering " << ts.Name() << " state";
}
};
struct InternalStopFct
{
template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
void operator()(EVT const&, FSM& fsm, SourceState& /* ss */, TargetState& ts)
{
fsm.fState = FairMQStateMachine::READY;
fsm.fState = ts.Type();
fsm.fUnblockHandler();
LOG(state) << "RUNNING state finished without an external event, entering READY state";
}
};
struct ResetTaskFct
{
template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
{
fsm.fState = FairMQStateMachine::RESETTING_TASK;
unique_lock<mutex> lock(fsm.fWorkMutex);
while (fsm.fWorkActive)
{
fsm.fWorkDoneCondition.wait(lock);
}
fsm.fWorkAvailable = true;
LOG(state) << "Entering RESETTING TASK state";
fsm.fWork = fsm.fResetTaskWrapperHandler;
fsm.fWorkAvailableCondition.notify_one();
}
};
struct ResetDeviceFct
{
template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
{
fsm.fState = FairMQStateMachine::RESETTING_DEVICE;
unique_lock<mutex> lock(fsm.fWorkMutex);
while (fsm.fWorkActive)
{
fsm.fWorkDoneCondition.wait(lock);
}
fsm.fWorkAvailable = true;
LOG(state) << "Entering RESETTING DEVICE state";
fsm.fWork = fsm.fResetWrapperHandler;
fsm.fWorkAvailableCondition.notify_one();
LOG(state) << "RUNNING state finished without an external event, entering " << ts.Name() << " state";
}
};
struct ExitingFct
{
template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
void operator()(EVT const& e, FSM& fsm, SourceState& /* ss */, TargetState& ts)
{
LOG(state) << "Entering EXITING state";
fsm.fState = FairMQStateMachine::EXITING;
fsm.fTerminationRequested = true;
LOG(state) << "Entering " << ts.Name() << " state";
fsm.fState = ts.Type();
fsm.CallStateChangeCallbacks(FairMQStateMachine::EXITING);
// terminate worker thread
// Stop ProcessWork()
{
lock_guard<mutex> lock(fsm.fWorkMutex);
fsm.fWorkerTerminated = true;
fsm.fWorkAvailableCondition.notify_one();
}
// join the worker thread (executing user states)
if (fsm.fWorkerThread.joinable())
{
fsm.fWorkerThread.join();
}
fsm.fExitHandler();
fsm.fStateHandlers.at(e.Type())();
}
};
struct ErrorFoundFct
{
template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
void operator()(EVT const&, FSM& fsm, SourceState& /* ss */, TargetState& ts)
{
LOG(state) << "Entering ERROR state";
fsm.fState = FairMQStateMachine::Error;
fsm.fState = ts.Type();
LOG(state) << "Entering " << ts.Name() << " state";
fsm.CallStateChangeCallbacks(FairMQStateMachine::Error);
}
};
// Transition table for Machine_
struct transition_table : boost::mpl::vector<
// Start Event Next Action Guard
msmf::Row<IDLE_FSM, INIT_DEVICE_E, INITIALIZING_DEVICE_FSM, InitDeviceFct, msmf::none>,
msmf::Row<IDLE_FSM, END_E, EXITING_FSM, ExitingFct, msmf::none>,
msmf::Row<INITIALIZING_DEVICE_FSM, internal_DEVICE_READY_E, DEVICE_READY_FSM, DeviceReadyFct, msmf::none>,
msmf::Row<DEVICE_READY_FSM, INIT_TASK_E, INITIALIZING_TASK_FSM, InitTaskFct, msmf::none>,
msmf::Row<DEVICE_READY_FSM, RESET_DEVICE_E, RESETTING_DEVICE_FSM, ResetDeviceFct, msmf::none>,
msmf::Row<INITIALIZING_TASK_FSM, internal_READY_E, READY_FSM, ReadyFct, msmf::none>,
msmf::Row<READY_FSM, RUN_E, RUNNING_FSM, RunFct, msmf::none>,
msmf::Row<READY_FSM, RESET_TASK_E, RESETTING_TASK_FSM, ResetTaskFct, msmf::none>,
msmf::Row<RUNNING_FSM, PAUSE_E, PAUSED_FSM, PauseFct, msmf::none>,
msmf::Row<RUNNING_FSM, STOP_E, READY_FSM, StopFct, msmf::none>,
msmf::Row<RUNNING_FSM, internal_READY_E, READY_FSM, InternalStopFct, msmf::none>,
msmf::Row<PAUSED_FSM, RUN_E, RUNNING_FSM, ResumeFct, msmf::none>,
msmf::Row<RESETTING_TASK_FSM, internal_DEVICE_READY_E, DEVICE_READY_FSM, DeviceReadyFct, msmf::none>,
msmf::Row<RESETTING_DEVICE_FSM, internal_IDLE_E, IDLE_FSM, IdleFct, msmf::none>,
msmf::Row<OK_FSM, ERROR_FOUND_E, ERROR_FSM, ErrorFoundFct, msmf::none>>
// Start Event Next Action Guard
Row<IDLE_FSM_STATE, INIT_DEVICE_FSM_EVENT, INITIALIZING_DEVICE_FSM_STATE, DefaultFct, none>,
Row<IDLE_FSM_STATE, END_FSM_EVENT, EXITING_FSM_STATE, ExitingFct, none>,
Row<INITIALIZING_DEVICE_FSM_STATE, internal_DEVICE_READY_FSM_EVENT, DEVICE_READY_FSM_STATE, AutomaticFct, none>,
Row<DEVICE_READY_FSM_STATE, INIT_TASK_FSM_EVENT, INITIALIZING_TASK_FSM_STATE, DefaultFct, none>,
Row<DEVICE_READY_FSM_STATE, RESET_DEVICE_FSM_EVENT, RESETTING_DEVICE_FSM_STATE, DefaultFct, none>,
Row<INITIALIZING_TASK_FSM_STATE, internal_READY_FSM_EVENT, READY_FSM_STATE, AutomaticFct, none>,
Row<READY_FSM_STATE, RUN_FSM_EVENT, RUNNING_FSM_STATE, DefaultFct, none>,
Row<READY_FSM_STATE, RESET_TASK_FSM_EVENT, RESETTING_TASK_FSM_STATE, DefaultFct, none>,
Row<RUNNING_FSM_STATE, PAUSE_FSM_EVENT, PAUSED_FSM_STATE, DefaultFct, none>,
Row<RUNNING_FSM_STATE, STOP_FSM_EVENT, READY_FSM_STATE, StopFct, none>,
Row<RUNNING_FSM_STATE, internal_READY_FSM_EVENT, READY_FSM_STATE, InternalStopFct, none>,
Row<PAUSED_FSM_STATE, RUN_FSM_EVENT, RUNNING_FSM_STATE, DefaultFct, none>,
Row<RESETTING_TASK_FSM_STATE, internal_DEVICE_READY_FSM_EVENT, DEVICE_READY_FSM_STATE, AutomaticFct, none>,
Row<RESETTING_DEVICE_FSM_STATE, internal_IDLE_FSM_EVENT, IDLE_FSM_STATE, AutomaticFct, none>,
Row<OK_FSM_STATE, ERROR_FOUND_FSM_EVENT, ERROR_FSM_STATE, ErrorFoundFct, none>>
{};
// replaces the default no-transition response.
@@ -391,45 +334,12 @@ struct Machine_ : public msmf::state_machine_def<Machine_>
if (pos != string::npos)
{
stateName = stateName.substr(pos + 1);
stateName = stateName.substr(0, stateName.size() - 4);
stateName = stateName.substr(0, stateName.size() - 10);
}
if (stateName != "OK")
{
LOG(state) << "No transition from state " << stateName << " on event " << e.name();
}
}
static string GetStateName(const int state)
{
switch(state)
{
case FairMQStateMachine::OK:
return "OK";
case FairMQStateMachine::Error:
return "Error";
case FairMQStateMachine::IDLE:
return "IDLE";
case FairMQStateMachine::INITIALIZING_DEVICE:
return "INITIALIZING_DEVICE";
case FairMQStateMachine::DEVICE_READY:
return "DEVICE_READY";
case FairMQStateMachine::INITIALIZING_TASK:
return "INITIALIZING_TASK";
case FairMQStateMachine::READY:
return "READY";
case FairMQStateMachine::RUNNING:
return "RUNNING";
case FairMQStateMachine::PAUSED:
return "PAUSED";
case FairMQStateMachine::RESETTING_TASK:
return "RESETTING_TASK";
case FairMQStateMachine::RESETTING_DEVICE:
return "RESETTING_DEVICE";
case FairMQStateMachine::EXITING:
return "EXITING";
default:
return "requested name for non-existent state...";
LOG(state) << "No transition from state " << stateName << " on event " << e.Name();
}
}
@@ -441,14 +351,8 @@ struct Machine_ : public msmf::state_machine_def<Machine_>
}
}
function<void(void)> fInitWrapperHandler;
function<void(void)> fInitTaskWrapperHandler;
function<void(void)> fRunWrapperHandler;
function<void(void)> fPauseWrapperHandler;
function<void(void)> fResetWrapperHandler;
function<void(void)> fResetTaskWrapperHandler;
function<void(void)> fExitHandler;
function<void(void)> fUnblockHandler;
unordered_map<FairMQStateMachine::Event, function<void(void)>> fStateHandlers;
// function to execute user states in a worker thread
function<void(void)> fWork;
@@ -461,12 +365,10 @@ struct Machine_ : public msmf::state_machine_def<Machine_>
boost::signals2::signal<void(const FairMQStateMachine::State)> fStateChangeSignal;
unordered_map<string, boost::signals2::connection> fStateChangeSignalsMap;
atomic<bool> fTerminationRequested;
atomic<FairMQStateMachine::State> fState;
private:
void Worker()
void ProcessWork()
{
while (true)
{
@@ -475,7 +377,7 @@ struct Machine_ : public msmf::state_machine_def<Machine_>
// Wait for work to be done.
while (!fWorkAvailable && !fWorkerTerminated)
{
fWorkAvailableCondition.wait(lock);
fWorkAvailableCondition.wait_for(lock, chrono::milliseconds(100));
}
if (fWorkerTerminated)
@@ -497,20 +399,10 @@ struct Machine_ : public msmf::state_machine_def<Machine_>
CallStateChangeCallbacks(fState);
}
}
// run state handlers in a separate thread
thread fWorkerThread;
}; // Machine_
using FairMQFSM = boost::msm::back::state_machine<Machine_>;
// reactivate the warning for non-virtual destructor
#if defined(__clang__)
_Pragma("clang diagnostic pop")
#elif defined(__GNUC__) || defined(__GNUG__)
_Pragma("GCC diagnostic pop")
#endif
} // namespace fsm
} // namespace mq
} // namespace fair
@@ -521,13 +413,13 @@ FairMQStateMachine::FairMQStateMachine()
: fChangeStateMutex()
, fFsm(new FairMQFSM)
{
static_pointer_cast<FairMQFSM>(fFsm)->fInitWrapperHandler = bind(&FairMQStateMachine::InitWrapper, this);
static_pointer_cast<FairMQFSM>(fFsm)->fInitTaskWrapperHandler = bind(&FairMQStateMachine::InitTaskWrapper, this);
static_pointer_cast<FairMQFSM>(fFsm)->fRunWrapperHandler = bind(&FairMQStateMachine::RunWrapper, this);
static_pointer_cast<FairMQFSM>(fFsm)->fPauseWrapperHandler = bind(&FairMQStateMachine::PauseWrapper, this);
static_pointer_cast<FairMQFSM>(fFsm)->fResetWrapperHandler = bind(&FairMQStateMachine::ResetWrapper, this);
static_pointer_cast<FairMQFSM>(fFsm)->fResetTaskWrapperHandler = bind(&FairMQStateMachine::ResetTaskWrapper, this);
static_pointer_cast<FairMQFSM>(fFsm)->fExitHandler = bind(&FairMQStateMachine::Exit, this);
static_pointer_cast<FairMQFSM>(fFsm)->fStateHandlers.emplace(INIT_DEVICE, bind(&FairMQStateMachine::InitWrapper, this));
static_pointer_cast<FairMQFSM>(fFsm)->fStateHandlers.emplace(INIT_TASK, bind(&FairMQStateMachine::InitTaskWrapper, this));
static_pointer_cast<FairMQFSM>(fFsm)->fStateHandlers.emplace(RUN, bind(&FairMQStateMachine::RunWrapper, this));
static_pointer_cast<FairMQFSM>(fFsm)->fStateHandlers.emplace(PAUSE, bind(&FairMQStateMachine::PauseWrapper, this));
static_pointer_cast<FairMQFSM>(fFsm)->fStateHandlers.emplace(RESET_TASK, bind(&FairMQStateMachine::ResetTaskWrapper, this));
static_pointer_cast<FairMQFSM>(fFsm)->fStateHandlers.emplace(RESET_DEVICE, bind(&FairMQStateMachine::ResetWrapper, this));
static_pointer_cast<FairMQFSM>(fFsm)->fStateHandlers.emplace(END, bind(&FairMQStateMachine::Exit, this));
static_pointer_cast<FairMQFSM>(fFsm)->fUnblockHandler = bind(&FairMQStateMachine::Unblock, this);
static_pointer_cast<FairMQFSM>(fFsm)->start();
@@ -552,79 +444,79 @@ bool FairMQStateMachine::ChangeState(int event)
case INIT_DEVICE:
{
lock_guard<mutex> lock(fChangeStateMutex);
static_pointer_cast<FairMQFSM>(fFsm)->process_event(INIT_DEVICE_E());
static_pointer_cast<FairMQFSM>(fFsm)->process_event(INIT_DEVICE_FSM_EVENT());
return true;
}
case internal_DEVICE_READY:
{
lock_guard<mutex> lock(fChangeStateMutex);
static_pointer_cast<FairMQFSM>(fFsm)->process_event(internal_DEVICE_READY_E());
static_pointer_cast<FairMQFSM>(fFsm)->process_event(internal_DEVICE_READY_FSM_EVENT());
return true;
}
case INIT_TASK:
{
lock_guard<mutex> lock(fChangeStateMutex);
static_pointer_cast<FairMQFSM>(fFsm)->process_event(INIT_TASK_E());
static_pointer_cast<FairMQFSM>(fFsm)->process_event(INIT_TASK_FSM_EVENT());
return true;
}
case internal_READY:
{
lock_guard<mutex> lock(fChangeStateMutex);
static_pointer_cast<FairMQFSM>(fFsm)->process_event(internal_READY_E());
static_pointer_cast<FairMQFSM>(fFsm)->process_event(internal_READY_FSM_EVENT());
return true;
}
case RUN:
{
lock_guard<mutex> lock(fChangeStateMutex);
static_pointer_cast<FairMQFSM>(fFsm)->process_event(RUN_E());
static_pointer_cast<FairMQFSM>(fFsm)->process_event(RUN_FSM_EVENT());
return true;
}
case PAUSE:
{
lock_guard<mutex> lock(fChangeStateMutex);
static_pointer_cast<FairMQFSM>(fFsm)->process_event(PAUSE_E());
static_pointer_cast<FairMQFSM>(fFsm)->process_event(PAUSE_FSM_EVENT());
return true;
}
case STOP:
{
lock_guard<mutex> lock(fChangeStateMutex);
static_pointer_cast<FairMQFSM>(fFsm)->process_event(STOP_E());
static_pointer_cast<FairMQFSM>(fFsm)->process_event(STOP_FSM_EVENT());
return true;
}
case RESET_DEVICE:
{
lock_guard<mutex> lock(fChangeStateMutex);
static_pointer_cast<FairMQFSM>(fFsm)->process_event(RESET_DEVICE_E());
static_pointer_cast<FairMQFSM>(fFsm)->process_event(RESET_DEVICE_FSM_EVENT());
return true;
}
case RESET_TASK:
{
lock_guard<mutex> lock(fChangeStateMutex);
static_pointer_cast<FairMQFSM>(fFsm)->process_event(RESET_TASK_E());
static_pointer_cast<FairMQFSM>(fFsm)->process_event(RESET_TASK_FSM_EVENT());
return true;
}
case internal_IDLE:
{
lock_guard<mutex> lock(fChangeStateMutex);
static_pointer_cast<FairMQFSM>(fFsm)->process_event(internal_IDLE_E());
static_pointer_cast<FairMQFSM>(fFsm)->process_event(internal_IDLE_FSM_EVENT());
return true;
}
case END:
{
lock_guard<mutex> lock(fChangeStateMutex);
static_pointer_cast<FairMQFSM>(fFsm)->process_event(END_E());
static_pointer_cast<FairMQFSM>(fFsm)->process_event(END_FSM_EVENT());
return true;
}
case ERROR_FOUND:
{
lock_guard<mutex> lock(fChangeStateMutex);
static_pointer_cast<FairMQFSM>(fFsm)->process_event(ERROR_FOUND_E());
static_pointer_cast<FairMQFSM>(fFsm)->process_event(ERROR_FOUND_FSM_EVENT());
return true;
}
default:
{
LOG(error) << "Requested state transition with an unsupported event: " << event << endl
<< "Supported are: INIT_DEVICE, INIT_TASK, RUN, PAUSE, STOP, RESET_TASK, RESET_DEVICE, END, ERROR_FOUND";
<< "Supported are: INIT_DEVICE, INIT_TASK, RUN, PAUSE, STOP, RESET_TASK, RESET_DEVICE, END, ERROR_FOUND";
return false;
}
}
@@ -738,7 +630,11 @@ void FairMQStateMachine::CallStateChangeCallbacks(const State state) const
string FairMQStateMachine::GetCurrentStateName() const
{
return static_pointer_cast<FairMQFSM>(fFsm)->GetStateName(static_pointer_cast<FairMQFSM>(fFsm)->fState);
return GetStateName(static_pointer_cast<FairMQFSM>(fFsm)->fState);
}
string FairMQStateMachine::GetStateName(const State state)
{
return stateNames.at(state);
}
int FairMQStateMachine::GetCurrentState() const
{
@@ -753,23 +649,12 @@ bool FairMQStateMachine::CheckCurrentState(string state) const
return state == GetCurrentStateName();
}
bool FairMQStateMachine::Terminated()
void FairMQStateMachine::ProcessWork()
{
return static_pointer_cast<FairMQFSM>(fFsm)->fTerminationRequested;
static_pointer_cast<FairMQFSM>(fFsm)->ProcessWork();
}
int FairMQStateMachine::GetEventNumber(const string& event)
{
if (event == "INIT_DEVICE") return INIT_DEVICE;
if (event == "INIT_TASK") return INIT_TASK;
if (event == "RUN") return RUN;
if (event == "PAUSE") return PAUSE;
if (event == "STOP") return STOP;
if (event == "RESET_DEVICE") return RESET_DEVICE;
if (event == "RESET_TASK") return RESET_TASK;
if (event == "END") return END;
if (event == "ERROR_FOUND") return ERROR_FOUND;
LOG(error) << "Requested number for non-existent event... " << event << endl
<< "Supported are: INIT_DEVICE, INIT_TASK, RUN, PAUSE, STOP, RESET_DEVICE, RESET_TASK, END, ERROR_FOUND";
return -1;
return eventNumbers.at(event);
}

View File

@@ -79,10 +79,10 @@ class FairMQStateMachine
void CallStateChangeCallbacks(const State state) const;
std::string GetCurrentStateName() const;
static std::string GetStateName(const State);
int GetCurrentState() const;
bool CheckCurrentState(int state) const;
bool CheckCurrentState(std::string state) const;
bool Terminated();
// actions to be overwritten by derived classes
virtual void InitWrapper() {}
@@ -94,8 +94,10 @@ class FairMQStateMachine
virtual void Exit() {}
virtual void Unblock() {}
void ProcessWork();
private:
int GetEventNumber(const std::string& event);
static int GetEventNumber(const std::string& event);
std::mutex fChangeStateMutex;

View File

@@ -116,9 +116,9 @@ class Plugin
} /* namespace fair */
#define REGISTER_FAIRMQ_PLUGIN(KLASS, NAME, VERSION, MAINTAINER, HOMEPAGE, PROGOPTIONS) \
static auto Make_##NAME##_Plugin(fair::mq::PluginServices* pluginServices) -> std::shared_ptr<fair::mq::Plugin> \
static auto Make_##NAME##_Plugin(fair::mq::PluginServices* pluginServices) -> std::unique_ptr<fair::mq::Plugin> \
{ \
return std::make_shared<KLASS>(std::string{#NAME}, VERSION, std::string{MAINTAINER}, std::string{HOMEPAGE}, pluginServices); \
return fair::mq::tools::make_unique<KLASS>(std::string{#NAME}, VERSION, std::string{MAINTAINER}, std::string{HOMEPAGE}, pluginServices); \
} \
BOOST_DLL_ALIAS(Make_##NAME##_Plugin, make_##NAME##_plugin) \
BOOST_DLL_ALIAS(PROGOPTIONS, get_##NAME##_plugin_progoptions)

View File

@@ -31,13 +31,55 @@ const std::string fair::mq::PluginManager::fgkLibPrefix = "FairMQPlugin_";
fair::mq::PluginManager::PluginManager()
: fSearchPaths{{"."}}
, fPluginFactories()
, fPluginServices()
, fPlugins()
, fPluginOrder()
, fPluginProgOptions()
, fPluginServices()
{
}
fair::mq::PluginManager::PluginManager(const vector<string> args)
: fSearchPaths{{"."}}
, fPluginFactories()
, fPluginServices()
, fPlugins()
, fPluginOrder()
, fPluginProgOptions()
{
// Parse command line options
auto options = ProgramOptions();
auto vm = po::variables_map{};
try
{
auto parsed = po::command_line_parser(args).options(options).allow_unregistered().run();
po::store(parsed, vm);
po::notify(vm);
} catch (const po::error& e)
{
throw ProgramOptionsParseError{ToString("Error occured while parsing the 'Plugin Manager' program options: ", e.what())};
}
// Process plugin search paths
auto append = vector<fs::path>{};
auto prepend = vector<fs::path>{};
auto searchPaths = vector<fs::path>{};
if (vm.count("plugin-search-path"))
{
for (const auto& path : vm["plugin-search-path"].as<vector<string>>())
{
if (path.substr(0, 1) == "<") { prepend.emplace_back(path.substr(1)); }
else if (path.substr(0, 1) == ">") { append.emplace_back(path.substr(1)); }
else { searchPaths.emplace_back(path); }
}
}
// Set supplied options
SetSearchPaths(searchPaths);
for(const auto& path : prepend) { PrependSearchPath(path); }
for(const auto& path : append) { AppendSearchPath(path); }
if (vm.count("plugin")) { LoadPlugins(vm["plugin"].as<vector<string>>()); }
}
auto fair::mq::PluginManager::ValidateSearchPath(const fs::path& path) -> void
{
if (path.empty()) throw BadSearchPath{"Specified path is empty."};
@@ -81,46 +123,6 @@ auto fair::mq::PluginManager::ProgramOptions() -> po::options_description
return plugin_options;
}
auto fair::mq::PluginManager::MakeFromCommandLineOptions(const vector<string> args) -> shared_ptr<PluginManager>
{
// Parse command line options
auto options = ProgramOptions();
auto vm = po::variables_map{};
try
{
auto parsed = po::command_line_parser(args).options(options).allow_unregistered().run();
po::store(parsed, vm);
po::notify(vm);
} catch (const po::error& e)
{
throw ProgramOptionsParseError{ToString("Error occured while parsing the 'Plugin Manager' program options: ", e.what())};
}
// Process plugin search paths
auto append = vector<fs::path>{};
auto prepend = vector<fs::path>{};
auto searchPaths = vector<fs::path>{};
if (vm.count("plugin-search-path"))
{
for (const auto& path : vm["plugin-search-path"].as<vector<string>>())
{
if (path.substr(0, 1) == "<") { prepend.emplace_back(path.substr(1)); }
else if (path.substr(0, 1) == ">") { append.emplace_back(path.substr(1)); }
else { searchPaths.emplace_back(path); }
}
}
// Create PluginManager with supplied options
auto mgr = make_shared<PluginManager>();
mgr->SetSearchPaths(searchPaths);
for(const auto& path : prepend) { mgr->PrependSearchPath(path); }
for(const auto& path : append) { mgr->AppendSearchPath(path); }
if (vm.count("plugin")) { mgr->LoadPlugins(vm["plugin"].as<vector<string>>()); }
// Return the plugin manager and command line options, that have not been recognized.
return mgr;
}
auto fair::mq::PluginManager::LoadPlugin(const string& pluginName) -> void
{
if (pluginName.substr(0,2) == "p:")

View File

@@ -47,9 +47,15 @@ namespace mq
class PluginManager
{
public:
using PluginFactory = std::shared_ptr<fair::mq::Plugin>(PluginServices&);
using PluginFactory = std::unique_ptr<fair::mq::Plugin>(PluginServices&);
PluginManager();
PluginManager(const std::vector<std::string> args);
~PluginManager()
{
LOG(debug) << "Shutting down Plugin Manager";
}
auto SetSearchPaths(const std::vector<boost::filesystem::path>&) -> void;
auto AppendSearchPath(const boost::filesystem::path&) -> void;
@@ -64,7 +70,6 @@ class PluginManager
struct PluginInstantiationError : std::runtime_error { using std::runtime_error::runtime_error; };
static auto ProgramOptions() -> boost::program_options::options_description;
static auto MakeFromCommandLineOptions(const std::vector<std::string>) -> std::shared_ptr<PluginManager>;
struct ProgramOptionsParseError : std::runtime_error { using std::runtime_error::runtime_error; };
static auto LibPrefix() -> const std::string& { return fgkLibPrefix; }
@@ -111,10 +116,10 @@ class PluginManager
static const std::string fgkLibPrefix;
std::vector<boost::filesystem::path> fSearchPaths;
std::map<std::string, std::function<PluginFactory>> fPluginFactories;
std::map<std::string, std::shared_ptr<Plugin>> fPlugins;
std::unique_ptr<PluginServices> fPluginServices;
std::map<std::string, std::unique_ptr<Plugin>> fPlugins;
std::vector<std::string> fPluginOrder;
std::map<std::string, boost::program_options::options_description> fPluginProgOptions;
std::unique_ptr<PluginServices> fPluginServices;
}; /* class PluginManager */
} /* namespace mq */

View File

@@ -98,7 +98,7 @@ auto PluginServices::ChangeDeviceState(const std::string& controller, const Devi
if (fDeviceController == controller)
{
fDevice->ChangeState(fkDeviceStateTransitionMap.at(next));
fDevice.ChangeState(fkDeviceStateTransitionMap.at(next));
}
else
{

View File

@@ -38,15 +38,20 @@ class PluginServices
{
public:
PluginServices() = delete;
PluginServices(FairMQProgOptions* config, std::shared_ptr<FairMQDevice> device)
: fConfig{config}
, fDevice{device}
PluginServices(FairMQProgOptions& config, FairMQDevice& device)
: fConfig(config)
, fDevice(device)
, fDeviceController()
, fDeviceControllerMutex()
, fReleaseDeviceControlCondition()
{
}
~PluginServices()
{
LOG(debug) << "Shutting down Plugin Services";
}
PluginServices(const PluginServices&) = delete;
PluginServices operator=(const PluginServices&) = delete;
@@ -109,7 +114,7 @@ class PluginServices
friend auto operator<<(std::ostream& os, const DeviceStateTransition& transition) -> std::ostream& { return os << ToStr(transition); }
/// @return current device state
auto GetCurrentDeviceState() const -> DeviceState { return fkDeviceStateMap.at(static_cast<FairMQDevice::State>(fDevice->GetCurrentState())); }
auto GetCurrentDeviceState() const -> DeviceState { return fkDeviceStateMap.at(static_cast<FairMQDevice::State>(fDevice.GetCurrentState())); }
/// @brief Become device controller
/// @param controller id
@@ -155,19 +160,19 @@ class PluginServices
/// the state is running in.
auto SubscribeToDeviceStateChange(const std::string& subscriber, std::function<void(DeviceState /*newState*/)> callback) -> void
{
fDevice->SubscribeToStateChange(subscriber, [&,callback](FairMQDevice::State newState){
fDevice.SubscribeToStateChange(subscriber, [&,callback](FairMQDevice::State newState){
callback(fkDeviceStateMap.at(newState));
});
}
/// @brief Unsubscribe from device state changes
/// @param subscriber id
auto UnsubscribeFromDeviceStateChange(const std::string& subscriber) -> void { fDevice->UnsubscribeFromStateChange(subscriber); }
auto UnsubscribeFromDeviceStateChange(const std::string& subscriber) -> void { fDevice.UnsubscribeFromStateChange(subscriber); }
// Config API
struct PropertyNotFoundError : std::runtime_error { using std::runtime_error::runtime_error; };
auto PropertyExists(const std::string& key) const -> bool { return fConfig->Count(key) > 0; }
auto PropertyExists(const std::string& key) const -> bool { return fConfig.Count(key) > 0; }
/// @brief Set config property
/// @param key
@@ -182,7 +187,7 @@ class PluginServices
auto currentState = GetCurrentDeviceState();
if (currentState == DeviceState::InitializingDevice)
{
fConfig->SetValue(key, val);
fConfig.SetValue(key, val);
}
else
{
@@ -200,7 +205,7 @@ class PluginServices
template<typename T>
auto GetProperty(const std::string& key) const -> T {
if (PropertyExists(key)) {
return fConfig->GetValue<T>(key);
return fConfig.GetValue<T>(key);
}
throw PropertyNotFoundError(fair::mq::tools::ToString("Config has no key: ", key));
}
@@ -212,16 +217,16 @@ class PluginServices
/// If a type is not supported, the user can provide support by overloading the ostream operator for this type
auto GetPropertyAsString(const std::string& key) const -> std::string {
if (PropertyExists(key)) {
return fConfig->GetStringValue(key);
return fConfig.GetStringValue(key);
}
throw PropertyNotFoundError(fair::mq::tools::ToString("Config has no key: ", key));
}
auto GetChannelInfo() const -> std::unordered_map<std::string, int> { return fConfig->GetChannelInfo(); }
auto GetChannelInfo() const -> std::unordered_map<std::string, int> { return fConfig.GetChannelInfo(); }
/// @brief Discover the list of property keys
/// @return list of property keys
auto GetPropertyKeys() const -> std::vector<std::string> { return fConfig->GetPropertyKeys(); }
auto GetPropertyKeys() const -> std::vector<std::string> { return fConfig.GetPropertyKeys(); }
/// @brief Subscribe to property updates of type T
/// @param subscriber
@@ -231,13 +236,13 @@ class PluginServices
template<typename T>
auto SubscribeToPropertyChange(const std::string& subscriber, std::function<void(const std::string& key, T)> callback) const -> void
{
fConfig->Subscribe<T>(subscriber, callback);
fConfig.Subscribe<T>(subscriber, callback);
}
/// @brief Unsubscribe from property updates of type T
/// @param subscriber
template<typename T>
auto UnsubscribeFromPropertyChange(const std::string& subscriber) -> void { fConfig->Unsubscribe<T>(subscriber); }
auto UnsubscribeFromPropertyChange(const std::string& subscriber) -> void { fConfig.Unsubscribe<T>(subscriber); }
/// @brief Subscribe to property updates
/// @param subscriber
@@ -246,12 +251,12 @@ class PluginServices
/// Subscribe to property changes with a callback to monitor property changes in an event based fashion. Will convert the property to string.
auto SubscribeToPropertyChangeAsString(const std::string& subscriber, std::function<void(const std::string& key, std::string)> callback) const -> void
{
fConfig->SubscribeAsString(subscriber, callback);
fConfig.SubscribeAsString(subscriber, callback);
}
/// @brief Unsubscribe from property updates that convert to string
/// @param subscriber
auto UnsubscribeFromPropertyChangeAsString(const std::string& subscriber) -> void { fConfig->UnsubscribeAsString(subscriber); }
auto UnsubscribeFromPropertyChangeAsString(const std::string& subscriber) -> void { fConfig.UnsubscribeAsString(subscriber); }
auto CycleLogConsoleSeverityUp() -> void { Logger::CycleConsoleSeverityUp(); }
auto CycleLogConsoleSeverityDown() -> void { Logger::CycleConsoleSeverityDown(); }
@@ -266,8 +271,8 @@ class PluginServices
static const std::unordered_map<DeviceStateTransition, FairMQDevice::Event, tools::HashEnum<DeviceStateTransition>> fkDeviceStateTransitionMap;
private:
FairMQProgOptions* fConfig; // TODO make it a shared pointer, once old AliceO2 code is cleaned up
std::shared_ptr<FairMQDevice> fDevice;
FairMQProgOptions& fConfig;
FairMQDevice& fDevice;
boost::optional<std::string> fDeviceController;
mutable std::mutex fDeviceControllerMutex;
std::condition_variable fReleaseDeviceControlCondition;

View File

@@ -17,6 +17,7 @@
#include <string>
#include <thread>
#include <atomic>
#include "FairMQDevice.h"
@@ -38,7 +39,7 @@ class FairMQBenchmarkSampler : public FairMQDevice
protected:
bool fSameMessage;
int fMsgSize;
int fMsgCounter;
std::atomic<int> fMsgCounter;
int fMsgRate;
uint64_t fNumIterations;
uint64_t fMaxIterations;

View File

@@ -10,6 +10,7 @@
#include <fairmq/Tools.h>
#include <FairMQLogger.h>
#include <cassert>
#include <cstdlib>
#include <zmq.h>
@@ -53,7 +54,7 @@ Message::Message(void* data, const size_t size, fairmq_free_fn* ffn, void* hint)
{
}
Message::Message(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint)
Message::Message(FairMQUnmanagedRegionPtr& /*region*/, void* /*data*/, const size_t /*size*/, void* /*hint*/)
{
throw MessageError{"Not yet implemented."};
}
@@ -91,7 +92,7 @@ auto Message::Rebuild(const size_t size) -> void
fHint = nullptr;
}
auto Message::Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint) -> void
auto Message::Rebuild(void* /*data*/, const size_t size, fairmq_free_fn* ffn, void* hint) -> void
{
if (fFreeFunction) {
fFreeFunction(fData, fHint);
@@ -132,12 +133,12 @@ auto Message::SetUsedSize(const size_t size) -> bool
}
}
auto Message::Copy(const fair::mq::Message& msg) -> void
auto Message::Copy(const fair::mq::Message& /*msg*/) -> void
{
throw MessageError{"Not yet implemented."};
}
auto Message::Copy(const fair::mq::MessagePtr& msg) -> void
auto Message::Copy(const fair::mq::MessagePtr& /*msg*/) -> void
{
throw MessageError{"Not yet implemented."};
}

View File

@@ -17,12 +17,11 @@ using namespace std;
namespace
{
// ugly global state, but std::signal gives us no other choice
std::function<void(int)> gSignalHandlerClosure;
volatile sig_atomic_t gSignalStatus = 0;
extern "C" auto signal_handler(int signal) -> void
{
gSignalHandlerClosure(signal);
gSignalStatus = signal;
}
}
@@ -37,10 +36,13 @@ Control::Control(const string name, const Plugin::Version version, const string
: Plugin(name, version, maintainer, homepage, pluginServices)
, fControllerThread()
, fSignalHandlerThread()
, fShutdownThread()
, fEvents()
, fEventsMutex()
, fShutdownMutex()
, fNewEvent()
, fDeviceTerminationRequested{false}
, fDeviceTerminationRequested(false)
, fHasShutdown(false)
{
try
{
@@ -73,7 +75,7 @@ Control::Control(const string name, const Plugin::Version version, const string
LOG(debug) << "catch-signals: " << GetProperty<int>("catch-signals");
if (GetProperty<int>("catch-signals") > 0)
{
gSignalHandlerClosure = bind(&Control::SignalHandler, this, placeholders::_1);
fSignalHandlerThread = thread(&Control::SignalHandler, this);
signal(SIGINT, signal_handler);
signal(SIGTERM, signal_handler);
}
@@ -227,7 +229,7 @@ auto Control::WaitForNextState() -> DeviceState
unique_lock<mutex> lock{fEventsMutex};
while (fEvents.empty())
{
fNewEvent.wait_for(lock, chrono::milliseconds(50));
fNewEvent.wait(lock);
}
auto result = fEvents.front();
@@ -263,69 +265,92 @@ auto Control::StaticMode() -> void
}
}
auto Control::SignalHandler(int signal) -> void
auto Control::SignalHandler() -> void
{
if (!fDeviceTerminationRequested)
while (true)
{
fDeviceTerminationRequested = true;
StealDeviceControl();
LOG(info) << "Received device shutdown request (signal " << signal << ").";
LOG(info) << "Waiting for graceful device shutdown. Hit Ctrl-C again to abort immediately.";
UnsubscribeFromDeviceStateChange(); // In case, static or interactive mode have subscribed already
SubscribeToDeviceStateChange([&](DeviceState newState)
if (gSignalStatus != 0 && !fHasShutdown)
{
{
lock_guard<mutex> lock{fEventsMutex};
fEvents.push(newState);
}
fNewEvent.notify_one();
});
LOG(info) << "Received device shutdown request (signal " << gSignalStatus << ").";
LOG(info) << "Waiting for graceful device shutdown. Hit Ctrl-C again to abort immediately.";
fSignalHandlerThread = thread(&Control::RunShutdownSequence, this);
if (!fDeviceTerminationRequested)
{
fDeviceTerminationRequested = true;
gSignalStatus = 0;
fShutdownThread = thread(&Control::HandleShutdownSignal, this);
}
else
{
LOG(warn) << "Received 2nd device shutdown request (signal " << gSignalStatus << ").";
LOG(warn) << "Aborting immediately!";
abort();
}
}
else if (fHasShutdown)
{
break;
}
this_thread::sleep_for(chrono::milliseconds(100));
}
else
}
auto Control::HandleShutdownSignal() -> void
{
StealDeviceControl();
UnsubscribeFromDeviceStateChange(); // In case, static or interactive mode have subscribed already
SubscribeToDeviceStateChange([&](DeviceState newState)
{
LOG(warn) << "Received 2nd device shutdown request (signal " << signal << ").";
LOG(warn) << "Aborting immediately !";
abort();
}
{
lock_guard<mutex> lock{fEventsMutex};
fEvents.push(newState);
}
fNewEvent.notify_one();
});
RunShutdownSequence();
}
auto Control::RunShutdownSequence() -> void
{
auto nextState = GetCurrentDeviceState();
EmptyEventQueue();
while (nextState != DeviceState::Exiting)
lock_guard<mutex> lock(fShutdownMutex);
if (!fHasShutdown)
{
switch (nextState)
auto nextState = GetCurrentDeviceState();
EmptyEventQueue();
while (nextState != DeviceState::Exiting)
{
case DeviceState::Idle:
ChangeDeviceState(DeviceStateTransition::End);
break;
case DeviceState::DeviceReady:
ChangeDeviceState(DeviceStateTransition::ResetDevice);
break;
case DeviceState::Ready:
ChangeDeviceState(DeviceStateTransition::ResetTask);
break;
case DeviceState::Running:
ChangeDeviceState(DeviceStateTransition::Stop);
break;
case DeviceState::Paused:
ChangeDeviceState(DeviceStateTransition::Resume);
break;
default:
break;
switch (nextState)
{
case DeviceState::Idle:
ChangeDeviceState(DeviceStateTransition::End);
break;
case DeviceState::DeviceReady:
ChangeDeviceState(DeviceStateTransition::ResetDevice);
break;
case DeviceState::Ready:
ChangeDeviceState(DeviceStateTransition::ResetTask);
break;
case DeviceState::Running:
ChangeDeviceState(DeviceStateTransition::Stop);
break;
case DeviceState::Paused:
ChangeDeviceState(DeviceStateTransition::Resume);
break;
default:
// ignore other states
break;
}
nextState = WaitForNextState();
}
nextState = WaitForNextState();
fHasShutdown = true;
UnsubscribeFromDeviceStateChange();
ReleaseDeviceControl();
}
UnsubscribeFromDeviceStateChange();
ReleaseDeviceControl();
}
auto Control::RunStartupSequence() -> void
@@ -357,6 +382,7 @@ Control::~Control()
{
if (fControllerThread.joinable()) fControllerThread.join();
if (fSignalHandlerThread.joinable()) fSignalHandlerThread.join();
if (fShutdownThread.joinable()) fShutdownThread.join();
}
} /* namespace plugins */

View File

@@ -37,17 +37,21 @@ class Control : public Plugin
auto PrintInteractiveHelp() -> void;
auto StaticMode() -> void;
auto WaitForNextState() -> DeviceState;
auto SignalHandler(int signal) -> void;
auto SignalHandler() -> void;
auto HandleShutdownSignal() -> void;
auto RunShutdownSequence() -> void;
auto RunStartupSequence() -> void;
auto EmptyEventQueue() -> void;
std::thread fControllerThread;
std::thread fSignalHandlerThread;
std::thread fShutdownThread;
std::queue<DeviceState> fEvents;
std::mutex fEventsMutex;
std::mutex fShutdownMutex;
std::condition_variable fNewEvent;
std::atomic<bool> fDeviceTerminationRequested;
std::atomic<bool> fHasShutdown;
}; /* class Control */
auto ControlPluginProgramOptions() -> Plugin::ProgOptions;

View File

@@ -46,7 +46,7 @@ int main(int argc, char* argv[])
// });
runner.AddHook<InstantiateDevice>([](DeviceRunner& r){
r.fDevice = std::shared_ptr<FairMQDevice>{getDevice(r.fConfig)};
r.fDevice = std::unique_ptr<FairMQDevice>{getDevice(r.fConfig)};
});
return runner.Run();

View File

@@ -19,8 +19,8 @@
using namespace std;
using namespace fair::mq::shmem;
namespace bipc = boost::interprocess;
namespace bpt = boost::posix_time;
namespace bipc = ::boost::interprocess;
namespace bpt = ::boost::posix_time;
atomic<bool> FairMQMessageSHM::fInterrupted(false);
fair::mq::Transport FairMQMessageSHM::fTransportType = fair::mq::Transport::SHM;

View File

@@ -351,7 +351,7 @@ int64_t FairMQSocketSHM::ReceiveImpl(vector<FairMQMessagePtr>& msgVec, const int
msgVec.reserve(lNumMessages);
for (auto m = 0; m < lNumMessages; m++)
for (size_t m = 0; m < lNumMessages; m++)
{
MetaHeader lMetaHeader;
memcpy(&lMetaHeader, &lHdrVec[m], sizeof(MetaHeader));

View File

@@ -28,9 +28,9 @@
using namespace std;
using namespace fair::mq::shmem;
namespace bfs = boost::filesystem;
namespace bpt = boost::posix_time;
namespace bipc = boost::interprocess;
namespace bfs = ::boost::filesystem;
namespace bpt = ::boost::posix_time;
namespace bipc = ::boost::interprocess;
fair::mq::Transport FairMQTransportFactorySHM::fTransportType = fair::mq::Transport::SHM;

View File

@@ -13,7 +13,7 @@
using namespace std;
using namespace fair::mq::shmem;
namespace bipc = boost::interprocess;
namespace bipc = ::boost::interprocess;
FairMQUnmanagedRegionSHM::FairMQUnmanagedRegionSHM(Manager& manager, const size_t size, FairMQRegionCallback callback)
: fManager(manager)

View File

@@ -9,6 +9,9 @@
#include <fairmq/shmem/Manager.h>
#include <fairmq/shmem/Common.h>
using namespace std;
namespace bipc = ::boost::interprocess;
namespace fair
{
namespace mq
@@ -16,9 +19,6 @@ namespace mq
namespace shmem
{
using namespace std;
namespace bipc = boost::interprocess;
std::unordered_map<uint64_t, Region> Manager::fRegions;
Manager::Manager(const string& name, size_t size)

View File

@@ -26,8 +26,8 @@
#include <poll.h>
using namespace std;
namespace bipc = boost::interprocess;
namespace bpt = boost::posix_time;
namespace bipc = ::boost::interprocess;
namespace bpt = ::boost::posix_time;
using CharAllocator = bipc::allocator<char, bipc::managed_shared_memory::segment_manager>;
using String = bipc::basic_string<char, char_traits<char>, CharAllocator>;

View File

@@ -12,6 +12,11 @@
#include <boost/date_time/posix_time/posix_time.hpp>
using namespace std;
namespace bipc = ::boost::interprocess;
namespace bpt = ::boost::posix_time;
namespace fair
{
namespace mq
@@ -19,11 +24,6 @@ namespace mq
namespace shmem
{
using namespace std;
namespace bipc = boost::interprocess;
namespace bpt = boost::posix_time;
Region::Region(Manager& manager, uint64_t id, uint64_t size, bool remote, FairMQRegionCallback callback)
: fManager(manager)
, fRemote(remote)

View File

@@ -12,6 +12,7 @@
#include <gtest/gtest.h>
#include <string>
#include <thread>
#include <future> // std::async, std::future
namespace
@@ -19,6 +20,24 @@ namespace
using namespace std;
void control(FairMQDevice& device)
{
device.ChangeState("INIT_DEVICE");
device.WaitForEndOfState("INIT_DEVICE");
device.ChangeState("INIT_TASK");
device.WaitForEndOfState("INIT_TASK");
device.ChangeState("RUN");
device.WaitForEndOfState("RUN");
device.ChangeState("RESET_TASK");
device.WaitForEndOfState("RESET_TASK");
device.ChangeState("RESET_DEVICE");
device.WaitForEndOfState("RESET_DEVICE");
device.ChangeState("END");
}
class MultipleDevices : public ::testing::Test {
public:
MultipleDevices()
@@ -34,20 +53,14 @@ class MultipleDevices : public ::testing::Test {
channel.UpdateRateLogging(0);
sender.fChannels["data"].push_back(channel);
sender.ChangeState("INIT_DEVICE");
sender.WaitForEndOfState("INIT_DEVICE");
sender.ChangeState("INIT_TASK");
sender.WaitForEndOfState("INIT_TASK");
thread t(control, std::ref(sender));
sender.ChangeState("RUN");
sender.WaitForEndOfState("RUN");
sender.RunStateMachine();
sender.ChangeState("RESET_TASK");
sender.WaitForEndOfState("RESET_TASK");
sender.ChangeState("RESET_DEVICE");
sender.WaitForEndOfState("RESET_DEVICE");
sender.ChangeState("END");
if (t.joinable())
{
t.join();
}
return true;
}
@@ -62,20 +75,14 @@ class MultipleDevices : public ::testing::Test {
channel.UpdateRateLogging(0);
receiver.fChannels["data"].push_back(channel);
receiver.ChangeState("INIT_DEVICE");
receiver.WaitForEndOfState("INIT_DEVICE");
receiver.ChangeState("INIT_TASK");
receiver.WaitForEndOfState("INIT_TASK");
thread t(control, std::ref(receiver));
receiver.ChangeState("RUN");
receiver.WaitForEndOfState("RUN");
receiver.RunStateMachine();
receiver.ChangeState("RESET_TASK");
receiver.WaitForEndOfState("RESET_TASK");
receiver.ChangeState("RESET_DEVICE");
receiver.WaitForEndOfState("RESET_DEVICE");
receiver.ChangeState("END");
if (t.joinable())
{
t.join();
}
return true;
}

View File

@@ -14,6 +14,7 @@
#include <FairMQDevice.h>
#include <options/FairMQProgOptions.h>
#include <memory>
#include <thread>
namespace fair
{
@@ -22,35 +23,41 @@ namespace mq
namespace test
{
inline auto control(std::shared_ptr<FairMQDevice> device) -> void
inline auto control(FairMQDevice& device) -> void
{
for (const auto event : {
FairMQDevice::INIT_DEVICE,
FairMQDevice::RESET_DEVICE,
FairMQDevice::END,
}) {
device->ChangeState(event);
if (event != FairMQDevice::END) device->WaitForEndOfState(event);
device.ChangeState(event);
if (event != FairMQDevice::END) device.WaitForEndOfState(event);
}
}
struct PluginServices : ::testing::Test {
PluginServices()
: mConfig()
, mDevice{std::make_shared<FairMQDevice>()}
, mServices{&mConfig, mDevice}
: mConfig()
, mDevice()
, mServices(mConfig, mDevice)
, fRunStateMachineThread()
{
mDevice->SetTransport("zeromq");
fRunStateMachineThread = std::thread(&FairMQDevice::RunStateMachine, &mDevice);
mDevice.SetTransport("zeromq");
}
~PluginServices()
{
if(mDevice->GetCurrentState() == FairMQDevice::IDLE) control(mDevice);
if (mDevice.GetCurrentState() == FairMQDevice::IDLE) control(mDevice);
if (fRunStateMachineThread.joinable()) {
fRunStateMachineThread.join();
}
}
FairMQProgOptions mConfig;
std::shared_ptr<FairMQDevice> mDevice;
FairMQDevice mDevice;
fair::mq::PluginServices mServices;
std::thread fRunStateMachineThread;
};
} /* namespace test */

View File

@@ -39,9 +39,9 @@ TEST_F(PluginServices, OnlySingleController)
EXPECT_EQ(mServices.GetDeviceController(), string{"foo"});
// park device
mDevice->WaitForEndOfState(FairMQDevice::DEVICE_READY);
mDevice.WaitForEndOfState(FairMQDevice::DEVICE_READY);
mServices.ChangeDeviceState("foo", DeviceStateTransition::ResetDevice);
mDevice->WaitForEndOfState(FairMQDevice::RESET_DEVICE);
mDevice.WaitForEndOfState(FairMQDevice::RESET_DEVICE);
mServices.ChangeDeviceState("foo", DeviceStateTransition::End);
}
@@ -72,7 +72,7 @@ TEST_F(PluginServices, Control)
ASSERT_EQ(mServices.GetCurrentDeviceState(), DeviceState::DeviceReady);
mServices.ChangeDeviceState("foo", DeviceStateTransition::ResetDevice);
mDevice->WaitForEndOfState(FairMQDevice::RESET_DEVICE);
mDevice.WaitForEndOfState(FairMQDevice::RESET_DEVICE);
mServices.ChangeDeviceState("foo", DeviceStateTransition::End);
}

View File

@@ -16,6 +16,7 @@
#include <sstream>
#include <string>
#include <vector>
#include <thread>
namespace
{
@@ -23,42 +24,50 @@ namespace
using namespace std;
using namespace fair::mq;
auto control(shared_ptr<FairMQDevice> device) -> void
auto control(FairMQDevice& device) -> void
{
device->SetTransport("zeromq");
device.SetTransport("zeromq");
for (const auto event : {
FairMQDevice::INIT_DEVICE,
FairMQDevice::RESET_DEVICE,
FairMQDevice::END,
}) {
device->ChangeState(event);
if (event != FairMQDevice::END) device->WaitForEndOfState(event);
device.ChangeState(event);
if (event != FairMQDevice::END) device.WaitForEndOfState(event);
}
}
TEST(Plugin, Operators)
{
FairMQProgOptions config{};
auto device = make_shared<FairMQDevice>();
PluginServices services{&config, device};
FairMQProgOptions config;
FairMQDevice device;
PluginServices services{config, device};
Plugin p1{"dds", {1, 0, 0}, "Foo Bar <foo.bar@test.net>", "https://git.test.net/dds.git", &services};
Plugin p2{"dds", {1, 0, 0}, "Foo Bar <foo.bar@test.net>", "https://git.test.net/dds.git", &services};
Plugin p3{"file", {1, 0, 0}, "Foo Bar <foo.bar@test.net>", "https://git.test.net/file.git", &services};
EXPECT_EQ(p1, p2);
EXPECT_NE(p1, p3);
control(device);
thread t(control, std::ref(device));
device.RunStateMachine();
if (t.joinable()) {
t.join();
}
}
TEST(Plugin, OstreamOperators)
{
FairMQProgOptions config{};
auto device = make_shared<FairMQDevice>();
PluginServices services{&config, device};
FairMQProgOptions config;
FairMQDevice device;
PluginServices services{config, device};
Plugin p1{"dds", {1, 0, 0}, "Foo Bar <foo.bar@test.net>", "https://git.test.net/dds.git", &services};
stringstream ss;
ss << p1;
EXPECT_EQ(ss.str(), string{"'dds', version '1.0.0', maintainer 'Foo Bar <foo.bar@test.net>', homepage 'https://git.test.net/dds.git'"});
control(device);
thread t(control, std::ref(device));
device.RunStateMachine();
if (t.joinable()) {
t.join();
}
}
TEST(PluginVersion, Operators)

View File

@@ -15,6 +15,7 @@
#include <fstream>
#include <memory>
#include <vector>
#include <thread>
namespace
{
@@ -24,25 +25,25 @@ using namespace boost::filesystem;
using namespace boost::program_options;
using namespace std;
auto control(shared_ptr<FairMQDevice> device) -> void
auto control(FairMQDevice& device) -> void
{
device->SetTransport("zeromq");
device.SetTransport("zeromq");
for (const auto event : {
FairMQDevice::INIT_DEVICE,
FairMQDevice::RESET_DEVICE,
FairMQDevice::END,
}) {
device->ChangeState(event);
if (event != FairMQDevice::END) device->WaitForEndOfState(event);
device.ChangeState(event);
if (event != FairMQDevice::END) device.WaitForEndOfState(event);
}
}
TEST(PluginManager, LoadPluginDynamic)
{
FairMQProgOptions config{};
auto mgr = PluginManager{};
auto device = make_shared<FairMQDevice>();
mgr.EmplacePluginServices(&config, device);
FairMQProgOptions config;
FairMQDevice device;
PluginManager mgr;
mgr.EmplacePluginServices(config, device);
mgr.PrependSearchPath("./test");
@@ -53,7 +54,7 @@ TEST(PluginManager, LoadPluginDynamic)
// check order
const auto expected = vector<string>{"test_dummy", "test_dummy2"};
auto actual = vector<string>{};
auto actual = vector<string>();
mgr.ForEachPlugin([&](Plugin& plugin){ actual.push_back(plugin.GetName()); });
ASSERT_TRUE(actual == expected);
@@ -62,27 +63,31 @@ TEST(PluginManager, LoadPluginDynamic)
mgr.ForEachPluginProgOptions([&count](const options_description& /*d*/){ ++count; });
ASSERT_EQ(count, 1);
control(device);
thread t(control, std::ref(device));
device.RunStateMachine();
if (t.joinable()) {
t.join();
}
}
TEST(PluginManager, LoadPluginStatic)
{
auto mgr = PluginManager{};
auto device = make_shared<FairMQDevice>();
device->SetTransport("zeromq");
FairMQDevice device;
PluginManager mgr;
device.SetTransport("zeromq");
ASSERT_NO_THROW(mgr.LoadPlugin("s:control"));
FairMQProgOptions config{};
FairMQProgOptions config;
config.SetValue<string>("control", "static");
config.SetValue("catch-signals", 0);
mgr.EmplacePluginServices(&config, device);
mgr.EmplacePluginServices(config, device);
ASSERT_NO_THROW(mgr.InstantiatePlugins());
// check order
const auto expected = vector<string>{"control"};
auto actual = vector<string>{};
auto actual = vector<string>();
mgr.ForEachPlugin([&](Plugin& plugin){ actual.push_back(plugin.GetName()); });
ASSERT_TRUE(actual == expected);
@@ -91,20 +96,22 @@ TEST(PluginManager, LoadPluginStatic)
mgr.ForEachPluginProgOptions([&count](const options_description&){ ++count; });
ASSERT_EQ(count, 1);
device.RunStateMachine();
mgr.WaitForPluginsToReleaseDeviceControl();
}
TEST(PluginManager, Factory)
{
const auto args = vector<string>{"-l", "debug", "--help", "-S", ">/lib", "</home/user/lib", "/usr/local/lib", "/usr/lib"};
auto mgr = PluginManager::MakeFromCommandLineOptions(args);
PluginManager mgr(args);
const auto path1 = path{"/home/user/lib"};
const auto path2 = path{"/usr/local/lib"};
const auto path3 = path{"/usr/lib"};
const auto path4 = path{"/lib"};
const auto expected = vector<path>{path1, path2, path3, path4};
ASSERT_TRUE(static_cast<bool>(mgr));
ASSERT_TRUE(mgr->SearchPaths() == expected);
// ASSERT_TRUE(static_cast<bool>(mgr));
ASSERT_TRUE(mgr.SearchPaths() == expected);
}
TEST(PluginManager, SearchPathValidation)
@@ -112,7 +119,7 @@ TEST(PluginManager, SearchPathValidation)
const auto path1 = path{"/tmp/test1"};
const auto path2 = path{"/tmp/test2"};
const auto path3 = path{"/tmp/test3"};
auto mgr = PluginManager{};
PluginManager mgr;
mgr.SetSearchPaths({path1, path2});
auto expected = vector<path>{path1, path2};
@@ -140,7 +147,7 @@ TEST(PluginManager, SearchPaths)
fs.close();
const auto empty_path = path{""};
auto mgr = PluginManager{};
PluginManager mgr;
ASSERT_NO_THROW(mgr.AppendSearchPath(non_existing_dir));
ASSERT_NO_THROW(mgr.AppendSearchPath(existing_dir));
ASSERT_THROW(mgr.AppendSearchPath(existing_file), PluginManager::BadSearchPath);

View File

@@ -17,7 +17,7 @@ using namespace std;
TEST(PluginManager, LoadPluginPrelinkedDynamic)
{
auto mgr = PluginManager{};
PluginManager mgr;
ASSERT_NO_THROW(mgr.LoadPlugin("p:test_dummy"));
ASSERT_NO_THROW(mgr.LoadPlugin("p:test_dummy2"));