fix: First round of using new non-namespaced typenames

This commit is contained in:
Dennis Klein 2021-07-13 20:52:17 +02:00
parent 0bf765e6ba
commit 4e8f247a0d
14 changed files with 336 additions and 327 deletions

View File

@ -27,34 +27,34 @@ The next table shows the supported address types for each transport implementati
## 2.1 Message
Devices transport data between each other in form of `FairMQMessage`s. These can be filled with arbitrary content. Message can be initialized in three different ways by calling `NewMessage()`:
Devices transport data between each other in form of `fair::mq::Message`s. These can be filled with arbitrary content. Message can be initialized in three different ways by calling `NewMessage()`:
```cpp
FairMQMessagePtr NewMessage() const;
fair::mq::MessagePtr NewMessage() const;
```
**with no parameters**: Initializes an empty message (typically used for receiving).
```cpp
FairMQMessagePtr NewMessage(const size_t size) const;
fair::mq::MessagePtr NewMessage(const size_t size) const;
```
**given message size**: Initializes message body with a given size. Fill the created contents via buffer pointer.
```cpp
using fairmq_free_fn = void(void* data, void* hint);
FairMQMessagePtr NewMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) const;
fair::mq::MessagePtr NewMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) const;
```
**given existing buffer and a size**: Initialize the message from an existing buffer. In case of ZeroMQ this is a zero-copy operation.
Additionally, FairMQ provides two more message factories for convenience:
```cpp
template<typename T>
FairMQMessagePtr NewSimpleMessage(const T& data) const
fair::mq::MessagePtr NewSimpleMessage(const T& data) const
```
**copy and own**: Copy the `data` argument into the returned message and take ownership (free memory after message is sent). This interface is useful for small, [trivially copyable](http://en.cppreference.com/w/cpp/concept/TriviallyCopyable) data.
```cpp
template<typename T>
FairMQMessagePtr NewStaticMessage(const T& data) const
fair::mq::MessagePtr NewStaticMessage(const T& data) const
```
**point to existing memory**: The returned message will point to the `data` argument, but not take ownership (someone else must destruct this variable). Make sure that `data` lives long enough to be successfully sent. This interface is most useful for third party managed, contiguous memory (Be aware of shallow types with internal pointer references! These will not be sent.)
@ -65,19 +65,19 @@ The component of a program, that is reponsible for the allocation or destruction
After queuing a message for sending in FairMQ, the transport takes ownership over the message body and will free it with `free()` after it is no longer used. A callback can be passed to the message object, to be called instead of the destruction with `free()` (for initialization via buffer+size).
```cpp
static void FairMQNoCleanup(void* /*data*/, void* /*obj*/) {}
static void fair::mq::NoCleanup(void* /*data*/, void* /*obj*/) {}
template<typename T>
static void FairMQSimpleMsgCleanup(void* /*data*/, void* obj) { delete static_cast<T*>(obj); }
static void fair::mq::SimpleMsgCleanup(void* /*data*/, void* obj) { delete static_cast<T*>(obj); }
```
For convenience, two common deleter callbacks are already defined in the `FairMQTransportFactory` class to aid the user in controlling ownership of the data.
For convenience, two common deleter callbacks are already defined in the `fair::mq::TransportFactory` class to aid the user in controlling ownership of the data.
## 2.2 Channel
A channel represents a communication endpoint in FairMQ. Usage is similar to a traditional Unix network socket. A device usually contains a number of channels that can either listen for incoming connections from channels of other devices or they can connect to other listening channels. Channels are organized by a channel name and a subchannel index.
```cpp
const FairMQChannel& GetChannel(const std::string& channelName, const int index = 0) const;
const fair::mq::Channel& GetChannel(const std::string& channelName, const int index = 0) const;
```
All subchannels with a common channel name need to be of the same transport type.
@ -87,7 +87,7 @@ All subchannels with a common channel name need to be of the same transport type
A poller allows to wait on multiple channels either to receive or send a message.
```cpp
FairMQPollerPtr NewPoller(const std::vector<const FairMQChannel*>& channels)
fair::mq::PollerPtr NewPoller(const std::vector<const fair::mq::Channel*>& channels)
```
**list channels**: This poller waits on all supplied channels. Currently, it is limited to channels of the same transport type only.

View File

@ -1,5 +1,5 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
@ -8,10 +8,15 @@
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
#include <memory>
using namespace std;
using namespace fair::mq;
namespace bpo = boost::program_options;
struct Receiver : fair::mq::Device
namespace {
struct Receiver : Device
{
void InitTask() override
{
@ -21,15 +26,14 @@ struct Receiver : fair::mq::Device
void Run() override
{
FairMQChannel& dataInChannel = fChannels.at("sr").at(0);
Channel& dataInChannel = fChannels.at("sr").at(0);
while (!NewStatePending()) {
FairMQMessagePtr msg(dataInChannel.Transport()->CreateMessage());
auto msg(dataInChannel.NewMessage());
dataInChannel.Receive(msg);
// void* ptr = msg->GetData();
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
LOG(info) << "Configured max number of iterations reached. Leaving RUNNING state.";
break;
}
}
@ -40,13 +44,14 @@ struct Receiver : fair::mq::Device
uint64_t fNumIterations = 0;
};
} // namespace
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
options.add_options()(
"max-iterations",
bpo::value<uint64_t>()->default_value(0),
"Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
{
return std::make_unique<Receiver>();
}
unique_ptr<Device> getDevice(ProgOptions& /*config*/) { return make_unique<Receiver>(); }

View File

@ -1,5 +1,5 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
@ -8,30 +8,34 @@
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
#include <memory>
namespace bpo = boost::program_options;
using namespace std;
using namespace fair::mq;
struct Sink : fair::mq::Device
namespace {
struct Sink : Device
{
void InitTask() override
{
// Get the fMaxIterations value from the command line options (via fConfig)
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
fChannels.at("data").at(0).Transport()->SubscribeToRegionEvents([](FairMQRegionInfo info) {
fChannels.at("data").at(0).Transport()->SubscribeToRegionEvents([](RegionInfo info) {
LOG(info) << "Region event: " << info.event << ": "
<< (info.managed ? "managed" : "unmanaged")
<< ", id: " << info.id
<< ", ptr: " << info.ptr
<< ", size: " << info.size
<< ", flags: " << info.flags;
<< (info.managed ? "managed" : "unmanaged") << ", id: " << info.id
<< ", ptr: " << info.ptr << ", size: " << info.size
<< ", flags: " << info.flags;
});
}
void Run() override
{
FairMQChannel& dataInChannel = fChannels.at("data").at(0);
Channel& dataInChannel = fChannels.at("data").at(0);
while (!NewStatePending()) {
FairMQMessagePtr msg(dataInChannel.Transport()->CreateMessage());
auto msg(dataInChannel.Transport()->CreateMessage());
dataInChannel.Receive(msg);
// void* ptr = msg->GetData();
@ -39,11 +43,12 @@ struct Sink : fair::mq::Device
// LOG(info) << "check: " << cptr[3];
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
LOG(info) << "Configured max number of iterations reached. Leaving RUNNING state.";
break;
}
}
}
void ResetTask() override
{
fChannels.at("data").at(0).Transport()->UnsubscribeFromRegionEvents();
@ -54,14 +59,14 @@ struct Sink : fair::mq::Device
uint64_t fNumIterations = 0;
};
} // namespace
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
options.add_options()(
"max-iterations",
bpo::value<uint64_t>()->default_value(0),
"Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
{
return std::make_unique<Sink>();
}
unique_ptr<Device> getDevice(ProgOptions& /*config*/) { return make_unique<Sink>(); }

View File

@ -13,18 +13,19 @@
*/
#include "JSONParser.h"
#include "FairMQChannel.h"
#include <fairmq/PropertyOutput.h>
#include <fairmq/tools/Strings.h>
#include <fairlogger/Logger.h>
#include <boost/any.hpp>
#define BOOST_BIND_GLOBAL_PLACEHOLDERS
#include <boost/property_tree/json_parser.hpp>
#undef BOOST_BIND_GLOBAL_PLACEHOLDERS
#include <boost/property_tree/ptree.hpp>
#include <boost/any.hpp>
#include <fairlogger/Logger.h>
#include <fairmq/Channel.h>
#include <fairmq/JSONParser.h>
#include <fairmq/PropertyOutput.h>
#include <fairmq/tools/Strings.h>
#include <iomanip>
using namespace std;

View File

@ -1,5 +1,5 @@
/********************************************************************************
* Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2017-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
@ -9,8 +9,8 @@
#ifndef FAIR_MQ_PLUGINSERVICES_H
#define FAIR_MQ_PLUGINSERVICES_H
#include <fairmq/Device.h>
#include <fairmq/States.h>
#include <FairMQDevice.h>
#include <fairmq/ProgOptions.h>
#include <fairmq/Properties.h>
@ -40,7 +40,7 @@ class PluginServices
{
public:
PluginServices() = delete;
PluginServices(ProgOptions& config, FairMQDevice& device)
PluginServices(ProgOptions& config, Device& device)
: fConfig(config)
, fDevice(device)
{}
@ -117,7 +117,7 @@ class PluginServices
/// The state transition may not happen immediately, but when the current state evaluates the
/// pending transition event and terminates. In other words, the device states are scheduled cooperatively.
/// If the device control role has not been taken yet, calling this function will take over control implicitely.
auto ChangeDeviceState(const std::string& controller, const DeviceStateTransition next) -> bool;
auto ChangeDeviceState(const std::string& controller, DeviceStateTransition next) -> bool;
/// @brief Subscribe with a callback to device state changes
/// @param subscriber id
@ -269,7 +269,7 @@ class PluginServices
private:
fair::mq::ProgOptions& fConfig;
FairMQDevice& fDevice;
Device& fDevice;
boost::optional<std::string> fDeviceController;
mutable std::mutex fDeviceControllerMutex;
std::condition_variable fReleaseDeviceControlCondition;

View File

@ -1,5 +1,5 @@
/********************************************************************************
* Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
@ -9,22 +9,20 @@
#ifndef FAIR_MQ_PROGOPTIONS_H
#define FAIR_MQ_PROGOPTIONS_H
#include "FairMQChannel.h"
#include "FairMQLogger.h"
#include <boost/program_options.hpp>
#include <fairlogger/Logger.h>
#include <fairmq/Channel.h>
#include <fairmq/EventManager.h>
#include <fairmq/ProgOptionsFwd.h>
#include <fairmq/Properties.h>
#include <fairmq/tools/Strings.h>
#include <boost/program_options.hpp>
#include <functional>
#include <map>
#include <mutex>
#include <stdexcept>
#include <string>
#include <unordered_map>
#include <vector>
#include <stdexcept>
namespace fair::mq
{
@ -38,10 +36,10 @@ class ProgOptions
virtual ~ProgOptions() = default;
void ParseAll(const std::vector<std::string>& cmdArgs, bool allowUnregistered);
void ParseAll(const int argc, char const* const* argv, bool allowUnregistered = true);
void ParseAll(int argc, char const* const* argv, bool allowUnregistered = true);
void Notify();
void AddToCmdLineOptions(const boost::program_options::options_description optDesc, bool visible = true);
void AddToCmdLineOptions(boost::program_options::options_description optDesc, bool visible = true);
boost::program_options::options_description& GetCmdLineOptions();
/// @brief Checks a property with the given key exist in the configuration
@ -174,7 +172,7 @@ class ProgOptions
/// @brief Takes the provided channel and creates properties based on it
/// @param name channel name
/// @param channel FairMQChannel reference
void AddChannel(const std::string& name, const FairMQChannel& channel);
void AddChannel(const std::string& name, const Channel& channel);
/// @brief Subscribe to property updates of type T
/// @param subscriber

View File

@ -59,12 +59,12 @@ class TransportFactory
/// @brief Create new Message of specified size
/// @param size message size
/// @return pointer to Message
virtual MessagePtr CreateMessage(const size_t size) = 0;
virtual MessagePtr CreateMessage(size_t size) = 0;
/// @brief Create new Message of specified size and alignment
/// @param size message size
/// @param alignment message alignment
/// @return pointer to Message
virtual MessagePtr CreateMessage(const size_t size, Alignment alignment) = 0;
virtual MessagePtr CreateMessage(size_t size, Alignment alignment) = 0;
/// @brief Create new Message with user provided buffer and size
/// @param data pointer to user provided buffer
/// @param size size of the user provided buffer
@ -72,8 +72,8 @@ class TransportFactory
/// @param obj optional helper pointer that can be used in the callback
/// @return pointer to Message
virtual MessagePtr CreateMessage(void* data,
const size_t size,
fairmq_free_fn* ffn,
size_t size,
FreeFn* ffn,
void* hint = nullptr) = 0;
/// @brief create a message with the buffer located within the corresponding unmanaged region
/// @param unmanagedRegion the unmanaged region that this message buffer belongs to
@ -82,8 +82,8 @@ class TransportFactory
/// @param hint optional parameter, returned to the user in the RegionCallback
virtual MessagePtr CreateMessage(UnmanagedRegionPtr& unmanagedRegion,
void* data,
const size_t size,
void* hint = 0) = 0;
size_t size,
void* hint = nullptr) = 0;
/// @brief Create a socket
virtual SocketPtr CreateSocket(const std::string& type, const std::string& name) = 0;

View File

@ -5,19 +5,18 @@
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIR_MQ_SHMEM_POLLER_H_
#define FAIR_MQ_SHMEM_POLLER_H_
#include "Socket.h"
#include <fairlogger/Logger.h>
#include <fairmq/Channel.h>
#include <fairmq/Poller.h>
#include <fairmq/shmem/Socket.h>
#include <fairmq/tools/Strings.h>
#include <FairMQChannel.h>
#include <FairMQLogger.h>
#include <FairMQPoller.h>
#include <zmq.h>
#include <unordered_map>
#include <vector>
#include <zmq.h>
namespace fair::mq::shmem
{
@ -25,7 +24,7 @@ namespace fair::mq::shmem
class Poller final : public fair::mq::Poller
{
public:
Poller(const std::vector<FairMQChannel>& channels)
Poller(const std::vector<Channel>& channels)
: fItems()
, fNumItems(0)
{
@ -45,7 +44,7 @@ class Poller final : public fair::mq::Poller
}
}
Poller(const std::vector<FairMQChannel*>& channels)
Poller(const std::vector<Channel*>& channels)
: fItems()
, fNumItems(0)
{
@ -65,7 +64,7 @@ class Poller final : public fair::mq::Poller
}
}
Poller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList)
Poller(const std::unordered_map<std::string, std::vector<Channel>>& channelsMap, const std::vector<std::string>& channelList)
: fItems()
, fNumItems(0)
{

View File

@ -1,5 +1,5 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
@ -9,16 +9,14 @@
#ifndef FAIR_MQ_ZMQ_POLLER_H
#define FAIR_MQ_ZMQ_POLLER_H
#include <fairmq/zeromq/Socket.h>
#include <fairlogger/Logger.h>
#include <fairmq/Channel.h>
#include <fairmq/Poller.h>
#include <fairmq/tools/Strings.h>
#include <FairMQChannel.h>
#include <FairMQLogger.h>
#include <FairMQPoller.h>
#include <zmq.h>
#include <fairmq/zeromq/Socket.h>
#include <unordered_map>
#include <vector>
#include <zmq.h>
namespace fair::mq::zmq
{
@ -26,7 +24,7 @@ namespace fair::mq::zmq
class Poller final : public fair::mq::Poller
{
public:
Poller(const std::vector<FairMQChannel>& channels)
Poller(const std::vector<Channel>& channels)
: fItems()
, fNumItems(0)
{
@ -46,7 +44,7 @@ class Poller final : public fair::mq::Poller
}
}
Poller(const std::vector<FairMQChannel*>& channels)
Poller(const std::vector<Channel*>& channels)
: fItems()
, fNumItems(0)
{
@ -66,14 +64,14 @@ class Poller final : public fair::mq::Poller
}
}
Poller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList)
Poller(const std::unordered_map<std::string, std::vector<Channel>>& channelsMap, const std::vector<std::string>& channelList)
: fItems()
, fNumItems(0)
{
try {
int offset = 0;
// calculate offsets and the total size of the poll item set
for (std::string channel : channelList) {
for (std::string const & channel : channelList) {
fOffsetMap[channel] = offset;
offset += channelsMap.at(channel).size();
fNumItems += channelsMap.at(channel).size();

View File

@ -1,15 +1,13 @@
/********************************************************************************
* Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2018-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <FairMQChannel.h>
#include <fairmq/Channel.h>
#include <gtest/gtest.h>
#include <string>
namespace
@ -20,15 +18,15 @@ using namespace fair::mq;
TEST(Channel, Validation)
{
FairMQChannel channel;
ASSERT_THROW(channel.Validate(), FairMQChannel::ChannelConfigurationError);
Channel channel;
ASSERT_THROW(channel.Validate(), Channel::ChannelConfigurationError);
channel.UpdateType("pair");
ASSERT_EQ(channel.Validate(), false);
ASSERT_EQ(channel.IsValid(), false);
channel.UpdateAddress("bla");
ASSERT_THROW(channel.Validate(), FairMQChannel::ChannelConfigurationError);
ASSERT_THROW(channel.Validate(), Channel::ChannelConfigurationError);
channel.UpdateMethod("connect");
ASSERT_EQ(channel.Validate(), false);
@ -55,31 +53,31 @@ TEST(Channel, Validation)
ASSERT_EQ(channel.IsValid(), true);
channel.UpdateSndBufSize(-1);
ASSERT_THROW(channel.Validate(), FairMQChannel::ChannelConfigurationError);
ASSERT_THROW(channel.Validate(), Channel::ChannelConfigurationError);
channel.UpdateSndBufSize(1000);
ASSERT_NO_THROW(channel.Validate());
channel.UpdateRcvBufSize(-1);
ASSERT_THROW(channel.Validate(), FairMQChannel::ChannelConfigurationError);
ASSERT_THROW(channel.Validate(), Channel::ChannelConfigurationError);
channel.UpdateRcvBufSize(1000);
ASSERT_NO_THROW(channel.Validate());
channel.UpdateSndKernelSize(-1);
ASSERT_THROW(channel.Validate(), FairMQChannel::ChannelConfigurationError);
ASSERT_THROW(channel.Validate(), Channel::ChannelConfigurationError);
channel.UpdateSndKernelSize(1000);
ASSERT_NO_THROW(channel.Validate());
channel.UpdateRcvKernelSize(-1);
ASSERT_THROW(channel.Validate(), FairMQChannel::ChannelConfigurationError);
ASSERT_THROW(channel.Validate(), Channel::ChannelConfigurationError);
channel.UpdateRcvKernelSize(1000);
ASSERT_NO_THROW(channel.Validate());
channel.UpdateRateLogging(-1);
ASSERT_THROW(channel.Validate(), FairMQChannel::ChannelConfigurationError);
ASSERT_THROW(channel.Validate(), Channel::ChannelConfigurationError);
channel.UpdateRateLogging(1);
ASSERT_NO_THROW(channel.Validate());
FairMQChannel channel2 = channel;
Channel channel2 = channel;
ASSERT_NO_THROW(channel2.Validate());
ASSERT_EQ(channel2.Validate(), true);
ASSERT_EQ(channel2.IsValid(), true);

View File

@ -73,9 +73,10 @@ int TestData::ndeallocations = 0;
TEST(MemoryResources, transportAllocatorMap)
{
size_t session{tools::UuidHash()};
// size_t session{tools::UuidHash()};
ProgOptions config;
config.SetProperty<string>("session", to_string(session));
// config.SetProperty<string>("session", to_string(session));
config.SetProperty<string>("session", "default");
FactoryType factoryZMQ = FairMQTransportFactory::CreateTransportFactory("zeromq", fair::mq::tools::Uuid(), &config);
FactoryType factorySHM = FairMQTransportFactory::CreateTransportFactory("shmem", fair::mq::tools::Uuid(), &config);

View File

@ -1,223 +1,232 @@
/********************************************************************************
* Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2017-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <FairMQChannel.h>
#include <FairMQLogger.h>
#include <FairMQTransportFactory.h>
#include <fairmq/ProgOptions.h>
#include <fairmq/tools/Unique.h>
#include <fairmq/tools/Strings.h>
#include <gtest/gtest.h>
#include <string>
#include <array>
#include <cassert>
#include <cstdint>
#include <fairlogger/Logger.h>
#include <fairmq/Channel.h>
#include <fairmq/ProgOptions.h>
#include <fairmq/TransportFactory.h>
#include <fairmq/tools/Strings.h>
#include <fairmq/tools/Unique.h>
#include <gtest/gtest.h>
#include <memory>
#include <string>
#include <string_view>
#include <utility>
namespace
{
using namespace std;
using namespace fair::mq;
void RunPushPullWithMsgResize(const string& transport, const string& _address)
auto AsStringView(Message const& msg) -> string_view
{
size_t session{fair::mq::tools::UuidHash()};
std::string address(fair::mq::tools::ToString(_address, "_", transport));
return {static_cast<char const*>(msg.GetData()), msg.GetSize()};
}
fair::mq::ProgOptions config;
config.SetProperty<string>("session", to_string(session));
auto RunPushPullWithMsgResize(string const & transport, string const & _address) -> void
{
ProgOptions config;
config.SetProperty<string>("session", tools::Uuid());
auto factory(TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config));
auto factory = FairMQTransportFactory::CreateTransportFactory(transport, fair::mq::tools::Uuid(), &config);
FairMQChannel push{"Push", "push", factory};
Channel push{"Push", "push", factory};
Channel pull{"Pull", "pull", factory};
auto const address(tools::ToString(_address, "_", transport));
push.Bind(address);
FairMQChannel pull{"Pull", "pull", factory};
pull.Connect(address);
{
FairMQMessagePtr outMsg(push.NewMessage(6));
ASSERT_EQ(outMsg->GetSize(), 6);
memcpy(outMsg->GetData(), "ABCDEF", 6);
ASSERT_EQ(outMsg->SetUsedSize(5), true);
ASSERT_EQ(outMsg->SetUsedSize(5), true);
ASSERT_EQ(outMsg->SetUsedSize(7), false);
size_t const size{6};
auto outMsg(push.NewMessage(size));
ASSERT_EQ(outMsg->GetSize(), size);
memcpy(outMsg->GetData(), "ABCDEF", size);
ASSERT_TRUE(outMsg->SetUsedSize(5));
ASSERT_TRUE(outMsg->SetUsedSize(5));
ASSERT_FALSE(outMsg->SetUsedSize(7));
ASSERT_EQ(outMsg->GetSize(), 5);
// check if the data is still intact
ASSERT_EQ(static_cast<char*>(outMsg->GetData())[0], 'A');
ASSERT_EQ(static_cast<char*>(outMsg->GetData())[1], 'B');
ASSERT_EQ(static_cast<char*>(outMsg->GetData())[2], 'C');
ASSERT_EQ(static_cast<char*>(outMsg->GetData())[3], 'D');
ASSERT_EQ(static_cast<char*>(outMsg->GetData())[4], 'E');
ASSERT_EQ(outMsg->SetUsedSize(2), true);
ASSERT_EQ(AsStringView(*outMsg)[0], 'A');
ASSERT_EQ(AsStringView(*outMsg)[1], 'B');
ASSERT_EQ(AsStringView(*outMsg)[2], 'C');
ASSERT_EQ(AsStringView(*outMsg)[3], 'D');
ASSERT_EQ(AsStringView(*outMsg)[4], 'E');
ASSERT_TRUE(outMsg->SetUsedSize(2));
ASSERT_EQ(outMsg->GetSize(), 2);
ASSERT_EQ(static_cast<char*>(outMsg->GetData())[0], 'A');
ASSERT_EQ(static_cast<char*>(outMsg->GetData())[1], 'B');
FairMQMessagePtr msgCopy(push.NewMessage());
ASSERT_EQ(AsStringView(*outMsg)[0], 'A');
ASSERT_EQ(AsStringView(*outMsg)[1], 'B');
auto msgCopy(push.NewMessage());
msgCopy->Copy(*outMsg);
ASSERT_EQ(msgCopy->GetSize(), 2);
ASSERT_EQ(push.Send(outMsg), 2);
FairMQMessagePtr inMsg(pull.NewMessage());
auto inMsg(pull.NewMessage());
ASSERT_EQ(pull.Receive(inMsg), 2);
ASSERT_EQ(inMsg->GetSize(), 2);
ASSERT_EQ(static_cast<char*>(inMsg->GetData())[0], 'A');
ASSERT_EQ(static_cast<char*>(inMsg->GetData())[1], 'B');
ASSERT_EQ(AsStringView(*inMsg)[0], 'A');
ASSERT_EQ(AsStringView(*inMsg)[1], 'B');
}
{
FairMQMessagePtr outMsg(push.NewMessage(1000));
ASSERT_EQ(outMsg->SetUsedSize(0), true);
size_t const size{1000};
auto outMsg(push.NewMessage(size));
ASSERT_TRUE(outMsg->SetUsedSize(0));
ASSERT_EQ(outMsg->GetSize(), 0);
FairMQMessagePtr msgCopy(push.NewMessage());
auto msgCopy(push.NewMessage());
msgCopy->Copy(*outMsg);
ASSERT_EQ(msgCopy->GetSize(), 0);
ASSERT_EQ(push.Send(outMsg), 0);
FairMQMessagePtr inMsg(pull.NewMessage());
auto inMsg(pull.NewMessage());
ASSERT_EQ(pull.Receive(inMsg), 0);
ASSERT_EQ(inMsg->GetSize(), 0);
}
}
void RunMsgRebuild(const string& transport)
auto RunMsgRebuild(const string& transport) -> void
{
size_t session{fair::mq::tools::UuidHash()};
ProgOptions config;
config.SetProperty<string>("session", tools::Uuid());
auto factory(TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config));
fair::mq::ProgOptions config;
config.SetProperty<string>("session", to_string(session));
auto factory = FairMQTransportFactory::CreateTransportFactory(transport, fair::mq::tools::Uuid(), &config);
FairMQMessagePtr msg(factory->CreateMessage());
size_t const msgSize{100};
string const expectedStr{"asdf"};
auto msg(factory->CreateMessage());
EXPECT_EQ(msg->GetSize(), 0);
msg->Rebuild(100);
EXPECT_EQ(msg->GetSize(), 100);
string* str = new string("asdf");
msg->Rebuild(const_cast<char*>(str->c_str()),
str->length(),
[](void* /*data*/, void* obj) { delete static_cast<string*>(obj); },
str);
EXPECT_NE(msg->GetSize(), 100);
EXPECT_EQ(msg->GetSize(), string("asdf").length());
EXPECT_EQ(string(static_cast<char*>(msg->GetData()), msg->GetSize()), string("asdf"));
msg->Rebuild(msgSize);
EXPECT_EQ(msg->GetSize(), msgSize);
auto str(make_unique<string>(expectedStr));
void* data(str->data());
auto const size(str->length());
msg->Rebuild(
data,
size,
[](void* /*data*/, void* obj) { delete static_cast<string*>(obj); }, // NOLINT
str.release());
EXPECT_NE(msg->GetSize(), msgSize);
EXPECT_EQ(msg->GetSize(), expectedStr.length());
EXPECT_EQ(AsStringView(*msg), expectedStr);
}
void Alignment(const string& transport, const string& _address)
auto CheckMsgAlignment(Message const& msg, fair::mq::Alignment alignment) -> bool
{
size_t session{fair::mq::tools::UuidHash()};
std::string address(fair::mq::tools::ToString(_address, "_", transport));
fair::mq::ProgOptions config;
config.SetProperty<string>("session", to_string(session));
auto factory = FairMQTransportFactory::CreateTransportFactory(transport, fair::mq::tools::Uuid(), &config);
FairMQChannel push{"Push", "push", factory};
push.Bind(address);
FairMQChannel pull{"Pull", "pull", factory};
pull.Connect(address);
size_t alignment = 64;
FairMQMessagePtr outMsg1(push.NewMessage(100, fair::mq::Alignment{alignment}));
ASSERT_EQ(reinterpret_cast<uintptr_t>(outMsg1->GetData()) % alignment, 0);
ASSERT_EQ(push.Send(outMsg1), 100);
FairMQMessagePtr inMsg1(pull.NewMessage(fair::mq::Alignment{alignment}));
ASSERT_EQ(pull.Receive(inMsg1), 100);
ASSERT_EQ(reinterpret_cast<uintptr_t>(inMsg1->GetData()) % alignment, 0);
FairMQMessagePtr outMsg2(push.NewMessage(32, fair::mq::Alignment{alignment}));
ASSERT_EQ(reinterpret_cast<uintptr_t>(outMsg2->GetData()) % alignment, 0);
ASSERT_EQ(push.Send(outMsg2), 32);
FairMQMessagePtr inMsg2(pull.NewMessage(fair::mq::Alignment{alignment}));
ASSERT_EQ(pull.Receive(inMsg2), 32);
ASSERT_EQ(reinterpret_cast<uintptr_t>(inMsg2->GetData()) % alignment, 0);
FairMQMessagePtr outMsg3(push.NewMessage(100, fair::mq::Alignment{0}));
ASSERT_EQ(push.Send(outMsg3), 100);
FairMQMessagePtr inMsg3(pull.NewMessage(fair::mq::Alignment{0}));
ASSERT_EQ(pull.Receive(inMsg3), 100);
FairMQMessagePtr msg1(push.NewMessage(25));
msg1->Rebuild(50, fair::mq::Alignment{alignment});
ASSERT_EQ(reinterpret_cast<uintptr_t>(msg1->GetData()) % alignment, 0);
size_t alignment2 = 32;
FairMQMessagePtr msg2(push.NewMessage(25, fair::mq::Alignment{alignment}));
msg2->Rebuild(50, fair::mq::Alignment{alignment2});
ASSERT_EQ(reinterpret_cast<uintptr_t>(msg2->GetData()) % alignment2, 0);
assert(static_cast<size_t>(alignment) > 0); // NOLINT
return (reinterpret_cast<uintptr_t>(msg.GetData()) % static_cast<size_t>(alignment)) == 0; // NOLINT
}
void EmptyMessage(const string& transport, const string& _address)
auto RunPushPullWithAlignment(string const& transport, string const& _address) -> void
{
size_t session{fair::mq::tools::UuidHash()};
std::string address(fair::mq::tools::ToString(_address, "_", transport));
ProgOptions config;
config.SetProperty<string>("session", tools::Uuid());
auto factory(TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config));
fair::mq::ProgOptions config;
config.SetProperty<string>("session", to_string(session));
auto factory = FairMQTransportFactory::CreateTransportFactory(transport, fair::mq::tools::Uuid(), &config);
FairMQChannel push{"Push", "push", factory};
Channel push{"Push", "push", factory};
Channel pull{"Pull", "pull", factory};
auto const address(tools::ToString(_address, "_", transport));
push.Bind(address);
FairMQChannel pull{"Pull", "pull", factory};
pull.Connect(address);
FairMQMessagePtr outMsg(push.NewMessage());
{
Alignment const align{64};
for (size_t const size : {100, 32}) {
auto outMsg(push.NewMessage(size, align));
auto inMsg(pull.NewMessage(align));
ASSERT_TRUE(CheckMsgAlignment(*outMsg, align));
ASSERT_EQ(push.Send(outMsg), size);
ASSERT_EQ(pull.Receive(inMsg), size);
ASSERT_TRUE(CheckMsgAlignment(*inMsg, align));
}
}
{
Alignment const align{0};
size_t const size{100};
auto outMsg(push.NewMessage(size, align));
auto inMsg(pull.NewMessage(align));
ASSERT_EQ(push.Send(outMsg), size);
ASSERT_EQ(pull.Receive(inMsg), size);
}
for (auto const align : {Alignment{64}, Alignment{32}}) {
size_t const size25{25};
size_t const size50{50};
auto msg(push.NewMessage(size25));
msg->Rebuild(size50, align);
ASSERT_TRUE(CheckMsgAlignment(*msg, align));
}
}
auto EmptyMessage(string const& transport, string const& _address) -> void
{
ProgOptions config;
config.SetProperty<string>("session", tools::Uuid());
auto factory(TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config));
Channel push{"Push", "push", factory};
Channel pull{"Pull", "pull", factory};
auto const address(tools::ToString(_address, "_", transport));
push.Bind(address);
pull.Connect(address);
auto outMsg(push.NewMessage());
ASSERT_EQ(outMsg->GetData(), nullptr);
ASSERT_EQ(push.Send(outMsg), 0);
FairMQMessagePtr inMsg(pull.NewMessage());
auto inMsg(pull.NewMessage());
ASSERT_EQ(pull.Receive(inMsg), 0);
ASSERT_EQ(inMsg->GetData(), nullptr);
}
TEST(Resize, zeromq)
TEST(Resize, zeromq) // NOLINT
{
RunPushPullWithMsgResize("zeromq", "ipc://test_message_resize");
}
TEST(Resize, shmem)
TEST(Resize, shmem) // NOLINT
{
RunPushPullWithMsgResize("shmem", "ipc://test_message_resize");
}
TEST(Rebuild, zeromq)
TEST(Rebuild, zeromq) // NOLINT
{
RunMsgRebuild("zeromq");
}
TEST(Rebuild, shmem)
TEST(Rebuild, shmem) // NOLINT
{
RunMsgRebuild("shmem");
}
TEST(Alignment, shmem)
TEST(Alignment, shmem) // NOLINT
{
Alignment("shmem", "ipc://test_message_alignment");
RunPushPullWithAlignment("shmem", "ipc://test_message_alignment");
}
TEST(Alignment, zeromq)
TEST(Alignment, zeromq) // NOLINT
{
Alignment("zeromq", "ipc://test_message_alignment");
RunPushPullWithAlignment("zeromq", "ipc://test_message_alignment");
}
TEST(EmptyMessage, zeromq)
TEST(EmptyMessage, zeromq) // NOLINT
{
EmptyMessage("zeromq", "ipc://test_empty_message");
}
TEST(EmptyMessage, shmem)
TEST(EmptyMessage, shmem) // NOLINT
{
EmptyMessage("shmem", "ipc://test_empty_message");
}

View File

@ -1,20 +1,19 @@
/********************************************************************************
* Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2017-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <gtest/gtest.h>
#include <FairMQChannel.h>
#include <FairMQParts.h>
#include <FairMQLogger.h>
#include <FairMQTransportFactory.h>
#include <fairmq/tools/Unique.h>
#include <fairmq/ProgOptions.h>
#include <algorithm>
#include <fairlogger/Logger.h>
#include <fairmq/Channel.h>
#include <fairmq/Parts.h>
#include <fairmq/ProgOptions.h>
#include <fairmq/TransportFactory.h>
#include <fairmq/tools/Unique.h>
#include <gtest/gtest.h>
#include <memory>
#include <sstream>
#include <string>
@ -24,23 +23,22 @@ namespace
{
using namespace std;
using namespace fair::mq;
auto RunSingleThreadedMultipart(string transport, string address1, string address2) -> void {
size_t session{fair::mq::tools::UuidHash()};
fair::mq::ProgOptions config;
config.SetProperty<string>("session", std::to_string(session));
config.SetProperty<string>("session", tools::Uuid());
auto factory = FairMQTransportFactory::CreateTransportFactory(transport, fair::mq::tools::Uuid(), &config);
auto factory = TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config);
FairMQChannel push1("Push1", "push", factory);
Channel push1("Push1", "push", factory);
ASSERT_TRUE(push1.Bind(address1));
FairMQChannel pull1("Pull1", "pull", factory);
Channel pull1("Pull1", "pull", factory);
ASSERT_TRUE(pull1.Connect(address1));
FairMQChannel push2("Push2", "push", factory);
Channel push2("Push2", "push", factory);
ASSERT_TRUE(push2.Bind(address2));
FairMQChannel pull2("Pull2", "pull", factory);
Channel pull2("Pull2", "pull", factory);
ASSERT_TRUE(pull2.Connect(address2));
// TODO validate that fTransportFactory is not nullptr
@ -51,32 +49,32 @@ auto RunSingleThreadedMultipart(string transport, string address1, string addres
ASSERT_TRUE(pull2.Validate());
{
FairMQParts multiplePartsOut;
Parts multiplePartsOut;
multiplePartsOut.AddPart(push1.NewSimpleMessage("1"));
multiplePartsOut.AddPart(push1.NewSimpleMessage("2"));
multiplePartsOut.AddPart(push1.NewSimpleMessage("3"));
ASSERT_GE(push1.Send(multiplePartsOut), 0);
FairMQParts singlePartOut;
Parts singlePartOut;
singlePartOut.AddPart(push1.NewSimpleMessage("4"));
ASSERT_GE(push1.Send(singlePartOut), 0);
}
FairMQParts multipleParts;
Parts multipleParts;
ASSERT_GE(pull1.Receive(multipleParts), 0);
stringstream multiple;
for_each(multipleParts.cbegin(), multipleParts.cend(), [&multiple, &factory](const FairMQMessagePtr& part) {
for_each(multipleParts.cbegin(), multipleParts.cend(), [&multiple, &factory](const MessagePtr& part) {
multiple << string{static_cast<char*>(part->GetData()), part->GetSize()};
ASSERT_EQ(part->GetTransport(), factory.get());
});
ASSERT_EQ(multiple.str(), "123");
FairMQParts singlePart;
Parts singlePart;
ASSERT_GE(pull1.Receive(singlePart), 0);
stringstream single;
for_each(singlePart.cbegin(), singlePart.cend(), [&single](const FairMQMessagePtr& part) {
for_each(singlePart.cbegin(), singlePart.cend(), [&single](const MessagePtr& part) {
single << string{static_cast<char*>(part->GetData()), part->GetSize()};
});
ASSERT_EQ(single.str(), "4");
@ -85,18 +83,18 @@ auto RunSingleThreadedMultipart(string transport, string address1, string addres
ASSERT_GE(push2.Send(multipleParts), 0);
{
FairMQParts singlePartIn;
Parts singlePartIn;
ASSERT_GE(pull2.Receive(singlePartIn), 0);
stringstream singleIn;
for_each(singlePartIn.cbegin(), singlePartIn.cend(), [&singleIn](const FairMQMessagePtr& part) {
for_each(singlePartIn.cbegin(), singlePartIn.cend(), [&singleIn](const MessagePtr& part) {
singleIn << string{static_cast<char*>(part->GetData()), part->GetSize()};
});
ASSERT_EQ(singleIn.str(), "4");
FairMQParts multiplePartsIn;
Parts multiplePartsIn;
ASSERT_GE(pull2.Receive(multiplePartsIn), 0);
stringstream multipleIn;
for_each(multiplePartsIn.cbegin(), multiplePartsIn.cend(), [&multipleIn, &factory](const FairMQMessagePtr& part) {
for_each(multiplePartsIn.cbegin(), multiplePartsIn.cend(), [&multipleIn, &factory](const MessagePtr& part) {
multipleIn << string{static_cast<char*>(part->GetData()), part->GetSize()};
ASSERT_EQ(part->GetTransport(), factory.get());
});
@ -106,24 +104,22 @@ auto RunSingleThreadedMultipart(string transport, string address1, string addres
auto RunMultiThreadedMultipart(string transport, string address1) -> void
{
size_t session{fair::mq::tools::UuidHash()};
fair::mq::ProgOptions config;
config.SetProperty<string>("session", std::to_string(session));
ProgOptions config;
config.SetProperty<string>("session", tools::Uuid());
config.SetProperty<int>("io-threads", 1);
config.SetProperty<size_t>("shm-segment-size", 20000000);
config.SetProperty<size_t>("shm-segment-size", 20000000); // NOLINT
auto factory = FairMQTransportFactory::CreateTransportFactory(transport, fair::mq::tools::Uuid(), &config);
auto factory = TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config);
FairMQChannel push1("Push1", "push", factory);
Channel push1("Push1", "push", factory);
ASSERT_TRUE(push1.Bind(address1));
FairMQChannel pull1("Pull1", "pull", factory);
Channel pull1("Pull1", "pull", factory);
ASSERT_TRUE(pull1.Connect(address1));
auto pusher = thread{[&push1](){
ASSERT_TRUE(push1.Validate());
FairMQParts sent;
Parts sent;
sent.AddPart(push1.NewSimpleMessage("1"));
sent.AddPart(push1.NewSimpleMessage("2"));
sent.AddPart(push1.NewSimpleMessage("3"));
@ -134,11 +130,11 @@ auto RunMultiThreadedMultipart(string transport, string address1) -> void
auto puller = thread{[&pull1](){
ASSERT_TRUE(pull1.Validate());
FairMQParts received;
Parts received;
ASSERT_GE(pull1.Receive(received), 0);
stringstream out;
for_each(received.cbegin(), received.cend(), [&out](const FairMQMessagePtr& part) {
for_each(received.cbegin(), received.cend(), [&out](const MessagePtr& part) {
out << string{static_cast<char*>(part->GetData()), part->GetSize()};
});
ASSERT_EQ(out.str(), "123");
@ -148,42 +144,42 @@ auto RunMultiThreadedMultipart(string transport, string address1) -> void
puller.join();
}
TEST(PushPull, Multipart_ST_inproc_zeromq)
TEST(PushPull, Multipart_ST_inproc_zeromq) // NOLINT
{
RunSingleThreadedMultipart("zeromq", "inproc://test1", "inproc://test2");
}
TEST(PushPull, Multipart_ST_inproc_shmem)
TEST(PushPull, Multipart_ST_inproc_shmem) // NOLINT
{
RunSingleThreadedMultipart("shmem", "inproc://test1", "inproc://test2");
}
TEST(PushPull, Multipart_ST_ipc_zeromq)
TEST(PushPull, Multipart_ST_ipc_zeromq) // NOLINT
{
RunSingleThreadedMultipart("zeromq", "ipc://test_Multipart_ST_ipc_zeromq_1", "ipc://test_Multipart_ST_ipc_zeromq_2");
}
TEST(PushPull, Multipart_ST_ipc_shmem)
TEST(PushPull, Multipart_ST_ipc_shmem) // NOLINT
{
RunSingleThreadedMultipart("shmem", "ipc://test_Multipart_ST_ipc_shmem_1", "ipc://test_Multipart_ST_ipc_shmem_2");
}
TEST(PushPull, Multipart_MT_inproc_zeromq)
TEST(PushPull, Multipart_MT_inproc_zeromq) // NOLINT
{
RunMultiThreadedMultipart("zeromq", "inproc://test_1");
}
TEST(PushPull, Multipart_MT_inproc_shmem)
TEST(PushPull, Multipart_MT_inproc_shmem) // NOLINT
{
RunMultiThreadedMultipart("shmem", "inproc://test_1");
}
TEST(PushPull, Multipart_MT_ipc_zeromq)
TEST(PushPull, Multipart_MT_ipc_zeromq) // NOLINT
{
RunMultiThreadedMultipart("zeromq", "ipc://test_Multipart_MT_ipc_zeromq_1");
}
TEST(PushPull, Multipart_MT_ipc_shmem)
TEST(PushPull, Multipart_MT_ipc_shmem) // NOLINT
{
RunMultiThreadedMultipart("shmem", "ipc://test_Multipart_MT_ipc_shmem_1");
}

View File

@ -1,20 +1,20 @@
/********************************************************************************
* Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2017-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <gtest/gtest.h>
#include <FairMQChannel.h>
#include <FairMQParts.h>
#include <FairMQLogger.h>
#include <FairMQTransportFactory.h>
#include <fairmq/tools/Unique.h>
#include <fairmq/ProgOptions.h>
#include <algorithm>
#include <array>
#include <fairlogger/Logger.h>
#include <fairmq/Channel.h>
#include <fairmq/Parts.h>
#include <fairmq/ProgOptions.h>
#include <fairmq/TransportFactory.h>
#include <fairmq/tools/Unique.h>
#include <gtest/gtest.h>
#include <memory>
#include <sstream>
#include <string>
@ -24,25 +24,25 @@ namespace
{
using namespace std;
using namespace fair::mq;
void CheckOldOptionInterface(FairMQChannel& channel, const string& option)
void CheckOldOptionInterface(Channel& channel, const string& option)
{
int value = 500;
int const expectedValue{500};
int value = expectedValue;
channel.GetSocket().SetOption(option, &value, sizeof(value));
value = 0;
size_t valueSize = sizeof(value);
channel.GetSocket().GetOption(option, &value, &valueSize);
ASSERT_EQ(value, 500);
ASSERT_EQ(value, expectedValue);
}
void RunOptionsTest(const string& transport)
{
size_t session{fair::mq::tools::UuidHash()};
fair::mq::ProgOptions config;
config.SetProperty<string>("session", to_string(session));
auto factory = FairMQTransportFactory::CreateTransportFactory(transport, fair::mq::tools::Uuid(), &config);
FairMQChannel channel("Push", "push", factory);
config.SetProperty<string>("session", tools::Uuid());
auto factory = TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config);
Channel channel("Push", "push", factory);
CheckOldOptionInterface(channel, "linger");
CheckOldOptionInterface(channel, "snd-hwm");
@ -50,38 +50,37 @@ void RunOptionsTest(const string& transport)
CheckOldOptionInterface(channel, "snd-size");
CheckOldOptionInterface(channel, "rcv-size");
channel.GetSocket().SetLinger(300);
ASSERT_EQ(channel.GetSocket().GetLinger(), 300);
size_t const linger{300};
channel.GetSocket().SetLinger(linger);
ASSERT_EQ(channel.GetSocket().GetLinger(), linger);
channel.GetSocket().SetSndBufSize(500);
ASSERT_EQ(channel.GetSocket().GetSndBufSize(), 500);
size_t const bufSize{500};
channel.GetSocket().SetSndBufSize(bufSize);
ASSERT_EQ(channel.GetSocket().GetSndBufSize(), bufSize);
channel.GetSocket().SetRcvBufSize(bufSize);
ASSERT_EQ(channel.GetSocket().GetRcvBufSize(), bufSize);
channel.GetSocket().SetRcvBufSize(500);
ASSERT_EQ(channel.GetSocket().GetRcvBufSize(), 500);
channel.GetSocket().SetSndKernelSize(8000);
ASSERT_EQ(channel.GetSocket().GetSndKernelSize(), 8000);
channel.GetSocket().SetRcvKernelSize(8000);
ASSERT_EQ(channel.GetSocket().GetRcvKernelSize(), 8000);
size_t const kernelSize{8000};
channel.GetSocket().SetSndKernelSize(kernelSize);
ASSERT_EQ(channel.GetSocket().GetSndKernelSize(), kernelSize);
channel.GetSocket().SetRcvKernelSize(kernelSize);
ASSERT_EQ(channel.GetSocket().GetRcvKernelSize(), kernelSize);
}
void ZeroingAndMlock(const string& transport)
{
size_t session{fair::mq::tools::UuidHash()};
fair::mq::ProgOptions config;
config.SetProperty<string>("session", to_string(session));
config.SetProperty<size_t>("shm-segment-size", 16384);
ProgOptions config;
config.SetProperty<string>("session", tools::Uuid());
config.SetProperty<size_t>("shm-segment-size", 16384); // NOLINT
config.SetProperty<bool>("shm-zero-segment", true);
config.SetProperty<bool>("shm-mlock-segment", true);
auto factory = FairMQTransportFactory::CreateTransportFactory(transport, fair::mq::tools::Uuid(), &config);
auto factory = TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config);
FairMQMessagePtr outMsg(factory->CreateMessage(10000));
char test[10000];
memset(test, 0, sizeof(test));
ASSERT_EQ(memcmp(test, outMsg->GetData(), outMsg->GetSize()), 0);
constexpr size_t size{10000};
auto outMsg(factory->CreateMessage(size));
array<char, size> test{0};
ASSERT_EQ(memcmp(test.data(), outMsg->GetData(), outMsg->GetSize()), 0);
}
void ZeroingAndMlockOnCreation(const string& transport)
@ -90,29 +89,29 @@ void ZeroingAndMlockOnCreation(const string& transport)
fair::mq::ProgOptions config;
config.SetProperty<string>("session", to_string(session));
config.SetProperty<size_t>("shm-segment-size", 16384);
config.SetProperty<size_t>("shm-segment-size", 16384); // NOLINT
config.SetProperty<bool>("shm-mlock-segment-on-creation", true);
config.SetProperty<bool>("shm-zero-segment-on-creation", true);
auto factory = FairMQTransportFactory::CreateTransportFactory(transport, fair::mq::tools::Uuid(), &config);
FairMQMessagePtr outMsg(factory->CreateMessage(10000));
char test[10000];
memset(test, 0, sizeof(test));
ASSERT_EQ(memcmp(test, outMsg->GetData(), outMsg->GetSize()), 0);
constexpr size_t size{10000};
auto outMsg(factory->CreateMessage(size));
array<char, size> test{0};
ASSERT_EQ(memcmp(test.data(), outMsg->GetData(), outMsg->GetSize()), 0);
}
TEST(Options, zeromq)
TEST(Options, zeromq) // NOLINT
{
RunOptionsTest("zeromq");
}
TEST(Options, shmem)
TEST(Options, shmem) // NOLINT
{
RunOptionsTest("shmem");
}
TEST(ZeroingAndMlock, shmem)
TEST(ZeroingAndMlock, shmem) // NOLINT
{
ZeroingAndMlock("shmem");
}