Compare commits

..

28 Commits

Author SHA1 Message Date
Dennis Klein
868fe02ee9 CI: Submit results to CDash for each build step 2021-04-08 16:22:47 +02:00
Dennis Klein
a2016a9361 CI: Add alice-centos-7 environment 2021-04-08 16:22:47 +02:00
Dennis Klein
ea9aede652 Fallback to <boost/filesystem> on GCC 7 2021-04-08 16:22:47 +02:00
Alexey Rybalchenko
77bf12c8e8 docs patch 2021-04-08 12:38:18 +02:00
Alexey Rybalchenko
f3bc9e05a8 shmmonitor: update docs 2021-04-08 10:11:23 +02:00
Alexey Rybalchenko
5facc441b8 shmmonitor: add --list-all 2021-04-08 10:11:23 +02:00
Alexey Rybalchenko
2602f53585 Add tools: StrStartsWith, StrEndsWith 2021-04-08 10:11:23 +02:00
Alexey Rybalchenko
0976465338 shm: reduce delay between monitor daemon launch & HBs 2021-04-08 10:11:23 +02:00
Alexey Rybalchenko
9144258b89 shmmonitor: daemon output to file if FAIRMQ_SHMMONITOR_VERBOSE is true 2021-04-08 10:11:23 +02:00
Alexey Rybalchenko
be55565617 shmmonitor: use fairlogger 2021-04-08 10:11:23 +02:00
Alexey Rybalchenko
d7e2fbecea shmmonitor: refactor to separate monitoring from output 2021-04-08 10:11:23 +02:00
Alexey Rybalchenko
72175e5757 shmmonitor: optimize startup to avoid repeated start 2021-04-08 10:11:23 +02:00
Alexey Rybalchenko
effba534f0 shmmonitor: add session name and creator id to the output 2021-04-08 10:11:23 +02:00
Alexey Rybalchenko
efd42075a9 shmmonitor: enable read only access 2021-04-08 10:11:23 +02:00
Alexey Rybalchenko
5228407932 shmmonitor: distinguish daemon from monitor mode (orthogonal) 2021-04-08 10:11:23 +02:00
Alexey Rybalchenko
30e81d58f8 shmmonitor: allow getting shmids based on session/userid 2021-04-08 10:11:23 +02:00
Dennis Klein
2c7c46f2fd Remove codacy badge 2021-03-26 10:06:10 +01:00
Dennis Klein
0a5122bb24 Remove codecov badge 2021-03-26 10:06:10 +01:00
Dennis Klein
fc49687879 builtin devices: Reorganize 2021-03-26 10:06:10 +01:00
Dennis Klein
66a4df0667 fairmq-uuid-gen: Move to tools directory 2021-03-26 10:06:10 +01:00
Dennis Klein
978191fa6c Introduce <fairmq/runDevice.h> 2021-03-26 10:06:10 +01:00
Dennis Klein
cef6d0afcd Introduce <fairmq/Device.h> 2021-03-26 10:06:10 +01:00
Dennis Klein
47ec550792 control plugin: Move to subdirectory for consistency 2021-03-26 10:06:10 +01:00
Dennis Klein
b4aeb320e5 CI: Collect DDS logs on error 2021-03-26 10:06:10 +01:00
Dennis Klein
107248be0a Reorganize includes for consistency 2021-03-26 10:06:10 +01:00
Dennis Klein
68ceaba501 CI: Filter and process warnings and errors 2021-03-26 10:06:10 +01:00
Dennis Klein
4b6cf8b181 Fix aggregate initialization issue before C++20
Use value initialization to prevent

error: temporary of type ... has protected destructor

see https://stackoverflow.com/a/56745475
2021-03-26 10:06:10 +01:00
Dennis Klein
21d6cf9830 CI: Run clang-tidy 2021-03-26 10:06:10 +01:00
39 changed files with 666 additions and 383 deletions

View File

@@ -1,3 +1,3 @@
---
Checks: '*,-google-*,-fuchsia-*,-cert-*,-llvm-header-guard,-readability-named-parameter,-misc-non-private-member-variables-in-classes,-*-magic-numbers,-llvm-include-order,-hicpp-no-array-decay,-performance-unnecessary-value-param,-cppcoreguidelines-pro-bounds-array-to-pointer-decay,-modernize-use-trailing-return-type,-readability-redundant-member-init'
Checks: 'cppcoreguidelines-*,misc-unused-alias-decls,misc-unused-parameters,modernize-deprecated-headers,modernize-raw-string-literal,modernize-redundant-void-arg,modernize-use-bool-literals,modernize-use-default-member-init,modernize-use-emplace,modernize-use-equals-default,modernize-use-equals-delete,modernize-use-noexcept,modernize-use-nullptr,modernize-use-override,modernize-use-using,performance-faster-string-find,performance-for-range-copy,performance-unnecessary-copy-initialization,readability-avoid-const-params-in-decls,readability-braces-around-statements,readability-container-size-empty,readability-delete-null-pointer,readability-redundant-member-init,readability-redundant-string-init,readability-static-accessed-through-instance,readability-string-compare'
HeaderFilterRegex: '/(fairmq/)'

View File

