Compare commits

..

18 Commits

Author SHA1 Message Date
Dennis Klein
9bab3f9f4c Support msgpack 3.x
Fixes #32
2018-07-18 16:13:18 +02:00
Dennis Klein
ee3a84ce7a Add Matthias as CONTRIBUTOR
To acknowledge his countless hours of consulting which benefited
this project significantly.
2018-06-19 14:08:32 +02:00
mkrzewic
f05118f4eb Make ";" the separateor in multi-point channel config
This is to avoid parsing problems using "," in e.g. multi-point configuration using Suboptparser
2018-06-18 15:26:31 +02:00
Alexey Rybalchenko
21419adb40 Change unregistered options warning to debug 2018-06-16 17:40:05 +02:00
Alexey Rybalchenko
1554c1c273 Change missing options in transport from warning to debug
Transport has meaningful defaults if FairMQProgOptions is missing
2018-06-13 18:03:16 +02:00
Alexey Rybalchenko
59b04a1a64 Handle Receive differently when switching transports
No need for buffer+size message on Receive.
2018-06-12 13:55:18 +02:00
Alexey Rybalchenko
653e82cab4 Avoid copy (where possible) when switching transports 2018-06-08 13:10:06 +02:00
Dennis Klein
96e2076300 Only install public header files
Resolves #15
2018-06-08 11:25:03 +02:00
Alexey Rybalchenko
2894af803b Add getter for cmd line args & align channel names in log 2018-06-06 16:04:08 +02:00
Alexey Rybalchenko
8b88e67360 Refactor FairMQProgOptions 2018-06-06 16:04:08 +02:00
Alexey Rybalchenko
ca694e4054 add privacy info link to doxygen 2018-05-24 16:06:28 +02:00
Dennis Klein
3f96181ffd Define nn tests only when nn transport is built 2018-05-24 15:54:35 +02:00
Dennis Klein
72f5cdef58 Fix various warnings
-Wunused-parameter
-Wreorder
-Wsign-compare
-Wunused-private-field
2018-05-24 13:03:06 +02:00
Dennis Klein
811e716731 Add missing header 2018-05-23 08:26:23 +02:00
Dennis Klein
5ab21946f8 Add new release 2018-05-22 16:49:46 +02:00
Dennis Klein
30367eb76d Add PR template 2018-05-22 16:33:13 +02:00
Dennis Klein
89d71ce14c Improve README 2018-05-22 15:21:35 +02:00
Dennis Klein
e54db27242 Remove the alias target
In some cases the definition of the alias target fails, because of
target visibility problems.
2018-05-22 15:21:35 +02:00
72 changed files with 753 additions and 1939 deletions

View File

@@ -48,7 +48,11 @@ endif()
if(BUILD_NANOMSG_TRANSPORT)
find_package2(PRIVATE nanomsg VERSION 1.0.0 REQUIRED)
find_package2(PRIVATE msgpack VERSION 2.1.5 REQUIRED)
find_package2(PRIVATE msgpack VERSION 3.0.0)
set(PROJECT_msgpack_VERSION 2.1.5)
if(NOT msgpack_FOUND)
find_package2(PRIVATE msgpack VERSION 2.1.5 REQUIRED)
endif()
set(msgpack_ROOT ${PACKAGE_PREFIX_DIR})
endif()
@@ -94,6 +98,7 @@ if(BUILD_DOCS)
set(DOXYGEN_PROJECT_NUMBER ${PROJECT_GIT_VERSION})
set(DOXYGEN_PROJECT_BRIEF "C++ Message Passing Framework")
set(DOXYGEN_USE_MDFILE_AS_MAINPAGE README.md)
set(DOXYGEN_HTML_FOOTER docs/footer.html)
doxygen_add_docs(doxygen README.md fairmq)
add_custom_target(docs ALL DEPENDS doxygen)
endif()

View File

@@ -2,6 +2,7 @@ Aphecetche, Laurent
Binet, Sebastien
Eulisse, Giulio
Karabowicz, Radoslaw
Kretz, Matthias <kretz@kde.org>
Krzewicki, Mikolaj
Neskovic, Gvozden
Richter, Matthias

9
PULL_REQUEST_TEMPLATE.md Normal file
View File

