mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-15 17:41:45 +00:00
Compare commits
14 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
c60dd9965c | ||
|
79ca436b74 | ||
|
36d4f3c937 | ||
|
bdf895ae9e | ||
|
42986e664c | ||
|
dd47b34e06 | ||
|
a59c902c74 | ||
|
dabc48c21a | ||
|
236d5a8608 | ||
|
5a782e8726 | ||
|
5008fa4732 | ||
|
b5bb476b0d | ||
|
ea7ae04025 | ||
|
02692e7002 |
3
.gitmodules
vendored
3
.gitmodules
vendored
@@ -4,3 +4,6 @@
|
|||||||
[submodule "extern/asio"]
|
[submodule "extern/asio"]
|
||||||
path = extern/asio
|
path = extern/asio
|
||||||
url = https://github.com/chriskohlhoff/asio
|
url = https://github.com/chriskohlhoff/asio
|
||||||
|
[submodule "extern/PicoSHA2"]
|
||||||
|
path = extern/PicoSHA2
|
||||||
|
url = https://github.com/okdshin/PicoSHA2
|
||||||
|
@@ -146,6 +146,8 @@ if(BUILD_FAIRMQ)
|
|||||||
find_package2(PRIVATE ZeroMQ REQUIRED
|
find_package2(PRIVATE ZeroMQ REQUIRED
|
||||||
VERSION 4.1.4
|
VERSION 4.1.4
|
||||||
)
|
)
|
||||||
|
build_bundled(PicoSHA2 extern/PicoSHA2)
|
||||||
|
find_package2(PRIVATE PicoSHA2 REQUIRED)
|
||||||
endif()
|
endif()
|
||||||
|
|
||||||
if(BUILD_TESTING)
|
if(BUILD_TESTING)
|
||||||
|
23
COPYRIGHT
23
COPYRIGHT
@@ -23,6 +23,10 @@ Files: extern/asio
|
|||||||
Copyright: 2003-2019, Christopher M. Kohlhoff (chris at kohlhoff dot com)
|
Copyright: 2003-2019, Christopher M. Kohlhoff (chris at kohlhoff dot com)
|
||||||
License: BSL-1.0
|
License: BSL-1.0
|
||||||
|
|
||||||
|
Files: extern/PicoSHA2
|
||||||
|
Copyright: 2017 okdshin
|
||||||
|
License: MIT
|
||||||
|
|
||||||
License: LGPL-3.0-only
|
License: LGPL-3.0-only
|
||||||
[see LICENSE file]
|
[see LICENSE file]
|
||||||
|
|
||||||
@@ -102,3 +106,22 @@ License: BSL-1.0
|
|||||||
FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
|
FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
|
||||||
ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||||
DEALINGS IN THE SOFTWARE.
|
DEALINGS IN THE SOFTWARE.
|
||||||
|
|
||||||
|
License: MIT
|
||||||
|
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||||
|
of this software and associated documentation files (the "Software"), to deal
|
||||||
|
in the Software without restriction, including without limitation the rights
|
||||||
|
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||||
|
copies of the Software, and to permit persons to whom the Software is
|
||||||
|
furnished to do so, subject to the following conditions:
|
||||||
|
|
||||||
|
The above copyright notice and this permission notice shall be included in all
|
||||||
|
copies or substantial portions of the Software.
|
||||||
|
|
||||||
|
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||||
|
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||||
|
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||||
|
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||||
|
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||||
|
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||||
|
SOFTWARE.
|
||||||
|
@@ -498,6 +498,8 @@ function(build_bundled package bundle)
|
|||||||
set(${package}_BUILD_INCLUDE_DIR ${${package}_SOURCE_DIR}/asio/include CACHE PATH "Bundled ${package} build-interface include dir")
|
set(${package}_BUILD_INCLUDE_DIR ${${package}_SOURCE_DIR}/asio/include CACHE PATH "Bundled ${package} build-interface include dir")
|
||||||
set(${package}_INSTALL_INCLUDE_DIR ${PROJECT_INSTALL_INCDIR}/bundled CACHE PATH "Bundled ${package} install-interface include dir")
|
set(${package}_INSTALL_INCLUDE_DIR ${PROJECT_INSTALL_INCDIR}/bundled CACHE PATH "Bundled ${package} install-interface include dir")
|
||||||
set(${package}_ROOT ${${package}_SOURCE_DIR}/asio)
|
set(${package}_ROOT ${${package}_SOURCE_DIR}/asio)
|
||||||
|
elseif(${package} STREQUAL PicoSHA2)
|
||||||
|
set(${package}_ROOT ${${package}_SOURCE_DIR})
|
||||||
endif()
|
endif()
|
||||||
|
|
||||||
string(TOUPPER ${package} package_upper)
|
string(TOUPPER ${package} package_upper)
|
||||||
|
21
cmake/FindPicoSHA2.cmake
Normal file
21
cmake/FindPicoSHA2.cmake
Normal file
@@ -0,0 +1,21 @@
|
|||||||
|
################################################################################
|
||||||
|
# Copyright (C) 2020 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
|
||||||
|
# #
|
||||||
|
# This software is distributed under the terms of the #
|
||||||
|
# GNU Lesser General Public Licence (LGPL) version 3, #
|
||||||
|
# copied verbatim in the file "LICENSE" #
|
||||||
|
################################################################################
|
||||||
|
|
||||||
|
find_path(PicoSHA2_INCLUDE_DIR NAMES picosha2.h)
|
||||||
|
|
||||||
|
include(FindPackageHandleStandardArgs)
|
||||||
|
find_package_handle_standard_args(PicoSHA2
|
||||||
|
REQUIRED_VARS PicoSHA2_INCLUDE_DIR
|
||||||
|
)
|
||||||
|
|
||||||
|
if(PicoSHA2_FOUND)
|
||||||
|
add_library(PicoSHA2 INTERFACE IMPORTED)
|
||||||
|
set_target_properties(PicoSHA2 PROPERTIES
|
||||||
|
INTERFACE_INCLUDE_DIRECTORIES "${PicoSHA2_INCLUDE_DIR}"
|
||||||
|
)
|
||||||
|
endif()
|
1
extern/PicoSHA2
vendored
Submodule
1
extern/PicoSHA2
vendored
Submodule
Submodule extern/PicoSHA2 added at 599843c396
@@ -325,6 +325,7 @@ if(BUILD_FAIRMQ)
|
|||||||
|
|
||||||
PRIVATE # only libFairMQ links against private dependencies
|
PRIVATE # only libFairMQ links against private dependencies
|
||||||
libzmq
|
libzmq
|
||||||
|
PicoSHA2
|
||||||
${OFI_DEPS}
|
${OFI_DEPS}
|
||||||
)
|
)
|
||||||
set_target_properties(${_target} PROPERTIES
|
set_target_properties(${_target} PROPERTIES
|
||||||
@@ -377,6 +378,7 @@ if(BUILD_FAIRMQ)
|
|||||||
Boost::boost
|
Boost::boost
|
||||||
Boost::date_time
|
Boost::date_time
|
||||||
Boost::program_options
|
Boost::program_options
|
||||||
|
PicoSHA2
|
||||||
)
|
)
|
||||||
target_include_directories(fairmq-shmmonitor PUBLIC
|
target_include_directories(fairmq-shmmonitor PUBLIC
|
||||||
$<BUILD_INTERFACE:${CMAKE_SOURCE_DIR}>
|
$<BUILD_INTERFACE:${CMAKE_SOURCE_DIR}>
|
||||||
|
@@ -15,8 +15,7 @@
|
|||||||
#include <fairmq/FairMQTransportFactory.h>
|
#include <fairmq/FairMQTransportFactory.h>
|
||||||
#include <fairmq/MemoryResources.h>
|
#include <fairmq/MemoryResources.h>
|
||||||
|
|
||||||
void *fair::mq::ChannelResource::do_allocate(std::size_t bytes, std::size_t /*alignment*/)
|
void *fair::mq::ChannelResource::do_allocate(std::size_t bytes, std::size_t alignment)
|
||||||
{
|
{
|
||||||
return setMessage(factory->CreateMessage(bytes));
|
return setMessage(factory->CreateMessage(bytes, fair::mq::Alignment{alignment}));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -117,16 +117,24 @@ void ProgOptions::ParseAll(const int argc, char const* const* argv, bool allowUn
|
|||||||
// clear the container because it was filled with default values and subsequent calls to store() do not overwrite the existing values
|
// clear the container because it was filled with default values and subsequent calls to store() do not overwrite the existing values
|
||||||
fVarMap.clear();
|
fVarMap.clear();
|
||||||
|
|
||||||
if (allowUnregistered) {
|
|
||||||
po::command_line_parser parser(argc, argv);
|
|
||||||
parser.options(fAllOptions).allow_unregistered();
|
|
||||||
po::parsed_options parsed = parser.run();
|
|
||||||
fUnregisteredOptions = po::collect_unrecognized(parsed.options, po::include_positional);
|
|
||||||
|
|
||||||
po::store(parsed, fVarMap);
|
po::command_line_parser parser(argc, argv);
|
||||||
} else {
|
|
||||||
po::store(po::parse_command_line(argc, argv, fAllOptions), fVarMap);
|
parser.options(fAllOptions);
|
||||||
|
|
||||||
|
if (allowUnregistered) {
|
||||||
|
parser.allow_unregistered();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
using namespace po::command_line_style;
|
||||||
|
style_t style = style_t(allow_short | short_allow_adjacent | short_allow_next | allow_long | long_allow_adjacent | long_allow_next | allow_sticky | allow_dash_for_short);
|
||||||
|
parser.style(style);
|
||||||
|
|
||||||
|
po::parsed_options parsed = parser.run();
|
||||||
|
|
||||||
|
fUnregisteredOptions = po::collect_unrecognized(parsed.options, po::include_positional);
|
||||||
|
|
||||||
|
po::store(parsed, fVarMap);
|
||||||
}
|
}
|
||||||
|
|
||||||
void ProgOptions::Notify()
|
void ProgOptions::Notify()
|
||||||
|
@@ -36,6 +36,7 @@ namespace
|
|||||||
|
|
||||||
extern "C" auto sigterm_handler(int signal) -> void
|
extern "C" auto sigterm_handler(int signal) -> void
|
||||||
{
|
{
|
||||||
|
++gSignalCount;
|
||||||
gLastSignal = signal;
|
gLastSignal = signal;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -68,7 +68,7 @@ Plugin::ProgOptions ConfigPluginProgramOptions()
|
|||||||
("init-timeout", po::value<int >()->default_value(120), "Timeout for the initialization in seconds (when expecting dynamic initialization).")
|
("init-timeout", po::value<int >()->default_value(120), "Timeout for the initialization in seconds (when expecting dynamic initialization).")
|
||||||
("max-run-time", po::value<uint64_t >()->default_value(0), "Maximum runtime for the Running state handler, after which state will change to Ready (in seconds, 0 for no limit).")
|
("max-run-time", po::value<uint64_t >()->default_value(0), "Maximum runtime for the Running state handler, after which state will change to Ready (in seconds, 0 for no limit).")
|
||||||
("print-channels", po::value<bool >()->implicit_value(true), "Print registered channel endpoints in a machine-readable format (<channel name>:<min num subchannels>:<max num subchannels>)")
|
("print-channels", po::value<bool >()->implicit_value(true), "Print registered channel endpoints in a machine-readable format (<channel name>:<min num subchannels>:<max num subchannels>)")
|
||||||
("shm-segment-size", po::value<size_t >()->default_value(2000000000), "Shared memory: size of the shared memory segment (in bytes).")
|
("shm-segment-size", po::value<size_t >()->default_value(2ULL << 30), "Shared memory: size of the shared memory segment (in bytes).")
|
||||||
("shm-throw-bad-alloc", po::value<bool >()->default_value(true), "Throw a fair::mq::MessageBadAlloc if cannot allocate a message (retry if false).")
|
("shm-throw-bad-alloc", po::value<bool >()->default_value(true), "Throw a fair::mq::MessageBadAlloc if cannot allocate a message (retry if false).")
|
||||||
("shm-monitor", po::value<bool >()->default_value(true), "Shared memory: run monitor daemon.")
|
("shm-monitor", po::value<bool >()->default_value(true), "Shared memory: run monitor daemon.")
|
||||||
("ofi-size-hint", po::value<size_t >()->default_value(0), "EXPERIMENTAL: OFI size hint for the allocator.")
|
("ofi-size-hint", po::value<size_t >()->default_value(0), "EXPERIMENTAL: OFI size hint for the allocator.")
|
||||||
|
@@ -8,6 +8,8 @@
|
|||||||
#ifndef FAIR_MQ_SHMEM_COMMON_H_
|
#ifndef FAIR_MQ_SHMEM_COMMON_H_
|
||||||
#define FAIR_MQ_SHMEM_COMMON_H_
|
#define FAIR_MQ_SHMEM_COMMON_H_
|
||||||
|
|
||||||
|
#include <picosha2.h>
|
||||||
|
|
||||||
#include <atomic>
|
#include <atomic>
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <unordered_map>
|
#include <unordered_map>
|
||||||
@@ -110,9 +112,9 @@ struct RegionBlock
|
|||||||
// a hash of user id + session id, truncated to 8 characters (to accommodate for name size limit on some systems (MacOS)).
|
// a hash of user id + session id, truncated to 8 characters (to accommodate for name size limit on some systems (MacOS)).
|
||||||
inline std::string buildShmIdFromSessionIdAndUserId(const std::string& sessionId)
|
inline std::string buildShmIdFromSessionIdAndUserId(const std::string& sessionId)
|
||||||
{
|
{
|
||||||
boost::hash<std::string> stringHash;
|
std::string seed((std::to_string(geteuid()) + sessionId));
|
||||||
std::string shmId(std::to_string(stringHash(std::string((std::to_string(geteuid()) + sessionId)))));
|
std::string shmId = picosha2::hash256_hex_string(seed);
|
||||||
shmId.resize(8, '_');
|
shmId.resize(10, '_');
|
||||||
return shmId;
|
return shmId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -109,7 +109,7 @@ class Message final : public fair::mq::Message
|
|||||||
, fRegionPtr(nullptr)
|
, fRegionPtr(nullptr)
|
||||||
, fLocalPtr(static_cast<char*>(data))
|
, fLocalPtr(static_cast<char*>(data))
|
||||||
{
|
{
|
||||||
if (reinterpret_cast<const char*>(data) >= reinterpret_cast<const char*>(region->GetData()) ||
|
if (reinterpret_cast<const char*>(data) >= reinterpret_cast<const char*>(region->GetData()) &&
|
||||||
reinterpret_cast<const char*>(data) <= reinterpret_cast<const char*>(region->GetData()) + region->GetSize()) {
|
reinterpret_cast<const char*>(data) <= reinterpret_cast<const char*>(region->GetData()) + region->GetSize()) {
|
||||||
fMeta.fHandle = (boost::interprocess::managed_shared_memory::handle_t)(reinterpret_cast<const char*>(data) - reinterpret_cast<const char*>(region->GetData()));
|
fMeta.fHandle = (boost::interprocess::managed_shared_memory::handle_t)(reinterpret_cast<const char*>(data) - reinterpret_cast<const char*>(region->GetData()));
|
||||||
} else {
|
} else {
|
||||||
|
@@ -505,6 +505,8 @@ class Socket final : public fair::mq::Socket
|
|||||||
if (constant == "no-block") return ZMQ_DONTWAIT;
|
if (constant == "no-block") return ZMQ_DONTWAIT;
|
||||||
if (constant == "snd-more no-block") return ZMQ_DONTWAIT|ZMQ_SNDMORE;
|
if (constant == "snd-more no-block") return ZMQ_DONTWAIT|ZMQ_SNDMORE;
|
||||||
|
|
||||||
|
if (constant == "fd") return ZMQ_FD;
|
||||||
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -155,9 +155,17 @@ class Message final : public fair::mq::Message
|
|||||||
void* GetData() const override
|
void* GetData() const override
|
||||||
{
|
{
|
||||||
if (!fViewMsg) {
|
if (!fViewMsg) {
|
||||||
return zmq_msg_data(fMsg.get());
|
if (zmq_msg_size(fMsg.get()) > 0) {
|
||||||
|
return zmq_msg_data(fMsg.get());
|
||||||
|
} else {
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
return zmq_msg_data(fViewMsg.get());
|
if (zmq_msg_size(fViewMsg.get()) > 0) {
|
||||||
|
return zmq_msg_data(fViewMsg.get());
|
||||||
|
} else {
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -459,6 +459,8 @@ class Socket final : public fair::mq::Socket
|
|||||||
|
|
||||||
if (constant == "linger") return ZMQ_LINGER;
|
if (constant == "linger") return ZMQ_LINGER;
|
||||||
|
|
||||||
|
if (constant == "fd") return ZMQ_FD;
|
||||||
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -91,6 +91,31 @@ void Alignment(const string& transport)
|
|||||||
ASSERT_EQ(reinterpret_cast<uintptr_t>(msg->GetData()) % 64, 0);
|
ASSERT_EQ(reinterpret_cast<uintptr_t>(msg->GetData()) % 64, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void EmptyMessage(const string& transport, const string& _address)
|
||||||
|
{
|
||||||
|
size_t session{fair::mq::tools::UuidHash()};
|
||||||
|
std::string address(fair::mq::tools::ToString(_address, "_", transport));
|
||||||
|
|
||||||
|
fair::mq::ProgOptions config;
|
||||||
|
config.SetProperty<string>("session", to_string(session));
|
||||||
|
|
||||||
|
auto factory = FairMQTransportFactory::CreateTransportFactory(transport, fair::mq::tools::Uuid(), &config);
|
||||||
|
|
||||||
|
FairMQChannel push{"Push", "push", factory};
|
||||||
|
push.Bind(address);
|
||||||
|
|
||||||
|
FairMQChannel pull{"Pull", "pull", factory};
|
||||||
|
pull.Connect(address);
|
||||||
|
|
||||||
|
FairMQMessagePtr outMsg(push.NewMessage());
|
||||||
|
ASSERT_EQ(outMsg->GetData(), nullptr);
|
||||||
|
ASSERT_EQ(push.Send(outMsg), 0);
|
||||||
|
|
||||||
|
FairMQMessagePtr inMsg(pull.NewMessage());
|
||||||
|
ASSERT_EQ(pull.Receive(inMsg), 0);
|
||||||
|
ASSERT_EQ(inMsg->GetData(), nullptr);
|
||||||
|
}
|
||||||
|
|
||||||
TEST(Resize, zeromq)
|
TEST(Resize, zeromq)
|
||||||
{
|
{
|
||||||
RunPushPullWithMsgResize("zeromq", "ipc://test_message_resize");
|
RunPushPullWithMsgResize("zeromq", "ipc://test_message_resize");
|
||||||
@@ -116,4 +141,14 @@ TEST(Alignment, shmem) // TODO: add test for ZeroMQ once it is implemented
|
|||||||
Alignment("shmem");
|
Alignment("shmem");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST(EmptyMessage, zeromq)
|
||||||
|
{
|
||||||
|
EmptyMessage("zeromq", "ipc://test_empty_message");
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(EmptyMessage, shmem)
|
||||||
|
{
|
||||||
|
EmptyMessage("shmem", "ipc://test_empty_message");
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace
|
} // namespace
|
||||||
|
@@ -70,6 +70,9 @@ TEST(ProgOptions, SetAndGet)
|
|||||||
set_and_get<vector<double>>(o, "_vector<double>", { 33.22, 33.23, 33.24 });
|
set_and_get<vector<double>>(o, "_vector<double>", { 33.22, 33.23, 33.24 });
|
||||||
set_and_get<vector<long double>>(o, "_vector<long double>", { 333.222, 333.223, 333.224 });
|
set_and_get<vector<long double>>(o, "_vector<long double>", { 333.222, 333.223, 333.224 });
|
||||||
set_and_get<vector<boost::filesystem::path>>(o, "_vector<boost::filesystem::path>", { boost::filesystem::path("C:\\Windows"), boost::filesystem::path("C:\\Windows\\System32") });
|
set_and_get<vector<boost::filesystem::path>>(o, "_vector<boost::filesystem::path>", { boost::filesystem::path("C:\\Windows"), boost::filesystem::path("C:\\Windows\\System32") });
|
||||||
|
|
||||||
|
ASSERT_THROW(o.ParseAll({"cmd", "--unregistered", "option"}, false), boost::program_options::unknown_option);
|
||||||
|
ASSERT_NO_THROW(o.ParseAll({"cmd", "--unregistered", "option"}, true));
|
||||||
}
|
}
|
||||||
|
|
||||||
template<typename T>
|
template<typename T>
|
||||||
|
Reference in New Issue
Block a user