Compare commits

..

14 Commits

Author SHA1 Message Date
Alexey Rybalchenko
c60dd9965c Add copyright entry for PicoSHA2 2020-06-05 18:16:13 +02:00
Alexey Rybalchenko
79ca436b74 Zmq: let GetData of an empty message return nullptr 2020-06-05 18:16:13 +02:00
Alexey Rybalchenko
36d4f3c937 Use SHA2 instead of boost::hash to generate shmem id 2020-06-05 18:16:13 +02:00
Alexey Rybalchenko
bdf895ae9e Add PicoSHA2 dependency 2020-06-05 18:16:13 +02:00
Alexey Rybalchenko
42986e664c Add PicoSHA2 submodule 2020-06-05 18:16:13 +02:00
Giulio Eulisse
dd47b34e06 Add ability to retrieve ZMQ_FD 2020-06-03 19:44:00 +02:00
Alexey Rybalchenko
a59c902c74 MemoryResource: propagate alignment 2020-06-03 19:41:40 +02:00
Alexey Rybalchenko
dabc48c21a Shm: fix incorrect ptr range check 2020-05-29 23:34:27 +02:00
Alexey Rybalchenko
236d5a8608 Let default shm segm size be multiple of page size
To allow potential optimizations (e.g. huge pages)
2020-05-28 17:23:18 +02:00
Alexey Rybalchenko
5a782e8726 Add a test for unregisted options 2020-05-28 17:23:18 +02:00
Alexey Rybalchenko
5008fa4732 Fix regression in handling unregistered options 2020-05-28 17:23:18 +02:00
Alexey Rybalchenko
b5bb476b0d Adjust program options style to disallow guessing 2020-05-25 14:03:12 +02:00
Giulio Eulisse
ea7ae04025 Adapt to requests by @rbx 2020-05-25 08:36:57 +02:00
Giulio Eulisse
02692e7002 Initiate termination process on SIGTERM as well 2020-05-25 08:36:57 +02:00
18 changed files with 132 additions and 18 deletions

3
.gitmodules vendored
View File

@@ -4,3 +4,6 @@
[submodule "extern/asio"]
path = extern/asio
url = https://github.com/chriskohlhoff/asio
[submodule "extern/PicoSHA2"]
path = extern/PicoSHA2
url = https://github.com/okdshin/PicoSHA2

View File

@@ -146,6 +146,8 @@ if(BUILD_FAIRMQ)
find_package2(PRIVATE ZeroMQ REQUIRED
VERSION 4.1.4
)
build_bundled(PicoSHA2 extern/PicoSHA2)
find_package2(PRIVATE PicoSHA2 REQUIRED)
endif()
if(BUILD_TESTING)

View File

@@ -23,6 +23,10 @@ Files: extern/asio
Copyright: 2003-2019, Christopher M. Kohlhoff (chris at kohlhoff dot com)
License: BSL-1.0
Files: extern/PicoSHA2
Copyright: 2017 okdshin
License: MIT
License: LGPL-3.0-only
[see LICENSE file]
@@ -102,3 +106,22 @@ License: BSL-1.0
FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.
License: MIT
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@@ -498,6 +498,8 @@ function(build_bundled package bundle)
set(${package}_BUILD_INCLUDE_DIR ${${package}_SOURCE_DIR}/asio/include CACHE PATH "Bundled ${package} build-interface include dir")
set(${package}_INSTALL_INCLUDE_DIR ${PROJECT_INSTALL_INCDIR}/bundled CACHE PATH "Bundled ${package} install-interface include dir")
set(${package}_ROOT ${${package}_SOURCE_DIR}/asio)
elseif(${package} STREQUAL PicoSHA2)
set(${package}_ROOT ${${package}_SOURCE_DIR})
endif()
string(TOUPPER ${package} package_upper)

21
cmake/FindPicoSHA2.cmake Normal file
View File

@@ -0,0 +1,21 @@
################################################################################
# Copyright (C) 2020 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# #
# This software is distributed under the terms of the #
# GNU Lesser General Public Licence (LGPL) version 3, #
# copied verbatim in the file "LICENSE" #
################################################################################
find_path(PicoSHA2_INCLUDE_DIR NAMES picosha2.h)
include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(PicoSHA2
REQUIRED_VARS PicoSHA2_INCLUDE_DIR
)
if(PicoSHA2_FOUND)
add_library(PicoSHA2 INTERFACE IMPORTED)
set_target_properties(PicoSHA2 PROPERTIES
INTERFACE_INCLUDE_DIRECTORIES "${PicoSHA2_INCLUDE_DIR}"
)
endif()