@@ -44,13 +44,19 @@ list(APPEND options
"-DDISABLE_COLOR=ON"
"-DBUILD_SDK_COMMANDS=ON"
"-DBUILD_SDK=ON"
"-DBUILD_DDS_PLUGIN=ON"
)
"-DBUILD_DDS_PLUGIN=ON")
if(RUN_STATIC_ANALYSIS)
list(APPEND options "-DRUN_STATIC_ANALYSIS=ON")
endif()
list(JOIN options ";" optionsstr)
ctest_configure(OPTIONS "${optionsstr}")
ctest_submit()
ctest_build(FLAGS "-j${NCPUS}")
ctest_submit()
ctest_test(BUILD "${CTEST_BINARY_DIRECTORY}"
PARALLEL_LEVEL 1
SCHEDULE_RANDOM ON

36
Jenkinsfile vendored
View File

@@ -1,6 +1,6 @@
#!groovy
def jobMatrix(String prefix, String type, List specs) {
def jobMatrix(String type, List specs) {
def nodes = [:]
for (spec in specs) {
job = "${spec.os}-${spec.ver}-${spec.arch}-${spec.compiler}"
@@ -12,7 +12,7 @@ def jobMatrix(String prefix, String type, List specs) {
nodes[label] = {
node(selector) {
githubNotify(context: "${prefix}/${label}", description: 'Building ...', status: 'PENDING')
githubNotify(context: "${label}", description: 'Building ...', status: 'PENDING')
try {
deleteDir()
checkout scm
@@ -29,7 +29,11 @@ def jobMatrix(String prefix, String type, List specs) {
sh "cat ${jobscript}"
sh "bash ${jobscript}"
} else {
def containercmd = "singularity exec -B/shared ${env.SINGULARITY_CONTAINER_ROOT}/fairmq/${os}.${ver}.sif bash -l -c \\\"${ctestcmd}\\\""
def static_analysis = "OFF"
if (selector =~ /^fedora/) {
static_analysis = "ON"
}
def containercmd = "singularity exec -B/shared ${env.SINGULARITY_CONTAINER_ROOT}/fairmq/${os}.${ver}.sif bash -l -c \\\"${ctestcmd} -DRUN_STATIC_ANALYSIS=${static_analysis}\\\""
sh """\
echo \"echo \\\"*** Job started at .......: \\\$(date -R)\\\"\" >> ${jobscript}
echo \"echo \\\"*** Job ID ...............: \\\${SLURM_JOB_ID}\\\"\" >> ${jobscript}
@@ -40,13 +44,26 @@ def jobMatrix(String prefix, String type, List specs) {
"""
sh "cat ${jobscript}"
sh "test/ci/slurm-submit.sh \"FairMQ \${JOB_BASE_NAME} ${label}\" ${jobscript}"
withChecks('Static Analysis') {
if (static_analysis == "ON") {
recordIssues(enabledForFailure: true,
tools: [gcc(pattern: 'build/Testing/Temporary/*.log')],
filters: [excludeFile('extern/*'), excludeFile('usr/*')],
skipBlames: true)
}
}
}
deleteDir()
githubNotify(context: "${prefix}/${label}", description: 'Success', status: 'SUCCESS')
githubNotify(context: "${label}", description: 'Success', status: 'SUCCESS')
} catch (e) {
def tarball = "${prefix}_${label}_dds_logs.tar.gz"
sh "tar czvf ${tarball} -C \${WORKSPACE}/build/test .DDS/"
archiveArtifacts tarball
deleteDir()
githubNotify(context: "${prefix}/${label}", description: 'Error', status: 'ERROR')
githubNotify(context: "${label}", description: 'Error', status: 'ERROR')
throw e
}
}
@@ -58,12 +75,13 @@ def jobMatrix(String prefix, String type, List specs) {
pipeline{
agent none
stages {
stage("Run CI Matrix") {
stage("CI") {
steps{
script {
def builds = jobMatrix('alfa-ci', 'build', [
[os: 'fedora', ver: '32', arch: 'x86_64', compiler: 'gcc-10'],
[os: 'macos', ver: '11', arch: 'x86_64', compiler: 'apple-clang-12'],
def builds = jobMatrix('build', [
[os: 'alice-centos', ver: '7', arch: 'x86_64', compiler: 'gcc-7'],
[os: 'fedora', ver: '32', arch: 'x86_64', compiler: 'gcc-10'],
[os: 'macos', ver: '11', arch: 'x86_64', compiler: 'apple-clang-12'],
])
parallel(builds)

View File

@@ -1,5 +1,5 @@
<!-- {#mainpage} -->
# FairMQ [![license](https://alfa-ci.gsi.de/shields/badge/license-LGPL--3.0-orange.svg)](COPYRIGHT) [![build status](https://alfa-ci.gsi.de/buildStatus/icon?job=FairRootGroup/FairMQ/master)](https://alfa-ci.gsi.de/blue/organizations/jenkins/FairRootGroup%2FFairMQ/branches) [![test coverage master branch](https://codecov.io/gh/FairRootGroup/FairMQ/branch/master/graph/badge.svg)](https://codecov.io/gh/FairRootGroup/FairMQ/branch/master) [![Coverity Badge](https://alfa-ci.gsi.de/shields/coverity/scan/fairrootgroup-fairmq.svg)](https://scan.coverity.com/projects/fairrootgroup-fairmq) [![Codacy Badge](https://api.codacy.com/project/badge/Grade/6b648d95d68d4c4eae833b84f84d299c)](https://www.codacy.com/app/dennisklein/FairMQ?utm_source=github.com&amp;utm_medium=referral&amp;utm_content=FairRootGroup/FairMQ&amp;utm_campaign=Badge_Grade)
# FairMQ [![license](https://alfa-ci.gsi.de/shields/badge/license-LGPL--3.0-orange.svg)](COPYRIGHT) [![build status](https://alfa-ci.gsi.de/buildStatus/icon?job=FairRootGroup/FairMQ/dev)](https://alfa-ci.gsi.de/blue/organizations/jenkins/FairRootGroup%2FFairMQ/branches) [![Coverity Badge](https://alfa-ci.gsi.de/shields/coverity/scan/fairrootgroup-fairmq.svg)](https://scan.coverity.com/projects/fairrootgroup-fairmq)
C++ Message Queuing Library and Framework

View File

@@ -203,6 +203,13 @@ macro(set_fairmq_defaults)
set_property(GLOBAL PROPERTY RULE_LAUNCH_COMPILE ${CCACHE})
endif()
endif()
if( CMAKE_CXX_COMPILER_ID STREQUAL "GNU"
AND CMAKE_CXX_COMPILER_VERSION VERSION_LESS 8)
set(FAIRMQ_HAS_STD_FILESYSTEM 0)
else()
set(FAIRMQ_HAS_STD_FILESYSTEM 1)
endif()
endmacro()
function(join VALUES GLUE OUTPUT)

View File

@@ -1,5 +1,5 @@
################################################################################
# Copyright (C) 2012-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# Copyright (C) 2012-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# #
# This software is distributed under the terms of the #
# GNU Lesser General Public Licence (LGPL) version 3, #
@@ -62,6 +62,7 @@ if(BUILD_FAIRMQ OR BUILD_SDK)
target_link_libraries(${target}
PRIVATE
FairLogger::FairLogger
Threads::Threads
PUBLIC
Boost::boost
)
@@ -145,6 +146,7 @@ if(BUILD_FAIRMQ)
# libFairMQ header files #
##########################
set(FAIRMQ_PUBLIC_HEADER_FILES
Device.h
DeviceRunner.h
EventManager.h
FairMQChannel.h
@@ -169,14 +171,21 @@ if(BUILD_FAIRMQ)
Plugin.h
PluginManager.h
PluginServices.h
runDevice.h
runFairMQDevice.h
shmem/Monitor.h
)
set(FAIRMQ_PRIVATE_HEADER_FILES
devices/BenchmarkSampler.h
devices/Merger.h
devices/Multiplier.h
devices/Proxy.h
devices/Sink.h
devices/Splitter.h
plugins/Builtin.h
plugins/config/Config.h
plugins/Control.h
plugins/control/Control.h
shmem/Message.h
shmem/Poller.h
shmem/UnmanagedRegion.h
@@ -223,7 +232,7 @@ if(BUILD_FAIRMQ)
Properties.cxx
SuboptParser.cxx
plugins/config/Config.cxx
plugins/Control.cxx
plugins/control/Control.cxx
MemoryResources.cxx
shmem/Monitor.cxx
)
@@ -243,7 +252,7 @@ if(BUILD_FAIRMQ)
# configure files #
###################
set(FAIRMQ_BIN_DIR ${CMAKE_BINARY_DIR}/fairmq)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/run/startMQBenchmark.sh.in ${CMAKE_CURRENT_BINARY_DIR}/startMQBenchmark.sh)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/devices/startMQBenchmark.sh.in ${CMAKE_CURRENT_BINARY_DIR}/startMQBenchmark.sh)
#################################
# define libFairMQ build target #
@@ -274,6 +283,7 @@ if(BUILD_FAIRMQ)
if(BUILD_OFI_TRANSPORT)
target_compile_definitions(${_target} PRIVATE BUILD_OFI_TRANSPORT)
endif()
target_compile_definitions(${_target} PUBLIC FAIRMQ_HAS_STD_FILESYSTEM=${FAIRMQ_HAS_STD_FILESYSTEM})
#######################
@@ -347,22 +357,22 @@ if(BUILD_FAIRMQ)
###############
# executables #
###############
add_executable(fairmq-bsampler run/runBenchmarkSampler.cxx)
add_executable(fairmq-bsampler devices/runBenchmarkSampler.cxx)
target_link_libraries(fairmq-bsampler FairMQ)
add_executable(fairmq-merger run/runMerger.cxx)
add_executable(fairmq-merger devices/runMerger.cxx)
target_link_libraries(fairmq-merger FairMQ)
add_executable(fairmq-multiplier run/runMultiplier.cxx)
add_executable(fairmq-multiplier devices/runMultiplier.cxx)
target_link_libraries(fairmq-multiplier FairMQ)
add_executable(fairmq-proxy run/runProxy.cxx)
add_executable(fairmq-proxy devices/runProxy.cxx)
target_link_libraries(fairmq-proxy FairMQ)
add_executable(fairmq-sink run/runSink.cxx)
add_executable(fairmq-sink devices/runSink.cxx)
target_link_libraries(fairmq-sink FairMQ)
add_executable(fairmq-splitter run/runSplitter.cxx)
add_executable(fairmq-splitter devices/runSplitter.cxx)
target_link_libraries(fairmq-splitter FairMQ)
add_executable(fairmq-shmmonitor shmem/Monitor.cxx shmem/Monitor.h shmem/runMonitor.cxx)
@@ -375,6 +385,7 @@ if(BUILD_FAIRMQ)
$<$<PLATFORM_ID:Linux>:rt>
Boost::boost
Boost::date_time
$<$<NOT:${FAIRMQ_HAS_STD_FILESYSTEM}>:Boost::filesystem>
Boost::program_options
FairLogger::FairLogger
PicoSHA2
@@ -382,9 +393,13 @@ if(BUILD_FAIRMQ)
target_include_directories(fairmq-shmmonitor PUBLIC
$<BUILD_INTERFACE:${CMAKE_SOURCE_DIR}>
)
target_compile_definitions(fairmq-shmmonitor PUBLIC FAIRMQ_HAS_STD_FILESYSTEM=${FAIRMQ_HAS_STD_FILESYSTEM})
add_executable(fairmq-uuid-gen run/runUuidGenerator.cxx)
target_link_libraries(fairmq-uuid-gen FairMQ)
add_executable(fairmq-uuid-gen tools/runUuidGenerator.cxx)
target_link_libraries(fairmq-uuid-gen PUBLIC
Boost::program_options
Tools
)
###########

21
fairmq/Device.h Normal file
View File

@@ -0,0 +1,21 @@
/********************************************************************************
* Copyright (C) 2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIR_MQ_DEVICE_H
#define FAIR_MQ_DEVICE_H
#include <FairMQDevice.h>
namespace fair::mq
{
using Device = ::FairMQDevice;
} // namespace fair::mq
#endif /* FAIR_MQ_DEVICE_H */

View File

@@ -1,5 +1,5 @@
/********************************************************************************
* Copyright (C) 2017-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2017-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *

View File

@@ -1,5 +1,5 @@
/********************************************************************************
* Copyright (C) 2017-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2017-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
@@ -9,10 +9,10 @@
#ifndef FAIR_MQ_DEVICERUNNER_H
#define FAIR_MQ_DEVICERUNNER_H
#include <fairmq/Device.h>
#include <fairmq/EventManager.h>
#include <fairmq/PluginManager.h>
#include <fairmq/ProgOptions.h>
#include <FairMQDevice.h>
#include <functional>
#include <memory>
@@ -73,7 +73,7 @@ class DeviceRunner
std::vector<std::string> fRawCmdLineArgs;
fair::mq::ProgOptions fConfig;
std::unique_ptr<FairMQDevice> fDevice;
std::unique_ptr<Device> fDevice;
PluginManager fPluginManager;
const bool fPrintLogo;

View File

@@ -9,17 +9,17 @@
#ifndef FAIRMQDEVICE_H_
#define FAIRMQDEVICE_H_
#include <StateMachine.h>
#include <FairMQTransportFactory.h>
#include <fairmq/Transports.h>
#include <fairmq/StateQueue.h>
#include <FairMQChannel.h>
#include <FairMQLogger.h>
#include <FairMQMessage.h>
#include <FairMQParts.h>
#include <FairMQTransportFactory.h>
#include <FairMQUnmanagedRegion.h>
#include <FairMQLogger.h>
#include <fairmq/ProgOptions.h>
#include <fairmq/StateMachine.h>
#include <fairmq/StateQueue.h>
#include <fairmq/Transports.h>
#include <fairmq/tools/Version.h>
#include <vector>
#include <memory> // unique_ptr
@@ -34,8 +34,6 @@
#include <cstddef>
#include <utility> // pair
#include <fairmq/tools/Version.h>
using FairMQChannelMap = std::unordered_map<std::string, std::vector<FairMQChannel>>;
using InputMsgCallback = std::function<bool(FairMQMessagePtr&, int)>;

View File

@@ -1,5 +1,5 @@
/********************************************************************************
* Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
@@ -9,8 +9,9 @@
#ifndef FAIRMQPOLLER_H_
#define FAIRMQPOLLER_H_
#include <string>
#include <memory>
#include <stdexcept>
#include <string>
class FairMQPoller
{

View File

@@ -15,7 +15,7 @@
#ifndef FAIR_MQ_MEMORY_RESOURCES_H
#define FAIR_MQ_MEMORY_RESOURCES_H
#include <fairmq/FairMQMessage.h>
#include <FairMQMessage.h>
class FairMQTransportFactory;
#include <boost/container/container_fwd.hpp>

View File

@@ -1,32 +1,35 @@
/********************************************************************************
* Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIRMQBENCHMARKSAMPLER_H_
#define FAIRMQBENCHMARKSAMPLER_H_
#ifndef FAIR_MQ_BENCHMARKSAMPLER_H
#define FAIR_MQ_BENCHMARKSAMPLER_H
#include "../FairMQLogger.h"
#include "FairMQDevice.h"
#include "tools/RateLimit.h"
#include <fairmq/Device.h>
#include <fairmq/tools/RateLimit.h>
#include <chrono>
#include <cstddef> // size_t
#include <cstdint> // uint64_t
#include <cstring> // memset
#include <fairlogger/Logger.h>
#include <string>
namespace fair::mq
{
/**
* Sampler to generate traffic for benchmarking.
*/
class FairMQBenchmarkSampler : public FairMQDevice
class BenchmarkSampler : public Device
{
public:
FairMQBenchmarkSampler()
BenchmarkSampler()
: fMultipart(false)
, fMemSet(false)
, fNumParts(1)
@@ -117,4 +120,6 @@ class FairMQBenchmarkSampler : public FairMQDevice
std::string fOutChannelName;
};
#endif /* FAIRMQBENCHMARKSAMPLER_H_ */
} // namespace fair::mq
#endif /* FAIR_MQ_BENCHMARKSAMPLER_H */

View File

@@ -1,36 +1,32 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQMerger.h
*
* @since 2012-12-06
* @author D. Klein, A. Rybalchenko
*/
#ifndef FAIRMQMERGER_H_
#define FAIRMQMERGER_H_
#ifndef FAIR_MQ_MERGER_H
#define FAIR_MQ_MERGER_H
#include "FairMQDevice.h"
#include "../FairMQPoller.h"
#include "../FairMQLogger.h"
#include <FairMQPoller.h>
#include <fairmq/Device.h>
#include <fairlogger/Logger.h>
#include <string>
#include <vector>
class FairMQMerger : public FairMQDevice
namespace fair::mq
{
class Merger : public Device
{
public:
FairMQMerger()
Merger()
: fMultipart(true)
, fInChannelName("data-in")
, fOutChannelName("data-out")
{}
~FairMQMerger() {}
protected:
bool fMultipart;
@@ -112,4 +108,6 @@ class FairMQMerger : public FairMQDevice
}
};
#endif /* FAIRMQMERGER_H_ */
} // namespace fair::mq
#endif /* FAIR_MQ_MERGER_H */

View File

@@ -1,29 +1,31 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIRMQMULTIPLIER_H_
#define FAIRMQMULTIPLIER_H_
#ifndef FAIR_MQ_MULTIPLIER_H
#define FAIR_MQ_MULTIPLIER_H
#include "FairMQDevice.h"
#include <fairmq/Device.h>
#include <string>
#include <vector>
class FairMQMultiplier : public FairMQDevice
namespace fair::mq
{
class Multiplier : public Device
{
public:
FairMQMultiplier()
Multiplier()
: fMultipart(true)
, fNumOutputs(0)
, fInChannelName()
, fOutChannelNames()
{}
~FairMQMultiplier() {}
protected:
bool fMultipart;
@@ -39,9 +41,9 @@ class FairMQMultiplier : public FairMQDevice
fNumOutputs = fChannels.at(fOutChannelNames.at(0)).size();
if (fMultipart) {
OnData(fInChannelName, &FairMQMultiplier::HandleMultipartData);
OnData(fInChannelName, &Multiplier::HandleMultipartData);
} else {
OnData(fInChannelName, &FairMQMultiplier::HandleSingleData);
OnData(fInChannelName, &Multiplier::HandleSingleData);
}
}
@@ -107,4 +109,6 @@ class FairMQMultiplier : public FairMQDevice
}
};
#endif /* FAIRMQMULTIPLIER_H_ */
} // namespace fair::mq
#endif /* FAIR_MQ_MULTIPLIER_H */

View File

@@ -1,33 +1,29 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQProxy.h
*
* @since 2013-10-02
* @author A. Rybalchenko
*/
#ifndef FAIRMQPROXY_H_
#define FAIRMQPROXY_H_
#ifndef FAIR_MQ_PROXY_H
#define FAIR_MQ_PROXY_H
#include "FairMQDevice.h"
#include <fairmq/Device.h>
#include <string>
class FairMQProxy : public FairMQDevice
namespace fair::mq
{
class Proxy : public Device
{
public:
FairMQProxy()
Proxy()
: fMultipart(true)
, fInChannelName()
, fOutChannelName()
{}
~FairMQProxy() {}
protected:
bool fMultipart;
@@ -73,4 +69,6 @@ class FairMQProxy : public FairMQDevice
}
};
#endif /* FAIRMQPROXY_H_ */
} // namespace fair::mq
#endif /* FAIR_MQ_PROXY_H */

View File

@@ -2,9 +2,9 @@
With FairMQ several generic devices are provided:
- **FairMQBenchmarkSampler**: generates random data of configurable size and at configurable rate and sends it out on an output channel.
- **FairMQSink**: receives messages on the input channel and simply discards them.
- **FairMQMerger**: receives data from multiple input channels and forwards it to a single output channel.
- **FairMQSplitter**: receives messages on a single input channels and round-robins them among multiple output channels (which can have different socket types).
- **FairMQMultiplier**: receives data from a single input channel and multiplies (copies) it to two or more output channels.
- **FairMQProxy**: connects input channel to output channel, where both can have different socket types and multiple peers.
- **BenchmarkSampler**: generates random data of configurable size and at configurable rate and sends it out on an output channel.
- **Sink**: receives messages on the input channel and simply discards them.
- **Merger**: receives data from multiple input channels and forwards it to a single output channel.
- **Splitter**: receives messages on a single input channels and round-robins them among multiple output channels (which can have different socket types).
- **Multiplier**: receives data from a single input channel and multiplies (copies) it to two or more output channels.
- **Proxy**: connects input channel to output channel, where both can have different socket types and multiple peers.

View File

@@ -1,33 +1,31 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQSink.h
*
* @since 2013-01-09
* @author D. Klein, A. Rybalchenko
*/
#ifndef FAIRMQSINK_H_
#define FAIRMQSINK_H_
#ifndef FAIR_MQ_SINK_H
#define FAIR_MQ_SINK_H
#include "../FairMQDevice.h"
#include "../FairMQLogger.h"
#include <FairMQPoller.h>
#include <fairmq/Device.h>
#include <fairmq/tools/Strings.h>
#include <chrono>
#include <string>
#include <fairlogger/Logger.h>
#include <fstream>
#include <string>
#include <stdexcept>
class FairMQSink : public FairMQDevice
namespace fair::mq
{
class Sink : public Device
{
public:
FairMQSink()
Sink()
: fMultipart(false)
, fMaxIterations(0)
, fNumIterations(0)
@@ -37,8 +35,6 @@ class FairMQSink : public FairMQDevice
, fOutFilename()
{}
~FairMQSink() {}
protected:
bool fMultipart;
uint64_t fMaxIterations;
@@ -145,4 +141,6 @@ class FairMQSink : public FairMQDevice
}
};
#endif /* FAIRMQSINK_H_ */
} // namespace fair::mq
#endif /* FAIR_MQ_SINK_H */

View File

@@ -1,35 +1,31 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQSplitter.h
*
* @since 2012-12-06
* @author D. Klein, A. Rybalchenko
*/
#ifndef FAIRMQSPLITTER_H_
#define FAIRMQSPLITTER_H_
#ifndef FAIR_MQ_SPLITTER_H
#define FAIR_MQ_SPLITTER_H
#include "FairMQDevice.h"
#include <fairmq/Device.h>
#include <string>
class FairMQSplitter : public FairMQDevice
namespace fair::mq
{
class Splitter : public Device
{
public:
FairMQSplitter()
Splitter()
: fMultipart(true)
, fNumOutputs(0)
, fDirection(0)
, fInChannelName()
, fOutChannelName()
{}
~FairMQSplitter() {}
protected:
bool fMultipart;
@@ -47,9 +43,9 @@ class FairMQSplitter : public FairMQDevice
fDirection = 0;
if (fMultipart) {
OnData(fInChannelName, &FairMQSplitter::HandleData<FairMQParts>);
OnData(fInChannelName, &Splitter::HandleData<FairMQParts>);
} else {
OnData(fInChannelName, &FairMQSplitter::HandleData<FairMQMessagePtr>);
OnData(fInChannelName, &Splitter::HandleData<FairMQMessagePtr>);
}
}
@@ -66,4 +62,6 @@ class FairMQSplitter : public FairMQDevice
}
};
#endif /* FAIRMQSPLITTER_H_ */
} // namespace fair::mq
#endif /* FAIR_MQ_SPLITTER_H */

View File

@@ -1,13 +1,13 @@
/********************************************************************************
* Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <runFairMQDevice.h>
#include <devices/FairMQBenchmarkSampler.h>
#include <fairmq/devices/BenchmarkSampler.h>
#include <fairmq/runDevice.h>
namespace bpo = boost::program_options;
@@ -24,7 +24,7 @@ void addCustomOptions(bpo::options_description& options)
("msg-rate", bpo::value<float>()->default_value(0), "Msg rate limit in maximum number of messages per second");
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /* config */)
std::unique_ptr<fair::mq::Device> getDevice(const fair::mq::ProgOptions& /* config */)
{
return new FairMQBenchmarkSampler();
return std::make_unique<fair::mq::BenchmarkSampler>();
}

View File

@@ -1,13 +1,13 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <runFairMQDevice.h>
#include <devices/FairMQMerger.h>
#include <fairmq/devices/Merger.h>
#include <fairmq/runDevice.h>
namespace bpo = boost::program_options;
@@ -19,7 +19,7 @@ void addCustomOptions(bpo::options_description& options)
("multipart", bpo::value<bool>()->default_value(true), "Handle multipart payloads");
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
std::unique_ptr<fair::mq::Device> getDevice(const fair::mq::ProgOptions& /*config*/)
{
return new FairMQMerger();
return std::make_unique<fair::mq::Merger>();
}

View File

@@ -1,13 +1,13 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <runFairMQDevice.h>
#include <devices/FairMQMultiplier.h>
#include <fairmq/devices/Multiplier.h>
#include <fairmq/runDevice.h>
namespace bpo = boost::program_options;
@@ -19,7 +19,7 @@ void addCustomOptions(bpo::options_description& options)
("multipart", bpo::value<bool>()->default_value(true), "Handle multipart payloads");
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
std::unique_ptr<fair::mq::Device> getDevice(const fair::mq::ProgOptions& /*config*/)
{
return new FairMQMultiplier();
return std::make_unique<fair::mq::Multiplier>();
}

View File

@@ -1,13 +1,13 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <runFairMQDevice.h>
#include <devices/FairMQProxy.h>
#include <fairmq/devices/Proxy.h>
#include <fairmq/runDevice.h>
namespace bpo = boost::program_options;
@@ -19,7 +19,7 @@ void addCustomOptions(bpo::options_description& options)
("multipart", bpo::value<bool>()->default_value(true), "Handle multipart payloads");
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
std::unique_ptr<fair::mq::Device> getDevice(const fair::mq::ProgOptions& /*config*/)
{
return new FairMQProxy();
return std::make_unique<fair::mq::Proxy>();
}

View File

@@ -1,13 +1,13 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <runFairMQDevice.h>
#include <devices/FairMQSink.h>
#include <fairmq/devices/Sink.h>
#include <fairmq/runDevice.h>
namespace bpo = boost::program_options;
@@ -21,7 +21,7 @@ void addCustomOptions(bpo::options_description& options)
("multipart", bpo::value<bool>()->default_value(false), "Handle multipart payloads");
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
std::unique_ptr<fair::mq::Device> getDevice(const fair::mq::ProgOptions& /*config*/)
{
return new FairMQSink();
return std::make_unique<fair::mq::Sink>();
}

View File

@@ -1,13 +1,13 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <runFairMQDevice.h>
#include <devices/FairMQSplitter.h>
#include <fairmq/devices/Splitter.h>
#include <fairmq/runDevice.h>
namespace bpo = boost::program_options;
@@ -19,7 +19,7 @@ void addCustomOptions(bpo::options_description& options)
("multipart", bpo::value<bool>()->default_value(true), "Handle multipart payloads");
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
std::unique_ptr<fair::mq::Device> getDevice(const fair::mq::ProgOptions& /*config*/)
{
return new FairMQSplitter();
return std::make_unique<fair::mq::Splitter>();
}

View File

@@ -1,5 +1,5 @@
/********************************************************************************
* Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2017-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
@@ -9,4 +9,4 @@
// List of all builtin plugin headers (the ones which call REGISTER_FAIRMQ_PLUGIN macro)
#include <fairmq/plugins/config/Config.h>
#include <fairmq/plugins/Control.h>
#include <fairmq/plugins/control/Control.h>

59
fairmq/runDevice.h Normal file
View File

@@ -0,0 +1,59 @@
/********************************************************************************
* Copyright (C) 2017-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <fairmq/DeviceRunner.h>
#include <boost/program_options.hpp>
#include <memory>
// to be implemented by the user to return a child class of FairMQDevice
std::unique_ptr<fair::mq::Device> getDevice(const fair::mq::ProgOptions& config);
// to be implemented by the user to add custom command line options (or just with empty body)
void addCustomOptions(boost::program_options::options_description&);
int main(int argc, char* argv[])
{
using namespace fair::mq;
using namespace fair::mq::hooks;
try {
DeviceRunner runner(argc, argv);
// runner.AddHook<LoadPlugins>([](DeviceRunner& r){
// // for example:
// r.fPluginManager->SetSearchPaths({"/lib", "/lib/plugins"});
// r.fPluginManager->LoadPlugin("asdf");
// });
runner.AddHook<SetCustomCmdLineOptions>([](DeviceRunner& r){
boost::program_options::options_description customOptions("Custom options");
addCustomOptions(customOptions);
r.fConfig.AddToCmdLineOptions(customOptions);
});
// runner.AddHook<ModifyRawCmdLineArgs>([](DeviceRunner& r){
// // for example:
// r.fRawCmdLineArgs.push_back("--blubb");
// });
runner.AddHook<InstantiateDevice>([](DeviceRunner& r){
r.fDevice = getDevice(r.fConfig);
});
return runner.Run();
// Run with builtin catch all exception handler, just:
// return runner.RunWithExceptionHandlers();
} catch (std::exception& e) {
LOG(error) << "Uncaught exception reached the top of main: " << e.what();
return 1;
} catch (...) {
LOG(error) << "Uncaught exception reached the top of main.";
return 1;
}
}

View File

@@ -91,6 +91,17 @@ struct SegmentInfo
AllocationAlgorithm fAllocationAlgorithm;
};
struct SessionInfo
{
SessionInfo(const char* sessionName, int creatorId, const VoidAlloc& alloc)
: fSessionName(sessionName, alloc)
, fCreatorId(creatorId)
{}
Str fSessionName;
int fCreatorId;
};
using Uint16SegmentInfoPairAlloc = boost::interprocess::allocator<std::pair<const uint16_t, SegmentInfo>, SegmentManager>;
using Uint16SegmentInfoHashMap = boost::unordered_map<uint16_t, SegmentInfo, boost::hash<uint16_t>, std::equal_to<uint16_t>, Uint16SegmentInfoPairAlloc>;
// using Uint16SegmentInfoMap = boost::interprocess::map<uint16_t, SegmentInfo, std::less<uint16_t>, Uint16SegmentInfoPairAlloc>;
@@ -195,9 +206,9 @@ struct RegionBlock
// find id for unique shmem name:
// a hash of user id + session id, truncated to 8 characters (to accommodate for name size limit on some systems (MacOS)).
inline std::string makeShmIdStr(const std::string& sessionId)
inline std::string makeShmIdStr(const std::string& sessionId, const std::string& userId)
{
std::string seed((std::to_string(geteuid()) + sessionId));
std::string seed(userId + sessionId);
// generate a 8-digit hex value out of sha256 hash
std::vector<unsigned char> hash(4);
picosha2::hash256(seed.begin(), seed.end(), hash.begin(), hash.end());
@@ -205,6 +216,11 @@ inline std::string makeShmIdStr(const std::string& sessionId)
return picosha2::bytes_to_hex_string(hash.begin(), hash.end());
}
inline std::string makeShmIdStr(const std::string& sessionId)
{
return makeShmIdStr(sessionId, std::to_string(geteuid()));
}
inline uint64_t makeShmIdUint64(const std::string& sessionId)
{
std::string shmId = makeShmIdStr(sessionId);

View File

@@ -46,6 +46,9 @@
#include <utility> // pair
#include <vector>
#include <unistd.h> // getuid
#include <sys/types.h> // getuid
#include <sys/mman.h> // mlock
namespace fair::mq::shmem
@@ -54,8 +57,8 @@ namespace fair::mq::shmem
class Manager
{
public:
Manager(std::string shmId, std::string deviceId, size_t size, const ProgOptions* config)
: fShmId(std::move(shmId))
Manager(const std::string& sessionName, std::string deviceId, size_t size, const ProgOptions* config)
: fShmId(makeShmIdStr(sessionName))
, fSegmentId(config ? config->GetProperty<uint16_t>("shm-segment-id", 0) : 0)
, fDeviceId(std::move(deviceId))
, fSegments()
@@ -82,6 +85,8 @@ class Manager
{
using namespace boost::interprocess;
LOG(debug) << "Generated shmid '" << fShmId << "' out of session id '" << sessionName << "'.";
bool mlockSegment = false;
bool zeroSegment = false;
bool autolaunchMonitor = false;
@@ -96,17 +101,28 @@ class Manager
}
if (autolaunchMonitor) {
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
StartMonitor(fShmId);
}
fHeartbeatThread = std::thread(&Manager::SendHeartbeats, this);
{
std::stringstream ss;
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
fShmSegments = fManagementSegment.find_or_construct<Uint16SegmentInfoHashMap>(unique_instance)(fShmVoidAlloc);
fEventCounter = fManagementSegment.find<EventCounter>(unique_instance).first;
SessionInfo* sessionInfo = fManagementSegment.find<SessionInfo>(unique_instance).first;
if (sessionInfo) {
LOG(debug) << "session info found, name: " << sessionInfo->fSessionName << ", creator id: " << sessionInfo->fCreatorId;
} else {
LOG(debug) << "no session info found, creating and initializing";
sessionInfo = fManagementSegment.construct<SessionInfo>(unique_instance)(sessionName.c_str(), geteuid(), fShmVoidAlloc);
LOG(debug) << "initialized session info, name: " << sessionInfo->fSessionName << ", creator id: " << sessionInfo->fCreatorId;
}
fEventCounter = fManagementSegment.find<EventCounter>(unique_instance).first;
if (fEventCounter) {
LOG(debug) << "event counter found: " << fEventCounter->fCount;
} else {
@@ -146,8 +162,8 @@ class Manager
ss << "Opened ";
}
ss << "shared memory segment '" << "fmq_" << fShmId << "_m_" << fSegmentId << "'."
<< " Size: " << boost::apply_visitor(SegmentSize{}, fSegments.at(fSegmentId)) << " bytes."
<< " Available: " << boost::apply_visitor(SegmentFreeMemory{}, fSegments.at(fSegmentId)) << " bytes."
<< " Size: " << boost::apply_visitor(SegmentSize(), fSegments.at(fSegmentId)) << " bytes."
<< " Available: " << boost::apply_visitor(SegmentFreeMemory(), fSegments.at(fSegmentId)) << " bytes."
<< " Allocation algorithm: " << allocationAlgorithm;
LOG(debug) << ss.str();
} catch(interprocess_exception& bie) {
@@ -157,21 +173,20 @@ class Manager
if (mlockSegment) {
LOG(debug) << "Locking the managed segment memory pages...";
if (mlock(boost::apply_visitor(SegmentAddress{}, fSegments.at(fSegmentId)), boost::apply_visitor(SegmentSize{}, fSegments.at(fSegmentId))) == -1) {
if (mlock(boost::apply_visitor(SegmentAddress(), fSegments.at(fSegmentId)), boost::apply_visitor(SegmentSize(), fSegments.at(fSegmentId))) == -1) {
LOG(error) << "Could not lock the managed segment memory. Code: " << errno << ", reason: " << strerror(errno);
}
LOG(debug) << "Successfully locked the managed segment memory pages.";
}
if (zeroSegment) {
LOG(debug) << "Zeroing the managed segment free memory...";
boost::apply_visitor(SegmentMemoryZeroer{}, fSegments.at(fSegmentId));
boost::apply_visitor(SegmentMemoryZeroer(), fSegments.at(fSegmentId));
LOG(debug) << "Successfully zeroed the managed segment free memory.";
}
fShmRegions = fManagementSegment.find_or_construct<Uint16RegionInfoHashMap>(unique_instance)(fShmVoidAlloc);
fDeviceCounter = fManagementSegment.find<DeviceCounter>(unique_instance).first;
if (fDeviceCounter) {
LOG(debug) << "device counter found, with value of " << fDeviceCounter->fCount << ". incrementing.";
(fDeviceCounter->fCount)++;
@@ -187,8 +202,6 @@ class Manager
fShmMsgCounters = fManagementSegment.find_or_construct<Uint16MsgCounterHashMap>(unique_instance)(fShmVoidAlloc);
#endif
}
fHeartbeatThread = std::thread(&Manager::SendHeartbeats, this);
}
Manager() = delete;
@@ -214,8 +227,15 @@ class Manager
boost::filesystem::path p = boost::process::search_path("fairmq-shmmonitor", ownPath);
bool verbose = false;
if (const char* verboseEnv = getenv("FAIRMQ_SHMMONITOR_VERBOSE")) {
if (std::string(verboseEnv) == "true") {
verbose = true;
}
}
if (!p.empty()) {
boost::process::spawn(p, "-x", "--shmid", id, "-d", "-t", "2000", env);
boost::process::spawn(p, "-x", "-m", "--shmid", id, "-d", "-t", "2000", verbose ? "--verbose" : "", env);
int numTries = 0;
do {
try {
@@ -384,8 +404,8 @@ class Manager
info.managed = true;
info.id = e.first;
info.event = RegionEvent::created;
info.ptr = boost::apply_visitor(SegmentAddress{}, fSegments.at(e.first));
info.size = boost::apply_visitor(SegmentSize{}, fSegments.at(e.first));
info.ptr = boost::apply_visitor(SegmentAddress(), fSegments.at(e.first));
info.size = boost::apply_visitor(SegmentSize(), fSegments.at(e.first));
result.push_back(info);
} catch (const std::out_of_range& oor) {
LOG(error) << "could not find segment with id " << e.first;
@@ -512,11 +532,11 @@ class Manager
boost::interprocess::managed_shared_memory::handle_t GetHandleFromAddress(const void* ptr, uint16_t segmentId) const
{
return boost::apply_visitor(SegmentHandleFromAddress{ptr}, fSegments.at(segmentId));
return boost::apply_visitor(SegmentHandleFromAddress(ptr), fSegments.at(segmentId));
}
void* GetAddressFromHandle(const boost::interprocess::managed_shared_memory::handle_t handle, uint16_t segmentId) const
{
return boost::apply_visitor(SegmentAddressFromHandle{handle}, fSegments.at(segmentId));
return boost::apply_visitor(SegmentAddressFromHandle(handle), fSegments.at(segmentId));
}
char* Allocate(const size_t size, size_t alignment = 0)
@@ -529,7 +549,7 @@ class Manager
// boost::interprocess::managed_shared_memory::size_type actualSize = size;
// char* hint = 0; // unused for boost::interprocess::allocate_new
// ptr = fSegments.at(fSegmentId).allocation_command<char>(boost::interprocess::allocate_new, size, actualSize, hint);
size_t segmentSize = boost::apply_visitor(SegmentSize{}, fSegments.at(fSegmentId));
size_t segmentSize = boost::apply_visitor(SegmentSize(), fSegments.at(fSegmentId));
if (size > segmentSize) {
throw MessageBadAlloc(tools::ToString("Requested message size (", size, ") exceeds segment size (", segmentSize, ")"));
}
@@ -541,7 +561,7 @@ class Manager
} catch (boost::interprocess::bad_alloc& ba) {
// LOG(warn) << "Shared memory full...";
if (ThrowingOnBadAlloc()) {
throw MessageBadAlloc(tools::ToString("shmem: could not create a message of size ", size, ", alignment: ", (alignment != 0) ? std::to_string(alignment) : "default", ", free memory: ", boost::apply_visitor(SegmentFreeMemory{}, fSegments.at(fSegmentId))));
throw MessageBadAlloc(tools::ToString("shmem: could not create a message of size ", size, ", alignment: ", (alignment != 0) ? std::to_string(alignment) : "default", ", free memory: ", boost::apply_visitor(SegmentFreeMemory(), fSegments.at(fSegmentId))));
}
// rateLimiter.maybe_sleep();
std::this_thread::sleep_for(std::chrono::milliseconds(50));
@@ -569,7 +589,7 @@ class Manager
void Deallocate(boost::interprocess::managed_shared_memory::handle_t handle, uint16_t segmentId)
{
boost::apply_visitor(SegmentDeallocate{GetAddressFromHandle(handle, segmentId)}, fSegments.at(segmentId));
boost::apply_visitor(SegmentDeallocate(GetAddressFromHandle(handle, segmentId)), fSegments.at(segmentId));
#ifdef FAIRMQ_DEBUG_MODE
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
DecrementShmMsgCounter(segmentId);
@@ -583,7 +603,7 @@ class Manager
char* ShrinkInPlace(size_t newSize, char* localPtr, uint16_t segmentId)
{
return boost::apply_visitor(SegmentBufferShrink{newSize, localPtr}, fSegments.at(segmentId));
return boost::apply_visitor(SegmentBufferShrink(newSize, localPtr), fSegments.at(segmentId));
}
uint16_t GetSegmentId() const { return fSegmentId; }

View File

@@ -10,7 +10,6 @@
#include "Common.h"
#include <fairmq/tools/Strings.h>
#include <fairlogger/Logger.h>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/file_mapping.hpp>
@@ -32,6 +31,14 @@
#include <termios.h>
#include <poll.h>
#if FAIRMQ_HAS_STD_FILESYSTEM
#include <filesystem>
namespace fs = std::filesystem;
#else
#include <boost/filesystem.hpp>
namespace fs = ::boost::filesystem;
#endif
using namespace std;
using bie = ::boost::interprocess::interprocess_exception;
namespace bipc = ::boost::interprocess;
@@ -71,18 +78,16 @@ void signalHandler(int signal)
gSignalStatus = signal;
}
Monitor::Monitor(const string& shmId, bool selfDestruct, bool interactive, bool viewOnly, unsigned int timeoutInMS, unsigned int intervalInMS, bool runAsDaemon, bool cleanOnExit)
Monitor::Monitor(const string& shmId, bool selfDestruct, bool interactive, bool viewOnly, unsigned int timeoutInMS, unsigned int intervalInMS, bool monitor, bool cleanOnExit)
: fSelfDestruct(selfDestruct)
, fInteractive(interactive)
, fViewOnly(viewOnly)
, fIsDaemon(runAsDaemon)
, fMonitor(monitor)
, fSeenOnce(false)
, fCleanOnExit(cleanOnExit)
, fTimeoutInMS(timeoutInMS)
, fIntervalInMS(intervalInMS)
, fShmId(shmId)
, fSegmentName("fmq_" + fShmId + "_m_0")
, fManagementSegmentName("fmq_" + fShmId + "_mng")
, fControlQueueName("fmq_" + fShmId + "_cq")
, fTerminating(false)
, fHeartbeatTriggered(false)
@@ -94,14 +99,14 @@ Monitor::Monitor(const string& shmId, bool selfDestruct, bool interactive, bool
try {
bipc::named_mutex monitorStatus(bipc::create_only, string("fmq_" + fShmId + "_ms").c_str());
} catch (bie&) {
cout << "fairmq-shmmonitor for shared memory id " << fShmId << " already started or not properly exited. Try `fairmq-shmmonitor --cleanup --shmid " << fShmId << "`" << endl;
throw DaemonPresent(tools::ToString("fairmq-shmmonitor for shared memory id ", fShmId, " already started or not properly exited."));
if (fInteractive) {
LOG(error) << "fairmq-shmmonitor for shm id " << fShmId << " is already running. Try `fairmq-shmmonitor --cleanup --shmid " << fShmId << "`, or run in view-only mode (-v)";
} else {
LOG(error) << "fairmq-shmmonitor for shm id " << fShmId << " is already running. Try `fairmq-shmmonitor --cleanup --shmid " << fShmId << "`";
}
throw DaemonPresent(tools::ToString("fairmq-shmmonitor (in monitoring mode) for shm id ", fShmId, " is already running."));
}
}
Logger::SetConsoleColor(false);
Logger::DefineVerbosity(Verbosity::user1, VerbositySpec::Make(VerbositySpec::Info::timestamp_us));
Logger::SetVerbosity(Verbosity::verylow);
}
void Monitor::CatchSignals()
@@ -116,7 +121,7 @@ void Monitor::SignalMonitor()
while (true) {
if (gSignalStatus != 0) {
fTerminating = true;
cout << "signal: " << gSignalStatus << endl;
LOG(info) << "signal: " << gSignalStatus;
break;
} else if (fTerminating) {
break;
@@ -131,18 +136,13 @@ void Monitor::Run()
thread heartbeatThread;
if (!fViewOnly) {
RemoveQueue(fControlQueueName);
heartbeatThread = thread(&Monitor::MonitorHeartbeats, this);
heartbeatThread = thread(&Monitor::ReceiveHeartbeats, this);
}
if (fInteractive) {
Interactive();
} else if (fViewOnly) {
CheckSegment();
} else {
while (!fTerminating) {
this_thread::sleep_for(chrono::milliseconds(fIntervalInMS));
CheckSegment();
}
Watch();
}
if (!fViewOnly) {
@@ -150,7 +150,165 @@ void Monitor::Run()
}
}
void Monitor::MonitorHeartbeats()
void Monitor::Watch()
{
while (!fTerminating) {
using namespace boost::interprocess;
try {
managed_shared_memory managementSegment(open_read_only, std::string("fmq_" + fShmId + "_mng").c_str());
fSeenOnce = true;
auto now = chrono::high_resolution_clock::now();
unsigned int duration = chrono::duration_cast<chrono::milliseconds>(now - fLastHeartbeat).count();
if (fHeartbeatTriggered && duration > fTimeoutInMS) {
// memory is present, but no heartbeats since timeout duration
LOG(info) << "no heartbeats since over " << fTimeoutInMS << " milliseconds, cleaning...";
Cleanup(ShmId{fShmId});
fHeartbeatTriggered = false;
if (fSelfDestruct) {
LOG(info) << "self destructing (segment has been observed and cleaned up by the monitor)";
fTerminating = true;
}
}
} catch (bie&) {
fHeartbeatTriggered = false;
if (fSelfDestruct) {
if (fSeenOnce) {
// segment has been observed at least once, can self-destruct
LOG(info) << "self destructing (segment has been observed and cleaned up orderly)";
fTerminating = true;
} else {
// if self-destruct is requested, and no segment has ever been observed, quit after double timeout duration
auto now = chrono::high_resolution_clock::now();
unsigned int duration = chrono::duration_cast<chrono::milliseconds>(now - fLastHeartbeat).count();
if (duration > fTimeoutInMS * 2) {
Cleanup(ShmId{fShmId});
LOG(info) << "self destructing (no segments observed within (timeout * 2) since start)";
fTerminating = true;
}
}
}
}
this_thread::sleep_for(chrono::milliseconds(100));
}
}
bool Monitor::PrintShm(const ShmId& shmId)
{
using namespace boost::interprocess;
try {
managed_shared_memory managementSegment(open_read_only, std::string("fmq_" + shmId.shmId + "_mng").c_str());
VoidAlloc allocInstance(managementSegment.get_segment_manager());
Uint16SegmentInfoHashMap* segmentInfos = managementSegment.find<Uint16SegmentInfoHashMap>(unique_instance).first;
std::unordered_map<uint16_t, boost::variant<RBTreeBestFitSegment, SimpleSeqFitSegment>> segments;
if (!segmentInfos) {
LOG(error) << "Found management segment, but cannot locate segment info, something went wrong...";
return false;
}
for (const auto& s : *segmentInfos) {
if (s.second.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) {
segments.emplace(s.first, RBTreeBestFitSegment(open_read_only, std::string("fmq_" + shmId.shmId + "_m_" + to_string(s.first)).c_str()));
} else {
segments.emplace(s.first, SimpleSeqFitSegment(open_read_only, std::string("fmq_" + shmId.shmId + "_m_" + to_string(s.first)).c_str()));
}
}
unsigned int numDevices = 0;
int creatorId = -1;
std::string sessionName;
DeviceCounter* deviceCounter = managementSegment.find<DeviceCounter>(unique_instance).first;
if (deviceCounter) {
numDevices = deviceCounter->fCount;
}
SessionInfo* sessionInfo = managementSegment.find<SessionInfo>(unique_instance).first;
if (sessionInfo) {
creatorId = sessionInfo->fCreatorId;
sessionName = sessionInfo->fSessionName;
}
#ifdef FAIRMQ_DEBUG_MODE
Uint16MsgCounterHashMap* msgCounters = managementSegment.find<Uint16MsgCounterHashMap>(unique_instance).first;
#endif
stringstream ss;
size_t mfree = managementSegment.get_free_memory();
size_t mtotal = managementSegment.get_size();
size_t mused = mtotal - mfree;
ss << "shm id: " << shmId.shmId
<< ", session: " << sessionName
<< ", creator id: " << creatorId
<< ", devices: " << numDevices
<< ", segments:\n";
for (const auto& s : segments) {
size_t free = boost::apply_visitor(SegmentFreeMemory(), s.second);
size_t total = boost::apply_visitor(SegmentSize(), s.second);
size_t used = total - free;
ss << " [" << s.first
<< "]: total: " << total
#ifdef FAIRMQ_DEBUG_MODE
<< ", msgs: " << ( (msgCounters != nullptr) ? to_string((*msgCounters)[s.first].fCount) : "unknown")
#else
<< ", msgs: NODEBUG"
#endif
<< ", free: " << free
<< ", used: " << used
<< "\n";
}
ss << " [m]: "
<< "total: " << mtotal
<< ", free: " << mfree
<< ", used: " << mused;
LOGV(info, user1) << ss.str();
} catch (bie&) {
return false;
}
return true;
}
void Monitor::ListAll(const std::string& path)
{
try {
if (fs::is_empty(path)) {
LOG(info) << "directory " << fs::path(path) << " is empty.";
return;
}
for (const auto& entry : fs::directory_iterator(path)) {
string filename = entry.path().filename().string();
// LOG(info) << filename << ", size: " << entry.file_size() << " bytes";
if (tools::StrStartsWith(filename, "fmq_") || tools::StrStartsWith(filename, "sem.fmq_")) {
// LOG(info) << "The file '" << filename << "' belongs to FairMQ.";
if (tools::StrEndsWith(filename, "_mng")) {
string shmId = filename.substr(4, 8);
LOG(info) << "\nFound shmid '" << shmId << "':\n";
if (!PrintShm(ShmId{shmId})) {
LOG(info) << "could not open file for shmid '" << shmId << "'";
}
}
} else {
LOG(info) << "The file '" << filename << "' does not belong to FairMQ, skipping...";
}
}
} catch (fs::filesystem_error& fse) {
LOG(error) << "error: " << fse.what();
}
}
void Monitor::ReceiveHeartbeats()
{
try {
bipc::message_queue mq(bipc::open_or_create, fControlQueueName.c_str(), 1000, 256);
@@ -167,11 +325,11 @@ void Monitor::MonitorHeartbeats()
string deviceId(msg, recvdSize);
fDeviceHeartbeats[deviceId] = fLastHeartbeat;
} else {
// cout << "control queue timeout" << endl;
// LOG(info) << "control queue timeout";
}
}
} catch (bie& ie) {
cout << ie.what() << endl;
LOG(info) << ie.what();
}
RemoveQueue(fControlQueueName);
@@ -186,7 +344,7 @@ void Monitor::Interactive()
TerminalConfig tcfg;
cout << endl;
LOG(info) << "\n";
PrintHelp();
while (!fTerminating) {
@@ -199,30 +357,30 @@ void Monitor::Interactive()
switch (c) {
case 'q':
cout << "\n[q] --> quitting." << endl;
LOG(info) << "\n[q] --> quitting.";
fTerminating = true;
break;
case 'x':
cout << "\n[x] --> closing shared memory:" << endl;
LOG(info) << "\n[x] --> closing shared memory:";
if (!fViewOnly) {
Cleanup(ShmId{fShmId});
} else {
cout << "cannot close because in view only mode" << endl;
LOG(info) << "cannot close because in view only mode";
}
break;
case 'h':
cout << "\n[h] --> help:" << endl << endl;
LOG(info) << "\n[h] --> help:\n";
PrintHelp();
cout << endl;
LOG(info);
break;
case '\n':
cout << "\n[\\n] --> invalid input." << endl;
LOG(info) << "\n[\\n] --> invalid input.";
break;
case 'b':
PrintDebugInfo(ShmId{fShmId});
break;
default:
cout << "\n[" << c << "] --> invalid input." << endl;
LOG(info) << "\n[" << c << "] --> invalid input.";
break;
}
@@ -235,118 +393,7 @@ void Monitor::Interactive()
break;
}
CheckSegment();
if (!fTerminating) {
cout << "\r";
}
}
}
void Monitor::CheckSegment()
{
using namespace boost::interprocess;
try {
managed_shared_memory managementSegment(open_only, fManagementSegmentName.c_str());
VoidAlloc allocInstance(managementSegment.get_segment_manager());
Uint16SegmentInfoHashMap* segmentInfos = managementSegment.find<Uint16SegmentInfoHashMap>(unique_instance).first;
std::unordered_map<uint16_t, boost::variant<RBTreeBestFitSegment, SimpleSeqFitSegment>> segments;
if (!segmentInfos) {
cout << "Found management segment, but cannot locate segment info, something went wrong..." << endl;
return;
}
for (const auto& s : *segmentInfos) {
if (s.second.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) {
segments.emplace(s.first, RBTreeBestFitSegment(open_only, std::string("fmq_" + fShmId + "_m_" + to_string(s.first)).c_str()));
} else {
segments.emplace(s.first, SimpleSeqFitSegment(open_only, std::string("fmq_" + fShmId + "_m_" + to_string(s.first)).c_str()));
}
}
fSeenOnce = true;
unsigned int numDevices = 0;
#ifdef FAIRMQ_DEBUG_MODE
Uint16MsgCounterHashMap* msgCounters = nullptr;
#endif
if (fInteractive || fViewOnly) {
DeviceCounter* dc = managementSegment.find<DeviceCounter>(unique_instance).first;
if (dc) {
numDevices = dc->fCount;
}
#ifdef FAIRMQ_DEBUG_MODE
msgCounters = managementSegment.find_or_construct<Uint16MsgCounterHashMap>(unique_instance)(allocInstance);
#endif
}
auto now = chrono::high_resolution_clock::now();
unsigned int duration = chrono::duration_cast<chrono::milliseconds>(now - fLastHeartbeat).count();
if (fHeartbeatTriggered && duration > fTimeoutInMS) {
cout << "no heartbeats since over " << fTimeoutInMS << " milliseconds, cleaning..." << endl;
Cleanup(ShmId{fShmId});
fHeartbeatTriggered = false;
if (fSelfDestruct) {
cout << "\nself destructing" << endl;
fTerminating = true;
}
}
if (fInteractive || fViewOnly) {
stringstream ss;
size_t mfree = managementSegment.get_free_memory();
size_t mtotal = managementSegment.get_size();
size_t mused = mtotal - mfree;
ss << "shm id: " << fShmId
<< ", devices: " << numDevices << ", segments:\n";
for (const auto& s : segments) {
size_t free = boost::apply_visitor(SegmentFreeMemory{}, s.second);
size_t total = boost::apply_visitor(SegmentSize{}, s.second);
size_t used = total - free;
ss << " [" << s.first
<< "]: total: " << total
#ifdef FAIRMQ_DEBUG_MODE
<< ", msgs: " << (*msgCounters)[s.first].fCount
#else
<< ", msgs: NODEBUG"
#endif
<< ", free: " << free
<< ", used: " << used
<< "\n";
}
ss << " [m]: "
<< "total: " << mtotal
<< ", free: " << mfree
<< ", used: " << mused;
LOGV(info, user1) << ss.str();
}
} catch (bie&) {
fHeartbeatTriggered = false;
auto now = chrono::high_resolution_clock::now();
unsigned int duration = chrono::duration_cast<chrono::milliseconds>(now - fLastHeartbeat).count();
if (fIsDaemon && duration > fTimeoutInMS * 2) {
Cleanup(ShmId{fShmId});
fHeartbeatTriggered = false;
if (fSelfDestruct) {
cout << "\nself destructing" << endl;
fTerminating = true;
}
}
if (fSelfDestruct) {
if (fSeenOnce) {
cout << "self destructing" << endl;
fTerminating = true;
}
}
PrintShm(ShmId{fShmId});
}
}
@@ -366,7 +413,7 @@ void Monitor::PrintDebugInfo(const ShmId& shmId __attribute__((unused)))
for (const auto& e : *debug) {
numMessages += e.second.size();
}
cout << endl << "found " << numMessages << " messages." << endl;
LOG(info) << endl << "found " << numMessages << " messages.";
for (const auto& s : *debug) {
for (const auto& e : s.second) {
@@ -375,19 +422,18 @@ void Monitor::PrintDebugInfo(const ShmId& shmId __attribute__((unused)))
time_t t = chrono::system_clock::to_time_t(tmpt);
uint64_t ms = e.second.fCreationTime % 1000000;
auto tm = localtime(&t);
cout << "segment: " << setw(3) << setfill(' ') << s.first
LOG(info) << "segment: " << setw(3) << setfill(' ') << s.first
<< ", offset: " << setw(12) << setfill(' ') << e.first
<< ", size: " << setw(10) << setfill(' ') << e.second.fSize
<< ", creator PID: " << e.second.fPid << setfill('0')
<< ", at: " << setw(2) << tm->tm_hour << ":" << setw(2) << tm->tm_min << ":" << setw(2) << tm->tm_sec << "." << setw(6) << ms << endl;
<< ", at: " << setw(2) << tm->tm_hour << ":" << setw(2) << tm->tm_min << ":" << setw(2) << tm->tm_sec << "." << setw(6) << ms;
}
}
cout << setfill(' ');
} catch (bie&) {
cout << "no segments found" << endl;
LOG(info) << "no segments found";
}
#else
cout << "FairMQ was not compiled in debug mode (FAIRMQ_DEBUG_MODE)" << endl;
LOG(info) << "FairMQ was not compiled in debug mode (FAIRMQ_DEBUG_MODE)";
#endif
}
@@ -419,10 +465,10 @@ unordered_map<uint16_t, std::vector<BufferDebugInfo>> Monitor::GetDebugInfo(cons
}
}
} catch (bie&) {
cout << "no segments found" << endl;
LOG(info) << "no segments found";
}
#else
cout << "FairMQ was not compiled in debug mode (FAIRMQ_DEBUG_MODE)" << endl;
LOG(info) << "FairMQ was not compiled in debug mode (FAIRMQ_DEBUG_MODE)";
#endif
return result;
@@ -435,10 +481,10 @@ unordered_map<uint16_t, std::vector<BufferDebugInfo>> Monitor::GetDebugInfo(cons
void Monitor::PrintHelp()
{
cout << "controls: [x] close memory, "
<< "[b] print a list of allocated messages (only available when compiled with FAIMQ_DEBUG_MODE=ON), "
<< "[h] help, "
<< "[q] quit." << endl;
LOG(info) << "controls: [x] close memory, "
<< "[b] print a list of allocated messages (only available when compiled with FAIMQ_DEBUG_MODE=ON), "
<< "[h] help, "
<< "[q] quit.";
}
@@ -446,12 +492,12 @@ std::pair<std::string, bool> RunRemoval(std::function<bool(const std::string&)>
{
if (f(name)) {
if (verbose) {
cout << "Successfully removed '" << name << "'." << endl;
LOG(info) << "Successfully removed '" << name << "'.";
}
return {name, true};
} else {
if (verbose) {
cout << "Did not remove '" << name << "'. Already removed?" << endl;
LOG(info) << "Did not remove '" << name << "'. Already removed?";
}
return {name, false};
}
@@ -468,7 +514,7 @@ std::vector<std::pair<std::string, bool>> Monitor::Cleanup(const ShmId& shmId, b
std::vector<std::pair<std::string, bool>> result;
if (verbose) {
cout << "Cleaning up for shared memory id '" << shmId.shmId << "'..." << endl;
LOG(info) << "Cleaning up for shared memory id '" << shmId.shmId << "'...";
}
string managementSegmentName("fmq_" + shmId.shmId + "_mng");
@@ -479,7 +525,7 @@ std::vector<std::pair<std::string, bool>> Monitor::Cleanup(const ShmId& shmId, b
RegionCounter* rc = managementSegment.find<RegionCounter>(bipc::unique_instance).first;
if (rc) {
if (verbose) {
cout << "Region counter found: " << rc->fCount << endl;
LOG(info) << "Region counter found: " << rc->fCount;
}
uint16_t regionCount = rc->fCount;
@@ -491,7 +537,7 @@ std::vector<std::pair<std::string, bool>> Monitor::Cleanup(const ShmId& shmId, b
string path = ri.fPath.c_str();
int flags = ri.fFlags;
if (verbose) {
cout << "Found RegionInfo with path: '" << path << "', flags: " << flags << ", fDestroyed: " << ri.fDestroyed << "." << endl;
LOG(info) << "Found RegionInfo with path: '" << path << "', flags: " << flags << ", fDestroyed: " << ri.fDestroyed << ".";
}
if (path != "") {
result.emplace_back(RunRemoval(Monitor::RemoveFileMapping, path + "fmq_" + shmId.shmId + "_rg_" + to_string(i), verbose));
@@ -506,12 +552,12 @@ std::vector<std::pair<std::string, bool>> Monitor::Cleanup(const ShmId& shmId, b
}
} else {
if (verbose) {
cout << "No region counter found. No regions to cleanup." << endl;
LOG(info) << "No region counter found. No regions to cleanup.";
}
}
} catch(out_of_range& oor) {
if (verbose) {
cout << "Could not locate element in the region map, out of range: " << oor.what() << endl;
LOG(info) << "Could not locate element in the region map, out of range: " << oor.what();
}
}
@@ -524,7 +570,7 @@ std::vector<std::pair<std::string, bool>> Monitor::Cleanup(const ShmId& shmId, b
result.emplace_back(RunRemoval(Monitor::RemoveObject, managementSegmentName.c_str(), verbose));
} catch (bie&) {
if (verbose) {
cout << "Did not find '" << managementSegmentName << "' shared memory segment. No regions to cleanup." << endl;
LOG(info) << "Did not find '" << managementSegmentName << "' shared memory segment. No regions to cleanup.";
}
}
@@ -538,7 +584,7 @@ std::vector<std::pair<std::string, bool>> Monitor::Cleanup(const SessionId& sess
{
ShmId shmId{makeShmIdStr(sessionId.sessionId)};
if (verbose) {
cout << "Cleanup called with session id '" << sessionId.sessionId << "', translating to shared memory id '" << shmId.shmId << "'" << endl;
LOG(info) << "Cleanup called with session id '" << sessionId.sessionId << "', translating to shared memory id '" << shmId.shmId << "'";
}
return Cleanup(shmId, verbose);
}
@@ -555,7 +601,7 @@ std::vector<std::pair<std::string, bool>> Monitor::CleanupFull(const SessionId&
{
ShmId shmId{makeShmIdStr(sessionId.sessionId)};
if (verbose) {
cout << "Cleanup called with session id '" << sessionId.sessionId << "', translating to shared memory id '" << shmId.shmId << "'" << endl;
LOG(info) << "Cleanup called with session id '" << sessionId.sessionId << "', translating to shared memory id '" << shmId.shmId << "'";
}
return CleanupFull(shmId, verbose);
}

View File

@@ -8,6 +8,8 @@
#ifndef FAIR_MQ_SHMEM_MONITOR_H_
#define FAIR_MQ_SHMEM_MONITOR_H_
#include <fairlogger/Logger.h>
#include <thread>
#include <chrono>
#include <atomic>
@@ -82,6 +84,9 @@ class Monitor
static std::unordered_map<uint16_t, std::vector<BufferDebugInfo>> GetDebugInfo(const ShmId& shmId);
static std::unordered_map<uint16_t, std::vector<BufferDebugInfo>> GetDebugInfo(const SessionId& shmId);
static bool PrintShm(const ShmId& shmId);
static void ListAll(const std::string& path);
static bool RemoveObject(const std::string& name);
static bool RemoveFileMapping(const std::string& name);
static bool RemoveQueue(const std::string& name);
@@ -92,7 +97,8 @@ class Monitor
private:
void PrintHelp();
void MonitorHeartbeats();
void Watch();
void ReceiveHeartbeats();
void CheckSegment();
void Interactive();
void SignalMonitor();
@@ -100,14 +106,12 @@ class Monitor
bool fSelfDestruct; // will self-destruct after the memory has been closed
bool fInteractive; // running in interactive mode
bool fViewOnly; // view only mode
bool fIsDaemon;
bool fMonitor;
bool fSeenOnce; // true is segment has been opened successfully at least once
bool fCleanOnExit;
unsigned int fTimeoutInMS;
unsigned int fIntervalInMS;
std::string fShmId;
std::string fSegmentName;
std::string fManagementSegmentName;
std::string fControlQueueName;
std::atomic<bool> fTerminating;
std::atomic<bool> fHeartbeatTriggered;

View File

@@ -15,7 +15,7 @@ FairMQ Shared Memory currently uses the following names to register shared memor
| `fmq_<shmId>_m_<segmentId>` | managed segment(s) (user data) | one of the devices | devices |
| `fmq_<shmId>_mng` | management segment (management data) | one of the devices | devices |
| `fmq_<shmId>_mtx` | mutex | one of the devices | devices |
| `fmq_<shmId>_cv` | condition variable | one of the devices | devices with unmanaged regions |
| `fmq_<shmId>_cv` | condition variable | one of the devices | devices |
| `fmq_<shmId>_rg_<index>` | unmanaged region(s) | one of the devices | devices with unmanaged regions |
| `fmq_<shmId>_rgq_<index>` | unmanaged region queue(s) | one of the devices | devices with unmanaged regions |
| `fmq_<shmId>_ms` | shmmonitor status | shmmonitor | devices, shmmonitor |
@@ -25,20 +25,31 @@ The shmId is generated out of session id and user id.
## Shared memory monitor
The shared memory monitor tool, supplied with the shared memory transport can be used to monitor shared memory use and automatically cleanup shared memory in case of device crashes.
The shared memory monitor tool (`fairmq-shmmonitor`) can be used to monitor and cleanup the created shared memory.
With default arguments the monitor will run indefinitely with no output, and clean up shared memory segment if it is open and no heartbeats from devices arrive within a timeout period. It can be further customized with following parameters:
Most commands act for the specified session, identified either via session id (`--session`,`-s`) or shmid (`--shmid`).
`--session <arg>`: for which session to run the monitor (default is "default"). The actual ressource names will be built out of session id, user id (hashed and truncated).
`--cleanup`: start monitor, perform cleanup of the memory and quit.
`--shmid <arg>`: if provided, this shmem id will be used instead of the one generated from session id. Use this if you know the name of the shared memory ressource, but do not have the used session id.
`--self-destruct`: run until the memory segment is closed (either naturally via cleanup performed by devices or in case of a crash (no heartbeats within timeout)).
`--interactive`: run interactively, with detailed segment details and user input for various shmem operations.
`--timeout <arg>`: specifiy the timeout for the heartbeats from shmem transports in milliseconds (default 5000).
The monitor runs in one of the following modes:
The options can be combined, with the exception of `--cleanup` option, which will invoke the described behaviour independent of other options.
Without the `--self-destruct` option, the monitor will run continuously, moitoring (and cleaning up if needed) consecutive topologies.
| command | action |
| --------------------------- | ---------------------------------------------- |
| no args | Print segment info of the specified session/shm ID and exit. |
| `--view`,`-v` | Print segment info of the specified session/shm ID and exit. |
| `--interactive`,`-i` | Print segment info of the specified session/shm ID at a given interval (`--interval`), with some keyboard controls. Can be combined with `--view` for read-only access (and avoid receiving heartbeats). |
| `--monitor`,`-m` | Monitor the session shm usage by receiving heartbeats from shmem users, cleaning it up if no heartbeats arrived within configured timeout (`--timeout`/`-t`). Only one heartbeat receiver per session is currently possible. If `--self-destruct`/`-x` is added, monitor will exit either when (a) no shm has been observed for interval * 2, (b) a cleanup due to reached timeout has been performed, or (c) shm has been observed, but is now cleaned up. |
| `--cleanup`,`-c` | Cleanup the shm for the specified session and exit. |
| `--debug`,`-b` | Print the list of messages in the current session and exit. Only availabe when FairMQ is compiled with `FAIRMQ_DEBUG_MODE=ON` (high performance impact). |
| `--get-shmid` | Translate given session id and user id (`--user-id`) to a shmem id (uses current user id if none provided) and exit. |
| `--list-all` | Print segment info for all sessions present on the system and exit. |
Possible further implementation would be to run the monitor with `--self-destruct` with each topology.
Additional cmd options:
The Monitor class can also be used independently from the supplied executable (built from `runMonitor.cxx`), allowing integration on any level. For example invoking the monitor could be a functionality that a device offers.
| command | action |
| --------------------------- | ---------------------------------------------- |
| `--cleanup-on-exit` | Perform a cleanup on exit, when running in monitoring or interactive mode. |
| `--daemonize`,`-d` | Can be combined with the monitoring mode to detach the process from the parent. |
| `--verbose`,`-d` | When running as a daemon, store monitor output in `fairmq-shmmonitor_<timestamp>.log` |
For full option details, run with `-h`.
The Monitor class can also be used independently from the supplied executable, allowing integration on any level.

View File

@@ -68,9 +68,6 @@ class TransportFactory final : public fair::mq::TransportFactory
throw SharedMemoryError(tools::ToString("Provided shared memory allocation algorithm '", allocationAlgorithm, "' is not supported. Supported are 'rbtree_best_fit'/'simple_seq_fit'"));
}
std::string shmId = makeShmIdStr(sessionName);
LOG(debug) << "Generated shmid '" << shmId << "' out of session id '" << sessionName << "'.";
try {
if (zmq_ctx_set(fZmqCtx, ZMQ_IO_THREADS, numIoThreads) != 0) {
LOG(error) << "failed configuring context, reason: " << zmq_strerror(errno);
@@ -81,7 +78,7 @@ class TransportFactory final : public fair::mq::TransportFactory
LOG(error) << "failed configuring context, reason: " << zmq_strerror(errno);
}
fManager = std::make_unique<Manager>(shmId, deviceId, segmentSize, config);
fManager = std::make_unique<Manager>(sessionName, deviceId, segmentSize, config);
} catch (boost::interprocess::interprocess_exception& e) {
LOG(error) << "Could not initialize shared memory transport: " << e.what();
throw std::runtime_error(tools::ToString("Could not initialize shared memory transport: ", e.what()));

View File

@@ -69,6 +69,10 @@ static void daemonize()
int main(int argc, char** argv)
{
try {
fair::Logger::SetConsoleColor(false);
fair::Logger::DefineVerbosity(fair::Verbosity::user1, fair::VerbositySpec::Make(fair::VerbositySpec::Info::timestamp_us));
fair::Logger::SetVerbosity(fair::Verbosity::verylow);
string sessionName;
string shmId;
bool cleanup = false;
@@ -78,8 +82,14 @@ int main(int argc, char** argv)
unsigned int timeoutInMS = 5000;
unsigned int intervalInMS = 100;
bool runAsDaemon = false;
bool monitor = false;
bool debug = false;
bool cleanOnExit = false;
bool getShmId = false;
bool listAll = false;
string listAllPath;
bool verbose = false;
int userId = -1;
options_description desc("Options");
desc.add_options()
@@ -90,27 +100,40 @@ int main(int argc, char** argv)
("interactive,i" , value<bool>(&interactive)->implicit_value(true), "Interactive run")
("view,v" , value<bool>(&viewOnly)->implicit_value(true), "Run in view only mode")
("timeout,t" , value<unsigned int>(&timeoutInMS)->default_value(5000), "Heartbeat timeout in milliseconds")
("daemonize,d" , value<bool>(&runAsDaemon)->implicit_value(true), "Daemonize the monitor")
("debug,b" , value<bool>(&debug)->implicit_value(true), "Debug - Print a list of messages)")
("daemonize,d" , value<bool>(&runAsDaemon)->implicit_value(true), "Daemonize the monitor process (only in monitoring mode)")
("monitor,m" , value<bool>(&monitor)->implicit_value(true), "Run in monitoring mode")
("debug,b" , value<bool>(&debug)->implicit_value(true), "Debug - Print a list of messages)")
("clean-on-exit,e", value<bool>(&cleanOnExit)->implicit_value(true), "Perform cleanup on exit")
("interval" , value<unsigned int>(&intervalInMS)->default_value(100), "Output interval for interactive/view-only mode")
("help,h", "Print help");
("interval" , value<unsigned int>(&intervalInMS)->default_value(1000),"Output interval for interactive mode")
("get-shmid" , value<bool>(&getShmId)->implicit_value(true), "Translate given session id and user id to a shmem id (uses current user id if none provided)")
("list-all" , value<bool>(&listAll)->implicit_value(true), "List all sessions & segments")
("list-all-path" , value<string>(&listAllPath)->default_value("/dev/shm/"),"Path for the --list-all command to search segments in")
("verbose" , value<bool>(&verbose)->implicit_value(true), "Verbose mode (daemon will output to a file 'fairmq-shmmonitor_<timestamp>')")
("user-id" , value<int>(&userId)->default_value(-1), "User id (used with --get-shmid)")
("help,h", "Print help");
variables_map vm;
store(parse_command_line(argc, argv, desc), vm);
if (vm.count("help")) {
cout << "FairMQ Shared Memory Monitor" << endl << desc << endl;
LOG(info) << "FairMQ Shared Memory Monitor" << "\n" << desc;
return 0;
}
notify(vm);
if (runAsDaemon) {
daemonize();
if (getShmId) {
if (userId == -1) {
LOG(info) << "shmem id for session '" << sessionName << "' and current user id " << geteuid()
<< " is: " << makeShmIdStr(sessionName);
} else {
LOG(info) << "shmem id for session '" << sessionName << "' and user id " << userId
<< " is: " << makeShmIdStr(sessionName, to_string(userId));
}
return 0;
}
if (shmId == "") {
if (shmId.empty()) {
shmId = makeShmIdStr(sessionName);
}
@@ -124,18 +147,39 @@ int main(int argc, char** argv)
return 0;
}
cout << "Starting shared memory monitor for session: \"" << sessionName << "\" (shmId: " << shmId << ")..." << endl;
Monitor monitor(shmId, selfDestruct, interactive, viewOnly, timeoutInMS, intervalInMS, runAsDaemon, cleanOnExit);
if (interactive || !viewOnly) {
monitor.CatchSignals();
if (listAll) {
Monitor::ListAll(listAllPath);
return 0;
}
monitor.Run();
if (!viewOnly && !interactive && !monitor) {
// if neither of the run modes are selected, use view only mode.
viewOnly = true;
}
if (viewOnly && !interactive) {
if (!Monitor::PrintShm(ShmId{shmId})) {
LOG(info) << "No segments found.";
}
return 0;
}
if (runAsDaemon && monitor) {
if (verbose) {
fair::Logger::InitFileSink("trace", "fairmq-shmmonitor");
}
daemonize();
}
LOG(info) << "Starting shared memory monitor for session: \"" << sessionName << "\" (shm id: " << shmId << ")...";
Monitor shmmonitor(shmId, selfDestruct, interactive, viewOnly, timeoutInMS, intervalInMS, monitor, cleanOnExit);
shmmonitor.CatchSignals();
shmmonitor.Run();
} catch (Monitor::DaemonPresent& dp) {
return 0;
} catch (exception& e) {
cerr << "Unhandled Exception reached the top of main: " << e.what() << ", application will now exit" << endl;
LOG(error) << "Unhandled Exception reached the top of main: " << e.what() << ", application will now exit";
return 2;
}

View File

@@ -39,6 +39,24 @@ inline auto ToStrVector(const int argc, char*const* argv, const bool dropProgram
}
}
inline bool StrStartsWith(std::string const& str, std::string const& start)
{
if (str.length() >= start.length()) {
return (0 == str.compare(0, start.length(), start));
} else {
return false;
}
}
inline bool StrEndsWith(std::string const& str, std::string const& end)
{
if (str.length() >= end.length()) {
return (0 == str.compare(str.length() - end.length(), end.length(), end));
} else {
return false;
}
}
} // namespace fair::mq::tools
#endif /* FAIR_MQ_TOOLS_STRINGS_H */

View File

@@ -1,10 +1,11 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <fairmq/tools/Unique.h>
#include <boost/program_options.hpp>