@@ -0,0 +1,9 @@
Replace me with your description.
---
Checklist:
* [ ] Rebased against `dev` branch
* [ ] My name is in the resp. CONTRIBUTORS/AUTHORS file
* [ ] Followed [the seven rules of great commit messages](https://chris.beams.io/posts/git-commit/#seven-rules)

View File

@@ -23,7 +23,7 @@ In addition to this core functionality FairMQ provides a framework for creating
are communicating through message passing. FairMQ does not only allow the user to use different transport but also to mix them; i.e: A Device can communicate using different transport on different channels at the same time. Device execution is modelled as a simple state machine that
shapes the integration points for the user task. Devices also incorporate a plugin system for runtime configuration and control.
Next to the provided devices and plugins (e.g. [DDS](https://github.com/FairRootGroup/DDS))
the user can extened FairMQ by developing his own plugins to integrate his devices with external
the user can extend FairMQ by developing his own plugins to integrate his devices with external
configuration and control services.
FairMQ has been developed in the context of its mother project [FairRoot](https://github.com/FairRootGroup/FairRoot) -
@@ -49,6 +49,7 @@ a simulation, reconstruction and analysis framework.
| Stable release | Date | API Docs |
| --- | --- | --- |
| [**1.2.3**](https://github.com/FairRootGroup/FairMQ/releases/tag/v1.2.3) | May 2018 | [link](https://fairrootgroup.github.io/FairMQ/v1.2.3/index.html) |
| [**1.2.1**](https://github.com/FairRootGroup/FairMQ/releases/tag/v1.2.1) | May 2018 | [link](https://fairrootgroup.github.io/FairMQ/v1.2.1/index.html) |
| [**1.2.0**](https://github.com/FairRootGroup/FairMQ/releases/tag/v1.2.0) | May 2018 | [link](https://fairrootgroup.github.io/FairMQ/v1.2.0/index.html) |
@@ -80,14 +81,16 @@ set(CMAKE_PREFIX_PATH /path/to/FairMQ_install_prefix ${CMAKE_PREFIX_PATH})
find_package(FairMQ)
```
`find_package(FairMQ)` will define an imported target `FairMQ::FairMQ` (An alias `FairRoot::FairMQ` is also defined (if you use CMake 3.11+) for backwards compatibility, but it is deprecated).
`find_package(FairMQ)` will define an imported target `FairMQ::FairMQ`.
In order to succesfully compile and link against the `FairMQ::FairMQ` target, you need to discover its public package dependencies, too.
```cmake
find_package(FairMQ)
find_package(FairLogger ${FairMQ_FairLogger_VERSION})
find_package(Boost ${FairMQ_Boost_VERSION} COMPONENTS ${FairMQ_BOOST_COMPONENTS})
if(FairMQ_FOUND)
find_package(FairLogger ${FairMQ_FairLogger_VERSION})
find_package(Boost ${FairMQ_Boost_VERSION} COMPONENTS ${FairMQ_BOOST_COMPONENTS})
endif()
```
Of course, feel free to customize the above commands to your needs.
@@ -96,8 +99,10 @@ Optionally, you can require certain FairMQ package components and a minimum vers
```cmake
find_package(FairMQ 1.1.0 COMPONENTS nanomsg_transport dds_plugin)
find_package(FairLogger ${FairMQ_FairLogger_VERSION})
find_package(Boost ${FairMQ_Boost_VERSION} COMPONENTS ${FairMQ_BOOST_COMPONENTS})
if(FairMQ_FOUND)
find_package(FairLogger ${FairMQ_FairLogger_VERSION})
find_package(Boost ${FairMQ_Boost_VERSION} COMPONENTS ${FairMQ_BOOST_COMPONENTS})
endif()
```
When building FairMQ, CMake will print a summary table of all available package components.

View File

@@ -32,9 +32,4 @@ set(CMAKE_MODULE_PATH ${@PROJECT_NAME@_CMAKEMODDIR} ${CMAKE_MODULE_PATH})
### Import targets
include(@PACKAGE_CMAKE_INSTALL_PREFIX@/@PACKAGE_INSTALL_DESTINATION@/@PROJECT_EXPORT_SET@.cmake)
### Alias target for backwards compat (DEPRECATED)
if((NOT TARGET FairRoot::@PROJECT_NAME@) AND (CMAKE_VERSION VERSION_GREATER 3.10.99))
add_library(FairRoot::@PROJECT_NAME@ ALIAS @PROJECT_NAME@::@PROJECT_NAME@)
endif()
@PACKAGE_COMPONENTS@

View File

@@ -280,20 +280,22 @@ macro(find_package2 qualifier pkgname)
set(CMAKE_PREFIX_PATH ${old_CPP})
unset(old_CPP)
if(${qualifier} STREQUAL PRIVATE)
set(PROJECT_${pkgname}_VERSION ${ARGS_VERSION})
set(PROJECT_${pkgname}_COMPONENTS ${ARGS_COMPONENTS})
set(PROJECT_PACKAGE_DEPENDENCIES ${PROJECT_PACKAGE_DEPENDENCIES} ${pkgname})
elseif(${qualifier} STREQUAL PUBLIC)
set(PROJECT_${pkgname}_VERSION ${ARGS_VERSION})
set(PROJECT_${pkgname}_COMPONENTS ${ARGS_COMPONENTS})
set(PROJECT_PACKAGE_DEPENDENCIES ${PROJECT_PACKAGE_DEPENDENCIES} ${pkgname})
set(PROJECT_INTERFACE_${pkgname}_VERSION ${ARGS_VERSION})
set(PROJECT_INTERFACE_${pkgname}_COMPONENTS ${ARGS_COMPONENTS})
set(PROJECT_INTERFACE_PACKAGE_DEPENDENCIES ${PROJECT_INTERFACE_PACKAGE_DEPENDENCIES} ${pkgname})
elseif(${qualifier} STREQUAL INTERFACE)
set(PROJECT_INTERFACE_${pkgname}_VERSION ${ARGS_VERSION})
set(PROJECT_INTERFACE_${pkgname}_COMPONENTS ${ARGS_COMPONENTS})
set(PROJECT_INTERFACE_PACKAGE_DEPENDENCIES ${PROJECT_INTERFACE_PACKAGE_DEPENDENCIES} ${pkgname})
if(${pkgname}_FOUND)
if(${qualifier} STREQUAL PRIVATE)
set(PROJECT_${pkgname}_VERSION ${ARGS_VERSION})
set(PROJECT_${pkgname}_COMPONENTS ${ARGS_COMPONENTS})
set(PROJECT_PACKAGE_DEPENDENCIES ${PROJECT_PACKAGE_DEPENDENCIES} ${pkgname})
elseif(${qualifier} STREQUAL PUBLIC)
set(PROJECT_${pkgname}_VERSION ${ARGS_VERSION})
set(PROJECT_${pkgname}_COMPONENTS ${ARGS_COMPONENTS})
set(PROJECT_PACKAGE_DEPENDENCIES ${PROJECT_PACKAGE_DEPENDENCIES} ${pkgname})
set(PROJECT_INTERFACE_${pkgname}_VERSION ${ARGS_VERSION})
set(PROJECT_INTERFACE_${pkgname}_COMPONENTS ${ARGS_COMPONENTS})
set(PROJECT_INTERFACE_PACKAGE_DEPENDENCIES ${PROJECT_INTERFACE_PACKAGE_DEPENDENCIES} ${pkgname})
elseif(${qualifier} STREQUAL INTERFACE)
set(PROJECT_INTERFACE_${pkgname}_VERSION ${ARGS_VERSION})
set(PROJECT_INTERFACE_${pkgname}_COMPONENTS ${ARGS_COMPONENTS})
set(PROJECT_INTERFACE_PACKAGE_DEPENDENCIES ${PROJECT_INTERFACE_PACKAGE_DEPENDENCIES} ${pkgname})
endif()
endif()
endmacro()

1
docs/footer.html Normal file
View File

@@ -0,0 +1 @@
<p style="margin: 0 12px 10px 12px;"><a href="https://help.github.com/articles/github-privacy-statement/">privacy</a></p>

View File

@@ -34,8 +34,10 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test-ex-1-1.sh.in ${CMAKE_CURRENT_BIN
add_test(NAME Example-1-1-zeromq COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-1-1.sh zeromq)
set_tests_properties(Example-1-1-zeromq PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received: ")
add_test(NAME Example-1-1-nanomsg COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-1-1.sh nanomsg)
set_tests_properties(Example-1-1-nanomsg PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received: ")
if(BUILD_NANOMSG_TRANSPORT)
add_test(NAME Example-1-1-nanomsg COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-1-1.sh nanomsg)
set_tests_properties(Example-1-1-nanomsg PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received: ")
endif()
add_test(NAME Example-1-1-shmem COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-1-1.sh shmem)
set_tests_properties(Example-1-1-shmem PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received: ")

View File

@@ -42,8 +42,10 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test-ex-1-n-1.sh.in ${CMAKE_CURRENT_B
add_test(NAME Example-1-n-1-zeromq COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-1-n-1.sh zeromq)
set_tests_properties(Example-1-n-1-zeromq PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received: ")
add_test(NAME Example-1-n-1-nanomsg COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-1-n-1.sh nanomsg)
set_tests_properties(Example-1-n-1-nanomsg PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received: ")
if(BUILD_NANOMSG_TRANSPORT)
add_test(NAME Example-1-n-1-nanomsg COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-1-n-1.sh nanomsg)
set_tests_properties(Example-1-n-1-nanomsg PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received: ")
endif()
add_test(NAME Example-1-n-1-shmem COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-1-n-1.sh shmem)
set_tests_properties(Example-1-n-1-shmem PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received: ")

View File

@@ -12,6 +12,8 @@ add_subdirectory(copypush)
add_subdirectory(dds)
add_subdirectory(multipart)
add_subdirectory(multiple-channels)
add_subdirectory(multiple-transports)
if(BUILD_NANOMSG_TRANSPORT)
add_subdirectory(multiple-transports)
endif()
add_subdirectory(region)
add_subdirectory(req-rep)

View File

@@ -35,8 +35,10 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test-ex-copypush.sh.in ${CMAKE_CURREN
add_test(NAME Example-CopyPush-zeromq COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-copypush.sh zeromq)
set_tests_properties(Example-CopyPush-zeromq PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message: ")
add_test(NAME Example-CopyPush-nanomsg COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-copypush.sh nanomsg)
set_tests_properties(Example-CopyPush-nanomsg PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message: ")
if(BUILD_NANOMSG_TRANSPORT)
add_test(NAME Example-CopyPush-nanomsg COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-copypush.sh nanomsg)
set_tests_properties(Example-CopyPush-nanomsg PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message: ")
endif()
add_test(NAME Example-CopyPush-shmem COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-copypush.sh shmem)
set_tests_properties(Example-CopyPush-shmem PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message: ")

View File

@@ -34,8 +34,10 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test-ex-multipart.sh.in ${CMAKE_CURRE
add_test(NAME Example-Multipart-zeromq COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multipart.sh zeromq)
set_tests_properties(Example-Multipart-zeromq PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message with 2 parts")
add_test(NAME Example-Multipart-nanomsg COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multipart.sh nanomsg)
set_tests_properties(Example-Multipart-nanomsg PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message with 2 parts")
if(BUILD_NANOMSG_TRANSPORT)
add_test(NAME Example-Multipart-nanomsg COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multipart.sh nanomsg)
set_tests_properties(Example-Multipart-nanomsg PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message with 2 parts")
endif()
add_test(NAME Example-Multipart-shmem COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multipart.sh shmem)
set_tests_properties(Example-Multipart-shmem PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message with 2 parts")

View File

@@ -39,8 +39,10 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test-ex-multiple-channels.sh.in ${CMA
add_test(NAME Example-Multiple-Channels-zeromq COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multiple-channels.sh zeromq)
set_tests_properties(Example-Multiple-Channels-zeromq PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received messages from both sources.")
add_test(NAME Example-Multiple-Channels-nanomsg COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multiple-channels.sh nanomsg)
set_tests_properties(Example-Multiple-Channels-nanomsg PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received messages from both sources.")
if(BUILD_NANOMSG_TRANSPORT)
add_test(NAME Example-Multiple-Channels-nanomsg COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multiple-channels.sh nanomsg)
set_tests_properties(Example-Multiple-Channels-nanomsg PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received messages from both sources.")
endif()
# install

View File

@@ -23,8 +23,6 @@ Sink::Sink()
: fMaxIterations(0)
, fNumIterations1(0)
, fNumIterations2(0)
, fReceived1(false)
, fReceived2(false)
{
// register a handler for data arriving on "data" channel
OnData("data1", &Sink::HandleData1);

View File

@@ -36,8 +36,6 @@ class Sink : public FairMQDevice
uint64_t fMaxIterations;
uint64_t fNumIterations1;
uint64_t fNumIterations2;
bool fReceived1;
bool fReceived2;
};
} // namespace example_multiple_transports

View File

@@ -24,4 +24,4 @@ SINK+=" --transport shmem"
SINK+=" --channel-config name=data1,type=pull,method=connect,address=tcp://127.0.0.1:5555"
SINK+=" name=data2,type=pull,method=connect,address=tcp://127.0.0.1:5556,transport=nanomsg"
SINK+=" name=ack,type=pub,method=connect,address=tcp://127.0.0.1:5557,transport=zeromq"
xterm -geometry 80x30+500+0 -hold -e @EX_BIN_DIR@/$SINK &
xterm -geometry 80x30+500+225 -hold -e @EX_BIN_DIR@/$SINK &

View File

@@ -35,8 +35,10 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test-ex-region.sh.in ${CMAKE_CURRENT_
add_test(NAME Example-Region-zeromq COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-region.sh zeromq)
set_tests_properties(Example-Region-zeromq PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received ack")
add_test(NAME Example-Region-nanomsg COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-region.sh nanomsg)
set_tests_properties(Example-Region-nanomsg PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received ack")
if(BUILD_NANOMSG_TRANSPORT)
add_test(NAME Example-Region-nanomsg COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-region.sh nanomsg)
set_tests_properties(Example-Region-nanomsg PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received ack")
endif()
add_test(NAME Example-Region-shmem COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-region.sh shmem)
set_tests_properties(Example-Region-shmem PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received ack")

View File

@@ -38,7 +38,7 @@ void Sampler::InitTask()
fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegionFor("data",
0,
10000000,
[this](void* data, size_t size, void* hint) { // callback to be called when message buffers no longer needed by transport
[this](void* /*data*/, size_t /*size*/, void* /*hint*/) { // callback to be called when message buffers no longer needed by transport
--fNumUnackedMsgs;
if (fMaxIterations > 0)
{

View File

@@ -35,8 +35,10 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test-ex-req-rep.sh.in ${CMAKE_CURRENT
add_test(NAME Example-ReqRep-zeromq COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-req-rep.sh zeromq)
set_tests_properties(Example-ReqRep-zeromq PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received reply from server: ")
add_test(NAME Example-ReqRep-nanomsg COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-req-rep.sh nanomsg)
set_tests_properties(Example-ReqRep-nanomsg PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received reply from server: ")
if(BUILD_NANOMSG_TRANSPORT)
add_test(NAME Example-ReqRep-nanomsg COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-req-rep.sh nanomsg)
set_tests_properties(Example-ReqRep-nanomsg PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received reply from server: ")
endif()
add_test(NAME Example-ReqRep-shmem COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-req-rep.sh shmem)
set_tests_properties(Example-ReqRep-shmem PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received reply from server: ")

View File

@@ -37,7 +37,7 @@ endif()
##########################
# libFairMQ header files #
##########################
set(FAIRMQ_HEADER_FILES
set(FAIRMQ_PUBLIC_HEADER_FILES
DeviceRunner.h
EventManager.h
FairMQChannel.h
@@ -52,6 +52,21 @@ set(FAIRMQ_HEADER_FILES
FairMQTransportFactory.h
Tools.h
Transports.h
options/FairMQProgOptions.h
options/FairProgOptions.h
Plugin.h
PluginManager.h
PluginServices.h
runFairMQDevice.h
tools/CppSTL.h
tools/Network.h
tools/Process.h
tools/Strings.h
tools/Unique.h
tools/Version.h
)
set(FAIRMQ_PRIVATE_HEADER_FILES
devices/FairMQBenchmarkSampler.h
devices/FairMQMerger.h
devices/FairMQMultiplier.h
@@ -59,16 +74,10 @@ set(FAIRMQ_HEADER_FILES
devices/FairMQSink.h
devices/FairMQSplitter.h
options/FairMQParser.h
options/FairMQProgOptions.h
options/FairMQSuboptParser.h
options/FairProgOptions.h
options/FairProgOptionsHelper.h
Plugin.h
PluginManager.h
PluginServices.h
plugins/Builtin.h
plugins/Control.h
runFairMQDevice.h
StateMachine.h
shmem/FairMQMessageSHM.h
shmem/FairMQPollerSHM.h
@@ -79,12 +88,6 @@ set(FAIRMQ_HEADER_FILES
shmem/Manager.h
shmem/Monitor.h
shmem/Region.h
tools/CppSTL.h
tools/Network.h
tools/Process.h
tools/Strings.h
tools/Unique.h
tools/Version.h
zeromq/FairMQMessageZMQ.h
zeromq/FairMQPollerZMQ.h
zeromq/FairMQUnmanagedRegionZMQ.h
@@ -93,7 +96,7 @@ set(FAIRMQ_HEADER_FILES
)
if(BUILD_NANOMSG_TRANSPORT)
set(FAIRMQ_HEADER_FILES ${FAIRMQ_HEADER_FILES}
set(FAIRMQ_PRIVATE_HEADER_FILES ${FAIRMQ_PRIVATE_HEADER_FILES}
nanomsg/FairMQMessageNN.h
nanomsg/FairMQPollerNN.h
nanomsg/FairMQUnmanagedRegionNN.h
@@ -103,7 +106,7 @@ if(BUILD_NANOMSG_TRANSPORT)
endif()
if(BUILD_OFI_TRANSPORT)
set(FAIRMQ_HEADER_FILES ${FAIRMQ_HEADER_FILES}
set(FAIRMQ_PRIVATE_HEADER_FILES ${FAIRMQ_PRIVATE_HEADER_FILES}
ofi/Context.h
ofi/Message.h
ofi/Poller.h
@@ -129,12 +132,10 @@ set(FAIRMQ_SOURCE_FILES
devices/FairMQMerger.cxx
devices/FairMQMultiplier.cxx
devices/FairMQProxy.cxx
# devices/FairMQSink.cxx
devices/FairMQSplitter.cxx
options/FairMQParser.cxx
options/FairMQProgOptions.cxx
options/FairMQSuboptParser.cxx
options/FairProgOptions.cxx
Plugin.cxx
PluginManager.cxx
PluginServices.cxx
@@ -192,7 +193,8 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/options/startConfigExample.sh.in ${CM
#################################
add_library(FairMQ SHARED
${FAIRMQ_SOURCE_FILES}
${FAIRMQ_HEADER_FILES} # for IDE integration
${FAIRMQ_PUBLIC_HEADER_FILES} # for IDE integration
${FAIRMQ_PRIVATE_HEADER_FILES} # for IDE integration
)
#######################
@@ -293,7 +295,7 @@ install(
)
# preserve relative path and prepend fairmq
foreach(HEADER ${FAIRMQ_HEADER_FILES})
foreach(HEADER ${FAIRMQ_PUBLIC_HEADER_FILES})
get_filename_component(_path ${HEADER} DIRECTORY)
file(TO_CMAKE_PATH ${PROJECT_INSTALL_INCDIR}/${_path} _destination)
install(FILES ${HEADER}

View File

@@ -56,6 +56,8 @@ auto DeviceRunner::Run() -> int
return 1;
}
fDevice->SetRawCmdLineArgs(fRawCmdLineArgs);
// Handle --print-channels
fDevice->RegisterChannelEndpoints();
if (fConfig.Count("print-channels"))

View File

@@ -28,6 +28,7 @@ FairMQChannel::FairMQChannel()
, fType("unspecified")
, fMethod("unspecified")
, fAddress("unspecified")
, fTransportType(fair::mq::Transport::DEFAULT)
, fSndBufSize(1000)
, fRcvBufSize(1000)
, fSndKernelSize(0)
@@ -35,7 +36,6 @@ FairMQChannel::FairMQChannel()
, fRateLogging(1)
, fName("")
, fIsValid(false)
, fTransportType(fair::mq::Transport::DEFAULT)
, fTransportFactory(nullptr)
, fMultipart(false)
, fModified(true)
@@ -48,6 +48,7 @@ FairMQChannel::FairMQChannel(const string& type, const string& method, const str
, fType(type)
, fMethod(method)
, fAddress(address)
, fTransportType(fair::mq::Transport::DEFAULT)
, fSndBufSize(1000)
, fRcvBufSize(1000)
, fSndKernelSize(0)
@@ -55,7 +56,6 @@ FairMQChannel::FairMQChannel(const string& type, const string& method, const str
, fRateLogging(1)
, fName("")
, fIsValid(false)
, fTransportType(fair::mq::Transport::DEFAULT)
, fTransportFactory(nullptr)
, fMultipart(false)
, fModified(true)
@@ -68,6 +68,7 @@ FairMQChannel::FairMQChannel(const string& name, const string& type, std::shared
, fType(type)
, fMethod("unspecified")
, fAddress("unspecified")
, fTransportType(factory->GetType())
, fSndBufSize(1000)
, fRcvBufSize(1000)
, fSndKernelSize(0)
@@ -75,7 +76,6 @@ FairMQChannel::FairMQChannel(const string& name, const string& type, std::shared
, fRateLogging(1)
, fName(name)
, fIsValid(false)
, fTransportType(factory->GetType())
, fTransportFactory(factory)
, fMultipart(false)
, fModified(true)
@@ -88,6 +88,7 @@ FairMQChannel::FairMQChannel(const FairMQChannel& chan)
, fType(chan.fType)
, fMethod(chan.fMethod)
, fAddress(chan.fAddress)
, fTransportType(chan.fTransportType)
, fSndBufSize(chan.fSndBufSize)
, fRcvBufSize(chan.fRcvBufSize)
, fSndKernelSize(chan.fSndKernelSize)
@@ -95,7 +96,6 @@ FairMQChannel::FairMQChannel(const FairMQChannel& chan)
, fRateLogging(chan.fRateLogging)
, fName(chan.fName)
, fIsValid(false)
, fTransportType(chan.fTransportType)
, fTransportFactory(nullptr)
, fMultipart(chan.fMultipart)
, fModified(chan.fModified)
@@ -327,9 +327,7 @@ void FairMQChannel::UpdateTransport(const string& transport)
{
unique_lock<mutex> lock(fChannelMutex);
fIsValid = false;
LOG(WARN) << fName << ": " << transport;
fTransportType = fair::mq::TransportTypes.at(transport);
LOG(WARN) << fName << ": " << fair::mq::TransportNames.at(fTransportType);
fModified = true;
}
catch (exception& e)
@@ -501,7 +499,7 @@ bool FairMQChannel::ValidateChannel()
else
{
vector<string> endpoints;
boost::algorithm::split(endpoints, fAddress, boost::algorithm::is_any_of(","));
boost::algorithm::split(endpoints, fAddress, boost::algorithm::is_any_of(";"));
for (const auto endpoint : endpoints)
{
string address;
@@ -655,13 +653,13 @@ void FairMQChannel::ResetChannel()
int FairMQChannel::Send(unique_ptr<FairMQMessage>& msg) const
{
CheckCompatibility(msg);
CheckSendCompatibility(msg);
return fSocket->Send(msg);
}
int FairMQChannel::Receive(unique_ptr<FairMQMessage>& msg) const
{
CheckCompatibility(msg);
CheckReceiveCompatibility(msg);
return fSocket->Receive(msg);
}
@@ -677,25 +675,25 @@ int FairMQChannel::Receive(unique_ptr<FairMQMessage>& msg, int rcvTimeoutInMs) c
int FairMQChannel::SendAsync(unique_ptr<FairMQMessage>& msg) const
{
CheckCompatibility(msg);
CheckSendCompatibility(msg);
return fSocket->TrySend(msg);
}
int FairMQChannel::ReceiveAsync(unique_ptr<FairMQMessage>& msg) const
{
CheckCompatibility(msg);
CheckReceiveCompatibility(msg);
return fSocket->TryReceive(msg);
}
int64_t FairMQChannel::Send(vector<unique_ptr<FairMQMessage>>& msgVec) const
{
CheckCompatibility(msgVec);
CheckSendCompatibility(msgVec);
return fSocket->Send(msgVec);
}
int64_t FairMQChannel::Receive(vector<unique_ptr<FairMQMessage>>& msgVec) const
{
CheckCompatibility(msgVec);
CheckReceiveCompatibility(msgVec);
return fSocket->Receive(msgVec);
}
@@ -711,7 +709,7 @@ int64_t FairMQChannel::Receive(vector<unique_ptr<FairMQMessage>>& msgVec, int rc
int64_t FairMQChannel::SendAsync(vector<unique_ptr<FairMQMessage>>& msgVec) const
{
CheckCompatibility(msgVec);
CheckSendCompatibility(msgVec);
return fSocket->TrySend(msgVec);
}
@@ -722,7 +720,7 @@ int64_t FairMQChannel::SendAsync(vector<unique_ptr<FairMQMessage>>& msgVec) cons
/// In case of errors, returns -1.
int64_t FairMQChannel::ReceiveAsync(vector<unique_ptr<FairMQMessage>>& msgVec) const
{
CheckCompatibility(msgVec);
CheckReceiveCompatibility(msgVec);
return fSocket->TryReceive(msgVec);
}
@@ -750,44 +748,58 @@ unsigned long FairMQChannel::GetMessagesRx() const
return fSocket->GetMessagesRx();
}
bool FairMQChannel::CheckCompatibility(unique_ptr<FairMQMessage>& msg) const
void FairMQChannel::CheckSendCompatibility(FairMQMessagePtr& msg) const
{
if (fTransportType == msg->GetType())
if (fTransportType != msg->GetType())
{
return true;
}
else
{
// LOG(warn) << "Channel type does not match message type. Copying...";
FairMQMessagePtr msgCopy(fTransportFactory->CreateMessage(msg->GetSize()));
memcpy(msgCopy->GetData(), msg->GetData(), msg->GetSize());
msg = move(msgCopy);
return false;
// LOG(debug) << "Channel type does not match message type. Creating wrapper";
FairMQMessagePtr msgWrapper(NewMessage(msg->GetData(),
msg->GetSize(),
[](void* /*data*/, void* msg) { delete static_cast<FairMQMessage*>(msg); },
msg.get()
));
msg.release();
msg = move(msgWrapper);
}
}
bool FairMQChannel::CheckCompatibility(vector<unique_ptr<FairMQMessage>>& msgVec) const
void FairMQChannel::CheckSendCompatibility(vector<FairMQMessagePtr>& msgVec) const
{
bool match = true;
if (msgVec.size() > 0)
for (auto& msg : msgVec)
{
for (unsigned int i = 0; i < msgVec.size(); ++i)
if (fTransportType != msg->GetType())
{
if (fTransportType != msgVec.at(i)->GetType())
{
// LOG(warn) << "Channel type does not match message type. Copying...";
FairMQMessagePtr newMsg(fTransportFactory->CreateMessage(msgVec[i]->GetSize()));
memcpy(newMsg->GetData(), msgVec[i]->GetData(), msgVec[i]->GetSize());
msgVec[i] = move(newMsg);
match = false;
}
// LOG(debug) << "Channel type does not match message type. Creating wrapper";
FairMQMessagePtr msgWrapper(NewMessage(msg->GetData(),
msg->GetSize(),
[](void* /*data*/, void* msg) { delete static_cast<FairMQMessage*>(msg); },
msg.get()
));
msg.release();
msg = move(msgWrapper);
}
}
}
void FairMQChannel::CheckReceiveCompatibility(FairMQMessagePtr& msg) const
{
if (fTransportType != msg->GetType())
{
// LOG(debug) << "Channel type does not match message type. Creating wrapper";
FairMQMessagePtr newMsg(NewMessage());
msg = move(newMsg);
}
}
void FairMQChannel::CheckReceiveCompatibility(vector<FairMQMessagePtr>& msgVec) const
{
for (auto& msg : msgVec)
{
if (fTransportType != msg->GetType())
{
// LOG(debug) << "Channel type does not match message type. Creating wrapper";
FairMQMessagePtr newMsg(NewMessage());
msg = move(newMsg);
}
}
else
{
return true;
}
return match;
}

View File

@@ -20,6 +20,7 @@
#include <fairmq/Transports.h>
#include <FairMQLogger.h>
#include <FairMQParts.h>
#include <FairMQMessage.h>
class FairMQChannel
{
@@ -165,8 +166,8 @@ class FairMQChannel
/// Resets the channel (requires validation to be used again).
void ResetChannel();
int Send(std::unique_ptr<FairMQMessage>& msg) const;
int Receive(std::unique_ptr<FairMQMessage>& msg) const;
int Send(FairMQMessagePtr& msg) const;
int Receive(FairMQMessagePtr& msg) const;
/// Sends a message to the socket queue.
/// @details Send method attempts to send a message by
@@ -176,7 +177,7 @@ class FairMQChannel
/// @param msg Constant reference of unique_ptr to a FairMQMessage
/// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out.
/// In case of errors, returns -1.
int Send(std::unique_ptr<FairMQMessage>& msg, int sndTimeoutInMs) const;
int Send(FairMQMessagePtr& msg, int sndTimeoutInMs) const;
/// Receives a message from the socket queue.
/// @details Receive method attempts to receive a message from the input queue.
@@ -185,7 +186,7 @@ class FairMQChannel
/// @param msg Constant reference of unique_ptr to a FairMQMessage
/// @return Number of bytes that have been received. -2 If reading from the queue was not possible or timed out.
/// In case of errors, returns -1.
int Receive(std::unique_ptr<FairMQMessage>& msg, int rcvTimeoutInMs) const;
int Receive(FairMQMessagePtr& msg, int rcvTimeoutInMs) const;
/// Sends a message in non-blocking mode.
/// @details SendAsync method attempts to send a message without blocking by
@@ -195,31 +196,31 @@ class FairMQChannel
/// @return Number of bytes that have been queued. If queueing failed due to
/// full queue or no connected peers (when binding), returns -2.
/// In case of errors, returns -1.
int SendAsync(std::unique_ptr<FairMQMessage>& msg) const;
int SendAsync(FairMQMessagePtr& msg) const;
/// Receives a message in non-blocking mode.
///
/// @param msg Constant reference of unique_ptr to a FairMQMessage
/// @return Number of bytes that have been received. If queue is empty, returns -2.
/// In case of errors, returns -1.
int ReceiveAsync(std::unique_ptr<FairMQMessage>& msg) const;
int ReceiveAsync(FairMQMessagePtr& msg) const;
int64_t Send(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) const;
int64_t Receive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) const;
int64_t Send(std::vector<FairMQMessagePtr>& msgVec) const;
int64_t Receive(std::vector<FairMQMessagePtr>& msgVec) const;
/// Send a vector of messages
///
/// @param msgVec message vector reference
/// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out.
/// In case of errors, returns -1.
int64_t Send(std::vector<std::unique_ptr<FairMQMessage>>& msgVec, int sndTimeoutInMs) const;
int64_t Send(std::vector<FairMQMessagePtr>& msgVec, int sndTimeoutInMs) const;
/// Receive a vector of messages
///
/// @param msgVec message vector reference
/// @return Number of bytes that have been received. -2 If reading from the queue was not possible or timed out.
/// In case of errors, returns -1.
int64_t Receive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec, int rcvTimeoutInMs) const;
int64_t Receive(std::vector<FairMQMessagePtr>& msgVec, int rcvTimeoutInMs) const;
/// Sends a vector of message in non-blocking mode.
/// @details SendAsync method attempts to send a vector of messages without blocking by
@@ -228,14 +229,14 @@ class FairMQChannel
/// @param msgVec message vector reference
/// @return Number of bytes that have been queued. If queueing failed due to
/// full queue or no connected peers (when binding), returns -2. In case of errors, returns -1.
int64_t SendAsync(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) const;
int64_t SendAsync(std::vector<FairMQMessagePtr>& msgVec) const;
/// Receives a vector of messages in non-blocking mode.
///
/// @param msgVec message vector reference
/// @return Number of bytes that have been received. If queue is empty, returns -2.
/// In case of errors, returns -1.
int64_t ReceiveAsync(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) const;
int64_t ReceiveAsync(std::vector<FairMQMessagePtr>& msgVec) const;
int64_t Send(FairMQParts& parts) const
{
@@ -313,8 +314,10 @@ class FairMQChannel
std::shared_ptr<FairMQTransportFactory> fTransportFactory;
bool CheckCompatibility(std::unique_ptr<FairMQMessage>& msg) const;
bool CheckCompatibility(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) const;
void CheckSendCompatibility(FairMQMessagePtr& msg) const;
void CheckSendCompatibility(std::vector<FairMQMessagePtr>& msgVec) const;
void CheckReceiveCompatibility(FairMQMessagePtr& msg) const;
void CheckReceiveCompatibility(std::vector<FairMQMessagePtr>& msgVec) const;
void InitTransport(std::shared_ptr<FairMQTransportFactory> factory);

View File

@@ -26,6 +26,8 @@
#include <thread>
#include <functional>
#include <sstream>
#include <iomanip>
#include <algorithm> // std::max
using namespace std;
@@ -57,6 +59,7 @@ FairMQDevice::FairMQDevice()
, fVersion({0, 0, 0})
, fRate(0.)
, fLastTime(0)
, fRawCmdLineArgs()
{
}
@@ -87,6 +90,7 @@ FairMQDevice::FairMQDevice(const fair::mq::tools::Version version)
, fVersion(version)
, fRate(0.)
, fLastTime(0)
, fRawCmdLineArgs()
{
}
@@ -144,7 +148,8 @@ void FairMQDevice::InitWrapper()
else
{
LOG(error) << "Cannot update configuration. Socket method (bind/connect) not specified.";
throw runtime_error("Cannot update configuration. Socket method (bind/connect) not specified.");
ChangeState(ERROR_FOUND);
// throw runtime_error("Cannot update configuration. Socket method (bind/connect) not specified.");
}
// }
}
@@ -157,7 +162,8 @@ void FairMQDevice::InitWrapper()
if (uninitializedBindingChannels.size() > 0)
{
LOG(error) << uninitializedBindingChannels.size() << " of the binding channels could not initialize. Initial configuration incomplete.";
throw runtime_error(fair::mq::tools::ToString(uninitializedBindingChannels.size(), " of the binding channels could not initialize. Initial configuration incomplete."));
ChangeState(ERROR_FOUND);
// throw runtime_error(fair::mq::tools::ToString(uninitializedBindingChannels.size(), " of the binding channels could not initialize. Initial configuration incomplete."));
}
CallStateChangeCallbacks(INITIALIZING_DEVICE);
@@ -196,7 +202,8 @@ void FairMQDevice::InitWrapper()
if (numAttempts++ > maxAttempts)
{
LOG(error) << "could not connect all channels after " << fInitializationTimeoutInS << " attempts";
throw runtime_error(fair::mq::tools::ToString("could not connect all channels after ", fInitializationTimeoutInS, " attempts"));
ChangeState(ERROR_FOUND);
// throw runtime_error(fair::mq::tools::ToString("could not connect all channels after ", fInitializationTimeoutInS, " attempts"));
}
AttachChannels(uninitializedConnectingChannels);
@@ -267,7 +274,15 @@ bool FairMQDevice::AttachChannel(FairMQChannel& ch)
//(re-)init socket
if (!ch.fSocket)
{
ch.fSocket = ch.fTransportFactory->CreateSocket(ch.fType, ch.fName);
try
{
ch.fSocket = ch.fTransportFactory->CreateSocket(ch.fType, ch.fName);
}
catch (fair::mq::SocketError& se)
{
LOG(error) << se.what();
return false;
}
}
// set high water marks
@@ -501,7 +516,7 @@ void FairMQDevice::RunWrapper()
while (CheckCurrentState(RUNNING) && ConditionalRun())
{
if (fRate > 0.001) {
auto timespan = chrono::duration_cast<TimeScale>(Clock::now() - reftime).count() - fLastTime;
auto timespan = static_cast<TimeScale::rep>(chrono::duration_cast<TimeScale>(Clock::now() - reftime).count() - fLastTime);
if (timespan < period) {
TimeScale sleepfor(period - timespan);
this_thread::sleep_for(sleepfor);
@@ -808,7 +823,7 @@ void FairMQDevice::CreateOwnConfig()
try {
fDefaultTransportType = fair::mq::TransportTypes.at(fConfig->GetValue<string>("transport"));
} catch(const exception& e) {
LOG(ERROR) << "invalid transport type provided: " << fConfig->GetValue<string>("transport");
LOG(error) << "invalid transport type provided: " << fConfig->GetValue<string>("transport");
}
}
@@ -837,22 +852,22 @@ void FairMQDevice::SetConfig(FairMQProgOptions& config)
{
fExternalConfig = true;
fConfig = &config;
for (auto& c : config.GetFairMQMap())
for (auto& c : fConfig->GetFairMQMap())
{
if (!fChannels.insert(c).second)
{
LOG(warn) << "did not insert channel '" << c.first << "', it is already in the device.";
}
}
fId = config.GetValue<string>("id");
fNetworkInterface = config.GetValue<string>("network-interface");
fNumIoThreads = config.GetValue<int>("io-threads");
fInitializationTimeoutInS = config.GetValue<int>("initialization-timeout");
fId = fConfig->GetValue<string>("id");
fNetworkInterface = fConfig->GetValue<string>("network-interface");
fNumIoThreads = fConfig->GetValue<int>("io-threads");
fInitializationTimeoutInS = fConfig->GetValue<int>("initialization-timeout");
fRate = fConfig->GetValue<float>("rate");
try {
fDefaultTransportType = fair::mq::TransportTypes.at(fConfig->GetValue<string>("transport"));
} catch(const exception& e) {
LOG(ERROR) << "invalid transport type provided: " << fConfig->GetValue<string>("transport");
LOG(error) << "invalid transport type provided: " << fConfig->GetValue<string>("transport");
}
SetTransport(fConfig->GetValue<string>("transport"));
}
@@ -869,6 +884,8 @@ void FairMQDevice::LogSocketRates()
vector<int> logIntervals;
vector<int> intervalCounters;
size_t chanNameLen = 0;
// iterate over the channels map
for (const auto& mi : fChannels)
{
@@ -881,6 +898,7 @@ void FairMQDevice::LogSocketRates()
logIntervals.push_back(vi->fRateLogging);
intervalCounters.push_back(0);
filteredChannelNames.push_back(fair::mq::tools::ToString(mi.first, "[", vi - (mi.second).begin(), "]"));
chanNameLen = max(chanNameLen, filteredChannelNames.back().length());
}
}
}
@@ -949,9 +967,9 @@ void FairMQDevice::LogSocketRates()
bytesOut.at(i) = bytesOutNew.at(i);
msgOut.at(i) = msgOutNew.at(i);
LOG(info) << filteredChannelNames.at(i) << ": "
<< "in: " << msgPerSecIn.at(i) << " (" << mbPerSecIn.at(i) << " MB) "
<< "out: " << msgPerSecOut.at(i) << " (" << mbPerSecOut.at(i) << " MB)";
LOG(info) << setw(chanNameLen) << filteredChannelNames.at(i) << ": "
<< "in: " << msgPerSecIn.at(i) << " (" << mbPerSecIn.at(i) << " MB) "
<< "out: " << msgPerSecOut.at(i) << " (" << mbPerSecOut.at(i) << " MB)";
}
++i;

View File

@@ -208,7 +208,7 @@ class FairMQDevice : public FairMQStateMachine
template<typename... Args>
FairMQMessagePtr NewMessageFor(const std::string& channel, int index, Args&&... args) const
{
return fChannels.at(channel).at(index).Transport()->CreateMessage(std::forward<Args>(args)...);
return fChannels.at(channel).at(index).NewMessage(std::forward<Args>(args)...);
}
template<typename T>
@@ -413,6 +413,9 @@ class FairMQDevice : public FairMQStateMachine
void SetInitializationTimeoutInS(int initializationTimeoutInS) { fInitializationTimeoutInS = initializationTimeoutInS; }
int GetInitializationTimeoutInS() const { return fInitializationTimeoutInS; }
void SetRawCmdLineArgs(const std::vector<std::string>& args) { fRawCmdLineArgs = args; }
std::vector<std::string> GetRawCmdLineArgs() const { return fRawCmdLineArgs; }
protected:
std::shared_ptr<FairMQTransportFactory> fTransportFactory; ///< Default transport factory
std::unordered_map<fair::mq::Transport, std::shared_ptr<FairMQTransportFactory>> fTransports; ///< Container for transports
@@ -532,6 +535,7 @@ class FairMQDevice : public FairMQStateMachine
const fair::mq::tools::Version fVersion;
float fRate; ///< Rate limiting for ConditionalRun
size_t fLastTime; ///< Rate limiting for ConditionalRun
std::vector<std::string> fRawCmdLineArgs;
};
#endif /* FAIRMQDEVICE_H_ */

View File

@@ -24,7 +24,7 @@
#include <boost/msm/back/metafunctions.hpp>
#include <boost/msm/front/state_machine_def.hpp>
#include <boost/msm/front/functor_row.hpp>
#include <boost/core/demangle.hpp>
#include <boost/signals2.hpp> // signal/slot for onStateChange callbacks
#include <atomic>
@@ -72,8 +72,7 @@ struct Machine_ : public msmf::state_machine_def<Machine_>
{
public:
Machine_()
: fState()
, fWork()
: fWork()
, fWorkAvailableCondition()
, fWorkDoneCondition()
, fWorkMutex()
@@ -83,6 +82,7 @@ struct Machine_ : public msmf::state_machine_def<Machine_>
, fStateChangeSignal()
, fStateChangeSignalsMap()
, fTerminationRequested(false)
, fState()
, fWorkerThread()
{}
@@ -386,21 +386,18 @@ struct Machine_ : public msmf::state_machine_def<Machine_>
boost::mpl::for_each<all_states, boost::msm::wrap<boost::mpl::placeholders::_1>>(boost::msm::back::get_state_name<recursive_stt>(stateName, state));
stateName = stateName.substr(24);
size_t pos = stateName.find("_FSME");
stateName.erase(pos);
if (stateName == "1RUNNING" || stateName == "6DEVICE_READY" || stateName == "0PAUSED" || stateName == "8RESETTING_TASK" || stateName == "0RESETTING_DEVICE")
stateName = boost::core::demangle(stateName.c_str());
size_t pos = stateName.rfind(":");
if (pos != string::npos)
{
stateName = stateName.substr(1);
stateName = stateName.substr(pos + 1);
stateName = stateName.substr(0, stateName.size() - 4);
}
if (stateName != "OK")
{
LOG(state) << "No transition from state " << stateName << " on event " << e.name();
}
// LOG(state) << "no transition from state " << GetStateName(state) << " (" << state << ") on event " << e.name();
}
static string GetStateName(const int state)
@@ -521,8 +518,8 @@ _Pragma("GCC diagnostic pop")
using namespace fair::mq::fsm;
FairMQStateMachine::FairMQStateMachine()
: fFsm(new FairMQFSM)
, fChangeStateMutex()
: fChangeStateMutex()
, fFsm(new FairMQFSM)
{
static_pointer_cast<FairMQFSM>(fFsm)->fInitWrapperHandler = bind(&FairMQStateMachine::InitWrapper, this);
static_pointer_cast<FairMQFSM>(fFsm)->fInitTaskWrapperHandler = bind(&FairMQStateMachine::InitTaskWrapper, this);
@@ -775,4 +772,4 @@ int FairMQStateMachine::GetEventNumber(const string& event)
LOG(error) << "Requested number for non-existent event... " << event << endl
<< "Supported are: INIT_DEVICE, INIT_TASK, RUN, PAUSE, STOP, RESET_DEVICE, RESET_TASK, END, ERROR_FOUND";
return -1;
}
}

View File

@@ -6,6 +6,7 @@
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <fairmq/plugins/Builtin.h>
#include <fairmq/PluginManager.h>
#include <fairmq/Tools.h>
#include <boost/program_options.hpp>

View File

@@ -11,7 +11,6 @@
#include <fairmq/Plugin.h>
#include <fairmq/PluginServices.h>
#include <fairmq/plugins/Builtin.h>
#include <fairmq/Tools.h>
#include <FairMQDevice.h>
#define BOOST_FILESYSTEM_VERSION 3

View File

@@ -57,7 +57,7 @@ void FairMQBenchmarkSampler::Run()
// store the channel reference to avoid traversing the map on every loop iteration
FairMQChannel& dataOutChannel = fChannels.at(fOutChannelName).at(0);
FairMQMessagePtr baseMsg(dataOutChannel.Transport()->CreateMessage(fMsgSize));
FairMQMessagePtr baseMsg(dataOutChannel.NewMessage(fMsgSize));
LOG(info) << "Starting the benchmark with message size of " << fMsgSize << " and " << fMaxIterations << " iterations.";
auto tStart = chrono::high_resolution_clock::now();
@@ -66,7 +66,7 @@ void FairMQBenchmarkSampler::Run()
{
if (fSameMessage)
{
FairMQMessagePtr msg(dataOutChannel.Transport()->CreateMessage());
FairMQMessagePtr msg(dataOutChannel.NewMessage());
msg->Copy(*baseMsg);
if (dataOutChannel.Send(msg) >= 0)
@@ -83,7 +83,7 @@ void FairMQBenchmarkSampler::Run()
}
else
{
FairMQMessagePtr msg(dataOutChannel.Transport()->CreateMessage(fMsgSize));
FairMQMessagePtr msg(dataOutChannel.NewMessage(fMsgSize));
if (dataOutChannel.Send(msg) >= 0)
{

View File

@@ -1,69 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQSink.cxx
*
* @since 2013-01-09
* @author D. Klein, A. Rybalchenko
*/
#include "FairMQSink.h"
#include <chrono>
#include "../FairMQLogger.h"
#include "../options/FairMQProgOptions.h"
using namespace std;
FairMQSink::FairMQSink()
: fNumMsgs(0)
, fInChannelName()
{
}
void FairMQSink::InitTask()
{
fNumMsgs = fConfig->GetValue<uint64_t>("num-msgs");
fInChannelName = fConfig->GetValue<string>("in-channel");
}
void FairMQSink::Run()
{
uint64_t numReceivedMsgs = 0;
// store the channel reference to avoid traversing the map on every loop iteration
FairMQChannel& dataInChannel = fChannels.at(fInChannelName).at(0);
LOG(info) << "Starting the benchmark and expecting to receive " << fNumMsgs << " messages.";
auto tStart = chrono::high_resolution_clock::now();
while (CheckCurrentState(RUNNING))
{
FairMQMessagePtr msg(dataInChannel.Transport()->CreateMessage());
if (dataInChannel.Receive(msg) >= 0)
{
if (fNumMsgs > 0)
{
if (numReceivedMsgs >= fNumMsgs)
{
break;
}
}
numReceivedMsgs++;
}
}
auto tEnd = chrono::high_resolution_clock::now();
LOG(info) << "Leaving RUNNING state. Received " << numReceivedMsgs << " messages in " << chrono::duration<double, milli>(tEnd - tStart).count() << "ms.";
}
FairMQSink::~FairMQSink()
{
}

View File

@@ -56,7 +56,7 @@ class FairMQSink : public FairMQDevice//, public OutputPolicy
while (CheckCurrentState(RUNNING))
{
FairMQMessagePtr msg(dataInChannel.Transport()->CreateMessage());
FairMQMessagePtr msg(dataInChannel.NewMessage());
if (dataInChannel.Receive(msg) >= 0)
{

View File

@@ -27,11 +27,16 @@ namespace parser
{
// TODO : add key-value map<string,string> parameter for replacing/updating values from keys
// function that convert property tree (given the json structure) to FairMQMap
FairMQMap ptreeToMQMap(const boost::property_tree::ptree& pt, const string& id, const string& rootNode)
// function that convert property tree (given the json structure) to FairMQChannelMap
FairMQChannelMap ptreeToMQMap(const boost::property_tree::ptree& pt, const string& id, const string& rootNode)
{
if (id == "")
{
throw ParserError("no device ID provided. Provide with `--id` cmd option");
}
// Create fair mq map
FairMQMap channelMap;
FairMQChannelMap channelMap;
// boost::property_tree::json_parser::write_json(std::cout, pt);
// Helper::PrintDeviceList(pt.get_child(rootNode));
// Extract value from boost::property_tree
@@ -46,20 +51,13 @@ FairMQMap ptreeToMQMap(const boost::property_tree::ptree& pt, const string& id,
return channelMap;
}
FairMQMap JSON::UserParser(const string& filename, const string& deviceId, const string& rootNode)
FairMQChannelMap JSON::UserParser(const string& filename, const string& deviceId, const string& rootNode)
{
boost::property_tree::ptree pt;
boost::property_tree::read_json(filename, pt);
return ptreeToMQMap(pt, deviceId, rootNode);
}
FairMQMap JSON::UserParser(stringstream& input, const string& deviceId, const string& rootNode)
{
boost::property_tree::ptree pt;
boost::property_tree::read_json(input, pt);
return ptreeToMQMap(pt, deviceId, rootNode);
}
namespace Helper
{
@@ -90,7 +88,7 @@ void PrintDeviceList(const boost::property_tree::ptree& tree)
}
}
void DeviceParser(const boost::property_tree::ptree& tree, FairMQMap& channelMap, const string& deviceId)
void DeviceParser(const boost::property_tree::ptree& tree, FairMQChannelMap& channelMap, const string& deviceId)
{
string deviceIdKey;
@@ -129,7 +127,7 @@ void DeviceParser(const boost::property_tree::ptree& tree, FairMQMap& channelMap
}
}
void ChannelParser(const boost::property_tree::ptree& tree, FairMQMap& channelMap)
void ChannelParser(const boost::property_tree::ptree& tree, FairMQChannelMap& channelMap)
{
string channelKey;

View File

@@ -10,8 +10,8 @@
#include <string>
#include <vector>
#include <map>
#include <unordered_map>
#include <exception>
#include <boost/property_tree/ptree_fwd.hpp>
@@ -24,22 +24,23 @@ namespace mq
namespace parser
{
using FairMQMap = std::unordered_map<std::string, std::vector<FairMQChannel>>;
using FairMQChannelMap = std::unordered_map<std::string, std::vector<FairMQChannel>>;
FairMQMap ptreeToMQMap(const boost::property_tree::ptree& pt, const std::string& deviceId, const std::string& rootNode);
struct ParserError : std::runtime_error { using std::runtime_error::runtime_error; };
FairMQChannelMap ptreeToMQMap(const boost::property_tree::ptree& pt, const std::string& deviceId, const std::string& rootNode);
struct JSON
{
FairMQMap UserParser(const std::string& filename, const std::string& deviceId, const std::string& rootNode = "fairMQOptions");
FairMQMap UserParser(std::stringstream& input, const std::string& deviceId, const std::string& rootNode = "fairMQOptions");
FairMQChannelMap UserParser(const std::string& filename, const std::string& deviceId, const std::string& rootNode = "fairMQOptions");
};
namespace Helper
{
void PrintDeviceList(const boost::property_tree::ptree& tree);
void DeviceParser(const boost::property_tree::ptree& tree, FairMQMap& channelMap, const std::string& deviceId);
void ChannelParser(const boost::property_tree::ptree& tree, FairMQMap& channelMap);
void DeviceParser(const boost::property_tree::ptree& tree, FairMQChannelMap& channelMap, const std::string& deviceId);
void ChannelParser(const boost::property_tree::ptree& tree, FairMQChannelMap& channelMap);
void SocketParser(const boost::property_tree::ptree& tree, std::vector<FairMQChannel>& channelList, const std::string& channelName, const FairMQChannel& commonChannel);
} // Helper namespace

View File

@@ -12,25 +12,70 @@
* Created on March 11, 2015, 10:20 PM
*/
#include "FairMQLogger.h"
#include "FairMQProgOptions.h"
#include "FairProgOptionsHelper.h"
#include "FairMQParser.h"
#include "FairMQSuboptParser.h"
#include "FairMQLogger.h"
#include <boost/algorithm/string.hpp> // join/split
#include <algorithm>
#include <iomanip>
#include <iostream>
#include <exception>
using namespace std;
using namespace fair::mq;
namespace po = boost::program_options;
FairMQProgOptions::FairMQProgOptions()
: FairProgOptions()
, fMQCmdOptions("FairMQ device options")
, fMQParserOptions("FairMQ config parser options")
, fFairMQMap()
: fVarMap()
, fFairMQChannelMap()
, fAllOptions("FairMQ Command Line Options")
, fGeneralOptions("General options")
, fMQOptions("FairMQ device options")
, fParserOptions("FairMQ channel config parser options")
, fConfigMutex()
, fChannelInfo()
, fMQKeyMap()
, fChannelKeyMap()
, fUnregisteredOptions()
, fEvents()
{
InitOptionDescription();
fGeneralOptions.add_options()
("help,h", "Print help")
("version,v", "Print version")
("severity", po::value<string>()->default_value("debug"), "Log severity level: trace, debug, info, state, warn, error, fatal, nolog")
("verbosity", po::value<string>()->default_value("medium"), "Log verbosity level: veryhigh, high, medium, low")
("color", po::value<bool >()->default_value(true), "Log color (true/false)")
("log-to-file", po::value<string>()->default_value(""), "Log output to a file.")
("print-options", po::value<bool >()->implicit_value(true), "Print options in machine-readable format (<option>:<computed-value>:<type>:<description>)");
fMQOptions.add_options()
("id", po::value<string>(), "Device ID (required argument).")
("io-threads", po::value<int >()->default_value(1), "Number of I/O threads.")
("transport", po::value<string>()->default_value("zeromq"), "Transport ('zeromq'/'nanomsg'/'shmem') .")
("network-interface", po::value<string>()->default_value("default"), "Network interface to bind on (e.g. eth0, ib0..., default will try to detect the interface of the default route).")
("config-key", po::value<string>(), "Use provided value instead of device id for fetching the configuration from the config file.")
("initialization-timeout", po::value<int >()->default_value(120), "Timeout for the initialization in seconds (when expecting dynamic initialization).")
("port-range-min", po::value<int >()->default_value(22000), "Start of the port range for dynamic initialization.")
("port-range-max", po::value<int >()->default_value(32000), "End of the port range for dynamic initialization.")
("print-channels", po::value<bool >()->implicit_value(true), "Print registered channel endpoints in a machine-readable format (<channel name>:<min num subchannels>:<max num subchannels>)")
("shm-segment-size", po::value<size_t>()->default_value(2000000000), "Shared memory: size of the shared memory segment (in bytes).")
("shm-monitor", po::value<bool >()->default_value(true), "Shared memory: run monitor daemon.")
("rate", po::value<float >()->default_value(0.), "Rate for conditional run loop (Hz).")
("session", po::value<string>()->default_value("default"), "Session name.");
fParserOptions.add_options()
("mq-config", po::value<string>(), "JSON input as file.")
("channel-config", po::value<vector<string>>()->multitoken()->composing(), "Configuration of single or multiple channel(s) by comma separated key=value list");
fAllOptions.add(fGeneralOptions);
fAllOptions.add(fMQOptions);
fAllOptions.add(fParserOptions);
ParseDefaults();
}
@@ -52,12 +97,20 @@ int FairMQProgOptions::ParseAll(const vector<string>& cmdLineArgs, bool allowUnr
int FairMQProgOptions::ParseAll(const int argc, char const* const* argv, bool allowUnregistered)
{
if (FairProgOptions::ParseCmdLine(argc, argv, allowUnregistered))
ParseCmdLine(argc, argv, allowUnregistered);
// if this option is provided, handle them and return stop value
if (fVarMap.count("help"))
{
// ParseCmdLine returns 0 if no immediate switches found.
cout << fAllOptions << endl;
return 1;
}
// if this option is provided, handle them and return stop value
if (fVarMap.count("print-options"))
{
PrintOptionsRaw();
return 1;
}
// if these options are provided, do no further checks and let the device handle them
if (fVarMap.count("print-channels") || fVarMap.count("version"))
{
@@ -90,7 +143,7 @@ int FairMQProgOptions::ParseAll(const int argc, char const* const* argv, bool al
{
id = fVarMap["config-key"].as<string>();
}
else
else if (fVarMap.count("id"))
{
id = fVarMap["id"].as<string>();
}
@@ -101,54 +154,84 @@ int FairMQProgOptions::ParseAll(const int argc, char const* const* argv, bool al
if (fVarMap.count("mq-config"))
{
LOG(debug) << "mq-config: Using default JSON parser";
Store(fair::mq::parser::JSON().UserParser(fVarMap["mq-config"].as<string>(), id));
}
else if (fVarMap.count("config-json-string"))
{
LOG(debug) << "config-json-string: Parsing JSON string";
string value = fair::mq::ConvertVariableValue<fair::mq::VarInfoToString>()(fVarMap.at("config-json-string"));
stringstream ss;
ss << value;
Store(fair::mq::parser::JSON().UserParser(ss, id));
UpdateChannelMap(parser::JSON().UserParser(fVarMap.at("mq-config").as<string>(), id));
}
else if (fVarMap.count("channel-config"))
{
LOG(debug) << "channel-config: Parsing channel configuration";
Store(fair::mq::parser::SUBOPT().UserParser(fVarMap.at("channel-config").as<vector<string>>(), id));
UpdateChannelMap(parser::SUBOPT().UserParser(fVarMap.at("channel-config").as<vector<string>>(), id));
}
else
{
LOG(warn) << "FairMQProgOptions: no channels configuration provided via neither of:";
for (const auto& p : fMQParserOptions.options())
for (const auto& p : fParserOptions.options())
{
LOG(warn) << "--" << p->canonical_display_name();
}
LOG(warn) << "No channels will be created (You can create them manually).";
}
}
catch (std::exception& e)
catch (exception& e)
{
LOG(error) << e.what();
return 1;
}
FairProgOptions::PrintOptions();
PrintOptions();
return 0;
}
int FairMQProgOptions::Store(const FairMQMap& channels)
void FairMQProgOptions::ParseCmdLine(const int argc, char const* const* argv, bool allowUnregistered)
{
fFairMQMap = channels;
UpdateChannelInfo();
UpdateMQValues();
return 0;
// get options from cmd line and store in variable map
// here we use command_line_parser instead of parse_command_line, to allow unregistered and positional options
if (allowUnregistered)
{
po::command_line_parser parser{argc, argv};
parser.options(fAllOptions).allow_unregistered();
po::parsed_options parsed = parser.run();
fUnregisteredOptions = po::collect_unrecognized(parsed.options, po::include_positional);
po::store(parsed, fVarMap);
}
else
{
po::store(po::parse_command_line(argc, argv, fAllOptions), fVarMap);
}
po::notify(fVarMap);
}
void FairMQProgOptions::ParseDefaults()
{
vector<string> emptyArgs;
emptyArgs.push_back("dummy");
vector<const char*> argv(emptyArgs.size());
transform(emptyArgs.begin(), emptyArgs.end(), argv.begin(), [](const string& str)
{
return str.c_str();
});
po::store(po::parse_command_line(argv.size(), const_cast<char**>(argv.data()), fAllOptions), fVarMap);
}
unordered_map<string, vector<FairMQChannel>> FairMQProgOptions::GetFairMQMap() const
{
return fFairMQChannelMap;
}
unordered_map<string, int> FairMQProgOptions::GetChannelInfo() const
{
return fChannelInfo;
}
// replace FairMQChannelMap, and update variable map accordingly
int FairMQProgOptions::UpdateChannelMap(const FairMQMap& channels)
int FairMQProgOptions::UpdateChannelMap(const unordered_map<string, vector<FairMQChannel>>& channels)
{
fFairMQMap = channels;
fFairMQChannelMap = channels;
UpdateChannelInfo();
UpdateMQValues();
return 0;
@@ -157,7 +240,7 @@ int FairMQProgOptions::UpdateChannelMap(const FairMQMap& channels)
void FairMQProgOptions::UpdateChannelInfo()
{
fChannelInfo.clear();
for (const auto& c : fFairMQMap)
for (const auto& c : fFairMQChannelMap)
{
fChannelInfo.insert(make_pair(c.first, c.second.size()));
}
@@ -167,7 +250,7 @@ void FairMQProgOptions::UpdateChannelInfo()
// create key for variable map as follow : channelName.index.memberName
void FairMQProgOptions::UpdateMQValues()
{
for (const auto& p : fFairMQMap)
for (const auto& p : fFairMQChannelMap)
{
int index = 0;
@@ -183,15 +266,15 @@ void FairMQProgOptions::UpdateMQValues()
string rcvKernelSizeKey = "chans." + p.first + "." + to_string(index) + ".rcvKernelSize";
string rateLoggingKey = "chans." + p.first + "." + to_string(index) + ".rateLogging";
fMQKeyMap[typeKey] = MQKey{p.first, index, "type"};
fMQKeyMap[methodKey] = MQKey{p.first, index, "method"};
fMQKeyMap[addressKey] = MQKey{p.first, index, "address"};
fMQKeyMap[transportKey] = MQKey{p.first, index, "transport"};
fMQKeyMap[sndBufSizeKey] = MQKey{p.first, index, "sndBufSize"};
fMQKeyMap[rcvBufSizeKey] = MQKey{p.first, index, "rcvBufSize"};
fMQKeyMap[sndKernelSizeKey] = MQKey{p.first, index, "sndKernelSize"};
fMQKeyMap[rcvKernelSizeKey] = MQKey{p.first, index, "rcvkernelSize"};
fMQKeyMap[rateLoggingKey] = MQKey{p.first, index, "rateLogging"};
fChannelKeyMap[typeKey] = ChannelKey{p.first, index, "type"};
fChannelKeyMap[methodKey] = ChannelKey{p.first, index, "method"};
fChannelKeyMap[addressKey] = ChannelKey{p.first, index, "address"};
fChannelKeyMap[transportKey] = ChannelKey{p.first, index, "transport"};
fChannelKeyMap[sndBufSizeKey] = ChannelKey{p.first, index, "sndBufSize"};
fChannelKeyMap[rcvBufSizeKey] = ChannelKey{p.first, index, "rcvBufSize"};
fChannelKeyMap[sndKernelSizeKey] = ChannelKey{p.first, index, "sndKernelSize"};
fChannelKeyMap[rcvKernelSizeKey] = ChannelKey{p.first, index, "rcvkernelSize"};
fChannelKeyMap[rateLoggingKey] = ChannelKey{p.first, index, "rateLogging"};
UpdateVarMap<string>(typeKey, channel.GetType());
UpdateVarMap<string>(methodKey, channel.GetMethod());
@@ -209,103 +292,56 @@ void FairMQProgOptions::UpdateMQValues()
}
}
int FairMQProgOptions::ImmediateOptions()
{
if (fVarMap.count("help"))
{
cout << "===== FairMQ Program Options =====" << endl << fAllOptions;
return 1;
}
if (fVarMap.count("print-options"))
{
PrintOptionsRaw();
return 1;
}
return 0;
}
void FairMQProgOptions::InitOptionDescription()
{
fMQCmdOptions.add_options()
("id", po::value<string>(), "Device ID (required argument).")
("io-threads", po::value<int >()->default_value(1), "Number of I/O threads.")
("transport", po::value<string>()->default_value("zeromq"), "Transport ('zeromq'/'nanomsg').")
("config", po::value<string>()->default_value("static"), "Config source ('static'/<config library filename>).")
("network-interface", po::value<string>()->default_value("default"), "Network interface to bind on (e.g. eth0, ib0..., default will try to detect the interface of the default route).")
("config-key", po::value<string>(), "Use provided value instead of device id for fetching the configuration from the config file.")
("initialization-timeout", po::value<int >()->default_value(120), "Timeout for the initialization in seconds (when expecting dynamic initialization).")
("port-range-min", po::value<int >()->default_value(22000), "Start of the port range for dynamic initialization.")
("port-range-max", po::value<int >()->default_value(32000), "End of the port range for dynamic initialization.")
("print-channels", po::value<bool >()->implicit_value(true), "Print registered channel endpoints in a machine-readable format (<channel name>:<min num subchannels>:<max num subchannels>)")
("shm-segment-size", po::value<size_t>()->default_value(2000000000), "Shared memory: size of the shared memory segment (in bytes).")
("shm-monitor", po::value<bool >()->default_value(true), "Shared memory: run monitor daemon.")
("rate", po::value<float >()->default_value(0.), "Rate for conditional run loop (Hz).")
("session", po::value<string>()->default_value("default"), "Session name.")
;
fMQParserOptions.add_options()
("config-json-string", po::value<vector<string>>()->multitoken(), "JSON input as command line string.")
("mq-config", po::value<string>(), "JSON/XML input as file. The configuration object will check xml or json file extention and will call the json or xml parser accordingly")
("channel-config", po::value<vector<string>>()->multitoken()->composing(), "Configuration of single or multiple channel(s) by comma separated key=value list")
;
AddToCmdLineOptions(fMQCmdOptions);
AddToCmdLineOptions(fMQParserOptions);
}
int FairMQProgOptions::UpdateChannelMap(const string& channelName, int index, const string& member, const string& val)
int FairMQProgOptions::UpdateChannelValue(const string& channelName, int index, const string& member, const string& val)
{
if (member == "type")
{
fFairMQMap.at(channelName).at(index).UpdateType(val);
fFairMQChannelMap.at(channelName).at(index).UpdateType(val);
return 0;
}
if (member == "method")
{
fFairMQMap.at(channelName).at(index).UpdateMethod(val);
fFairMQChannelMap.at(channelName).at(index).UpdateMethod(val);
return 0;
}
if (member == "address")
{
fFairMQMap.at(channelName).at(index).UpdateAddress(val);
fFairMQChannelMap.at(channelName).at(index).UpdateAddress(val);
return 0;
}
if (member == "transport")
{
fFairMQMap.at(channelName).at(index).UpdateTransport(val);
fFairMQChannelMap.at(channelName).at(index).UpdateTransport(val);
return 0;
}
else
{
//if we get there it means something is wrong
LOG(error) << "update of FairMQChannel map failed for the following key: "
<< channelName << "." << index << "." << member;
LOG(error) << "update of FairMQChannel map failed for the following key: " << channelName << "." << index << "." << member;
return 1;
}
}
int FairMQProgOptions::UpdateChannelMap(const string& channelName, int index, const string& member, int val)
int FairMQProgOptions::UpdateChannelValue(const string& channelName, int index, const string& member, int val)
{
if (member == "sndBufSize")
{
fFairMQMap.at(channelName).at(index).UpdateSndBufSize(val);
fFairMQChannelMap.at(channelName).at(index).UpdateSndBufSize(val);
return 0;
}
if (member == "rcvBufSize")
{
fFairMQMap.at(channelName).at(index).UpdateRcvBufSize(val);
fFairMQChannelMap.at(channelName).at(index).UpdateRcvBufSize(val);
return 0;
}
if (member == "rateLogging")
{
fFairMQMap.at(channelName).at(index).UpdateRateLogging(val);
fFairMQChannelMap.at(channelName).at(index).UpdateRateLogging(val);
return 0;
}
else
@@ -315,3 +351,135 @@ int FairMQProgOptions::UpdateChannelMap(const string& channelName, int index, co
return 1;
}
}
vector<string> FairMQProgOptions::GetPropertyKeys() const
{
lock_guard<mutex> lock{fConfigMutex};
vector<string> result;
for (const auto& it : fVarMap)
{
result.push_back(it.first.c_str());
}
return result;
}
/// Add option descriptions
int FairMQProgOptions::AddToCmdLineOptions(const po::options_description optDesc, bool /* visible */)
{
fAllOptions.add(optDesc);
return 0;
}
po::options_description& FairMQProgOptions::GetCmdLineOptions()
{
return fAllOptions;
}
int FairMQProgOptions::PrintOptions()
{
// -> loop over variable map and print its content
// -> In this example the following types are supported:
// string, int, float, double, short, boost::filesystem::path
// vector<string>, vector<int>, vector<float>, vector<double>, vector<short>
map<string, VarValInfo> mapinfo;
// get string length for formatting and convert varmap values into string
int maxLenKey = 0;
int maxLenValue = 0;
int maxLenType = 0;
int maxLenDefault = 0;
for (const auto& m : fVarMap)
{
maxLenKey = max(maxLenKey, static_cast<int>(m.first.length()));
VarValInfo valinfo = ConvertVariableValue<options::ToVarValInfo>()((m.second));
mapinfo[m.first] = valinfo;
maxLenValue = max(maxLenValue, static_cast<int>(valinfo.value.length()));
maxLenType = max(maxLenType, static_cast<int>(valinfo.type.length()));
maxLenDefault = max(maxLenDefault, static_cast<int>(valinfo.defaulted.length()));
}
// TODO : limit the value len field in a better way
if (maxLenValue > 100)
{
maxLenValue = 100;
}
for (const auto& o : fUnregisteredOptions)
{
LOG(debug) << "detected unregistered option: " << o;
}
stringstream ss;
ss << "Configuration: \n";
for (const auto& p : mapinfo)
{
ss << setfill(' ') << left
<< setw(maxLenKey) << p.first << " = "
<< setw(maxLenValue) << p.second.value
<< setw(maxLenType) << p.second.type
<< setw(maxLenDefault) << p.second.defaulted
<< "\n";
}
LOG(info) << ss.str();
return 0;
}
int FairMQProgOptions::PrintOptionsRaw()
{
const vector<boost::shared_ptr<po::option_description>>& options = fAllOptions.options();
for (const auto& o : options)
{
VarValInfo value;
if (fVarMap.count(o->canonical_display_name()))
{
value = ConvertVariableValue<options::ToVarValInfo>()((fVarMap[o->canonical_display_name()]));
}
string description = o->description();
replace(description.begin(), description.end(), '\n', ' ');
cout << o->long_name() << ":" << value.value << ":" << (value.type == "" ? "<unknown>" : value.type) << ":" << description << endl;
}
return 0;
}
string FairMQProgOptions::GetStringValue(const string& key)
{
unique_lock<mutex> lock(fConfigMutex);
string valueStr;
try
{
if (fVarMap.count(key))
{
valueStr = ConvertVariableValue<options::ToString>()(fVarMap.at(key));
}
}
catch (exception& e)
{
LOG(error) << "Exception thrown for the key '" << key << "'";
LOG(error) << e.what();
}
return valueStr;
}
int FairMQProgOptions::Count(const string& key) const
{
unique_lock<mutex> lock(fConfigMutex);
return fVarMap.count(key);
}

View File

@@ -6,26 +6,22 @@
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/*
* File: FairMQProgOptions.h
* Author: winckler
*
* Created on March 11, 2015, 10:20 PM
*/
#ifndef FAIRMQPROGOPTIONS_H
#define FAIRMQPROGOPTIONS_H
#include <fairmq/EventManager.h>
#include "FairProgOptions.h"
#include "FairMQLogger.h"
#include "FairMQChannel.h"
#include <fairmq/Tools.h>
#include <boost/program_options.hpp>
#include <unordered_map>
#include <functional>
#include <map>
#include <mutex>
#include <string>
#include <vector>
#include <mutex>
#include <sstream>
namespace fair
{
@@ -38,10 +34,10 @@ struct PropertyChangeAsString : Event<std::string> {};
} /* namespace mq */
} /* namespace fair */
class FairMQProgOptions : public FairProgOptions
class FairMQProgOptions
{
protected:
using FairMQMap = std::unordered_map<std::string, std::vector<FairMQChannel>>;
private:
using FairMQChannelMap = std::unordered_map<std::string, std::vector<FairMQChannel>>;
public:
FairMQProgOptions();
@@ -49,52 +45,11 @@ class FairMQProgOptions : public FairProgOptions
int ParseAll(const std::vector<std::string>& cmdLineArgs, bool allowUnregistered);
// parse command line.
// default parser for the mq-configuration file (JSON/XML) is called if command line key mq-config is called
int ParseAll(const int argc, char const* const* argv, bool allowUnregistered = false) override;
// default parser for the mq-configuration file (JSON) is called if command line key mq-config is called
int ParseAll(const int argc, char const* const* argv, bool allowUnregistered = true);
FairMQMap GetFairMQMap() const
{
return fFairMQMap;
}
std::unordered_map<std::string, int> GetChannelInfo() const
{
return fChannelInfo;
}
template<typename T>
int UpdateValue(const std::string& key, T val)
{
std::unique_lock<std::mutex> lock(fConfigMutex);
if (fVarMap.count(key))
{
// update variable map
UpdateVarMap<typename std::decay<T>::type>(key, val);
// update FairMQChannel map, check first if data are int or string
if (std::is_same<T, int>::value || std::is_same<T, std::string>::value)
{
if (fMQKeyMap.count(key))
{
UpdateChannelMap(fMQKeyMap.at(key).channel, fMQKeyMap.at(key).index, fMQKeyMap.at(key).member, val);
}
}
lock.unlock();
//if (std::is_same<T, int>::value || std::is_same<T, std::string>::value)//if one wants to restrict type
fEvents.Emit<fair::mq::PropertyChange, typename std::decay<T>::type>(key, val);
fEvents.Emit<fair::mq::PropertyChangeAsString, std::string>(key, GetStringValue(key));
return 0;
}
else
{
LOG(error) << "UpdateValue failed: key '" << key << "' not found in the variable map";
return 1;
}
return 0;
}
FairMQChannelMap GetFairMQMap() const;
std::unordered_map<std::string, int> GetChannelInfo() const;
template<typename T>
int SetValue(const std::string& key, T val)
@@ -104,12 +59,12 @@ class FairMQProgOptions : public FairProgOptions
// update variable map
UpdateVarMap<typename std::decay<T>::type>(key, val);
// update FairMQChannel map, check first if data are int or string
// update FairMQChannel map if the key is a channel key
if (std::is_same<T, int>::value || std::is_same<T, std::string>::value)
{
if (fMQKeyMap.count(key))
if (fChannelKeyMap.count(key))
{
UpdateChannelMap(fMQKeyMap.at(key).channel, fMQKeyMap.at(key).index, fMQKeyMap.at(key).member, val);
UpdateChannelValue(fChannelKeyMap.at(key).channel, fChannelKeyMap.at(key).index, fChannelKeyMap.at(key).member, val);
}
}
@@ -155,57 +110,114 @@ class FairMQProgOptions : public FairProgOptions
fEvents.Unsubscribe<fair::mq::PropertyChangeAsString, std::string>(subscriber);
}
// replace FairMQChannelMap, and update variable map accordingly
int UpdateChannelMap(const FairMQMap& map);
std::vector<std::string> GetPropertyKeys() const;
protected:
struct MQKey
// get value corresponding to the key
template<typename T>
T GetValue(const std::string& key) const
{
std::unique_lock<std::mutex> lock(fConfigMutex);
T val = T();
if (fVarMap.count(key))
{
val = fVarMap[key].as<T>();
}
else
{
LOG(warn) << "Config has no key: " << key << ". Returning default constructed object.";
}
return val;
}
// Given a key, convert the variable value to string
std::string GetStringValue(const std::string& key);
int Count(const std::string& key) const;
template<typename T>
T ConvertTo(const std::string& strValue)
{
if (std::is_arithmetic<T>::value)
{
std::istringstream iss(strValue);
T val;
iss >> val;
return val;
}
else
{
LOG(error) << "the provided string " << strValue << " cannot be converted to the requested type. The target type must be arithmetic type.";
}
}
// add options_description
int AddToCmdLineOptions(const boost::program_options::options_description optDesc, bool visible = true);
boost::program_options::options_description& GetCmdLineOptions();
const boost::program_options::variables_map& GetVarMap() const { return fVarMap; }
int PrintOptions();
int PrintOptionsRaw();
private:
struct ChannelKey
{
std::string channel;
int index;
std::string member;
};
po::options_description fMQCmdOptions;
po::options_description fMQParserOptions;
FairMQMap fFairMQMap;
boost::program_options::variables_map fVarMap; ///< options container
FairMQChannelMap fFairMQChannelMap;
// map of read channel info - channel name - number of subchannels
std::unordered_map<std::string, int> fChannelInfo;
boost::program_options::options_description fAllOptions; ///< all options descriptions
boost::program_options::options_description fGeneralOptions; ///< general options descriptions
boost::program_options::options_description fMQOptions; ///< MQ options descriptions
boost::program_options::options_description fParserOptions; ///< MQ Parser options descriptions
std::map<std::string, MQKey> fMQKeyMap;// key=full path - val=key info
mutable std::mutex fConfigMutex;
int ImmediateOptions() override; // for custom help & version printing
void InitOptionDescription();
std::unordered_map<std::string, int> fChannelInfo; ///< channel name - number of subchannels
std::unordered_map<std::string, ChannelKey> fChannelKeyMap;// key=full path - val=key info
std::vector<std::string> fUnregisteredOptions; ///< container with unregistered options
fair::mq::EventManager fEvents;
void ParseCmdLine(const int argc, char const* const* argv, bool allowUnregistered = true);
void ParseDefaults();
// read FairMQChannelMap and insert/update corresponding values in variable map
// create key for variable map as follow : channelName.index.memberName
void UpdateMQValues();
int Store(const FairMQMap& channels);
int Store(const FairMQChannelMap& channels);
private:
template<typename T>
void EmitUpdate(const std::string& key, T val)
{
//compile time check whether T is const char* or char*, and in that case a compile time error is thrown.
// compile time check whether T is const char* or char*, and in that case a compile time error is thrown.
static_assert(!std::is_same<T,const char*>::value || !std::is_same<T, char*>::value,
"In template member FairMQProgOptions::EmitUpdate<T>(key,val) the types const char* or char* for the calback signatures are not supported.");
fEvents.Emit<fair::mq::PropertyChange, T>(key, val);
fEvents.Emit<fair::mq::PropertyChangeAsString, std::string>(key, GetStringValue(key));
}
int UpdateChannelMap(const std::string& channelName, int index, const std::string& member, const std::string& val);
int UpdateChannelMap(const std::string& channelName, int index, const std::string& member, int val);
// for cases other than int and string
template<typename T>
int UpdateChannelMap(const std::string& /*channelName*/, int /*index*/, const std::string& /*member*/, T /*val*/)
{
return 0;
}
int UpdateChannelMap(const FairMQChannelMap& map);
int UpdateChannelValue(const std::string& channelName, int index, const std::string& member, const std::string& val);
int UpdateChannelValue(const std::string& channelName, int index, const std::string& member, int val);
void UpdateChannelInfo();
fair::mq::EventManager fEvents;
// helper to modify the value of variable map after calling boost::program_options::store
template<typename T>
void UpdateVarMap(const std::string& key, const T& val)
{
std::map<std::string, boost::program_options::variable_value>& vm = fVarMap;
vm[key].value() = boost::any(val);
}
};
#endif /* FAIRMQPROGOPTIONS_H */

View File

@@ -29,7 +29,7 @@ namespace parser
constexpr const char* SUBOPT::channelOptionKeys[];
FairMQMap SUBOPT::UserParser(const vector<string>& channelConfig, const string& deviceId, const string& rootNode)
FairMQChannelMap SUBOPT::UserParser(const vector<string>& channelConfig, const string& deviceId, const string& rootNode)
{
ptree pt;

View File

@@ -14,14 +14,12 @@
#ifndef FAIRMQPARSER_SUBOPT_H
#define FAIRMQPARSER_SUBOPT_H
#include "FairMQParser.h" // for FairMQMap
#include "FairMQParser.h" // for FairMQChannelMap
#include <boost/program_options.hpp>
#include <cstring>
#include <vector>
#include <string>
namespace po = boost::program_options;
namespace fair
{
namespace mq
@@ -78,7 +76,7 @@ struct SUBOPT
nullptr
};
FairMQMap UserParser(const std::vector<std::string>& channelConfig, const std::string& deviceId, const std::string& rootNode = "fairMQOptions");
FairMQChannelMap UserParser(const std::vector<std::string>& channelConfig, const std::string& deviceId, const std::string& rootNode = "fairMQOptions");
};
}

View File

@@ -1,178 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/*
* File: FairProgOptions.cxx
* Author: winckler
*
* Created on March 11, 2015, 5:38 PM
*/
#include "FairProgOptions.h"
#include <iomanip>
#include <sstream>
#include <algorithm>
using namespace std;
using namespace fair::mq;
FairProgOptions::FairProgOptions()
: fVarMap()
, fGeneralOptions("General options")
, fAllOptions("Command line options")
, fConfigMutex()
{
fGeneralOptions.add_options()
("help,h", "produce help")
("version,v", "print version")
("severity", po::value<string>()->default_value("debug"), "Log severity level: trace, debug, info, state, warn, error, fatal, nolog")
("verbosity", po::value<string>()->default_value("medium"), "Log verbosity level: veryhigh, high, medium, low")
("color", po::value<bool>()->default_value(true), "Log color (true/false)")
("log-to-file", po::value<string>()->default_value(""), "Log output to a file.")
("print-options", po::value<bool>()->implicit_value(true), "print options in machine-readable format (<option>:<computed-value>:<type>:<description>)");
fAllOptions.add(fGeneralOptions);
}
FairProgOptions::~FairProgOptions()
{
}
/// Add option descriptions
int FairProgOptions::AddToCmdLineOptions(const po::options_description optDesc, bool /* visible */)
{
fAllOptions.add(optDesc);
return 0;
}
po::options_description& FairProgOptions::GetCmdLineOptions()
{
return fAllOptions;
}
int FairProgOptions::ParseCmdLine(const int argc, char const* const* argv, bool allowUnregistered)
{
// get options from cmd line and store in variable map
// here we use command_line_parser instead of parse_command_line, to allow unregistered and positional options
if (allowUnregistered)
{
po::command_line_parser parser{argc, argv};
parser.options(fAllOptions).allow_unregistered();
po::parsed_options parsedOptions = parser.run();
po::store(parsedOptions, fVarMap);
}
else
{
po::store(po::parse_command_line(argc, argv, fAllOptions), fVarMap);
}
// Handles options like "--help" or "--version"
// return 1 if switch options found in fVarMap
if (ImmediateOptions())
{
return 1;
}
po::notify(fVarMap);
return 0;
}
void FairProgOptions::ParseDefaults()
{
vector<string> emptyArgs;
emptyArgs.push_back("dummy");
vector<const char*> argv(emptyArgs.size());
transform(emptyArgs.begin(), emptyArgs.end(), argv.begin(), [](const string& str)
{
return str.c_str();
});
po::store(po::parse_command_line(argv.size(), const_cast<char**>(argv.data()), fAllOptions), fVarMap);
}
int FairProgOptions::PrintOptions()
{
// -> loop over variable map and print its content
// -> In this example the following types are supported:
// string, int, float, double, short, boost::filesystem::path
// vector<string>, vector<int>, vector<float>, vector<double>, vector<short>
map<string, VarValInfo> mapinfo;
// get string length for formatting and convert varmap values into string
int maxLen1st = 0;
int maxLen2nd = 0;
int maxLenTypeInfo = 0;
int maxLenDefault = 0;
int maxLenEmpty = 0;
for (const auto& m : fVarMap)
{
Max(maxLen1st, m.first.length());
VarValInfo valinfo = GetVariableValueInfo(m.second);
mapinfo[m.first] = valinfo;
Max(maxLen2nd, valinfo.value.length());
Max(maxLenTypeInfo, valinfo.type.length());
Max(maxLenDefault, valinfo.defaulted.length());
Max(maxLenEmpty, valinfo.empty.length());
}
// TODO : limit the value len field in a better way
if (maxLen2nd > 100)
{
maxLen2nd = 100;
}
stringstream ss;
ss << "Configuration: \n";
for (const auto& p : mapinfo)
{
ss << setfill(' ')
<< setw(maxLen1st) << left << p.first << " = "
<< setw(maxLen2nd) << p.second.value
<< setw(maxLenTypeInfo) << p.second.type
<< setw(maxLenDefault) << p.second.defaulted
<< setw(maxLenEmpty) << p.second.empty
<< "\n";
}
LOG(info) << ss.str();
return 0;
}
int FairProgOptions::PrintOptionsRaw()
{
const std::vector<boost::shared_ptr<po::option_description>>& options = fAllOptions.options();
for (const auto& o : options)
{
VarValInfo value;
if (fVarMap.count(o->canonical_display_name()))
{
value = GetVariableValueInfo(fVarMap[o->canonical_display_name()]);
}
string description = o->description();
replace(description.begin(), description.end(), '\n', ' ');
cout << o->long_name() << ":" << value.value << ":" << (value.type == "" ? "<unknown>" : value.type) << ":" << description << endl;
}
return 0;
}
VarValInfo FairProgOptions::GetVariableValueInfo(const po::variable_value& varValue)
{
return ConvertVariableValue<ToVarValInfo>()(varValue);
}

View File

@@ -1,172 +1 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/*
* File: FairProgOptions.h
* Author: winckler
*
* Created on March 11, 2015, 5:38 PM
*/
#ifndef FAIRPROGOPTIONS_H
#define FAIRPROGOPTIONS_H
#include "FairMQLogger.h"
#include "FairProgOptionsHelper.h"
#include <fairmq/Tools.h>
#include <boost/program_options.hpp>
#include <boost/filesystem.hpp>
#include <string>
#include <vector>
#include <iostream>
#include <fstream>
#include <mutex>
#include <exception>
namespace po = boost::program_options;
namespace fs = boost::filesystem;
class FairProgOptions
{
public:
FairProgOptions();
virtual ~FairProgOptions();
auto GetPropertyKeys() const -> std::vector<std::string>
{
std::lock_guard<std::mutex> lock{fConfigMutex};
std::vector<std::string> result;
for (const auto& it : fVarMap)
{
result.push_back(it.first.c_str());
}
return result;
}
// add options_description
int AddToCmdLineOptions(const po::options_description optDesc, bool visible = true);
po::options_description& GetCmdLineOptions();
// get value corresponding to the key
template<typename T>
T GetValue(const std::string& key) const
{
std::unique_lock<std::mutex> lock(fConfigMutex);
T val = T();
if (fVarMap.count(key))
{
val = fVarMap[key].as<T>();
}
else
{
LOG(warn) << "Config has no key: " << key << ". Returning default constructed object.";
}
return val;
}
// Given a key, convert the variable value to string
std::string GetStringValue(const std::string& key)
{
std::unique_lock<std::mutex> lock(fConfigMutex);
std::string valueStr;
try
{
if (fVarMap.count(key))
{
valueStr = fair::mq::ConvertVariableValue<fair::mq::VarInfoToString>()(fVarMap.at(key));
}
}
catch (std::exception& e)
{
LOG(error) << "Exception thrown for the key '" << key << "'";
LOG(error) << e.what();
}
return valueStr;
}
int Count(const std::string& key) const
{
std::unique_lock<std::mutex> lock(fConfigMutex);
return fVarMap.count(key);
}
//restrict conversion to fundamental types
template<typename T>
T ConvertTo(const std::string& strValue)
{
if (std::is_arithmetic<T>::value)
{
std::istringstream iss(strValue);
T val;
iss >> val;
return val;
}
else
{
LOG(error) << "the provided string " << strValue << " cannot be converted in the requested type. The target types must be arithmetic types";
}
}
const po::variables_map& GetVarMap() const { return fVarMap; }
int ParseCmdLine(const int argc, char const* const* argv, bool allowUnregistered = false);
void ParseDefaults();
virtual int ParseAll(const int argc, char const* const* argv, bool allowUnregistered = false) = 0;
virtual int PrintOptions();
virtual int PrintOptionsRaw();
protected:
// options container
po::variables_map fVarMap;
// options descriptions
po::options_description fGeneralOptions;
po::options_description fAllOptions;
mutable std::mutex fConfigMutex;
virtual int ImmediateOptions() = 0;
// UpdateVarMap() and Replace() --> helper functions to modify the value of variable map after calling po::store
template<typename T>
void UpdateVarMap(const std::string& key, const T& val)
{
Replace(fVarMap, key, val);
}
template<typename T>
void Replace(std::map<std::string, po::variable_value>& vm, const std::string& key, const T& val)
{
vm[key].value() = boost::any(val);
}
private:
fair::mq::VarValInfo GetVariableValueInfo(const po::variable_value& varValue);
static void Max(int& val, const int& comp)
{
if (comp > val)
{
val = comp;
}
}
};
#endif /* FAIRPROGOPTIONS_H */
#warning "This header file is deprecated. Use FairMQProgOptions class directly which now contains all FairProgOptions functionality. Note, that FairMQProgOptions is also available if you include FairMQDevice."

View File

@@ -17,211 +17,171 @@
#include <boost/program_options.hpp>
#include <boost/filesystem.hpp>
#include <boost/spirit/home/support/detail/hold_any.hpp>
#include <string>
#include <vector>
#include <iostream>
#include <ostream>
#include <iterator>
#include <typeinfo>
namespace fair
{
namespace mq
{
template<class T>
std::ostream& operator<<(std::ostream& os, const std::vector<T>& v)
{
for (const auto& i : v)
{
os << i << " ";
}
return os;
}
struct VarValInfo
{
std::string value;
std::string type;
std::string defaulted;
std::string empty;
};
template<class T>
std::ostream& operator<<(std::ostream& os, const std::vector<T>& v)
{
std::copy(v.begin(), v.end(), std::ostream_iterator<T>(os, " "));
return os;
}
template<typename T>
bool typeIs(const boost::program_options::variable_value& varValue)
std::string ConvertVariableValueToString(const boost::program_options::variable_value& varVal)
{
auto& value = varValue.value();
if (auto q = boost::any_cast<T>(&value))
{
return true;
}
else
{
return false;
std::ostringstream oss;
if (auto q = boost::any_cast<T>(&varVal.value())) {
oss << *q;
}
return oss.str();
}
template<typename T>
std::string ConvertVariableValueToString(const boost::program_options::variable_value& varValue)
namespace options
{
auto& value = varValue.value();
std::ostringstream ostr;
if (auto q = boost::any_cast<T>(&value))
{
ostr << *q;
}
std::string valueStr = ostr.str();
return valueStr;
}
// string specialization
template<>
inline std::string ConvertVariableValueToString<std::string>(const boost::program_options::variable_value& varValue)
{
auto& value = varValue.value();
std::string valueStr;
if (auto q = boost::any_cast<std::string>(&value))
{
valueStr = *q;
}
return valueStr;
}
// boost::filesystem::path specialization
template<>
inline std::string ConvertVariableValueToString<boost::filesystem::path>(const boost::program_options::variable_value& varValue)
{
auto& value = varValue.value();
std::string valueStr;
if (auto q = boost::any_cast<boost::filesystem::path>(&value))
{
valueStr = (*q).string();
}
return valueStr;
}
// policy to convert boost variable value into string
struct VarInfoToString
struct ToString
{
using returned_type = std::string;
template<typename T>
std::string Value(const boost::program_options::variable_value& varValue, const std::string&, const std::string&, const std::string&)
std::string Value(const boost::program_options::variable_value& varVal, const std::string&, const std::string&)
{
return ConvertVariableValueToString<T>(varValue);
return ConvertVariableValueToString<T>(varVal);
}
returned_type DefaultValue(const std::string&, const std::string&)
returned_type DefaultValue(const std::string&)
{
return std::string("empty value");
return std::string("[unidentified]");
}
};
// policy to convert variable value content into VarValInfo
struct ToVarValInfo
{
using returned_type = fair::mq::VarValInfo;
using returned_type = VarValInfo;
template<typename T>
returned_type Value(const boost::program_options::variable_value& varValue, const std::string& type, const std::string& defaulted, const std::string& empty)
returned_type Value(const boost::program_options::variable_value& varVal, const std::string& type, const std::string& defaulted)
{
std::string valueStr = ConvertVariableValueToString<T>(varValue);
return fair::mq::VarValInfo{valueStr, type, defaulted, empty};
return VarValInfo{ConvertVariableValueToString<T>(varVal), type, defaulted};
}
returned_type DefaultValue(const std::string& defaulted, const std::string& empty)
returned_type DefaultValue(const std::string& defaulted)
{
return fair::mq::VarValInfo{std::string("Unknown value"), std::string(" [Unknown]"), defaulted, empty};
return VarValInfo{std::string("[unidentified]"), std::string("[unidentified]"), defaulted};
}
};
} // namespace options
// host class that take one of the two policy defined above
template<typename T>
struct ConvertVariableValue : T
{
auto operator()(const boost::program_options::variable_value& varValue) -> typename T::returned_type
auto operator()(const boost::program_options::variable_value& varVal) -> typename T::returned_type
{
std::string defaulted;
std::string empty;
if (varValue.empty())
if (varVal.defaulted())
{
empty = " [empty]";
defaulted = " [default]";
}
else
{
if (varValue.defaulted())
{
defaulted = " [default]";
}
else
{
defaulted = " [provided]";
}
defaulted = " [provided]";
}
if (typeIs<std::string>(varValue))
return T::template Value<std::string>(varValue, std::string("<string>"), defaulted, empty);
if (typeid(std::string) == varVal.value().type())
return T::template Value<std::string>(varVal, std::string("<string>"), defaulted);
if (typeIs<std::vector<std::string>>(varValue))
return T::template Value<std::vector<std::string>>(varValue, std::string("<vector<string>>"), defaulted, empty);
if (typeid(std::vector<std::string>) == varVal.value().type())
return T::template Value<std::vector<std::string>>(varVal, std::string("<vector<string>>"), defaulted);
if (typeIs<int>(varValue))
return T::template Value<int>(varValue, std::string("<int>"), defaulted, empty);
if (typeid(int) == varVal.value().type())
return T::template Value<int>(varVal, std::string("<int>"), defaulted);
if (typeIs<std::vector<int>>(varValue))
return T::template Value<std::vector<int>>(varValue, std::string("<vector<int>>"), defaulted, empty);
if (typeid(std::vector<int>) == varVal.value().type())
return T::template Value<std::vector<int>>(varVal, std::string("<vector<int>>"), defaulted);
if (typeIs<float>(varValue))
return T::template Value<float>(varValue, std::string("<float>"), defaulted, empty);
if (typeid(float) == varVal.value().type())
return T::template Value<float>(varVal, std::string("<float>"), defaulted);
if (typeIs<std::vector<float>>(varValue))
return T::template Value<std::vector<float>>(varValue, std::string("<vector<float>>"), defaulted, empty);
if (typeid(std::vector<float>) == varVal.value().type())
return T::template Value<std::vector<float>>(varVal, std::string("<vector<float>>"), defaulted);
if (typeIs<double>(varValue))
return T::template Value<double>(varValue, std::string("<double>"), defaulted, empty);
if (typeid(double) == varVal.value().type())
return T::template Value<double>(varVal, std::string("<double>"), defaulted);
if (typeIs<std::vector<double>>(varValue))
return T::template Value<std::vector<double>>(varValue, std::string("<vector<double>>"), defaulted, empty);
if (typeid(std::vector<double>) == varVal.value().type())
return T::template Value<std::vector<double>>(varVal, std::string("<vector<double>>"), defaulted);
if (typeIs<short>(varValue))
return T::template Value<short>(varValue, std::string("<short>"), defaulted, empty);
if (typeid(short) == varVal.value().type())
return T::template Value<short>(varVal, std::string("<short>"), defaulted);
if (typeIs<std::vector<short>>(varValue))
return T::template Value<std::vector<short>>(varValue, std::string("<vector<short>>"), defaulted, empty);
if (typeid(std::vector<short>) == varVal.value().type())
return T::template Value<std::vector<short>>(varVal, std::string("<vector<short>>"), defaulted);
if (typeIs<long>(varValue))
return T::template Value<long>(varValue, std::string("<long>"), defaulted, empty);
if (typeid(long) == varVal.value().type())
return T::template Value<long>(varVal, std::string("<long>"), defaulted);
if (typeIs<std::vector<long>>(varValue))
return T::template Value<std::vector<long>>(varValue, std::string("<vector<long>>"), defaulted, empty);
if (typeid(std::vector<long>) == varVal.value().type())
return T::template Value<std::vector<long>>(varVal, std::string("<vector<long>>"), defaulted);
if (typeIs<std::size_t>(varValue))
return T::template Value<std::size_t>(varValue, std::string("<std::size_t>"), defaulted, empty);
if (typeid(std::size_t) == varVal.value().type())
return T::template Value<std::size_t>(varVal, std::string("<std::size_t>"), defaulted);
if (typeIs<std::vector<std::size_t>>(varValue))
return T::template Value<std::vector<std::size_t>>(varValue, std::string("<vector<std::size_t>>"), defaulted, empty);
if (typeid(std::vector<std::size_t>) == varVal.value().type())
return T::template Value<std::vector<std::size_t>>(varVal, std::string("<vector<std::size_t>>"), defaulted);
if (typeIs<std::uint32_t>(varValue))
return T::template Value<std::uint32_t>(varValue, std::string("<std::uint32_t>"), defaulted, empty);
if (typeid(std::uint32_t) == varVal.value().type())
return T::template Value<std::uint32_t>(varVal, std::string("<std::uint32_t>"), defaulted);
if (typeIs<std::vector<std::uint32_t>>(varValue))
return T::template Value<std::vector<std::uint32_t>>(varValue, std::string("<vector<std::uint32_t>>"), defaulted, empty);
if (typeid(std::vector<std::uint32_t>) == varVal.value().type())
return T::template Value<std::vector<std::uint32_t>>(varVal, std::string("<vector<std::uint32_t>>"), defaulted);
if (typeIs<std::uint64_t>(varValue))
return T::template Value<std::uint64_t>(varValue, std::string("<std::uint64_t>"), defaulted, empty);
if (typeid(std::uint64_t) == varVal.value().type())
return T::template Value<std::uint64_t>(varVal, std::string("<std::uint64_t>"), defaulted);
if (typeIs<std::vector<std::uint64_t>>(varValue))
return T::template Value<std::vector<std::uint64_t>>(varValue, std::string("<vector<std::uint64_t>>"), defaulted, empty);
if (typeid(std::vector<std::uint64_t>) == varVal.value().type())
return T::template Value<std::vector<std::uint64_t>>(varVal, std::string("<vector<std::uint64_t>>"), defaulted);
if (typeIs<bool>(varValue))
return T::template Value<bool>(varValue, std::string("<bool>"), defaulted, empty);
if (typeid(bool) == varVal.value().type())
return T::template Value<bool>(varVal, std::string("<bool>"), defaulted);
if (typeIs<std::vector<bool>>(varValue))
return T::template Value<std::vector<bool>>(varValue, std::string("<vector<bool>>"), defaulted, empty);
if (typeid(std::vector<bool>) == varVal.value().type())
return T::template Value<std::vector<bool>>(varVal, std::string("<vector<bool>>"), defaulted);
if (typeIs<boost::filesystem::path>(varValue))
return T::template Value<boost::filesystem::path>(varValue, std::string("<boost::filesystem::path>"), defaulted, empty);
if (typeid(boost::filesystem::path) == varVal.value().type())
return T::template Value<boost::filesystem::path>(varVal, std::string("<boost::filesystem::path>"), defaulted);
if (typeid(std::vector<boost::filesystem::path>) == varVal.value().type())
return T::template Value<std::vector<boost::filesystem::path>>(varVal, std::string("<std::vector<boost::filesystem::path>>"), defaulted);
// if we get here, the type is not supported return unknown info
return T::DefaultValue(defaulted, empty);
return T::DefaultValue(defaulted);
}
};

View File

@@ -1,98 +0,0 @@
################################################################################
# Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# #
# This software is distributed under the terms of the #
# GNU Lesser General Public Licence (LGPL) version 3, #
# copied verbatim in the file "LICENSE" #
################################################################################
# Create a library called "ProgOptionTest"
# Test fair-prog-opt
# scripts
configure_file( ${CMAKE_SOURCE_DIR}/fairmq/options/ProgOptionTest/macro/testMQOptions1.sh.in ${CMAKE_BINARY_DIR}/bin/testMQOptions1.sh )
configure_file( ${CMAKE_SOURCE_DIR}/fairmq/options/ProgOptionTest/macro/testMQOptions2.sh.in ${CMAKE_BINARY_DIR}/bin/testMQOptions2.sh )
configure_file( ${CMAKE_SOURCE_DIR}/fairmq/options/ProgOptionTest/macro/testMQOptions3.sh.in ${CMAKE_BINARY_DIR}/bin/testMQOptions3.sh )
configure_file( ${CMAKE_SOURCE_DIR}/fairmq/options/ProgOptionTest/macro/testMQOptions4.sh.in ${CMAKE_BINARY_DIR}/bin/testMQOptions4.sh )
configure_file( ${CMAKE_SOURCE_DIR}/fairmq/options/ProgOptionTest/macro/testMQOptions5.sh.in ${CMAKE_BINARY_DIR}/bin/testMQOptions5.sh )
configure_file( ${CMAKE_SOURCE_DIR}/fairmq/options/ProgOptionTest/macro/start-bsampler-sink.sh.in ${CMAKE_BINARY_DIR}/bin/start-bsampler-sink.sh )
EXEC_PROGRAM(/bin/chmod ARGS "u+x ${CMAKE_BINARY_DIR}/bin/testMQOptions1.sh")
EXEC_PROGRAM(/bin/chmod ARGS "u+x ${CMAKE_BINARY_DIR}/bin/testMQOptions2.sh")
EXEC_PROGRAM(/bin/chmod ARGS "u+x ${CMAKE_BINARY_DIR}/bin/testMQOptions3.sh")
EXEC_PROGRAM(/bin/chmod ARGS "u+x ${CMAKE_BINARY_DIR}/bin/testMQOptions4.sh")
EXEC_PROGRAM(/bin/chmod ARGS "u+x ${CMAKE_BINARY_DIR}/bin/testMQOptions5.sh")
EXEC_PROGRAM(/bin/chmod ARGS "u+x ${CMAKE_BINARY_DIR}/bin/start-bsampler-sink.sh")
# Config/xml/json example file
configure_file( ${CMAKE_SOURCE_DIR}/fairmq/options/ProgOptionTest/macro/ConfigFileTest.cfg.in ${CMAKE_BINARY_DIR}/bin/ConfigFileTest.cfg)
configure_file( ${CMAKE_SOURCE_DIR}/fairmq/options/ProgOptionTest/macro/testXML.xml ${CMAKE_BINARY_DIR}/bin/testXML.xml )
configure_file( ${CMAKE_SOURCE_DIR}/fairmq/options/ProgOptionTest/macro/testJSON.json ${CMAKE_BINARY_DIR}/bin/testJSON.json )
configure_file( ${CMAKE_SOURCE_DIR}/fairmq/options/ProgOptionTest/macro/bsampler-sink.json ${CMAKE_BINARY_DIR}/bin/bsampler-sink.json )
set(INCLUDE_DIRECTORIES
${CMAKE_SOURCE_DIR}/fairmq
${CMAKE_SOURCE_DIR}/fairmq/options
${CMAKE_SOURCE_DIR}/fairmq/options/ProgOptionTest/lib
${CMAKE_SOURCE_DIR}/fairmq/options/ProgOptionTest/run
)
Set(SYSTEM_INCLUDE_DIRECTORIES
${SYSTEM_INCLUDE_DIRECTORIES}
)
include_directories(${INCLUDE_DIRECTORIES})
include_directories(${SYSTEM_INCLUDE_DIRECTORIES})
set(LINK_DIRECTORIES
${Boost_LIBRARY_DIRS}
)
link_directories(${LINK_DIRECTORIES})
Set(SRCS
lib/FairMQParserExample.cxx
)
Set(LIBRARY_NAME ProgOptionTest)
If (Boost_FOUND)
Set(DEPENDENCIES
${Boost_SYSTEM_LIBRARY}
${Boost_PROGRAM_OPTIONS_LIBRARY})
FairMQ
)
EndIf (Boost_FOUND)
GENERATE_LIBRARY()
If (Boost_FOUND)
Set(Exe_Names
runtestMQOption1
runtestMQOption2
runOptTestSampler
runOptTestSink
)
set(Exe_Source
run/testMQoptions1.cxx
run/testMQoptions2.cxx
run/runOptTestSampler.cxx
run/runOptTestSink.cxx
)
List(LENGTH Exe_Names _length)
Math(EXPR _length ${_length}-1)
ForEach(_file RANGE 0 ${_length})
List(GET Exe_Names ${_file} _name)
List(GET Exe_Source ${_file} _src)
Set(EXE_NAME ${_name})
Set(SRCS ${_src})
Set(DEPENDENCIES ProgOptionTest)
GENERATE_EXECUTABLE()
EndForEach(_file RANGE 0 ${_length})
EndIf (Boost_FOUND)

View File

@@ -1,74 +0,0 @@
/*
* File: FairMQParserExample.cxx
* Author: winckler
*
* Created on May 14, 2015, 5:01 PM
*/
#include "FairMQParserExample.h"
#include "FairMQLogger.h"
#include <boost/property_tree/xml_parser.hpp>
namespace FairMQParser
{
// other xml examples
////////////////////////////////////////////////////////////////////////////
boost::property_tree::ptree MQXML2::UserParser(const std::string& filename)
{
boost::property_tree::ptree pt;
boost::property_tree::read_xml(filename, pt);
return pt;
}
// TODO : finish implementation
////////////////////////////////////////////////////////////////////////////
boost::property_tree::ptree MQXML3::UserParser(const std::string& filename, const std::string& devicename)
{
// Create an empty property tree object
boost::property_tree::ptree pt;
boost::property_tree::read_xml(filename, pt);
// Create fair mq map
auto xml = pt.get_child("");
std::vector<std::pair<std::string, boost::property_tree::ptree>> match;
std::pair<std::string, boost::property_tree::ptree> device_match;
ProcessTree(xml.begin (), xml.end (), std::back_inserter(match),
[] (const std::string& key) { return key == "device"; });
// for each device
for (const auto& pair: match)
{
if(pair.second.get<std::string>("<xmlattr>.name") == devicename)
{
device_match=pair;
}
else
{
//match.erase(std::remove(match.begin(), match.end(), pair), match.end());
continue;
}
//std::cout << "pair.first " << pair.first << std::endl;//device
//std::cout << "\t node = " << pair.first
// << "\t name = " << pair.second.get<std::string>("<xmlattr>.name")
// << "\t id = " << pair.second.get<std::string>("<xmlattr>.id");
//std::cout<<std::endl;
}
return device_match.second;
}
} // end FairMQParser namespace

View File

@@ -1,86 +0,0 @@
/*
* File: FairMQParserExample.h
* Author: winckler
*
* Created on May 14, 2015, 5:01 PM
*/
#ifndef FAIRMQPARSEREXAMPLE_H
#define FAIRMQPARSEREXAMPLE_H
// FairRoot
#include "FairMQChannel.h"
#include "FairMQParser.h"
// Boost
#include <boost/property_tree/ptree.hpp>
// std
#include <string>
#include <vector>
#include <map>
namespace FairMQParser
{
////////////////////////////////////////////////////////////////////////////
/////////////////////////////////// XML ////////////////////////////////////
////////////////////////////////////////////////////////////////////////////
// xml example 2
////////////////////////////////////////////////////////////////////////////
struct MQXML2
{
boost::property_tree::ptree UserParser(const std::string& filename);
};
// xml example 3
////////////////////////////////////////////////////////////////////////////
struct MQXML3
{
boost::property_tree::ptree UserParser(const std::string& filename, const std::string& root_node);
};
////////////////////////////////////////////////////////////////////////////
// template function iterating over the whole boost property tree
template <typename Input_tree_It, typename Output_tree_It, typename Compare_key>
void ProcessTree(Input_tree_It first, Input_tree_It last, Output_tree_It dest, Compare_key compare)
{
//typedef typename std::iterator_traits<Input_tree_It>::reference reference;
if (first == last)
{
return;
}
auto begin = first->second.begin ();
auto end = first->second.end ();
if (begin != end)
{
ProcessTree (begin, end, dest, compare);
}
if (compare (first->first))
{
dest = *first;
}
ProcessTree (++first, last, dest, compare);
}
class no_id_exception: public std::exception
{
virtual const char* what() const throw()
{
return "Empty string for the device-id in FairMQParser::ptreeToMQMap(...) function";
}
};
} // end FairMQParser namespace
#endif /* FAIRMQPARSEREXAMPLE_H */

View File

@@ -1,38 +0,0 @@
#----------------------------------------------------
# comments :
# brackets [] are used to group options. For example :
#
# [xml.config]
# node.root = fairMQOptions
#
# is equivalent to
# xml.config.node.root = fairMQOptions
#----------------------------------------------------
config-json-file = @CMAKE_BINARY_DIR@/bin/testJSON.json
config-xml-file = @CMAKE_BINARY_DIR@/bin/testXML.xml
#
device-id = merger
#-------------------
[xml.config]
#filename = @CMAKE_BINARY_DIR@/bin/testXML.xml
node.root = fairMQOptions
#-------------------
[input.file]
name = sampler_file_name.root
tree = sampler_tree
branch = sampler_branch
#-------------------
[output.file]
name = sink_filename.root
tree = sink_tree
branch = sink_branch

View File

@@ -1,18 +0,0 @@
#!/bin/bash
JSONFILE="@CMAKE_BINARY_DIR@/bin/bsampler-sink.json"
# Note: device-id value must correspond to the device id given in the json file
BSAMPLER="runOptTestSampler"
BSAMPLER+=" --config-json-file $JSONFILE"
BSAMPLER+=" --id bsampler1"
xterm -geometry 150x23+0+0 -hold -e @CMAKE_BINARY_DIR@/bin/$BSAMPLER &
SINK="runOptTestSink"
SINK+=" --config-json-file $JSONFILE"
SINK+=" --id sink1"
xterm -geometry 150x23+0+350 -hold -e @CMAKE_BINARY_DIR@/bin/$SINK &

View File

@@ -1,63 +0,0 @@
{
"fairMQOptions": {
"devices": [
{
"id": "merger1",
"channels": [
{
"name": "two_inputs_channel",
"sockets": [
{
"type": "pull",
"method": "",
"address": "tcp://*:5569",
"sndBufSize": 1000,
"rcvBufSize": 1000,
"rateLogging": 1
},
{
"type": "pull",
"method": "bind",
"address": "tcp://*:5570",
"sndBufSize": 1000,
"rcvBufSize": 1000,
"rateLogging": 1
}
]
},
{
"name": "one_output_channel",
"sockets": [
{
"type": "push",
"method": "connect",
"address": "tcp://*:5571",
"sndBufSize": 1000,
"rcvBufSize": 1000,
"rateLogging": 1
}
]
}
]
},
{
"id": "sink1",
"channels": [
{
"name": "one_input",
"sockets": [
{
"type": "pull",
"method": "bind",
"address": "tcp://localhost:5571",
"sndBufSize": 1000,
"rcvBufSize": 1000,
"rateLogging": 1
}
]
}
]
}
]
}
}

View File

@@ -1,11 +0,0 @@
#!/bin/bash
RUN_TEST="runtestMQOption1"
if [ "$#" -gt 0 ]; then
RUN_TEST+=" $*"
fi
RUN_TEST+=" --config-xml-file @CMAKE_BINARY_DIR@/bin/testXML.xml"
@CMAKE_BINARY_DIR@/bin/$RUN_TEST

View File

@@ -1,59 +0,0 @@
#!/bin/bash
RUN_TEST="runtestMQOption1"
if [ "$#" -gt 0 ]; then
RUN_TEST+=" $*"
fi
XML_CMD_LINE="<fairMQOptions>"
XML_CMD_LINE+="<device name=\"merger\" id=\"1234\" >"
XML_CMD_LINE+=" <channel name=\"two_inputs_channel\" >"
XML_CMD_LINE+=" <socket name=\"input1\" >"
XML_CMD_LINE+=" <type>pull</type>"
XML_CMD_LINE+=" <method>bind</method>"
XML_CMD_LINE+=" <address>tcp://*:5569</address>"
XML_CMD_LINE+=" <sndBufSize>1000</sndBufSize>"
XML_CMD_LINE+=" <rcvBufSize>1000</rcvBufSize>"
XML_CMD_LINE+=" <rateLogging>1</rateLogging>"
XML_CMD_LINE+=" </socket>"
XML_CMD_LINE+=" <socket name=\"input2\" >"
XML_CMD_LINE+=" <type>pull</type>"
XML_CMD_LINE+=" <method>bind</method>"
XML_CMD_LINE+=" <address>tcp://*:5570</address>"
XML_CMD_LINE+=" <sndBufSize>1000</sndBufSize>"
XML_CMD_LINE+=" <rcvBufSize>1000</rcvBufSize>"
XML_CMD_LINE+=" <rateLogging>1</rateLogging>"
XML_CMD_LINE+=" </socket>"
XML_CMD_LINE+=" </channel>"
XML_CMD_LINE+=" <channel name=\"one_output_channel\" >"
XML_CMD_LINE+=" <socket name=\"output1\" >"
XML_CMD_LINE+=" <type>push</type>"
XML_CMD_LINE+=" <method>connect</method>"
XML_CMD_LINE+=" <address>tcp://*:5571</address>"
XML_CMD_LINE+=" <sndBufSize>1000</sndBufSize>"
XML_CMD_LINE+=" <rcvBufSize>1000</rcvBufSize>"
XML_CMD_LINE+=" <rateLogging>1</rateLogging>"
XML_CMD_LINE+=" </socket>"
XML_CMD_LINE+=" </channel>"
XML_CMD_LINE+=" </device>"
XML_CMD_LINE+=" <device name=\"sink\" id=\"4567\" >"
XML_CMD_LINE+=" <channel name=\"one_input\" >"
XML_CMD_LINE+=" <socket name=\"input1\" >"
XML_CMD_LINE+=" <type>pull</type>"
XML_CMD_LINE+=" <method>bind</method>"
XML_CMD_LINE+=" <address>tcp://localhost:5571</address>"
XML_CMD_LINE+=" <sndBufSize>1000</sndBufSize>"
XML_CMD_LINE+=" <rcvBufSize>1000</rcvBufSize>"
XML_CMD_LINE+=" <rateLogging>1</rateLogging>"
XML_CMD_LINE+=" </socket>"
XML_CMD_LINE+=" </channel>"
XML_CMD_LINE+=" </device>"
XML_CMD_LINE+="</fairmq_option>"
RUN_TEST+=" --config-xml-string $XML_CMD_LINE"
@CMAKE_BINARY_DIR@/bin/$RUN_TEST

View File

@@ -1,10 +0,0 @@
#!/bin/bash
RUN_TEST="runtestMQOption2"
if [ "$#" -gt 0 ]; then
RUN_TEST+=" $*"
fi
RUN_TEST+=" --config @CMAKE_BINARY_DIR@/bin/ConfigFileTest.cfg"
@CMAKE_BINARY_DIR@/bin/$RUN_TEST

View File

@@ -1,11 +0,0 @@
#!/bin/bash
RUN_TEST="runtestMQOption1"
if [ "$#" -gt 0 ]; then
RUN_TEST+=" $*"
fi
RUN_TEST+=" --config-json-file @CMAKE_BINARY_DIR@/bin/testJSON.json"
@CMAKE_BINARY_DIR@/bin/$RUN_TEST

View File

@@ -1,77 +0,0 @@
#!/bin/bash
RUN_TEST="runtestMQOption1"
if [ "$#" -gt 0 ]; then
RUN_TEST+=" $*"
fi
JSON_CMD_LINE="{"
JSON_CMD_LINE+=" \"fairMQOptions\": {"
JSON_CMD_LINE+=" \"device\": "
JSON_CMD_LINE+=" {"
JSON_CMD_LINE+=" \"name\": \"merger\","
JSON_CMD_LINE+=" \"id\": \"1234\","
JSON_CMD_LINE+=" \"channel\": "
JSON_CMD_LINE+=" {"
JSON_CMD_LINE+=" \"name\": \"two_inputs_channel\","
JSON_CMD_LINE+=" \"socket\": "
JSON_CMD_LINE+=" {"
JSON_CMD_LINE+=" \"name\": \"input1\","
JSON_CMD_LINE+=" \"type\": \"pull\","
JSON_CMD_LINE+=" \"method\": \"bind\","
JSON_CMD_LINE+=" \"address\": \"tcp://*:5569\","
JSON_CMD_LINE+=" \"sndBufSize\": \"1000\","
JSON_CMD_LINE+=" \"rcvBufSize\": \"1000\","
JSON_CMD_LINE+=" \"rateLogging\": \"1\" "
JSON_CMD_LINE+=" },"
JSON_CMD_LINE+=" \"socket\": "
JSON_CMD_LINE+=" {"
JSON_CMD_LINE+=" \"name\": \"input2\","
JSON_CMD_LINE+=" \"type\": \"pull\","
JSON_CMD_LINE+=" \"method\": \"bind\","
JSON_CMD_LINE+=" \"address\": \"tcp://*:5570\","
JSON_CMD_LINE+=" \"sndBufSize\": \"1000\","
JSON_CMD_LINE+=" \"rcvBufSize\": \"1000\","
JSON_CMD_LINE+=" \"rateLogging\": \"1\" "
JSON_CMD_LINE+=" }"
JSON_CMD_LINE+=" },"
JSON_CMD_LINE+=" \"channel\":"
JSON_CMD_LINE+=" {"
JSON_CMD_LINE+=" \"name\": \"one_output_channel\","
JSON_CMD_LINE+=" \"socket\": {"
JSON_CMD_LINE+=" \"name\": \"output1\","
JSON_CMD_LINE+=" \"type\": \"push\","
JSON_CMD_LINE+=" \"method\": \"connect\","
JSON_CMD_LINE+=" \"address\": \"tcp://*:5571\","
JSON_CMD_LINE+=" \"sndBufSize\": \"1000\","
JSON_CMD_LINE+=" \"rcvBufSize\": \"1000\","
JSON_CMD_LINE+=" \"rateLogging\": \"1\" "
JSON_CMD_LINE+=" }"
JSON_CMD_LINE+=" }"
JSON_CMD_LINE+=" },"
JSON_CMD_LINE+=" \"device\":"
JSON_CMD_LINE+=" {"
JSON_CMD_LINE+=" \"name\": \"sink\","
JSON_CMD_LINE+=" \"id\": \"4567\","
JSON_CMD_LINE+=" \"channel\": {"
JSON_CMD_LINE+=" \"name\": \"one_input\","
JSON_CMD_LINE+=" \"socket\": {"
JSON_CMD_LINE+=" \"name\": \"input1\","
JSON_CMD_LINE+=" \"type\": \"pull\","
JSON_CMD_LINE+=" \"method\": \"bind\","
JSON_CMD_LINE+=" \"address\": \"tcp://localhost:5571\","
JSON_CMD_LINE+=" \"sndBufSize\": \"1000\","
JSON_CMD_LINE+=" \"rcvBufSize\": \"1000\","
JSON_CMD_LINE+=" \"rateLogging\": \"1\" "
JSON_CMD_LINE+=" }"
JSON_CMD_LINE+=" }"
JSON_CMD_LINE+=" }"
JSON_CMD_LINE+=" }"
JSON_CMD_LINE+="}"
RUN_TEST+=" --config-json-string $JSON_CMD_LINE"
@CMAKE_BINARY_DIR@/bin/$RUN_TEST

View File

@@ -1,44 +0,0 @@
<fairMQOptions>
<device name="merger" id="merger">
<channel name="two_inputs_channel">
<socket name="input1">
<type>pull</type>
<method>bind</method>
<address>tcp://*:5569</address>
<sndBufSize>1000</sndBufSize>
<rcvBufSize>1000</rcvBufSize>
<rateLogging>1</rateLogging>
</socket>
<socket name="input2">
<type>pull</type>
<method>bind</method>
<address>tcp://*:5570</address>
<sndBufSize>1000</sndBufSize>
<rcvBufSize>1000</rcvBufSize>
<rateLogging>1</rateLogging>
</socket>
</channel>
<channel name="one_output_channel">
<socket name="output1">
<type>push</type>
<method>connect</method>
<address>tcp://*:5571</address>
<sndBufSize>1000</sndBufSize>
<rcvBufSize>1000</rcvBufSize>
<rateLogging>1</rateLogging>
</socket>
</channel>
</device>
<device name="sink" id="sink">
<channel name="one_input">
<socket name="input1">
<type>pull</type>
<method>bind</method>
<address>tcp://localhost:5571</address>
<sndBufSize>1000</sndBufSize>
<rcvBufSize>1000</rcvBufSize>
<rateLogging>1</rateLogging>
</socket>
</channel>
</device>
</fairMQOptions>

View File

@@ -1,83 +0,0 @@
/*
* File: runOptTestSampler.cxx
* Author: winckler
*
* Created on June 10, 2015, 3:34 PM
*/
#include <cstdlib>
/// std
#include <iostream>
#include <string>
/// boost
#include "boost/program_options.hpp"
/// FairRoot/FairMQ
#include "FairMQLogger.h"
#include "FairMQParser.h"
#include "FairMQProgOptions.h"
//////////////////////////////////////////////////////////////
/// main
//////////////////////////////////////////////////////////////
using namespace std;
using namespace FairMQParser;
using namespace boost::program_options;
int main(int argc, char** argv)
{
FairMQProgOptions config;
try
{
// //////////////////////////////////////////////////////////////
// define key specific to the BenchmarkSampler in options_description
int eventSize;
int eventRate;
int ioThreads;
options_description sampler_options("Sampler options");
sampler_options.add_options()
("event-size", value<int>(&eventSize)->default_value(1000), "Event size in bytes")
("event-rate", value<int>(&eventRate)->default_value(0), "Event rate limit in maximum number of events per second")
("io-threads", value<int>(&ioThreads)->default_value(1), "Number of I/O threads");
// and add it to cmdline option of FairMQProgOptions
config.AddToCmdLineOptions(sampler_options);
// //////////////////////////////////////////////////////////////
// enable simple config txt/INI file
//config.EnableCfgFile();
//config.AddToCfgFileOptions(sampler_options,false);
// //////////////////////////////////////////////////////////////
// Parse command line options and store in variable map
if(config.ParseAll(argc,argv,true))
return 0;
// keys defined in FairMQProgOptions
string filename=config.GetValue<string>("config-json-file");
string deviceID=config.GetValue<string>("id");
// //////////////////////////////////////////////////////////////
// User defined parsing method.
config.UserParser<JSON>(filename,deviceID);
FairMQMap channels=config.GetFairMQMap();
//set device here
}
catch (exception& e)
{
LOG(error) << e.what();
LOG(info) << "Command line options are the following : ";
config.PrintHelp();
return 1;
}
return 0;
}

View File

@@ -1,77 +0,0 @@
/*
* File: runOptTestSink.cxx
* Author: winckler
*
* Created on June 10, 2015, 3:34 PM
*/
#include <cstdlib>
/// std
#include <iostream>
#include <string>
/// boost
#include "boost/program_options.hpp"
/// FairRoot/FairMQ
#include "FairMQLogger.h"
#include "FairMQParser.h"
#include "FairMQProgOptions.h"
//////////////////////////////////////////////////////////////
/// main
//////////////////////////////////////////////////////////////
using namespace std;
using namespace FairMQParser;
using namespace boost::program_options;
int main(int argc, char** argv)
{
FairMQProgOptions config;
try
{
// //////////////////////////////////////////////////////////////
// define key specific to the BenchmarkSampler in options_description
int eventSize;
int eventRate;
int ioThreads;
options_description Sink_options("Sink options");
Sink_options.add_options()
("io-threads", value<int>(&ioThreads)->default_value(1), "Number of I/O threads");
// and add it to cmdline option of FairMQProgOptions
config.AddToCmdLineOptions(Sink_options);
// //////////////////////////////////////////////////////////////
// Parse command line options and store in variable map
if(config.ParseAll(argc,argv,true))
return 0;
// keys defined in FairMQProgOptions
string filename=config.GetValue<string>("config-json-file");
string deviceID=config.GetValue<string>("id");
// //////////////////////////////////////////////////////////////
// User defined parsing method.
config.UserParser<JSON>(filename,deviceID);
FairMQMap channels=config.GetFairMQMap();
//set device here
}
catch (exception& e)
{
LOG(error) << e.what();
LOG(info) << "Command line options are the following : ";
config.PrintHelp();
return 1;
}
return 0;
}

View File

@@ -1,141 +0,0 @@
/// std
#include <iostream>
#include <string>
/// boost
#include "boost/program_options.hpp"
/// FairRoot/FairMQ
#include "FairMQLogger.h"
#include "FairMQParser.h"
#include "FairMQProgOptions.h"
// Parse xml from file
int testXML1(FairMQProgOptions* config)
{
LOG(info)<<"--------- test XML1 ---------\n";
std::string filename;
std::string XMLrootNode;
filename=config->GetValue<std::string>("config-xml-file");
XMLrootNode=config->GetValue<std::string>("xml.config.node.root");
std::string id=config->GetValue<std::string>("id");
config->UserParser<FairMQParser::XML>(filename,id,XMLrootNode);
// other xml parser test
config->UserParser<FairMQParser::MQXML2>(filename);
config->UserParser<FairMQParser::MQXML3>(filename,"merger");
LOG(info)<<"--------- test XML1 end ---------\n";
return 0;
}
// Parse xml from command line
int testXML2(FairMQProgOptions* config)
{
LOG(info)<<"--------- test XML2 ---------\n";
std::string XML;
std::string XMLrootNode;
std::string id=config->GetValue<std::string>("id");
XMLrootNode=config->GetValue<std::string>("xml.config.node.root");
// Note: convert the vector<string> into one string with GetStringValue(key)
XML=config->GetStringValue("config-xml-string");
std::stringstream iss;
iss << XML;
config->UserParser<FairMQParser::XML>(iss,id,XMLrootNode);
LOG(info)<<"--------- test XML2 end ---------\n";
return 0;
}
// Parse json from file
int testJSON1(FairMQProgOptions* config)
{
LOG(info)<<"--------- test JSON1 ---------\n";
std::string filename;
std::string JSONrootNode;
std::string id=config->GetValue<std::string>("id");
filename=config->GetValue<std::string>("config-json-file");
JSONrootNode=config->GetValue<std::string>("json.config.node.root");
config->UserParser<FairMQParser::JSON>(filename,id,JSONrootNode);
LOG(info)<<"--------- test JSON1 end ---------\n";
return 0;
}
// Parse json from command line
int testJSON2(FairMQProgOptions* config)
{
LOG(info)<<"--------- test JSON2 ---------\n";
std::string JSON;
std::string JSONrootNode;
std::string id=config->GetValue<std::string>("id");
JSONrootNode=config->GetValue<std::string>("json.config.node.root");
// Note: convert the vector<string> into one string with GetStringValue(key)
JSON=config->GetStringValue("config-json-string");
std::stringstream iss;
iss << JSON;
config->UserParser<FairMQParser::JSON>(iss,id,JSONrootNode);
LOG(info)<<"--------- test JSON2 end ---------\n";
return 0;
}
int main(int argc, char** argv)
{
FairMQProgOptions* config= new FairMQProgOptions();
try
{
po::options_description format_desc("XML or JSON input");
format_desc.add_options()
("xml.config.node.root", po::value<std::string>()->default_value("fairMQOptions"), "xml root node ")
("json.config.node.root", po::value<std::string>()->default_value("fairMQOptions"), "json root node ")
;
config->AddToCmdLineOptions(format_desc);
// Parse command line
if (config->ParseAll(argc,argv))
{
return 0;
}
// Set severity level (Default is 0=DEBUG)
int severity = config->GetValue<int>("severity");
FairMQLogger::Level lvl=static_cast<FairMQLogger::Level>(severity);
SET_LOGGER_LEVEL(lvl);
// Parse xml or json from cmd line or file
if (config->GetVarMap().count("config-xml-file"))
{
testXML1(config);
}
if (config->GetVarMap().count("config-xml-string"))
{
testXML2(config);
}
if (config->GetVarMap().count("config-json-file"))
{
testJSON1(config);
}
if (config->GetVarMap().count("config-json-string"))
{
testJSON2(config);
}
}
catch (std::exception& e)
{
LOG(error) << e.what();
return 1;
}
return 0;
}

View File

@@ -1,69 +0,0 @@
/// std
#include <iostream>
#include <string>
/// boost
#include "boost/program_options.hpp"
/// FairRoot/FairMQ
#include "FairMQLogger.h"
#include "FairMQParser.h"
#include "FairMQProgOptions.h"
//////////////////////////////////////////////////////////////
/// main
//////////////////////////////////////////////////////////////
int main(int argc, char** argv)
{
try
{
FairMQProgOptions config;
po::options_description format_desc("XML input");
format_desc.add_options()
("xml.config.node.root", po::value<std::string>()->default_value("fairMQOptions"), "xml root node ")
;
po::options_description io_file_opt_desc("I/O file Options");
io_file_opt_desc.add_options()
("input.file.name", po::value<std::string>(), "input file name")
("input.file.tree", po::value<std::string>(), "input tree name")
("input.file.branch", po::value<std::string>(), "input branch name")
("output.file.name", po::value<std::string>(), "output file name")
("output.file.tree", po::value<std::string>(), "output tree name")
("output.file.branch", po::value<std::string>(), "output branch name")
;
config.AddToCmdLineOptions(format_desc);
config.AddToCmdLineOptions(io_file_opt_desc);
config.EnableCfgFile();// UseConfigFile (by default config file is not defined)
config.AddToCfgFileOptions(format_desc,false);//false because already added to visible
config.AddToCfgFileOptions(io_file_opt_desc,false);
// Parse command line and config file
if(config.ParseAll(argc,argv))
return 0;
// Set severity level (Default is 0=DEBUG)
int severity = config.GetValue<int>("severity");
FairMQLogger::Level lvl = static_cast<FairMQLogger::Level>(severity);
SET_LOGGER_LEVEL(lvl);
// parse XML file
std::string filename;
std::string XMLrootNode;
filename=config.GetValue<std::string>("config-xml-file");
XMLrootNode=config.GetValue<std::string>("xml.config.node.root");
std::string id=config.GetValue<std::string>("id");
config.UserParser<FairMQParser::XML>(filename,id,XMLrootNode);
}
catch (std::exception& e)
{
LOG(error) << e.what();
return 1;
}
return 0;
}

View File

@@ -63,21 +63,18 @@ int main(int argc, char** argv)
{
try
{
// create option manager object
FairMQProgOptions config;
// add key description to cmd line options
config.GetCmdLineOptions().add_options()
("data-rate", po::value<double>()->default_value(0.5), "Data rate");
("data-rate", boost::program_options::value<double>()->default_value(0.5), "Data rate");
// parse command lines, parse json file and init FairMQMap
config.ParseAll(argc, argv, true);
// // get FairMQMap
// auto map1 = config.GetFairMQMap();
// // update value in variable map, and propagate the update to the FairMQMap
// config.UpdateValue<string>("chans.data.0.address","tcp://localhost:1234");
// config.SetValue<string>("chans.data.0.address","tcp://localhost:1234");
// // get the updated FairMQMap
// auto map2 = config.GetFairMQMap();
@@ -89,7 +86,6 @@ int main(int argc, char** argv)
// config.UpdateChannelMap(map2);
MyDevice device;
// device.CatchSignals();
device.SetConfig(config);
// getting as string and conversion helpers
@@ -130,45 +126,30 @@ int main(int argc, char** argv)
LOG(info) << "Starting value updates...\n";
config.UpdateValue<string>("chans.data.0.address", "tcp://localhost:4321");
config.SetValue<string>("chans.data.0.address", "tcp://localhost:4321");
LOG(info) << "config: " << config.GetValue<string>("chans.data.0.address");
LOG(info) << "device: " << device.fChannels.at("data").at(0).GetAddress() << endl;
config.UpdateValue<int>("chans.data.0.rcvBufSize", 100);
config.SetValue<int>("chans.data.0.rcvBufSize", 100);
LOG(info) << "config: " << config.GetValue<int>("chans.data.0.rcvBufSize");
LOG(info) << "device: " << device.fChannels.at("data").at(0).GetRcvBufSize() << endl;
config.UpdateValue<double>("data-rate", 0.9);
config.SetValue<double>("data-rate", 0.9);
LOG(info) << "config: " << config.GetValue<double>("data-rate");
LOG(info) << "device: " << device.GetRate() << endl;
// device.Print();
device.Print();
LOG(info) << "nase: " << config.GetValue<double>("nase");
config.Unsubscribe<string>("test");
config.Unsubscribe<int>("test");
config.Unsubscribe<double>("test");
// advanced commands
// LOG(info) << "-------------------- start custom 1";
// config.Connect<EventId::Custom, MyDevice&, double>("myNewKey", [](MyDevice& d, double val)
// {
// d.SetRate(val);
// d.Print();
// });
// config.Emit<EventId::Custom, MyDevice&, double>("myNewKey", device, 0.123);
// LOG(info) << "-------------------- start custom 2 with function";
// config.Connect<EventId::Custom, MyDevice&, double>("function example", &MyCallBack);
// config.Emit<EventId::Custom, MyDevice&, double>("function example", device, 6.66);
device.ChangeState("END");
}
catch (exception& e)
{
LOG(error) << "Unhandled Exception reached the top of main: "
<< e.what() << ", application will now exit";
LOG(error) << "Unhandled Exception reached the top of main: " << e.what() << ", application will now exit";
return 1;
}
return 0;

View File

@@ -1,11 +1,5 @@
#!/bin/bash
TRANSPORT="zeromq"
VERBOSE="DEBUG"
JSONCONFIGFILE="@CMAKE_BINARY_DIR@/bin/config/ex1-sampler-sink.json"
########################## start DEVICE
DEVICE="runConfigExample --transport $TRANSPORT --severity $VERBOSE"
DEVICE+=" --id sampler1 --mq-config $JSONCONFIGFILE"
@CMAKE_BINARY_DIR@/bin/$DEVICE
DEVICE="runConfigExample"
DEVICE+=" --id sampler1 --channel-config name=data,type=push,method=bind,address=tcp://*:5555,rateLogging=0"
@CMAKE_CURRENT_BINARY_DIR@/$DEVICE

View File

@@ -110,12 +110,12 @@ auto DDS::HandleControl() -> void
// and propagate addresses of bound channels to DDS.
FillChannelContainers();
LOG(DEBUG) << "$DDS_TASK_PATH: " << getenv("DDS_TASK_PATH");
LOG(DEBUG) << "$DDS_GROUP_NAME: " << getenv("DDS_GROUP_NAME");
LOG(DEBUG) << "$DDS_COLLECTION_NAME: " << getenv("DDS_COLLECTION_NAME");
LOG(DEBUG) << "$DDS_TASK_NAME: " << getenv("DDS_TASK_NAME");
LOG(DEBUG) << "$DDS_TASK_INDEX: " << getenv("DDS_TASK_INDEX");
LOG(DEBUG) << "$DDS_COLLECTION_INDEX: " << getenv("DDS_COLLECTION_INDEX");
LOG(debug) << "$DDS_TASK_PATH: " << getenv("DDS_TASK_PATH");
LOG(debug) << "$DDS_GROUP_NAME: " << getenv("DDS_GROUP_NAME");
LOG(debug) << "$DDS_COLLECTION_NAME: " << getenv("DDS_COLLECTION_NAME");
LOG(debug) << "$DDS_TASK_NAME: " << getenv("DDS_TASK_NAME");
LOG(debug) << "$DDS_TASK_INDEX: " << getenv("DDS_TASK_INDEX");
LOG(debug) << "$DDS_COLLECTION_INDEX: " << getenv("DDS_COLLECTION_INDEX");
// start DDS service - subscriptions will only start firing after this step
fService.start();

View File

@@ -52,8 +52,8 @@ struct IofN
, fEntries()
{}
int fI;
int fN;
unsigned int fI;
unsigned int fN;
std::vector<std::string> fEntries;
};

View File

@@ -15,8 +15,8 @@ void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("out-channel", bpo::value<std::string>()->default_value("data"), "Name of the output channel")
("same-msg", bpo::value<bool>()->default_value(true), "Re-send the same message (default), or recreate for each iteration")
("msg-size", bpo::value<int>()->default_value(1000), "Message size in bytes")
("same-msg", bpo::value<bool>()->default_value(false), "Re-send the same message, or recreate for each iteration")
("msg-size", bpo::value<int>()->default_value(1000000), "Message size in bytes")
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Number of run iterations (0 - infinite)")
("msg-rate", bpo::value<int>()->default_value(0), "Msg rate limit in maximum number of messages per second");
}

View File

@@ -72,7 +72,7 @@ SAMPLER+=" --msg-size $msgSize"
SAMPLER+=" --same-msg $sameMsg"
# SAMPLER+=" --msg-rate 1000"
SAMPLER+=" --max-iterations $maxIterations"
SAMPLER+=" --channel-config name=data,type=push,method=bind,address=tcp://127.0.01:5555"
SAMPLER+=" --channel-config name=data,type=push,method=bind,address=tcp://127.0.0.1:5555"
xterm -geometry 90x50+0+0 -hold -e $affinitySamp @CMAKE_CURRENT_BINARY_DIR@/$SAMPLER &
echo ""
echo "started: xterm -geometry 90x50+0+0 -hold -e $affinitySamp @CMAKE_CURRENT_BINARY_DIR@/$SAMPLER"
@@ -85,7 +85,7 @@ SINK+=" --id sink1"
SINK+=" --transport $transport"
SINK+=" --severity debug"
SINK+=" --max-iterations $maxIterations"
SINK+=" --channel-config name=data,type=pull,method=connect,address=tcp://127.0.01:5555"
SINK+=" --channel-config name=data,type=pull,method=connect,address=tcp://127.0.0.1:5555"
xterm -geometry 90x50+550+0 -hold -e $affinitySink @CMAKE_CURRENT_BINARY_DIR@/$SINK &
echo ""
echo "started: xterm -geometry 90x50+550+0 -hold -e $affinitySink @CMAKE_CURRENT_BINARY_DIR@/$SINK"

View File

@@ -14,6 +14,8 @@
#include <zmq.h>
#include <stdexcept>
using namespace std;
using namespace fair::mq::shmem;
@@ -62,12 +64,18 @@ FairMQSocketSHM::FairMQSocketSHM(Manager& manager, const string& type, const str
LOG(error) << "Failed setting ZMQ_RCVTIMEO socket option, reason: " << zmq_strerror(errno);
}
if (type == "sub")
// if (type == "sub")
// {
// if (zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, nullptr, 0) != 0)
// {
// LOG(error) << "Failed setting ZMQ_SUBSCRIBE socket option, reason: " << zmq_strerror(errno);
// }
// }
if (type == "sub" || type == "pub")
{
if (zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, nullptr, 0) != 0)
{
LOG(error) << "Failed setting ZMQ_SUBSCRIBE socket option, reason: " << zmq_strerror(errno);
}
LOG(error) << "PUB/SUB socket type is not supported for shared memory transport";
throw fair::mq::SocketError("PUB/SUB socket type is not supported for shared memory transport");
}
// LOG(info) << "created socket " << fId;
@@ -188,7 +196,7 @@ int FairMQSocketSHM::ReceiveImpl(FairMQMessagePtr& msg, const int flags, const i
const auto numMsgs = nbytes / sizeof(MetaHeader);
if (numMsgs > 1)
{
LOG(ERROR) << "Receiving SHM multipart with a single message receive call";
LOG(error) << "Receiving SHM multipart with a single message receive call";
}
assert (numMsgs == 1);
@@ -238,7 +246,7 @@ int FairMQSocketSHM::ReceiveImpl(FairMQMessagePtr& msg, const int flags, const i
}
}
int64_t FairMQSocketSHM::SendImpl(vector<FairMQMessagePtr>& msgVec, const int flags, const int timeout)
int64_t FairMQSocketSHM::SendImpl(vector<FairMQMessagePtr>& msgVec, const int flags, const int /*timeout*/)
{
const unsigned int vecSize = msgVec.size();
int64_t totalSize = 0;
@@ -303,13 +311,13 @@ int64_t FairMQSocketSHM::SendImpl(vector<FairMQMessagePtr>& msgVec, const int fl
else if (zmq_errno() == ETERM)
{
zmq_msg_close (&lZmqMsg);
LOG(INFO) << "terminating socket " << fId;
LOG(info) << "terminating socket " << fId;
return -1;
}
else
{
zmq_msg_close (&lZmqMsg);
LOG(ERROR) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno);
LOG(error) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno);
return nbytes;
}
}
@@ -318,7 +326,7 @@ int64_t FairMQSocketSHM::SendImpl(vector<FairMQMessagePtr>& msgVec, const int fl
}
int64_t FairMQSocketSHM::ReceiveImpl(vector<FairMQMessagePtr>& msgVec, const int flags, const int timeout)
int64_t FairMQSocketSHM::ReceiveImpl(vector<FairMQMessagePtr>& msgVec, const int flags, const int /*timeout*/)
{
int64_t totalSize = 0;

View File

@@ -69,7 +69,7 @@ FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const Fai
}
else
{
LOG(warn) << "FairMQProgOptions not available! Using defaults.";
LOG(debug) << "FairMQProgOptions not available! Using defaults.";
}
fSessionName.resize(8, '_'); // shorten the session name, to accommodate for name size limit on some systems (MacOS)
@@ -149,14 +149,14 @@ void FairMQTransportFactorySHM::StartMonitor()
auto env = boost::this_process::environment();
vector<boost::filesystem::path> ownPath = boost::this_process::path();
vector<bfs::path> ownPath = boost::this_process::path();
if (const char* fmqp = getenv("FAIRMQ_PATH"))
{
ownPath.insert(ownPath.begin(), boost::filesystem::path(fmqp));
ownPath.insert(ownPath.begin(), bfs::path(fmqp));
}
boost::filesystem::path p = boost::process::search_path("fairmq-shmmonitor", ownPath);
bfs::path p = boost::process::search_path("fairmq-shmmonitor", ownPath);
if (!p.empty())
{
@@ -184,7 +184,7 @@ void FairMQTransportFactorySHM::StartMonitor()
}
else
{
LOG(WARN) << "could not find fairmq-shmmonitor in the path";
LOG(warn) << "could not find fairmq-shmmonitor in the path";
}
}

View File

@@ -53,7 +53,7 @@ Region::Region(Manager& manager, uint64_t id, uint64_t size, bool remote, FairMQ
LOG(debug) << "shmem: created region queue: " << fQueueName;
}
fRegion = bipc::mapped_region(fShmemObject, bipc::read_write); // TODO: add HUGEPAGES flag here
// fRegion = bipc::mapped_region(fShmemObject, bipc::read_write, 0, 0, 0, MAP_HUGETLB | MAP_HUGE_1GB);
// fRegion = bipc::mapped_region(fShmemObject, bipc::read_write, 0, 0, 0, MAP_ANONYMOUS | MAP_HUGETLB);
}
void Region::StartReceivingAcks()

View File

@@ -11,6 +11,7 @@
#include <boost/process.hpp>
#include <iostream>
#include <sstream>
using namespace std;

View File

@@ -39,7 +39,7 @@ FairMQTransportFactoryZMQ::FairMQTransportFactoryZMQ(const string& id, const Fai
}
else
{
LOG(warn) << "FairMQProgOptions not available! Using defaults.";
LOG(debug) << "FairMQProgOptions not available! Using defaults.";
}
if (zmq_ctx_set(fContext, ZMQ_IO_THREADS, numIoThreads) != 0)

View File

@@ -8,6 +8,7 @@
#include <FairMQDevice.h>
#include <cstddef>
#include <thread>
namespace fair
@@ -53,7 +54,7 @@ class PairLeft : public FairMQDevice
if (ret > 0) {
auto content = std::string{static_cast<char*>(msg6->GetData()), msg6->GetSize()};
LOG(info) << ret << ", " << msg6->GetSize() << ", '" << content << "'";
if (msg6->GetSize() == ret && content == "testdata1234") counter++;
if (msg6->GetSize() == static_cast<std::size_t>(ret) && content == "testdata1234") counter++;
}
if (counter == 6) LOG(info) << "Simple message with short text data successfull";

View File

@@ -7,6 +7,7 @@
********************************************************************************/
#include <FairMQDevice.h>
#include <cstddef>
#include <string>
#include <thread>
@@ -51,7 +52,7 @@ class PairRight : public FairMQDevice
if (ret > 0) {
auto content = std::string{static_cast<char*>(msg5->GetData()), msg5->GetSize()};
LOG(info) << ret << ", " << msg5->GetSize() << ", '" << content << "'";
if (msg5->GetSize() == ret && content == "testdata1234") counter++;
if (msg5->GetSize() == static_cast<std::size_t>(ret) && content == "testdata1234") counter++;
}
auto msg6(NewSimpleMessageFor("data", 0, "testdata1234"));
if (Send(msg6, "data") >= 0) counter++;

View File

@@ -63,7 +63,7 @@ TEST_F(PluginServices, ConfigCallbacks)
if (key == "chans.data.0.address") { ASSERT_EQ(value, "tcp://localhost:4321"); }
});
mServices.SubscribeToPropertyChange<int>("test", [](const string& key, int value) {
mServices.SubscribeToPropertyChange<int>("test", [](const string& key, int /*value*/) {
if(key == "chans.data.0.rcvBufSize") {
FAIL(); // should not be called because we unsubscribed
}

View File

@@ -88,7 +88,7 @@ TEST(PluginManager, LoadPluginStatic)
// program options
auto count = 0;
mgr.ForEachPluginProgOptions([&count](const options_description& d){ ++count; });
mgr.ForEachPluginProgOptions([&count](const options_description&){ ++count; });
ASSERT_EQ(count, 1);
mgr.WaitForPluginsToReleaseDeviceControl();

View File

@@ -38,7 +38,7 @@ TEST(StateMachine, RegularFSM)
ASSERT_NO_THROW(fsm.ChangeState(T::Automatic));
int cnt{0};
fsm.SubscribeToStateQueued("test", [&](S newState, S lastState){
fsm.SubscribeToStateQueued("test", [&](S /*newState*/, S /*lastState*/){
++cnt;
});