1
extern/PicoSHA2 vendored Submodule

Submodule extern/PicoSHA2 added at 599843c396

View File

@@ -325,6 +325,7 @@ if(BUILD_FAIRMQ)
PRIVATE # only libFairMQ links against private dependencies
libzmq
PicoSHA2
${OFI_DEPS}
)
set_target_properties(${_target} PROPERTIES
@@ -377,6 +378,7 @@ if(BUILD_FAIRMQ)
Boost::boost
Boost::date_time
Boost::program_options
PicoSHA2
)
target_include_directories(fairmq-shmmonitor PUBLIC
$<BUILD_INTERFACE:${CMAKE_SOURCE_DIR}>

View File

@@ -15,8 +15,7 @@
#include <fairmq/FairMQTransportFactory.h>
#include <fairmq/MemoryResources.h>
void *fair::mq::ChannelResource::do_allocate(std::size_t bytes, std::size_t /*alignment*/)
void *fair::mq::ChannelResource::do_allocate(std::size_t bytes, std::size_t alignment)
{
return setMessage(factory->CreateMessage(bytes));
return setMessage(factory->CreateMessage(bytes, fair::mq::Alignment{alignment}));
}

View File

@@ -117,16 +117,24 @@ void ProgOptions::ParseAll(const int argc, char const* const* argv, bool allowUn
// clear the container because it was filled with default values and subsequent calls to store() do not overwrite the existing values
fVarMap.clear();
if (allowUnregistered) {
po::command_line_parser parser(argc, argv);
parser.options(fAllOptions).allow_unregistered();
po::parsed_options parsed = parser.run();
fUnregisteredOptions = po::collect_unrecognized(parsed.options, po::include_positional);
po::store(parsed, fVarMap);
} else {
po::store(po::parse_command_line(argc, argv, fAllOptions), fVarMap);
po::command_line_parser parser(argc, argv);
parser.options(fAllOptions);
if (allowUnregistered) {
parser.allow_unregistered();
}
using namespace po::command_line_style;
style_t style = style_t(allow_short | short_allow_adjacent | short_allow_next | allow_long | long_allow_adjacent | long_allow_next | allow_sticky | allow_dash_for_short);
parser.style(style);
po::parsed_options parsed = parser.run();
fUnregisteredOptions = po::collect_unrecognized(parsed.options, po::include_positional);
po::store(parsed, fVarMap);
}
void ProgOptions::Notify()

View File

@@ -36,6 +36,7 @@ namespace
extern "C" auto sigterm_handler(int signal) -> void
{
++gSignalCount;
gLastSignal = signal;
}
}

View File

@@ -68,7 +68,7 @@ Plugin::ProgOptions ConfigPluginProgramOptions()
("init-timeout", po::value<int >()->default_value(120), "Timeout for the initialization in seconds (when expecting dynamic initialization).")
("max-run-time", po::value<uint64_t >()->default_value(0), "Maximum runtime for the Running state handler, after which state will change to Ready (in seconds, 0 for no limit).")
("print-channels", po::value<bool >()->implicit_value(true), "Print registered channel endpoints in a machine-readable format (<channel name>:<min num subchannels>:<max num subchannels>)")
("shm-segment-size", po::value<size_t >()->default_value(2000000000), "Shared memory: size of the shared memory segment (in bytes).")
("shm-segment-size", po::value<size_t >()->default_value(2ULL << 30), "Shared memory: size of the shared memory segment (in bytes).")
("shm-throw-bad-alloc", po::value<bool >()->default_value(true), "Throw a fair::mq::MessageBadAlloc if cannot allocate a message (retry if false).")
("shm-monitor", po::value<bool >()->default_value(true), "Shared memory: run monitor daemon.")
("ofi-size-hint", po::value<size_t >()->default_value(0), "EXPERIMENTAL: OFI size hint for the allocator.")

View File

@@ -8,6 +8,8 @@
#ifndef FAIR_MQ_SHMEM_COMMON_H_
#define FAIR_MQ_SHMEM_COMMON_H_
#include <picosha2.h>
#include <atomic>
#include <string>
#include <unordered_map>
@@ -110,9 +112,9 @@ struct RegionBlock
// a hash of user id + session id, truncated to 8 characters (to accommodate for name size limit on some systems (MacOS)).
inline std::string buildShmIdFromSessionIdAndUserId(const std::string& sessionId)
{
boost::hash<std::string> stringHash;
std::string shmId(std::to_string(stringHash(std::string((std::to_string(geteuid()) + sessionId)))));
shmId.resize(8, '_');
std::string seed((std::to_string(geteuid()) + sessionId));
std::string shmId = picosha2::hash256_hex_string(seed);
shmId.resize(10, '_');
return shmId;
}

View File

@@ -109,7 +109,7 @@ class Message final : public fair::mq::Message
, fRegionPtr(nullptr)
, fLocalPtr(static_cast<char*>(data))
{
if (reinterpret_cast<const char*>(data) >= reinterpret_cast<const char*>(region->GetData()) ||
if (reinterpret_cast<const char*>(data) >= reinterpret_cast<const char*>(region->GetData()) &&
reinterpret_cast<const char*>(data) <= reinterpret_cast<const char*>(region->GetData()) + region->GetSize()) {
fMeta.fHandle = (boost::interprocess::managed_shared_memory::handle_t)(reinterpret_cast<const char*>(data) - reinterpret_cast<const char*>(region->GetData()));
} else {

View File

@@ -505,6 +505,8 @@ class Socket final : public fair::mq::Socket
if (constant == "no-block") return ZMQ_DONTWAIT;
if (constant == "snd-more no-block") return ZMQ_DONTWAIT|ZMQ_SNDMORE;
if (constant == "fd") return ZMQ_FD;
return -1;
}

View File

@@ -155,9 +155,17 @@ class Message final : public fair::mq::Message
void* GetData() const override
{
if (!fViewMsg) {
return zmq_msg_data(fMsg.get());
if (zmq_msg_size(fMsg.get()) > 0) {
return zmq_msg_data(fMsg.get());
} else {
return nullptr;
}
} else {
return zmq_msg_data(fViewMsg.get());
if (zmq_msg_size(fViewMsg.get()) > 0) {
return zmq_msg_data(fViewMsg.get());
} else {
return nullptr;
}
}
}

View File

@@ -459,6 +459,8 @@ class Socket final : public fair::mq::Socket
if (constant == "linger") return ZMQ_LINGER;
if (constant == "fd") return ZMQ_FD;
return -1;
}

View File

@@ -91,6 +91,31 @@ void Alignment(const string& transport)
ASSERT_EQ(reinterpret_cast<uintptr_t>(msg->GetData()) % 64, 0);
}
void EmptyMessage(const string& transport, const string& _address)
{
size_t session{fair::mq::tools::UuidHash()};
std::string address(fair::mq::tools::ToString(_address, "_", transport));
fair::mq::ProgOptions config;
config.SetProperty<string>("session", to_string(session));
auto factory = FairMQTransportFactory::CreateTransportFactory(transport, fair::mq::tools::Uuid(), &config);
FairMQChannel push{"Push", "push", factory};
push.Bind(address);
FairMQChannel pull{"Pull", "pull", factory};
pull.Connect(address);
FairMQMessagePtr outMsg(push.NewMessage());
ASSERT_EQ(outMsg->GetData(), nullptr);
ASSERT_EQ(push.Send(outMsg), 0);
FairMQMessagePtr inMsg(pull.NewMessage());
ASSERT_EQ(pull.Receive(inMsg), 0);
ASSERT_EQ(inMsg->GetData(), nullptr);
}
TEST(Resize, zeromq)
{
RunPushPullWithMsgResize("zeromq", "ipc://test_message_resize");
@@ -116,4 +141,14 @@ TEST(Alignment, shmem) // TODO: add test for ZeroMQ once it is implemented
Alignment("shmem");
}
TEST(EmptyMessage, zeromq)
{
EmptyMessage("zeromq", "ipc://test_empty_message");
}
TEST(EmptyMessage, shmem)
{
EmptyMessage("shmem", "ipc://test_empty_message");
}
} // namespace

View File

@@ -70,6 +70,9 @@ TEST(ProgOptions, SetAndGet)
set_and_get<vector<double>>(o, "_vector<double>", { 33.22, 33.23, 33.24 });
set_and_get<vector<long double>>(o, "_vector<long double>", { 333.222, 333.223, 333.224 });
set_and_get<vector<boost::filesystem::path>>(o, "_vector<boost::filesystem::path>", { boost::filesystem::path("C:\\Windows"), boost::filesystem::path("C:\\Windows\\System32") });
ASSERT_THROW(o.ParseAll({"cmd", "--unregistered", "option"}, false), boost::program_options::unknown_option);
ASSERT_NO_THROW(o.ParseAll({"cmd", "--unregistered", "option"}, true));
}
template<typename T>