From 5933046ffdd01772a37c30aabafd8e229593fb94 Mon Sep 17 00:00:00 2001 From: Dennis Klein Date: Wed, 1 Mar 2023 16:17:04 +0100 Subject: [PATCH] feat(ofi)!: Remove `ofi` transport BREAKING CHANGE Due to a lack of users, we remove the experimental code. The latest implementation can be found in release v1.4.56. This does not mean it will never be picked up again, but for now there are no plans. --- CMakeLists.txt | 7 +- FairMQTest.cmake | 3 - Jenkinsfile | 16 +- README.md | 14 +- cmake/FairMQDependencies.cmake | 19 +- cmake/FairMQSummary.cmake | 8 +- docs/Configuration.md | 1 - examples/multipart/CMakeLists.txt | 7 +- examples/multipart/test-ex-multipart.sh.in | 6 +- examples/readout/README.md | 6 +- .../fairmq-start-ex-readout-processing.sh.in | 2 - .../readout/fairmq-start-ex-readout.sh.in | 2 - fairmq/CMakeLists.txt | 29 - fairmq/TransportFactory.cxx | 13 +- fairmq/Transports.h | 15 +- fairmq/ofi/Context.cxx | 130 ---- fairmq/ofi/Context.h | 92 --- fairmq/ofi/ControlMessages.h | 112 --- fairmq/ofi/Message.cxx | 199 ----- fairmq/ofi/Message.h | 81 --- fairmq/ofi/Socket.cxx | 680 ------------------ fairmq/ofi/Socket.h | 125 ---- fairmq/ofi/TransportFactory.h | 218 ------ fairmq/plugins/config/Config.cxx | 3 +- test/CMakeLists.txt | 4 - test/ci/Containerfile.ubuntu | 5 - test/protocols/_pair.cxx | 14 +- 27 files changed, 24 insertions(+), 1787 deletions(-) delete mode 100644 fairmq/ofi/Context.cxx delete mode 100644 fairmq/ofi/Context.h delete mode 100644 fairmq/ofi/ControlMessages.h delete mode 100644 fairmq/ofi/Message.cxx delete mode 100644 fairmq/ofi/Message.h delete mode 100644 fairmq/ofi/Socket.cxx delete mode 100644 fairmq/ofi/Socket.h delete mode 100644 fairmq/ofi/TransportFactory.h diff --git a/CMakeLists.txt b/CMakeLists.txt index ada71d81..aad6e3d7 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,5 +1,5 @@ ################################################################################ -# Copyright (C) 2018-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH # +# Copyright (C) 2018-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH # # # # This software is distributed under the terms of the # # GNU Lesser General Public Licence (LGPL) version 3, # @@ -27,8 +27,6 @@ fairmq_build_option(BUILD_FAIRMQ "Build FairMQ library and devices." DEFAULT ON) fairmq_build_option(BUILD_TESTING "Build tests." DEFAULT OFF REQUIRES "BUILD_FAIRMQ") -fairmq_build_option(BUILD_OFI_TRANSPORT "Build experimental OFI transport." - DEFAULT OFF REQUIRES "BUILD_FAIRMQ") fairmq_build_option(BUILD_PMIX_PLUGIN "Build PMIx plugin." DEFAULT OFF REQUIRES "BUILD_FAIRMQ") fairmq_build_option(BUILD_EXAMPLES "Build FairMQ examples." @@ -89,9 +87,6 @@ endif() if(BUILD_PMIX_PLUGIN) list(APPEND PROJECT_PACKAGE_COMPONENTS pmix_plugin) endif() -if(BUILD_OFI_TRANSPORT) - list(APPEND PROJECT_PACKAGE_COMPONENTS ofi_transport) -endif() if(BUILD_EXAMPLES) list(APPEND PROJECT_PACKAGE_COMPONENTS examples) endif() diff --git a/FairMQTest.cmake b/FairMQTest.cmake index d8761e30..df2ae6f4 100644 --- a/FairMQTest.cmake +++ b/FairMQTest.cmake @@ -45,9 +45,6 @@ list(APPEND options "-DDISABLE_COLOR=ON" "-DBUILD_EXAMPLES=ON" "-DBUILD_TESTING= if(HAS_PMIX) list(APPEND options "-DBUILD_PMIX_PLUGIN=ON") endif() -if(HAS_ASIO AND HAS_ASIOFI) - list(APPEND options "-DBUILD_OFI_TRANSPORT=ON") -endif() if(RUN_STATIC_ANALYSIS) list(APPEND options "-DRUN_STATIC_ANALYSIS=ON") endif() diff --git a/Jenkinsfile b/Jenkinsfile index f67e77a8..81fef6f3 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -84,22 +84,20 @@ pipeline{ stage("CI") { steps{ script { - def all = '-DHAS_ASIO=ON -DHAS_ASIOFI=ON -DHAS_PMIX=ON' - def builds = jobMatrix('build', [ - [os: 'ubuntu', ver: '20.04', arch: 'x86_64', compiler: 'gcc-9', extra: all], - [os: 'ubuntu', ver: '22.04', arch: 'x86_64', compiler: 'gcc-11', extra: '-DHAS_ASIO=ON -DHAS_ASIOFI=ON'], - [os: 'fedora', ver: '33', arch: 'x86_64', compiler: 'gcc-10', extra: all], - [os: 'fedora', ver: '34', arch: 'x86_64', compiler: 'gcc-11', extra: all], - [os: 'fedora', ver: '35', arch: 'x86_64', compiler: 'gcc-11', extra: all], - [os: 'fedora', ver: '36', arch: 'x86_64', compiler: 'gcc-12', extra: all], + [os: 'ubuntu', ver: '20.04', arch: 'x86_64', compiler: 'gcc-9', extra: '-DHAS_PMIX=ON'], + [os: 'ubuntu', ver: '22.04', arch: 'x86_64', compiler: 'gcc-11'], + [os: 'fedora', ver: '33', arch: 'x86_64', compiler: 'gcc-10', extra: '-DHAS_PMIX=ON'], + [os: 'fedora', ver: '34', arch: 'x86_64', compiler: 'gcc-11', extra: '-DHAS_PMIX=ON'], + [os: 'fedora', ver: '35', arch: 'x86_64', compiler: 'gcc-11', extra: '-DHAS_PMIX=ON'], + [os: 'fedora', ver: '36', arch: 'x86_64', compiler: 'gcc-12', extra: '-DHAS_PMIX=ON'], [os: 'fedora', ver: '37', arch: 'x86_64', compiler: 'gcc-12', extra: '-DHAS_PMIX=ON'], [os: 'fedora', ver: '38', arch: 'x86_64', compiler: 'gcc-13', extra: '-DHAS_PMIX=ON'], [os: 'macos', ver: '12', arch: 'x86_64', compiler: 'apple-clang-13', extra: '-DHAS_ASIO=ON'], [os: 'macos', ver: '12', arch: 'arm64', compiler: 'apple-clang-13', extra: '-DHAS_ASIO=ON'], ]) - def all_debug = "${all} -DCMAKE_BUILD_TYPE=Debug" + def all_debug = "-DCMAKE_BUILD_TYPE=Debug" def checks = jobMatrix('check', [ [name: 'static-analyzers', extra: "${all_debug} -DRUN_STATIC_ANALYSIS=ON"], diff --git a/README.md b/README.md index 11704722..00773699 100644 --- a/README.md +++ b/README.md @@ -23,7 +23,8 @@ FairMQ is designed to help implementing large-scale data processing workflows ne The core of FairMQ provides an abstract asynchronous message passing API with scalability protocols inspired by [ZeroMQ](https://github.com/zeromq/libzmq) (e.g. PUSH/PULL, PUB/SUB). FairMQ provides multiple implementations for its API (so-called "transports", -e.g. `zeromq`, `shmem` and `ofi` (in development)) to cover a variety of use cases +e.g. `zeromq` and `shmem` (latest release of the `ofi` transport in v1.4.56, removed since v1.5+)) to cover +a variety of use cases (e.g. inter-thread, inter-process, inter-node communication) and machines (e.g. Ethernet, Infiniband). In addition to this core functionality FairMQ provides a framework for creating "devices" - actors which are communicating through message passing. FairMQ does not only allow the user to use different transport @@ -73,18 +74,8 @@ If FairMQ is not installed in system directories, you can hint the installation: list(PREPEND CMAKE_PREFIX_PATH /path/to/fairmq_install) ``` -Optionally, you can require certain FairMQ package components and a minimum version: - -```cmake -find_package(FairMQ 1.4.50 COMPONENTS ofi_transport) -``` - -When building FairMQ, CMake will print a summary table of all available package components. - ## Dependencies - * [asio](https://github.com/chriskohlhoff/asio) - * [asiofi](https://github.com/FairRootGroup/asiofi) * [Boost](https://www.boost.org/) * [CMake](https://cmake.org/) * [Doxygen](http://www.doxygen.org/) @@ -105,7 +96,6 @@ On command line: * `-DDISABLE_COLOR=ON` disables coloured console output. * `-DBUILD_TESTING=OFF` disables building of tests. * `-DBUILD_EXAMPLES=OFF` disables building of examples. - * `-DBUILD_OFI_TRANSPORT=ON` enables building of the experimental OFI transport. * `-DBUILD_PMIX_PLUGIN=ON` enables building of the PMIx plugin. * `-DBUILD_DOCS=ON` enables building of API docs. * You can hint non-system installations for dependent packages, see the #installation-from-source section above diff --git a/cmake/FairMQDependencies.cmake b/cmake/FairMQDependencies.cmake index b1b1d5aa..15418120 100644 --- a/cmake/FairMQDependencies.cmake +++ b/cmake/FairMQDependencies.cmake @@ -1,5 +1,5 @@ ################################################################################ -# Copyright (C) 2018-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH # +# Copyright (C) 2018-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH # # # # This software is distributed under the terms of the # # GNU Lesser General Public Licence (LGPL) version 3, # @@ -18,11 +18,6 @@ if(BUILD_FAIRMQ) set(Threads_PREFIX "") endif() -if(BUILD_OFI_TRANSPORT) - find_package2(PRIVATE asiofi REQUIRED VERSION 0.5) - find_package2(PRIVATE OFI REQUIRED) -endif() - if(BUILD_PMIX_PLUGIN) find_package2(PRIVATE PMIx REQUIRED VERSION 2.1.4) endif() @@ -34,14 +29,6 @@ if(BUILD_FAIRMQ OR BUILD_TIDY_TOOL) ) endif() -if(BUILD_OFI_TRANSPORT) - set(__old ${CMAKE_FIND_PACKAGE_PREFER_CONFIG}) - set(CMAKE_FIND_PACKAGE_PREFER_CONFIG ON) - find_package2(PUBLIC asio REQUIRED VERSION 1.18) - set(CMAKE_FIND_PACKAGE_PREFER_CONFIG ${__old}) - unset(__old) -endif() - if(BUILD_FAIRMQ) find_package2(PRIVATE ZeroMQ REQUIRED VERSION 4.1.4) if(NOT PicoSHA2_BUNDLED) @@ -87,10 +74,6 @@ if(PROJECT_PACKAGE_DEPENDENCIES) if(NOT FairLogger_PREFIX AND FairLogger_ROOT) set(FairLogger_PREFIX ${FairLogger_ROOT}) endif() - elseif(${dep} STREQUAL asiofi) - if(NOT asiofi_PREFIX AND asiofi_ROOT) - set(asiofi_PREFIX ${asiofi_ROOT}) - endif() elseif(${dep} STREQUAL Boost) if(TARGET Boost::headers) get_target_property(boost_include Boost::headers INTERFACE_INCLUDE_DIRECTORIES) diff --git a/cmake/FairMQSummary.cmake b/cmake/FairMQSummary.cmake index 4a195412..f8777d5a 100644 --- a/cmake/FairMQSummary.cmake +++ b/cmake/FairMQSummary.cmake @@ -1,5 +1,5 @@ ################################################################################ -# Copyright (C) 2018-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH # +# Copyright (C) 2018-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH # # # # This software is distributed under the terms of the # # GNU Lesser General Public Licence (LGPL) version 3, # @@ -27,12 +27,6 @@ macro(fairmq_summary_components) set(tests_summary "${BRed} NO${CR} (default, enable with ${BMagenta}-DBUILD_TESTING=ON${CR})") endif() message(STATUS " ${BWhite}tests${CR} ${tests_summary}") - if(BUILD_OFI_TRANSPORT) - set(ofi_summary "${BGreen}YES${CR} EXPERIMENTAL (disable with ${BMagenta}-DBUILD_OFI_TRANSPORT=OFF${CR})") - else() - set(ofi_summary "${BRed} NO${CR} EXPERIMENTAL (default, enable with ${BMagenta}-DBUILD_OFI_TRANSPORT=ON${CR})") - endif() - message(STATUS " ${BWhite}ofi_transport${CR} ${ofi_summary}") if(BUILD_PMIX_PLUGIN) set(pmix_summary "${BGreen}YES${CR} EXPERIMENTAL (disable with ${BMagenta}-DBUILD_PMIX_PLUGIN=OFF${CR})") else() diff --git a/docs/Configuration.md b/docs/Configuration.md index 733f0c66..d1c56306 100644 --- a/docs/Configuration.md +++ b/docs/Configuration.md @@ -28,7 +28,6 @@ Here is an overview of the device/channel options and when they are applied: | `init-timeout` | at the end of `fair::mq::State::InitializingDevice` | | `shm-segment-size` | at the end of `fair::mq::State::InitializingDevice` | | `shm-monitor` | at the end of `fair::mq::State::InitializingDevice` | -| `ofi-size-hint` | at the end of `fair::mq::State::InitializingDevice` | | `rate` | at the end of `fair::mq::State::InitializingDevice` | | `session` | at the end of `fair::mq::State::InitializingDevice` | | `chan.*` | at the end of `fair::mq::State::InitializingDevice` (channel addresses can be also applied during `fair::mq::State::Binding`/`fair::mq::State::Connecting`) | diff --git a/examples/multipart/CMakeLists.txt b/examples/multipart/CMakeLists.txt index 163da69f..8ad94f5e 100644 --- a/examples/multipart/CMakeLists.txt +++ b/examples/multipart/CMakeLists.txt @@ -1,5 +1,5 @@ ################################################################################ - # Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH # + # Copyright (C) 2014-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH # # # # This software is distributed under the terms of the # # GNU Lesser General Public Licence (LGPL) version 3, # @@ -28,11 +28,6 @@ set_tests_properties(Example.Multipart.zeromq PROPERTIES TIMEOUT "30") add_test(NAME Example.Multipart.shmem COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multipart.sh shmem) set_tests_properties(Example.Multipart.shmem PROPERTIES TIMEOUT "30") -if(BUILD_OFI_TRANSPORT) - add_test(NAME Example.Multipart.ofi COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multipart.sh ofi) - set_tests_properties(Example.Multipart.ofi PROPERTIES TIMEOUT "30") -endif() - # install install( diff --git a/examples/multipart/test-ex-multipart.sh.in b/examples/multipart/test-ex-multipart.sh.in index 29380b87..0e5818e6 100755 --- a/examples/multipart/test-ex-multipart.sh.in +++ b/examples/multipart/test-ex-multipart.sh.in @@ -14,11 +14,7 @@ session="$(@CMAKE_BINARY_DIR@/fairmq/fairmq-uuid-gen -h)" chan="data" chanAddr="" chanIpcFile="/tmp/fmq_$session""_""$chan""_""$transport" -if [ $transport = "ofi" ]; then - chanAddr="tcp://127.0.0.1:5656" -else - chanAddr="ipc://""$chanIpcFile" -fi +chanAddr="ipc://""$chanIpcFile" # setup a trap to kill everything if the test fails/timeouts trap 'set +e; kill -TERM $SAMPLER_PID; kill -TERM $SINK_PID; wait $SAMPLER_PID; wait $SINK_PID; rm $chanIpcFile; exit 0' TERM diff --git a/examples/readout/README.md b/examples/readout/README.md index ac49c19a..b5d7b81f 100644 --- a/examples/readout/README.md +++ b/examples/readout/README.md @@ -7,19 +7,19 @@ This examples shows two possible topologies (out of many) for a node connected t ``` |------------------------------- Readout Node ---------------------------| |- Processing Node -| | Readout --> Builder --> Sender | --> | Receiver | -| [# shared memory segment (unused in this topology) ##################] | ofi | | +| [# shared memory segment (unused in this topology) ##################] | zmq | | | [# shmem unmanaged region (readout writes here, others read) ########] | | | |------------------------------------------------------------------------| |-------------------| ``` -The devices one the Readout Node communicate via shared memory transport. Readout device writes into shared memory unmanaged region. The data is then forwarded through Builder to Sender process, which sends it out via OFI transport. +The devices one the Readout Node communicate via shared memory transport. Readout device writes into shared memory unmanaged region. The data is then forwarded through Builder to Sender process, which sends it out via zeromq transport. ## Setup with generating new data on the Readout node ``` |------------------------------- Readout Node ---------------------------| |- Processing Node -| | Readout --> Builder --> Processor --> Sender | --> | Receiver | -| [# shared memory segment (used between Proccessor and Sender) #######] | ofi | | +| [# shared memory segment (used between Proccessor and Sender) #######] | zmq | | | [# shmem unmanaged region (readout writes here, builder & proc read) ] | | | |------------------------------------------------------------------------| |-------------------| ``` diff --git a/examples/readout/fairmq-start-ex-readout-processing.sh.in b/examples/readout/fairmq-start-ex-readout-processing.sh.in index 9f8a69cb..86e5d9c0 100755 --- a/examples/readout/fairmq-start-ex-readout-processing.sh.in +++ b/examples/readout/fairmq-start-ex-readout-processing.sh.in @@ -31,12 +31,10 @@ SENDER="fairmq-ex-readout-sender" SENDER+=" --id sender1" SENDER+=" --input-name ps" SENDER+=" --channel-config name=ps,type=pair,method=bind,address=tcp://localhost:7779,transport=shmem" -#SENDER+=" name=sr,type=pair,method=connect,address=tcp://localhost:7780,transport=ofi" SENDER+=" name=sr,type=pair,method=connect,address=tcp://localhost:7780,transport=zeromq" xterm -geometry 80x23+1000+0 -hold -e @EX_BIN_DIR@/$SENDER & RECEIVER="fairmq-ex-readout-receiver" RECEIVER+=" --id receiver1" -#RECEIVER+=" --channel-config name=sr,type=pair,method=bind,address=tcp://localhost:7780,transport=ofi" RECEIVER+=" --channel-config name=sr,type=pair,method=bind,address=tcp://localhost:7780,transport=zeromq" xterm -geometry 80x23+1500+0 -hold -e @EX_BIN_DIR@/$RECEIVER & diff --git a/examples/readout/fairmq-start-ex-readout.sh.in b/examples/readout/fairmq-start-ex-readout.sh.in index 01d605cf..ea9c4a28 100755 --- a/examples/readout/fairmq-start-ex-readout.sh.in +++ b/examples/readout/fairmq-start-ex-readout.sh.in @@ -25,12 +25,10 @@ SENDER="fairmq-ex-readout-sender" SENDER+=" --id sender1" SENDER+=" --input-name bs" SENDER+=" --channel-config name=bs,type=pair,method=bind,address=tcp://localhost:7778,transport=shmem" -# SENDER+=" name=sr,type=pair,method=connect,address=tcp://localhost:7779,transport=ofi" SENDER+=" name=sr,type=pair,method=connect,address=tcp://localhost:7779,transport=zeromq" xterm -geometry 80x23+1000+0 -hold -e @EX_BIN_DIR@/$SENDER & RECEIVER="fairmq-ex-readout-receiver" RECEIVER+=" --id receiver1" -# RECEIVER+=" --channel-config name=sr,type=pair,method=bind,address=tcp://localhost:7779,transport=ofi" RECEIVER+=" --channel-config name=sr,type=pair,method=bind,address=tcp://localhost:7779,transport=zeromq" xterm -geometry 80x23+1500+0 -hold -e @EX_BIN_DIR@/$RECEIVER & diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index caa76ad5..6d108794 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -110,16 +110,6 @@ if(BUILD_FAIRMQ) zeromq/TransportFactory.h ) - if(BUILD_OFI_TRANSPORT) - set(FAIRMQ_PRIVATE_HEADER_FILES ${FAIRMQ_PRIVATE_HEADER_FILES} - ofi/Context.h - ofi/ControlMessages.h - ofi/Message.h - ofi/Socket.h - ofi/TransportFactory.h - ) - endif() - ########################## # libFairMQ source files # ########################## @@ -150,14 +140,6 @@ if(BUILD_FAIRMQ) tools/Unique.cxx ) - if(BUILD_OFI_TRANSPORT) - set(FAIRMQ_SOURCE_FILES ${FAIRMQ_SOURCE_FILES} - ofi/Context.cxx - ofi/Message.cxx - ofi/Socket.cxx - ) - endif() - ################### # configure files # @@ -185,9 +167,6 @@ if(BUILD_FAIRMQ) if(FAIRMQ_DEBUG_MODE) target_compile_definitions(${target} PUBLIC FAIRMQ_DEBUG_MODE) endif() - if(BUILD_OFI_TRANSPORT) - target_compile_definitions(${target} PRIVATE BUILD_OFI_TRANSPORT) - endif() target_compile_definitions(${target} PUBLIC FAIRMQ_HAS_STD_FILESYSTEM=${FAIRMQ_HAS_STD_FILESYSTEM} FAIRMQ_HAS_STD_PMR=${FAIRMQ_HAS_STD_PMR} @@ -209,13 +188,6 @@ if(BUILD_FAIRMQ) ################## # link libraries # ################## - if(BUILD_OFI_TRANSPORT) - set(OFI_DEPS - asio::asio - asiofi::asiofi - ) - endif() - target_link_libraries(${target} INTERFACE # only consumers link against interface dependencies Boost::container @@ -233,7 +205,6 @@ if(BUILD_FAIRMQ) PRIVATE # only libFairMQ links against private dependencies libzmq PicoSHA2 - ${OFI_DEPS} ) set_target_properties(${target} PROPERTIES VERSION ${PROJECT_VERSION} diff --git a/fairmq/TransportFactory.cxx b/fairmq/TransportFactory.cxx index 72c8195a..e91754c3 100644 --- a/fairmq/TransportFactory.cxx +++ b/fairmq/TransportFactory.cxx @@ -1,5 +1,5 @@ /******************************************************************************** - * Copyright (C) 2017-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * Copyright (C) 2017-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence (LGPL) version 3, * @@ -9,9 +9,6 @@ #include #include #include -#ifdef BUILD_OFI_TRANSPORT -#include -#endif #include #include #include @@ -39,20 +36,12 @@ auto TransportFactory::CreateTransportFactory(const string& type, } else if (type == "shmem") { return make_shared(finalId, config); } -#ifdef BUILD_OFI_TRANSPORT - else if (type == "ofi") { - return make_shared(finalId, config); - } -#endif /* BUILD_OFI_TRANSPORT */ else { LOG(error) << "Unavailable transport requested: " << "\"" << type << "\"" << ". Available are: " << "\"zeromq\"," << "\"shmem\"" -#ifdef BUILD_OFI_TRANSPORT - << ", and \"ofi\"" -#endif /* BUILD_OFI_TRANSPORT */ << ". Exiting."; throw TransportFactoryError(tools::ToString("Unavailable transport requested: ", type)); } diff --git a/fairmq/Transports.h b/fairmq/Transports.h index b15a3887..3725e828 100644 --- a/fairmq/Transports.h +++ b/fairmq/Transports.h @@ -1,5 +1,5 @@ /******************************************************************************** - * Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * Copyright (C) 2014-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence (LGPL) version 3, * @@ -22,8 +22,7 @@ enum class Transport { DEFAULT, ZMQ, - SHM, - OFI + SHM }; struct TransportError : std::runtime_error @@ -34,15 +33,13 @@ struct TransportError : std::runtime_error static const std::unordered_map TransportTypes{ {"default", Transport::DEFAULT}, {"zeromq", Transport::ZMQ}, - {"shmem", Transport::SHM}, - {"ofi", Transport::OFI} + {"shmem", Transport::SHM} }; static const std::unordered_map TransportNames{ {Transport::DEFAULT, "default"}, {Transport::ZMQ, "zeromq"}, - {Transport::SHM, "shmem"}, - {Transport::OFI, "ofi"} + {Transport::SHM, "shmem"} }; inline std::string TransportName(Transport transport) { return TransportNames.at(transport); } @@ -61,11 +58,7 @@ inline std::ostream& operator<<(std::ostream& os, const Transport& transport) inline auto GetEnabledTransports() -> std::vector { -#ifdef BUILD_OFI_TRANSPORT - return {Transport::ZMQ, Transport::SHM, Transport::OFI}; -#else return {Transport::ZMQ, Transport::SHM}; -#endif } } // namespace fair::mq diff --git a/fairmq/ofi/Context.cxx b/fairmq/ofi/Context.cxx deleted file mode 100644 index fd412b24..00000000 --- a/fairmq/ofi/Context.cxx +++ /dev/null @@ -1,130 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2018-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -namespace fair::mq::ofi -{ - -using namespace std; - -Context::Context(mq::TransportFactory& sendFactory, - mq::TransportFactory& receiveFactory, - int numberIoThreads) - : fIoWork(fIoContext) - , fReceiveFactory(receiveFactory) - , fSendFactory(sendFactory) - , fSizeHint(0) -{ - InitThreadPool(numberIoThreads); -} - -auto Context::InitThreadPool(int numberIoThreads) -> void -{ - assert(numberIoThreads > 0); - - for (int i = 1; i <= numberIoThreads; ++i) { - fThreadPool.emplace_back([&, i, numberIoThreads]{ - try { - LOG(debug) << "OFI transport: I/O thread #" << i << " of " << numberIoThreads << " started"; - fIoContext.run(); - LOG(debug) << "OFI transport: I/O thread #" << i << " of " << numberIoThreads << " stopped"; - } catch (const std::exception& e) { - LOG(error) << "OFI transport: Uncaught exception in I/O thread #" << i << ": " << e.what(); - } catch (...) { - LOG(error) << "OFI transport: Uncaught exception in I/O thread #" << i; - } - }); - } -} - -auto Context::Reset() -> void -{ - // TODO "Linger", rethink this - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); - - fIoContext.stop(); -} - -Context::~Context() -{ - for (auto& thread : fThreadPool) - thread.join(); -} - -auto Context::GetAsiofiVersion() const -> string -{ - return ASIOFI_VERSION; -} - -auto Context::ConvertAddress(std::string address) -> Address -{ - string protocol, ip; - unsigned int port = 0; - regex address_regex("^([a-z]+)://([0-9]+\\.[0-9]+\\.[0-9]+\\.[0-9]+):([0-9]+).*"); - smatch address_result; - if (regex_match(address, address_result, address_regex)) { - protocol = address_result[1]; - ip = address_result[2]; - port = stoul(address_result[3]); - // LOG(debug) << "Parsed '" << protocol << "', '" << ip << "', '" << port << "' fields from '" << address << "'"; - } else { - throw ContextError(tools::ToString("Wrong format: Address must be in format prot://ip:port")); - } - - return { protocol, ip, port }; -} - -auto Context::ConvertAddress(Address address) -> sockaddr_in -{ - sockaddr_in sa; - if (inet_pton(AF_INET, address.Ip.c_str(), &(sa.sin_addr)) != 1) - throw ContextError(tools::ToString("Failed to convert given IP '", address.Ip, "' to struct in_addr, reason: ", strerror(errno))); - sa.sin_port = htons(address.Port); - sa.sin_family = AF_INET; - - return sa; -} - -auto Context::ConvertAddress(sockaddr_in address) -> Address -{ - return {"tcp", inet_ntoa(address.sin_addr), ntohs(address.sin_port)}; -} - -auto Context::VerifyAddress(const std::string& address) -> Address -{ - auto addr = ConvertAddress(address); - - if (!(addr.Protocol == "tcp" || addr.Protocol == "verbs")) - throw ContextError("Wrong protocol: Supported protocols are: tcp:// and verbs://"); - - return addr; -} - -auto Context::MakeReceiveMessage(size_t size) -> MessagePtr -{ - return fReceiveFactory.CreateMessage(size); -} - -auto Context::MakeSendMessage(size_t size) -> MessagePtr -{ - return fSendFactory.CreateMessage(size); -} - -} // namespace fair::mq::ofi diff --git a/fairmq/ofi/Context.h b/fairmq/ofi/Context.h deleted file mode 100644 index f9c8d884..00000000 --- a/fairmq/ofi/Context.h +++ /dev/null @@ -1,92 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2018-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ - -#ifndef FAIR_MQ_OFI_CONTEXT_H -#define FAIR_MQ_OFI_CONTEXT_H - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -namespace fair::mq::ofi -{ - -enum class ConnectionType : bool { Bind, Connect }; - -struct Address { - std::string Protocol; - std::string Ip; - unsigned int Port; - friend auto operator<<(std::ostream& os, const Address& a) -> std::ostream& - { - return os << a.Protocol << "://" << a.Ip << ":" << a.Port; - } - friend auto operator==(const Address& lhs, const Address& rhs) -> bool - { - return (lhs.Protocol == rhs.Protocol) && (lhs.Ip == rhs.Ip) && (lhs.Port == rhs.Port); - } -}; - -/** - * @class Context Context.h - * @brief Transport-wide context - * - * @todo TODO insert long description - */ -class Context -{ - public: - Context(mq::TransportFactory& sendFactory, - mq::TransportFactory& receiveFactory, - int numberIoThreads = 1); - Context(const Context&) = delete; - Context(Context&&) = delete; - Context& operator=(const Context&) = delete; - Context& operator=(Context&&) = delete; - ~Context(); - - auto GetAsiofiVersion() const -> std::string; - auto GetIoContext() -> asio::io_context& { return fIoContext; } - static auto ConvertAddress(std::string address) -> Address; - static auto ConvertAddress(Address address) -> sockaddr_in; - static auto ConvertAddress(sockaddr_in address) -> Address; - static auto VerifyAddress(const std::string& address) -> Address; - auto Interrupt() -> void { LOG(debug) << "OFI transport: Interrupted (NOOP - not implemented)."; } - auto Resume() -> void { LOG(debug) << "OFI transport: Resumed (NOOP - not implemented)."; } - auto Reset() -> void; - auto MakeReceiveMessage(size_t size) -> MessagePtr; - auto MakeSendMessage(size_t size) -> MessagePtr; - auto GetSizeHint() -> size_t { return fSizeHint; } - auto SetSizeHint(size_t size) -> void { fSizeHint = size; } - - private: - asio::io_context fIoContext; - asio::io_context::work fIoWork; - std::vector fThreadPool; - mq::TransportFactory& fReceiveFactory; - mq::TransportFactory& fSendFactory; - size_t fSizeHint; - - auto InitThreadPool(int numberIoThreads) -> void; -}; /* class Context */ - -struct ContextError : std::runtime_error { using std::runtime_error::runtime_error; }; - -} // namespace fair::mq::ofi - -#endif /* FAIR_MQ_OFI_CONTEXT_H */ diff --git a/fairmq/ofi/ControlMessages.h b/fairmq/ofi/ControlMessages.h deleted file mode 100644 index c68dc366..00000000 --- a/fairmq/ofi/ControlMessages.h +++ /dev/null @@ -1,112 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2018-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ - -#ifndef FAIR_MQ_OFI_CONTROLMESSAGES_H -#define FAIR_MQ_OFI_CONTROLMESSAGES_H - -#include -#include -#include -#include -#include -#include -#include - -namespace asio -{ - -template -auto buffer(const PodType& obj) -> asio::const_buffer -{ - return asio::const_buffer(static_cast(&obj), sizeof(PodType)); -} - -} // namespace asio - -namespace fair::mq::ofi -{ - -enum class ControlMessageType -{ - Empty = 1, - PostBuffer, - PostMultiPartStartBuffer -}; - -struct Empty -{}; - -struct PostBuffer -{ - uint64_t size; // buffer size (size_t) -}; - -struct PostMultiPartStartBuffer -{ - uint32_t numParts; // buffer size (size_t) - uint64_t size; // buffer size (size_t) -}; - -union ControlMessageContent -{ - PostBuffer postBuffer; - PostMultiPartStartBuffer postMultiPartStartBuffer; -}; - -struct ControlMessage -{ - ControlMessageType type; - ControlMessageContent msg; -}; - -template -using unique_ptr = std::unique_ptr>; - -template -auto MakeControlMessageWithPmr(std::pmr::memory_resource& pmr, Args&&... args) - -> ofi::unique_ptr -{ - void* mem = pmr.allocate(sizeof(ControlMessage)); - ControlMessage* ctrl = new (mem) ControlMessage(); - - if (std::is_same::value) { - ctrl->type = ControlMessageType::PostBuffer; - ctrl->msg.postBuffer = PostBuffer(std::forward(args)...); - } else if (std::is_same::value) { - ctrl->type = ControlMessageType::PostMultiPartStartBuffer; - ctrl->msg.postMultiPartStartBuffer = PostMultiPartStartBuffer(std::forward(args)...); - } else if (std::is_same::value) { - ctrl->type = ControlMessageType::Empty; - } - - return ofi::unique_ptr(ctrl, [&pmr](ControlMessage* p) { - p->~ControlMessage(); - pmr.deallocate(p, sizeof(T)); - }); -} - -template -auto MakeControlMessage(Args&&... args) -> ControlMessage -{ - ControlMessage ctrl; - - if (std::is_same::value) { - ctrl.type = ControlMessageType::PostBuffer; - } else if (std::is_same::value) { - ctrl.type = ControlMessageType::PostMultiPartStartBuffer; - } else if (std::is_same::value) { - ctrl.type = ControlMessageType::Empty; - } - ctrl.msg = T(std::forward(args)...); - - return ctrl; -} - -} // namespace fair::mq::ofi - -#endif /* FAIR_MQ_OFI_CONTROLMESSAGES_H */ diff --git a/fairmq/ofi/Message.cxx b/fairmq/ofi/Message.cxx deleted file mode 100644 index 6f4b6e23..00000000 --- a/fairmq/ofi/Message.cxx +++ /dev/null @@ -1,199 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2018-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ - -#include -#include -#include -#include -#include -#include - -namespace fair::mq::ofi -{ - -using namespace std; - -Message::Message(pmr::memory_resource* pmr) - : fInitialSize(0) - , fSize(0) - , fData(nullptr) - , fFreeFunction(nullptr) - , fHint(nullptr) - , fPmr(pmr) -{ -} - -Message::Message(pmr::memory_resource* pmr, Alignment /* alignment */) - : fInitialSize(0) - , fSize(0) - , fData(nullptr) - , fFreeFunction(nullptr) - , fHint(nullptr) - , fPmr(pmr) -{ -} - -Message::Message(pmr::memory_resource* pmr, const size_t size) - : fInitialSize(size) - , fSize(size) - , fData(nullptr) - , fFreeFunction(nullptr) - , fHint(nullptr) - , fPmr(pmr) -{ - if (size) { - fData = fPmr->allocate(size); - assert(fData); - } -} - -Message::Message(pmr::memory_resource* pmr, const size_t size, Alignment /* alignment */) - : fInitialSize(size) - , fSize(size) - , fData(nullptr) - , fFreeFunction(nullptr) - , fHint(nullptr) - , fPmr(pmr) -{ - if (size) { - fData = fPmr->allocate(size); - assert(fData); - } -} - -Message::Message(pmr::memory_resource* pmr, - void* data, - const size_t size, - FreeFn* ffn, - void* hint) - : fInitialSize(size) - , fSize(size) - , fData(data) - , fFreeFunction(ffn) - , fHint(hint) - , fPmr(pmr) -{} - -Message::Message(pmr::memory_resource* /*pmr*/, - fair::mq::UnmanagedRegionPtr& /*region*/, - void* /*data*/, - const size_t /*size*/, - void* /*hint*/) -{ - throw MessageError{"Not yet implemented."}; -} - -auto Message::Rebuild() -> void -{ - if (fFreeFunction) { - fFreeFunction(fData, fHint); - } else { - if (fData) { - fPmr->deallocate(fData, fSize); - } - } - fData = nullptr; - fInitialSize = 0; - fSize = 0; - fFreeFunction = nullptr; - fHint = nullptr; -} - -auto Message::Rebuild(Alignment /* alignment */) -> void -{ - // TODO: implement alignment - Rebuild(); -} - -auto Message::Rebuild(size_t size) -> void -{ - if (fFreeFunction) { - fFreeFunction(fData, fHint); - } else { - if (fData) { - fPmr->deallocate(fData, fSize); - } - } - if (size) { - fData = fPmr->allocate(size); - assert(fData); - } else { - fData = nullptr; - } - fInitialSize = size; - fSize = size; - fFreeFunction = nullptr; - fHint = nullptr; -} - -auto Message::Rebuild(size_t size, Alignment /* alignment */) -> void -{ - // TODO: implement alignment - Rebuild(size); -} - -auto Message::Rebuild(void* /*data*/, size_t size, FreeFn* ffn, void* hint) -> void -{ - if (fFreeFunction) { - fFreeFunction(fData, fHint); - } else { - if (fData) { - fPmr->deallocate(fData, fSize); - } - } - if (size) { - fData = fPmr->allocate(size); - assert(fData); - } else { - fData = nullptr; - } - assert(fData); - fInitialSize = size; - fSize = size; - fFreeFunction = ffn; - fHint = hint; -} - -auto Message::GetData() const -> void* -{ - return fData; -} - -auto Message::GetSize() const -> size_t -{ - return fSize; -} - -auto Message::SetUsedSize(size_t size) -> bool -{ - if (size == fSize) { - return true; - } else if (size <= fSize) { - throw MessageError{"Message size shrinking not yet implemented."}; - } else { - throw MessageError{"Cannot grow message size."}; - } -} - -auto Message::Copy(const fair::mq::Message& /*msg*/) -> void -{ - throw MessageError{"Not yet implemented."}; -} - -Message::~Message() -{ - if (fFreeFunction) { - fFreeFunction(fData, fHint); - } else { - if (fData) { - fPmr->deallocate(fData, fSize); - } - } -} - -} // namespace fair::mq::ofi diff --git a/fairmq/ofi/Message.h b/fairmq/ofi/Message.h deleted file mode 100644 index 9f47dd84..00000000 --- a/fairmq/ofi/Message.h +++ /dev/null @@ -1,81 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2018-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ - -#ifndef FAIR_MQ_OFI_MESSAGE_H -#define FAIR_MQ_OFI_MESSAGE_H - -#include -#include -#include // size_t -#include -#include -#include -#include -#include - -namespace fair::mq::ofi -{ - -/** - * @class Message Message.h - * @brief - * - * @todo TODO insert long description - */ -class Message final : public fair::mq::Message -{ - public: - Message(std::pmr::memory_resource* pmr); - Message(std::pmr::memory_resource* pmr, Alignment alignment); - Message(std::pmr::memory_resource* pmr, size_t size); - Message(std::pmr::memory_resource* pmr, size_t size, Alignment alignment); - Message(std::pmr::memory_resource* pmr, - void* data, - size_t size, - FreeFn* ffn, - void* hint = nullptr); - Message(std::pmr::memory_resource* pmr, - fair::mq::UnmanagedRegionPtr& region, - void* data, - size_t size, - void* hint = 0); - - Message(const Message&) = delete; - Message(Message&&) = delete; - Message& operator=(const Message&) = delete; - Message& operator=(Message&&) = delete; - - auto Rebuild() -> void override; - auto Rebuild(Alignment alignment) -> void override; - auto Rebuild(size_t size) -> void override; - auto Rebuild(size_t size, Alignment alignment) -> void override; - auto Rebuild(void* data, size_t size, FreeFn* ffn, void* hint = nullptr) -> void override; - - auto GetData() const -> void* override; - auto GetSize() const -> size_t override; - - auto SetUsedSize(size_t size) -> bool override; - - auto GetType() const -> fair::mq::Transport override { return fair::mq::Transport::OFI; } - - auto Copy(const fair::mq::Message& msg) -> void override; - - ~Message() override; - - private: - size_t fInitialSize; - size_t fSize; - void* fData; - FreeFn* fFreeFunction; - void* fHint; - std::pmr::memory_resource* fPmr; -}; /* class Message */ - -} // namespace fair::mq::ofi - -#endif /* FAIR_MQ_OFI_MESSAGE_H */ diff --git a/fairmq/ofi/Socket.cxx b/fairmq/ofi/Socket.cxx deleted file mode 100644 index 5569b37b..00000000 --- a/fairmq/ofi/Socket.cxx +++ /dev/null @@ -1,680 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2018-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ - -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -namespace fair::mq::ofi -{ - -using namespace std; - -Socket::Socket(Context& context, const string& type, const string& name, const string& id /*= ""*/) - : fContext(context) - , fOfiInfo(nullptr) - , fOfiFabric(nullptr) - , fOfiDomain(nullptr) - , fPassiveEndpoint(nullptr) - , fDataEndpoint(nullptr) - , fControlEndpoint(nullptr) - , fId(id + "." + name + "." + type) - , fBytesTx(0) - , fBytesRx(0) - , fMessagesTx(0) - , fMessagesRx(0) - , fMultiPartRecvCounter(-1) - , fSendPushSem(fContext.GetIoContext(), 384) - , fSendPopSem(fContext.GetIoContext(), 0) - , fRecvPushSem(fContext.GetIoContext(), 384) - , fRecvPopSem(fContext.GetIoContext(), 0) - , fNeedOfiMemoryRegistration(false) -{ - if (type != "pair") { - throw SocketError{tools::ToString("Socket type '", type, "' not implemented for ofi transport.")}; - } -} - -auto Socket::InitOfi(Address addr) -> void -{ - if (!fOfiInfo) { - assert(!fOfiFabric); - assert(!fOfiDomain); - - asiofi::hints hints; - if (addr.Protocol == "tcp") { - hints.set_provider("sockets"); - } else if (addr.Protocol == "verbs") { - hints.set_provider("verbs"); - } - if (fRemoteAddr == addr) { - fOfiInfo = make_unique(addr.Ip.c_str(), std::to_string(addr.Port).c_str(), 0, hints); - } else { - fOfiInfo = make_unique(addr.Ip.c_str(), std::to_string(addr.Port).c_str(), FI_SOURCE, hints); - } - - LOG(debug) << "OFI transport (" << fId << "): " << *fOfiInfo; - - fOfiFabric = make_unique(*fOfiInfo); - - fOfiDomain = make_unique(*fOfiFabric); - } -} - -auto Socket::Bind(const string& addr) -> bool -try { - fLocalAddr = Context::VerifyAddress(addr); - if (fLocalAddr.Protocol == "verbs") { - fNeedOfiMemoryRegistration = true; - } - - InitOfi(fLocalAddr); - - fPassiveEndpoint = make_unique(fContext.GetIoContext(), *fOfiFabric); - //fPassiveEndpoint->set_local_address(Context::ConvertAddress(fLocalAddr)); - - BindControlEndpoint(); - - return true; -} -// TODO catch the correct ofi error -catch (const SilentSocketError& e) -{ - // do not print error in this case, this is handled by fair::mq::Device - // in case no connection could be established after trying a number of random ports from a range. - return false; -} -catch (const std::exception& e) -{ - LOG(error) << "OFI transport: " << e.what(); - return false; -} -catch (...) -{ - LOG(error) << "OFI transport: Unknown exception in ofi::Socket::Bind"; - return false; -} - -auto Socket::BindControlEndpoint() -> void -{ - assert(!fControlEndpoint); - - fPassiveEndpoint->listen([&](asiofi::info&& info) { - LOG(debug) << "OFI transport (" << fId - << "): control band connection request received. Accepting ..."; - fControlEndpoint = make_unique( - fContext.GetIoContext(), *fOfiDomain, info); - fControlEndpoint->enable(); - fControlEndpoint->accept([&]() { - LOG(debug) << "OFI transport (" << fId << "): control band connection accepted."; - - BindDataEndpoint(); - }); - }); - - LOG(debug) << "OFI transport (" << fId << "): control band bound to " << fLocalAddr; -} - -auto Socket::BindDataEndpoint() -> void -{ - assert(!fDataEndpoint); - - fPassiveEndpoint->listen([&](asiofi::info&& info) { - LOG(debug) << "OFI transport (" << fId - << "): data band connection request received. Accepting ..."; - fDataEndpoint = make_unique( - fContext.GetIoContext(), *fOfiDomain, info); - fDataEndpoint->enable(); - fDataEndpoint->accept([&]() { - LOG(debug) << "OFI transport (" << fId << "): data band connection accepted."; - - if (fContext.GetSizeHint()) { - asio::post(fContext.GetIoContext(), - std::bind(&Socket::SendQueueReaderStatic, this)); - asio::post(fContext.GetIoContext(), - std::bind(&Socket::RecvQueueReaderStatic, this)); - } else { - asio::post(fContext.GetIoContext(), - std::bind(&Socket::SendQueueReader, this)); - asio::post(fContext.GetIoContext(), - std::bind(&Socket::RecvControlQueueReader, this)); - } - }); - }); - - LOG(debug) << "OFI transport (" << fId << "): data band bound to " << fLocalAddr; -} - -auto Socket::Connect(const string& address) -> bool -try { - fRemoteAddr = Context::VerifyAddress(address); - if (fRemoteAddr.Protocol == "verbs") { - fNeedOfiMemoryRegistration = true; - } - - InitOfi(fRemoteAddr); - - ConnectEndpoint(fControlEndpoint, Band::Control); - ConnectEndpoint(fDataEndpoint, Band::Data); - - if (fContext.GetSizeHint()) { - asio::post(fContext.GetIoContext(), std::bind(&Socket::SendQueueReaderStatic, this)); - asio::post(fContext.GetIoContext(), std::bind(&Socket::RecvQueueReaderStatic, this)); - } else { - asio::post(fContext.GetIoContext(), std::bind(&Socket::SendQueueReader, this)); - asio::post(fContext.GetIoContext(), std::bind(&Socket::RecvControlQueueReader, this)); - } - - return true; -} -catch (const SilentSocketError& e) -{ - // do not print error in this case, this is handled by fair::mq::Device - return false; -} -catch (const std::exception& e) -{ - LOG(error) << "OFI transport: " << e.what(); - return false; -} -catch (...) -{ - LOG(error) << "OFI transport: Unknown exception in ofi::Socket::Connect"; - return false; -} - -auto Socket::ConnectEndpoint(std::unique_ptr& endpoint, Band type) -> void -{ - assert(!endpoint); - - std::string band(type == Band::Control ? "control" : "data"); - - endpoint = make_unique(fContext.GetIoContext(), *fOfiDomain); - endpoint->enable(); - - LOG(debug) << "OFI transport (" << fId << "): Sending " << band << " band connection request to " << fRemoteAddr; - - std::mutex mtx; - std::condition_variable cv; - bool notified(false), connected(false); - - while (true) { - endpoint->connect(Context::ConvertAddress(fRemoteAddr), [&, band](asiofi::eq::event event) { - // LOG(debug) << "OFI transport (" << fId << "): " << band << " band conn event happened"; - std::unique_lock lk2(mtx); - notified = true; - if (event == asiofi::eq::event::connected) { - LOG(debug) << "OFI transport (" << fId << "): " << band << " band connected."; - connected = true; - } else { - // LOG(debug) << "OFI transport (" << fId << "): " << band << " band connection refused. Trying again."; - } - lk2.unlock(); - cv.notify_one(); - }); - - { - std::unique_lock lk(mtx); - cv.wait(lk, [&] { return notified; }); - - if (connected) { - break; - } else { - notified = false; - lk.unlock(); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - } - } - } -} - -auto Socket::Send(MessagePtr& msg, int /*timeout*/) -> int64_t -{ - // timeout argument not yet implemented - - std::vector msgVec; - msgVec.reserve(1); - msgVec.emplace_back(std::move(msg)); - - return Send(msgVec); -} - -auto Socket::Send(std::vector& msgVec, int /*timeout*/) -> int64_t -try { - // timeout argument not yet implemented - - int size(0); - for (auto& msg : msgVec) { - size += msg->GetSize(); - } - - fSendPushSem.wait(); - { - std::lock_guard lk(fSendQueueMutex); - fSendQueue.emplace(std::move(msgVec)); - } - fSendPopSem.signal(); - - return size; -} catch (const std::exception& e) { - LOG(error) << e.what(); - return static_cast(TransferCode::error); -} - -auto Socket::SendQueueReader() -> void -{ - fSendPopSem.async_wait([&] { - // Read msg from send queue - std::unique_lock lk(fSendQueueMutex); - std::vector msgVec(std::move(fSendQueue.front())); - fSendQueue.pop(); - lk.unlock(); - - bool postMultiPartStartBuffer = msgVec.size() > 1; - for (auto& msg : msgVec) { - // Create control message - ofi::unique_ptr ctrl(nullptr); - if (postMultiPartStartBuffer) { - postMultiPartStartBuffer = false; - ctrl = MakeControlMessageWithPmr(fControlMemPool); - ctrl->msg.postMultiPartStartBuffer.numParts = msgVec.size(); - ctrl->msg.postMultiPartStartBuffer.size = msg->GetSize(); - } else { - ctrl = MakeControlMessageWithPmr(fControlMemPool); - ctrl->msg.postBuffer.size = msg->GetSize(); - } - - // Send control message - asio::mutable_buffer ctrlMsg(ctrl.get(), sizeof(ControlMessage)); - - if (fNeedOfiMemoryRegistration) { - asiofi::memory_region mr(*fOfiDomain, ctrlMsg, asiofi::mr::access::send); - auto desc = mr.desc(); - fControlEndpoint->send(ctrlMsg, - desc, - [&, ctrl2 = std::move(ctrlMsg), mr2 = std::move(mr)]( - asio::mutable_buffer) mutable {}); - } else { - fControlEndpoint->send( - ctrlMsg, [&, ctrl2 = std::move(ctrl)](asio::mutable_buffer) mutable {}); - } - - // Send data message - const auto size = msg->GetSize(); - - if (size) { - asio::mutable_buffer buffer(msg->GetData(), size); - - if (fNeedOfiMemoryRegistration) { - asiofi::memory_region mr(*fOfiDomain, buffer, asiofi::mr::access::send); - auto desc = mr.desc(); - - fDataEndpoint->send(buffer, - desc, - [&, size, msg2 = std::move(msg), mr2 = std::move(mr)]( - asio::mutable_buffer) mutable { - fBytesTx += size; - fMessagesTx++; - fSendPushSem.signal(); - }); - } else { - fDataEndpoint->send( - buffer, [&, size, msg2 = std::move(msg)](asio::mutable_buffer) mutable { - fBytesTx += size; - fMessagesTx++; - fSendPushSem.signal(); - }); - } - } else { - ++fMessagesTx; - fSendPushSem.signal(); - } - } - - asio::dispatch(fContext.GetIoContext(), std::bind(&Socket::SendQueueReader, this)); - }); -} - -auto Socket::SendQueueReaderStatic() -> void -{ - fSendPopSem.async_wait([&] { - // Read msg from send queue - std::unique_lock lk(fSendQueueMutex); - std::vector msgVec(std::move(fSendQueue.front())); - fSendQueue.pop(); - lk.unlock(); - - bool postMultiPartStartBuffer = msgVec.size() > 1; - if (postMultiPartStartBuffer) { - throw SocketError{tools::ToString("Multipart API not supported in static size mode.")}; - } - - MessagePtr& msg = msgVec[0]; - - // Send data message - const auto size = msg->GetSize(); - - if (size) { - asio::mutable_buffer buffer(msg->GetData(), size); - - if (fNeedOfiMemoryRegistration) { - asiofi::memory_region mr(*fOfiDomain, buffer, asiofi::mr::access::send); - auto desc = mr.desc(); - - fDataEndpoint->send(buffer, - desc, - [&, size, msg2 = std::move(msg), mr2 = std::move(mr)]( - asio::mutable_buffer) mutable { - fBytesTx += size; - fMessagesTx++; - fSendPushSem.signal(); - }); - } else { - fDataEndpoint->send( - buffer, [&, size, msg2 = std::move(msg)](asio::mutable_buffer) mutable { - fBytesTx += size; - fMessagesTx++; - fSendPushSem.signal(); - }); - } - } else { - ++fMessagesTx; - fSendPushSem.signal(); - } - - asio::dispatch(fContext.GetIoContext(), std::bind(&Socket::SendQueueReaderStatic, this)); - }); -} - -auto Socket::Receive(MessagePtr& msg, int /*timeout*/) -> int64_t -try { - // timeout argument not yet implemented - - fRecvPopSem.wait(); - { - std::lock_guard lk(fRecvQueueMutex); - msg = std::move(fRecvQueue.front().front()); - fRecvQueue.pop(); - } - fRecvPushSem.signal(); - - int size(msg->GetSize()); - fBytesRx += size; - ++fMessagesRx; - - return size; -} catch (const std::exception& e) { - LOG(error) << e.what(); - return static_cast(TransferCode::error); -} - -auto Socket::Receive(std::vector& msgVec, int /*timeout*/) -> int64_t -try { - // timeout argument not yet implemented - - fRecvPopSem.wait(); - { - std::lock_guard lk(fRecvQueueMutex); - msgVec = std::move(fRecvQueue.front()); - fRecvQueue.pop(); - } - fRecvPushSem.signal(); - - int64_t size(0); - for (auto& msg : msgVec) { - size += msg->GetSize(); - } - fBytesRx += size; - ++fMessagesRx; - - return size; -} catch (const std::exception& e) { - LOG(error) << e.what(); - return static_cast(TransferCode::error); -} - -auto Socket::RecvControlQueueReader() -> void -{ - fRecvPushSem.async_wait([&] { - // Receive control message - ofi::unique_ptr ctrl(MakeControlMessageWithPmr(fControlMemPool)); - asio::mutable_buffer ctrlMsg(ctrl.get(), sizeof(ControlMessage)); - - if (fNeedOfiMemoryRegistration) { - asiofi::memory_region mr(*fOfiDomain, ctrlMsg, asiofi::mr::access::recv); - auto desc = mr.desc(); - - fControlEndpoint->recv( - ctrlMsg, - desc, - [&, ctrl2 = std::move(ctrl), mr2 = std::move(mr)]( - asio::mutable_buffer) mutable { OnRecvControl(std::move(ctrl2)); }); - } else { - fControlEndpoint->recv( - ctrlMsg, [&, ctrl2 = std::move(ctrl)](asio::mutable_buffer) mutable { - OnRecvControl(std::move(ctrl2)); - }); - } - }); -} - -auto Socket::OnRecvControl(ofi::unique_ptr ctrl) -> void -{ - // Check control message type - auto size(0); - if (ctrl->type == ControlMessageType::PostMultiPartStartBuffer) { - size = ctrl->msg.postMultiPartStartBuffer.size; - if (fMultiPartRecvCounter == -1) { - fMultiPartRecvCounter = ctrl->msg.postMultiPartStartBuffer.numParts; - assert(fInflightMultiPartMessage.empty()); - fInflightMultiPartMessage.reserve(ctrl->msg.postMultiPartStartBuffer.numParts); - } else { - throw SocketError{tools::ToString( - "OFI transport: Received control start of new multi part message without completed " - "reception of previous multi part message. Number of parts missing: ", - fMultiPartRecvCounter)}; - } - } else if (ctrl->type == ControlMessageType::PostBuffer) { - size = ctrl->msg.postBuffer.size; - } else { - throw SocketError{tools::ToString("OFI transport: Unknown control message type: '", - static_cast(ctrl->type))}; - } - - // Receive data - auto msg = fContext.MakeReceiveMessage(size); - - if (size) { - asio::mutable_buffer buffer(msg->GetData(), size); - - if (fNeedOfiMemoryRegistration) { - asiofi::memory_region mr(*fOfiDomain, buffer, asiofi::mr::access::recv); - auto desc = mr.desc(); - - fDataEndpoint->recv( - buffer, - desc, - [&, msg2 = std::move(msg), mr2 = std::move(mr)]( - asio::mutable_buffer) mutable { DataMessageReceived(std::move(msg2)); }); - - } else { - fDataEndpoint->recv(buffer, - [&, msg2 = std::move(msg)](asio::mutable_buffer) mutable { - DataMessageReceived(std::move(msg2)); - }); - } - } else { - DataMessageReceived(std::move(msg)); - } - - asio::dispatch(fContext.GetIoContext(), - std::bind(&Socket::RecvControlQueueReader, this)); -} - -auto Socket::RecvQueueReaderStatic() -> void -{ - fRecvPushSem.async_wait([&] { - static size_t size = fContext.GetSizeHint(); - // Receive data - auto msg = fContext.MakeReceiveMessage(size); - - if (size) { - asio::mutable_buffer buffer(msg->GetData(), size); - - if (fNeedOfiMemoryRegistration) { - asiofi::memory_region mr(*fOfiDomain, buffer, asiofi::mr::access::recv); - auto desc = mr.desc(); - - fDataEndpoint->recv(buffer, - desc, - [&, msg2 = std::move(msg), mr2 = std::move(mr)]( - asio::mutable_buffer) mutable { - DataMessageReceived(std::move(msg2)); - }); - - } else { - fDataEndpoint->recv( - buffer, [&, msg2 = std::move(msg)](asio::mutable_buffer) mutable { - DataMessageReceived(std::move(msg2)); - }); - } - } else { - DataMessageReceived(std::move(msg)); - } - - asio::dispatch(fContext.GetIoContext(), - std::bind(&Socket::RecvQueueReaderStatic, this)); - }); -} - -auto Socket::DataMessageReceived(MessagePtr msg) -> void -{ - if (fMultiPartRecvCounter > 0) { - --fMultiPartRecvCounter; - fInflightMultiPartMessage.push_back(std::move(msg)); - } - - if (fMultiPartRecvCounter == 0) { - std::unique_lock lk(fRecvQueueMutex); - fRecvQueue.push(std::move(fInflightMultiPartMessage)); - lk.unlock(); - fMultiPartRecvCounter = -1; - fRecvPopSem.signal(); - } else if (fMultiPartRecvCounter == -1) { - std::vector msgVec; - msgVec.push_back(std::move(msg)); - std::unique_lock lk(fRecvQueueMutex); - fRecvQueue.push(std::move(msgVec)); - lk.unlock(); - fRecvPopSem.signal(); - } - -} - -auto Socket::Close() -> void {} - -auto Socket::SetOption(const string& /*option*/, const void* /*value*/, size_t /*valueSize*/) -> void -{ - // if (zmq_setsockopt(fControlSocket, GetConstant(option), value, valueSize) < 0) { - // throw SocketError{tools::ToString("Failed setting socket option, reason: ", zmq_strerror(errno))}; - // } -} - -auto Socket::GetOption(const string& /*option*/, void* /*value*/, size_t* /*valueSize*/) -> void -{ - // if (zmq_getsockopt(fControlSocket, GetConstant(option), value, valueSize) < 0) { - // throw SocketError{tools::ToString("Failed getting socket option, reason: ", zmq_strerror(errno))}; - // } -} - -void Socket::SetLinger(int /*value*/) -{ - LOG(debug) << "OFI transport (" << fId << "): Not yet implemented."; -} - -int Socket::GetLinger() const -{ - LOG(debug) << "OFI transport (" << fId << "): Not yet implemented."; - return 0; -} - -void Socket::SetSndBufSize(int /*value*/) -{ - LOG(debug) << "OFI transport (" << fId << "): Not yet implemented."; -} - -int Socket::GetSndBufSize() const -{ - LOG(debug) << "OFI transport (" << fId << "): Not yet implemented."; - return 0; -} - -void Socket::SetRcvBufSize(int /*value*/) -{ - LOG(debug) << "OFI transport (" << fId << "): Not yet implemented."; -} - -int Socket::GetRcvBufSize() const -{ - LOG(debug) << "OFI transport (" << fId << "): Not yet implemented."; - return 0; -} - -void Socket::SetSndKernelSize(int /*value*/) -{ - LOG(debug) << "OFI transport (" << fId << "): Not yet implemented."; -} - -int Socket::GetSndKernelSize() const -{ - LOG(debug) << "OFI transport (" << fId << "): Not yet implemented."; - return 0; -} - -void Socket::SetRcvKernelSize(int /*value*/) -{ - LOG(debug) << "OFI transport (" << fId << "): Not yet implemented."; -} - -int Socket::GetRcvKernelSize() const -{ - LOG(debug) << "OFI transport (" << fId << "): Not yet implemented."; - return 0; -} - -auto Socket::GetConstant(const string& /*constant*/) -> int -{ - LOG(debug) << "OFI transport: Not yet implemented."; - return -1; -} - -Socket::~Socket() -{ - try { - Close(); // NOLINT(clang-analyzer-optin.cplusplus.VirtualCall) - } catch (SocketError& e) { - LOG(error) << e.what(); - } -} - -} // namespace fair::mq::ofi diff --git a/fairmq/ofi/Socket.h b/fairmq/ofi/Socket.h deleted file mode 100644 index 7b36b0fc..00000000 --- a/fairmq/ofi/Socket.h +++ /dev/null @@ -1,125 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2018-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ - -#ifndef FAIR_MQ_OFI_SOCKET_H -#define FAIR_MQ_OFI_SOCKET_H - -#include -#include -#include -#include - -#include -#include -#include -#include -#include // unique_ptr -#include - - -namespace fair::mq::ofi -{ - -/** - * @class Socket Socket.h - * @brief - * - * @todo TODO insert long description - */ -class Socket final : public fair::mq::Socket -{ - public: - Socket(Context& context, const std::string& type, const std::string& name, const std::string& id = ""); - Socket(const Socket&) = delete; - Socket(Socket&&) = delete; - Socket& operator=(const Socket&) = delete; - Socket& operator=(Socket&&) = delete; - - auto GetId() const -> std::string override { return fId; } - - auto Events(uint32_t *events) -> int override { *events = 0; return -1; } - auto Bind(const std::string& address) -> bool override; - auto Connect(const std::string& address) -> bool override; - - auto Send(MessagePtr& msg, int timeout = 0) -> int64_t override; - auto Receive(MessagePtr& msg, int timeout = 0) -> int64_t override; - auto Send(std::vector& msgVec, int timeout = 0) -> int64_t override; - auto Receive(std::vector& msgVec, int timeout = 0) -> int64_t override; - - auto GetSocket() const -> void* { return nullptr; } - - void SetLinger(int value) override; - int GetLinger() const override; - void SetSndBufSize(int value) override; - int GetSndBufSize() const override; - void SetRcvBufSize(int value) override; - int GetRcvBufSize() const override; - void SetSndKernelSize(int value) override; - int GetSndKernelSize() const override; - void SetRcvKernelSize(int value) override; - int GetRcvKernelSize() const override; - - auto Close() -> void override; - - auto SetOption(const std::string& option, const void* value, size_t valueSize) -> void override; - auto GetOption(const std::string& option, void* value, size_t* valueSize) -> void override; - - auto GetBytesTx() const -> unsigned long override { return fBytesTx; } - auto GetBytesRx() const -> unsigned long override { return fBytesRx; } - auto GetMessagesTx() const -> unsigned long override { return fMessagesTx; } - auto GetMessagesRx() const -> unsigned long override { return fMessagesRx; } - - auto GetNumberOfConnectedPeers() const -> unsigned long override - { - throw SocketError("not yet implemented"); - } - - static auto GetConstant(const std::string& constant) -> int; - - ~Socket() override; - - private: - Context& fContext; - asiofi::allocated_pool_resource fControlMemPool; - std::unique_ptr fOfiInfo; - std::unique_ptr fOfiFabric; - std::unique_ptr fOfiDomain; - std::unique_ptr fPassiveEndpoint; - std::unique_ptr fDataEndpoint, fControlEndpoint; - std::string fId; - std::atomic fBytesTx; - std::atomic fBytesRx; - std::atomic fMessagesTx; - std::atomic fMessagesRx; - Address fRemoteAddr; - Address fLocalAddr; - std::mutex fSendQueueMutex, fRecvQueueMutex; - std::queue> fSendQueue, fRecvQueue; - std::vector fInflightMultiPartMessage; - int64_t fMultiPartRecvCounter; - asiofi::synchronized_semaphore fSendPushSem, fSendPopSem, fRecvPushSem, fRecvPopSem; - std::atomic fNeedOfiMemoryRegistration; - - auto InitOfi(Address addr) -> void; - auto BindControlEndpoint() -> void; - auto BindDataEndpoint() -> void; - enum class Band { Control, Data }; - auto ConnectEndpoint(std::unique_ptr& endpoint, Band type) -> void; - auto SendQueueReader() -> void; - auto SendQueueReaderStatic() -> void; - auto RecvControlQueueReader() -> void; - auto RecvQueueReaderStatic() -> void; - auto OnRecvControl(ofi::unique_ptr ctrl) -> void; - auto DataMessageReceived(MessagePtr msg) -> void; -}; /* class Socket */ - -struct SilentSocketError : SocketError { using SocketError::SocketError; }; - -} // namespace fair::mq::ofi - -#endif /* FAIR_MQ_OFI_SOCKET_H */ diff --git a/fairmq/ofi/TransportFactory.h b/fairmq/ofi/TransportFactory.h deleted file mode 100644 index 5656ce7d..00000000 --- a/fairmq/ofi/TransportFactory.h +++ /dev/null @@ -1,218 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2018-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ - -#ifndef FAIR_MQ_OFI_TRANSPORTFACTORY_H -#define FAIR_MQ_OFI_TRANSPORTFACTORY_H - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -namespace fair::mq::ofi { - -/** - * @class TransportFactory TransportFactory.h - * @brief FairMQ transport factory for the ofi transport - * - * @todo TODO insert long description - */ -struct TransportFactory final : mq::TransportFactory -{ - TransportFactory(std::string const& id = "", ProgOptions const* config = nullptr) - : mq::TransportFactory(id) - , fContext(*this, *this, 1) - { - try { - LOG(debug) << "OFI transport: asiofi (" << fContext.GetAsiofiVersion() << ")"; - - if (config) { - fContext.SetSizeHint(config->GetProperty("ofi-size-hint", 0)); - } - } catch (ContextError& e) { - throw TransportFactoryError(e.what()); - } - } - - TransportFactory(const TransportFactory&) = delete; - TransportFactory(TransportFactory&&) = delete; - TransportFactory& operator=(const TransportFactory&) = delete; - TransportFactory& operator=(TransportFactory&&) = delete; - ~TransportFactory() override = default; - - auto CreateMessage() -> std::unique_ptr override - { - return std::make_unique(&fMemoryResource); - } - - auto CreateMessage(Alignment /*alignment*/) -> std::unique_ptr override - { - // TODO Do not ignore alignment - return std::make_unique(&fMemoryResource); - } - - auto CreateMessage(std::size_t size) -> std::unique_ptr override - { - return std::make_unique(&fMemoryResource, size); - } - - auto CreateMessage(std::size_t size, Alignment /*alignment*/) - -> std::unique_ptr override - { - // TODO Do not ignore alignment - return std::make_unique(&fMemoryResource, size); - } - - auto CreateMessage(void* data, std::size_t size, FreeFn* ffn, void* hint = nullptr) - -> std::unique_ptr override - { - return std::make_unique(&fMemoryResource, data, size, ffn, hint); - } - - auto CreateMessage(std::unique_ptr& region, - void* data, - std::size_t size, - void* hint = nullptr) -> std::unique_ptr override - { - return std::make_unique(&fMemoryResource, region, data, size, hint); - } - - auto CreateSocket(std::string const& type, std::string const& name) - -> std::unique_ptr override - { - return std::make_unique(fContext, type, name, GetId()); - } - - auto CreatePoller(std::vector const& /*channels*/) const - -> std::unique_ptr override - { - throw std::runtime_error("Not yet implemented (Poller)."); - } - - auto CreatePoller(std::vector const& /*channels*/) const - -> std::unique_ptr override - { - throw std::runtime_error("Not yet implemented (Poller)."); - } - - auto CreatePoller( - std::unordered_map> const& /*channelsMap*/, - std::vector const& /*channelList*/) const - -> std::unique_ptr override - { - throw std::runtime_error("Not yet implemented (Poller)."); - } - - auto CreateUnmanagedRegion(std::size_t /*size*/, - RegionCallback /*callback = nullptr*/, - std::string const& /*path = ""*/, - int /*flags = 0*/, - RegionConfig /*cfg = RegionConfig()*/) - -> std::unique_ptr override - { - throw std::runtime_error("Not yet implemented UMR."); - } - - auto CreateUnmanagedRegion(std::size_t /*size*/, - RegionBulkCallback /*callback = nullptr*/, - std::string const& /*path = ""*/, - int /*flags = 0*/, - RegionConfig /*cfg = RegionConfig()*/) - -> std::unique_ptr override - { - throw std::runtime_error("Not yet implemented UMR."); - } - - auto CreateUnmanagedRegion(std::size_t /*size*/, - int64_t /*userFlags*/, - RegionCallback /*callback = nullptr*/, - std::string const& /*path = ""*/, - int /*flags = 0*/, - RegionConfig /*cfg = RegionConfig()*/) - -> std::unique_ptr override - { - throw std::runtime_error("Not yet implemented UMR."); - } - - auto CreateUnmanagedRegion(std::size_t /*size*/, - int64_t /*userFlags*/, - RegionBulkCallback /*callback = nullptr*/, - std::string const& /*path = ""*/, - int /*flags = 0*/, - RegionConfig /*cfg = RegionConfig()*/) - -> std::unique_ptr override - { - throw std::runtime_error("Not yet implemented UMR."); - } - - auto CreateUnmanagedRegion(std::size_t /*size*/, - RegionCallback /*callback*/, - RegionConfig /*cfg*/) - -> std::unique_ptr override - { - throw std::runtime_error("Not yet implemented UMR."); - } - - auto CreateUnmanagedRegion(std::size_t /*size*/, - RegionBulkCallback /*callback*/, - RegionConfig /*cfg*/) - -> std::unique_ptr override - { - throw std::runtime_error("Not yet implemented UMR."); - } - - auto SubscribeToRegionEvents(RegionEventCallback /*callback*/) -> void override - { - throw std::runtime_error("Not yet implemented."); - } - - auto SubscribedToRegionEvents() -> bool override - { - throw std::runtime_error("Not yet implemented."); - } - - auto UnsubscribeFromRegionEvents() -> void override - { - throw std::runtime_error("Not yet implemented."); - } - - auto GetRegionInfo() -> std::vector override - { - LOG(error) << "GetRegionInfo not yet implemented for OFI, returning empty vector"; - return std::vector(); - } - - auto GetType() const -> Transport override { return Transport::OFI; } - - void Interrupt() override { fContext.Interrupt(); } - void Resume() override { fContext.Resume(); } - void Reset() override { fContext.Reset(); } - - private: - mutable Context fContext; - asiofi::allocated_pool_resource fMemoryResource; -}; /* class TransportFactory */ - -} // namespace fair::mq::ofi - -#endif /* FAIR_MQ_OFI_TRANSPORTFACTORY_H */ diff --git a/fairmq/plugins/config/Config.cxx b/fairmq/plugins/config/Config.cxx index 1abe2495..5d470825 100644 --- a/fairmq/plugins/config/Config.cxx +++ b/fairmq/plugins/config/Config.cxx @@ -1,5 +1,5 @@ /******************************************************************************** - * Copyright (C) 2017-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * Copyright (C) 2017-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence (LGPL) version 3, * @@ -76,7 +76,6 @@ Plugin::ProgOptions ConfigPluginProgramOptions() ("bad-alloc-attempt-interval", po::value()->default_value(50), "Interval between attempts if cannot allocate a message (in ms).") ("shm-monitor", po::value()->default_value(false), "Shared memory: run monitor daemon.") ("shm-no-cleanup", po::value()->default_value(false), "Shared memory: do not cleanup the memory when last device leaves.") - ("ofi-size-hint", po::value()->default_value(0), "EXPERIMENTAL: OFI size hint for the allocator.") ("rate", po::value()->default_value(0.), "Rate for conditional run loop (Hz).") ("session", po::value()->default_value("default"), "Session name.") ("config-key", po::value(), "Use provided value instead of device id for fetching the configuration from JSON file.") diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 56953234..d02ddef8 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -16,10 +16,6 @@ if(FairLogger_VERSION VERSION_LESS 1.9.0 AND FairLogger_VERSION VERSION_GREATER_ LIST(APPEND definitions FAIR_MIN_SEVERITY=trace) endif() -if(BUILD_OFI_TRANSPORT) - LIST(APPEND definitions BUILD_OFI_TRANSPORT) -endif() - if(definitions) set(definitions DEFINITIONS ${definitions}) endif() diff --git a/test/ci/Containerfile.ubuntu b/test/ci/Containerfile.ubuntu index 8fa3543e..6ab79fe1 100644 --- a/test/ci/Containerfile.ubuntu +++ b/test/ci/Containerfile.ubuntu @@ -22,11 +22,6 @@ RUN cmake -GNinja -S FairCMakeModules -B FairCMakeModules_build -DCMAKE_INSTALL_ RUN cmake --build FairCMakeModules_build --target install RUN rm -rf FairCMakeModules FairCMakeModules_build -RUN git clone -b v0.5.1 --recurse-submodules https://github.com/FairRootGroup/asiofi -RUN cmake -GNinja -S asiofi -B asiofi_build -DCMAKE_INSTALL_PREFIX=/usr -DCMAKE_BUILD_TYPE=Release -RUN cmake --build asiofi_build --target install -RUN rm -rf asiofi asiofi_build - RUN git clone -b v1.11.0 https://github.com/FairRootGroup/FairLogger RUN cmake -GNinja -S FairLogger -B FairLogger_build -DCMAKE_INSTALL_PREFIX=/usr -DCMAKE_BUILD_TYPE=Release -DUSE_EXTERNAL_FMT=ON RUN cmake --build FairLogger_build --target install diff --git a/test/protocols/_pair.cxx b/test/protocols/_pair.cxx index 4ce84954..835fb3d3 100644 --- a/test/protocols/_pair.cxx +++ b/test/protocols/_pair.cxx @@ -1,5 +1,5 @@ /******************************************************************************** - * Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * Copyright (C) 2018-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence (LGPL) version 3, * @@ -28,11 +28,6 @@ auto RunPair(string transport) -> void string ipcFile("/tmp/fmq_" + to_string(session) + "_data_" + transport); string address("ipc://" + ipcFile); - // ofi does not run with ipc:// - if (transport == "ofi") { - address = "tcp://127.0.0.1:5957"; - } - auto pairleft = execute_result{"", 100}; thread pairleft_thread([&]() { stringstream cmd; @@ -80,11 +75,4 @@ TEST(Pair, SingleMsg_MP_tcp_shmem) EXPECT_EXIT(RunPair("shmem"), ::testing::ExitedWithCode(0), "PAIR test successfull"); } -#ifdef BUILD_OFI_TRANSPORT -TEST(Pair, SingleMsg_MP_tcp_ofi) -{ - EXPECT_EXIT(RunPair("ofi"), ::testing::ExitedWithCode(0), "PAIR test successfull"); -} -#endif /* BUILD_OFI_TRANSPORT */ - } // namespace