Minor formatting

This commit is contained in:
Alexey Rybalchenko 2020-05-17 13:05:52 +02:00
parent 361fb0cba5
commit 4b516de81a
5 changed files with 65 additions and 88 deletions

View File

@ -46,7 +46,7 @@ struct ZMsg
class Socket final : public fair::mq::Socket class Socket final : public fair::mq::Socket
{ {
public: public:
Socket(Manager& manager, const std::string& type, const std::string& name, const std::string& id = "", void* context = nullptr, FairMQTransportFactory* fac = nullptr) Socket(Manager& manager, const std::string& type, const std::string& name, const std::string& id, void* context, FairMQTransportFactory* fac = nullptr)
: fair::mq::Socket(fac) : fair::mq::Socket(fac)
, fSocket(nullptr) , fSocket(nullptr)
, fManager(manager) , fManager(manager)

View File

@ -25,11 +25,9 @@
#include <zmq.h> #include <zmq.h>
#include <vector> #include <memory> // unique_ptr
#include <string> #include <string>
#include <atomic> #include <vector>
#include <chrono>
#include <thread>
namespace fair namespace fair
{ {
@ -45,7 +43,7 @@ class TransportFactory final : public fair::mq::TransportFactory
: fair::mq::TransportFactory(id) : fair::mq::TransportFactory(id)
, fDeviceId(id) , fDeviceId(id)
, fShmId() , fShmId()
, fZMQContext(nullptr) , fZMQContext(zmq_ctx_new())
, fManager(nullptr) , fManager(nullptr)
{ {
int major, minor, patch; int major, minor, patch;
@ -53,7 +51,6 @@ class TransportFactory final : public fair::mq::TransportFactory
LOG(debug) << "Transport: Using ZeroMQ (" << major << "." << minor << "." << patch << ") & " LOG(debug) << "Transport: Using ZeroMQ (" << major << "." << minor << "." << patch << ") & "
<< "boost::interprocess (" << (BOOST_VERSION / 100000) << "." << (BOOST_VERSION / 100 % 1000) << "." << (BOOST_VERSION % 100) << ")"; << "boost::interprocess (" << (BOOST_VERSION / 100000) << "." << (BOOST_VERSION / 100 % 1000) << "." << (BOOST_VERSION % 100) << ")";
fZMQContext = zmq_ctx_new();
if (!fZMQContext) { if (!fZMQContext) {
throw std::runtime_error(tools::ToString("failed creating context, reason: ", zmq_strerror(errno))); throw std::runtime_error(tools::ToString("failed creating context, reason: ", zmq_strerror(errno)));
} }
@ -120,7 +117,6 @@ class TransportFactory final : public fair::mq::TransportFactory
SocketPtr CreateSocket(const std::string& type, const std::string& name) override SocketPtr CreateSocket(const std::string& type, const std::string& name) override
{ {
assert(fZMQContext);
return tools::make_unique<Socket>(*fManager, type, name, GetId(), fZMQContext, this); return tools::make_unique<Socket>(*fManager, type, name, GetId(), fZMQContext, this);
} }
@ -141,43 +137,33 @@ class TransportFactory final : public fair::mq::TransportFactory
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) override UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) override
{ {
return tools::make_unique<UnmanagedRegion>(*fManager, size, 0, callback, nullptr, path, flags, this); return CreateUnmanagedRegion(size, 0, callback, nullptr, path, flags);
} }
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionBulkCallback bulkCallback = nullptr, const std::string& path = "", int flags = 0) override UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionBulkCallback bulkCallback = nullptr, const std::string& path = "", int flags = 0) override
{ {
return tools::make_unique<UnmanagedRegion>(*fManager, size, 0, nullptr, bulkCallback, path, flags, this); return CreateUnmanagedRegion(size, 0, nullptr, bulkCallback, path, flags);
} }
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) override UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) override
{ {
return tools::make_unique<UnmanagedRegion>(*fManager, size, userFlags, callback, nullptr, path, flags, this); return CreateUnmanagedRegion(size, userFlags, callback, nullptr, path, flags);
} }
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionBulkCallback bulkCallback = nullptr, const std::string& path = "", int flags = 0) override UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionBulkCallback bulkCallback = nullptr, const std::string& path = "", int flags = 0) override
{ {
return tools::make_unique<UnmanagedRegion>(*fManager, size, userFlags, nullptr, bulkCallback, path, flags, this); return CreateUnmanagedRegion(size, userFlags, nullptr, bulkCallback, path, flags);
} }
void SubscribeToRegionEvents(RegionEventCallback callback) override UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string& path, int flags)
{ {
fManager->SubscribeToRegionEvents(callback); return tools::make_unique<UnmanagedRegion>(*fManager, size, userFlags, callback, bulkCallback, path, flags, this);
} }
bool SubscribedToRegionEvents() override void SubscribeToRegionEvents(RegionEventCallback callback) override { fManager->SubscribeToRegionEvents(callback); }
{ bool SubscribedToRegionEvents() override { return fManager->SubscribedToRegionEvents(); }
return fManager->SubscribedToRegionEvents(); void UnsubscribeFromRegionEvents() override { fManager->UnsubscribeFromRegionEvents(); }
} std::vector<fair::mq::RegionInfo> GetRegionInfo() override { return fManager->GetRegionInfo(); }
void UnsubscribeFromRegionEvents() override
{
fManager->UnsubscribeFromRegionEvents();
}
std::vector<fair::mq::RegionInfo> GetRegionInfo() override
{
return fManager->GetRegionInfo();
}
Transport GetType() const override { return fair::mq::Transport::SHM; } Transport GetType() const override { return fair::mq::Transport::SHM; }

View File

@ -9,29 +9,24 @@
#ifndef FAIR_MQ_ZMQ_SOCKET_H #ifndef FAIR_MQ_ZMQ_SOCKET_H
#define FAIR_MQ_ZMQ_SOCKET_H #define FAIR_MQ_ZMQ_SOCKET_H
#include <FairMQLogger.h>
#include <FairMQMessage.h>
#include <FairMQSocket.h>
#include <atomic>
#include <fairmq/Tools.h>
#include <fairmq/zeromq/Context.h> #include <fairmq/zeromq/Context.h>
#include <fairmq/zeromq/Message.h> #include <fairmq/zeromq/Message.h>
#include <fairmq/Tools.h> #include <memory> // unique_ptr
#include <FairMQLogger.h>
#include <FairMQSocket.h>
#include <FairMQMessage.h>
#include <zmq.h> #include <zmq.h>
#include <atomic> namespace fair {
#include <memory> // unique_ptr namespace mq {
namespace zmq {
namespace fair
{
namespace mq
{
namespace zmq
{
class Socket final : public fair::mq::Socket class Socket final : public fair::mq::Socket
{ {
public: public:
Socket(Context& ctx, const std::string& type, const std::string& name, const std::string& id = "", FairMQTransportFactory* factory = nullptr) Socket(Context& ctx, const std::string& type, const std::string& name, const std::string& id, FairMQTransportFactory* factory = nullptr)
: fair::mq::Socket(factory) : fair::mq::Socket(factory)
, fCtx(ctx) , fCtx(ctx)
, fSocket(zmq_socket(fCtx.GetZmqCtx(), GetConstant(type))) , fSocket(zmq_socket(fCtx.GetZmqCtx(), GetConstant(type)))
@ -43,39 +38,32 @@ class Socket final : public fair::mq::Socket
, fSndTimeout(100) , fSndTimeout(100)
, fRcvTimeout(100) , fRcvTimeout(100)
{ {
if (fSocket == nullptr) if (fSocket == nullptr) {
{
LOG(error) << "Failed creating socket " << fId << ", reason: " << zmq_strerror(errno); LOG(error) << "Failed creating socket " << fId << ", reason: " << zmq_strerror(errno);
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
if (zmq_setsockopt(fSocket, ZMQ_IDENTITY, fId.c_str(), fId.length()) != 0) if (zmq_setsockopt(fSocket, ZMQ_IDENTITY, fId.c_str(), fId.length()) != 0) {
{
LOG(error) << "Failed setting ZMQ_IDENTITY socket option, reason: " << zmq_strerror(errno); LOG(error) << "Failed setting ZMQ_IDENTITY socket option, reason: " << zmq_strerror(errno);
} }
// Tell socket to try and send/receive outstanding messages for <linger> milliseconds before terminating. // Tell socket to try and send/receive outstanding messages for <linger> milliseconds before
// Default value for ZeroMQ is -1, which is to wait forever. // terminating. Default value for ZeroMQ is -1, which is to wait forever.
int linger = 1000; int linger = 1000;
if (zmq_setsockopt(fSocket, ZMQ_LINGER, &linger, sizeof(linger)) != 0) if (zmq_setsockopt(fSocket, ZMQ_LINGER, &linger, sizeof(linger)) != 0) {
{
LOG(error) << "Failed setting ZMQ_LINGER socket option, reason: " << zmq_strerror(errno); LOG(error) << "Failed setting ZMQ_LINGER socket option, reason: " << zmq_strerror(errno);
} }
if (zmq_setsockopt(fSocket, ZMQ_SNDTIMEO, &fSndTimeout, sizeof(fSndTimeout)) != 0) if (zmq_setsockopt(fSocket, ZMQ_SNDTIMEO, &fSndTimeout, sizeof(fSndTimeout)) != 0) {
{
LOG(error) << "Failed setting ZMQ_SNDTIMEO socket option, reason: " << zmq_strerror(errno); LOG(error) << "Failed setting ZMQ_SNDTIMEO socket option, reason: " << zmq_strerror(errno);
} }
if (zmq_setsockopt(fSocket, ZMQ_RCVTIMEO, &fRcvTimeout, sizeof(fRcvTimeout)) != 0) if (zmq_setsockopt(fSocket, ZMQ_RCVTIMEO, &fRcvTimeout, sizeof(fRcvTimeout)) != 0) {
{
LOG(error) << "Failed setting ZMQ_RCVTIMEO socket option, reason: " << zmq_strerror(errno); LOG(error) << "Failed setting ZMQ_RCVTIMEO socket option, reason: " << zmq_strerror(errno);
} }
if (type == "sub") if (type == "sub") {
{ if (zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, nullptr, 0) != 0) {
if (zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, nullptr, 0) != 0)
{
LOG(error) << "Failed setting ZMQ_SUBSCRIBE socket option, reason: " << zmq_strerror(errno); LOG(error) << "Failed setting ZMQ_SUBSCRIBE socket option, reason: " << zmq_strerror(errno);
} }
} }
@ -92,10 +80,11 @@ class Socket final : public fair::mq::Socket
{ {
// LOG(info) << "bind socket " << fId << " on " << address; // LOG(info) << "bind socket " << fId << " on " << address;
if (zmq_bind(fSocket, address.c_str()) != 0) if (zmq_bind(fSocket, address.c_str()) != 0) {
{
if (errno == EADDRINUSE) { if (errno == EADDRINUSE) {
// do not print error in this case, this is handled by FairMQDevice in case no connection could be established after trying a number of random ports from a range. // do not print error in this case, this is handled by FairMQDevice in case no
// connection could be established after trying a number of random ports from a
// range.
return false; return false;
} }
LOG(error) << "Failed binding socket " << fId << ", address: " << address << ", reason: " << zmq_strerror(errno); LOG(error) << "Failed binding socket " << fId << ", address: " << address << ", reason: " << zmq_strerror(errno);
@ -109,8 +98,7 @@ class Socket final : public fair::mq::Socket
{ {
// LOG(info) << "connect socket " << fId << " on " << address; // LOG(info) << "connect socket " << fId << " on " << address;
if (zmq_connect(fSocket, address.c_str()) != 0) if (zmq_connect(fSocket, address.c_str()) != 0) {
{
LOG(error) << "Failed connecting socket " << fId << ", address: " << address << ", reason: " << zmq_strerror(errno); LOG(error) << "Failed connecting socket " << fId << ", address: " << address << ", reason: " << zmq_strerror(errno);
return false; return false;
} }
@ -219,9 +207,7 @@ class Socket final : public fair::mq::Socket
for (unsigned int i = 0; i < vecSize; ++i) { for (unsigned int i = 0; i < vecSize; ++i) {
static_cast<Message*>(msgVec[i].get())->ApplyUsedSize(); static_cast<Message*>(msgVec[i].get())->ApplyUsedSize();
int nbytes = zmq_msg_send(static_cast<Message*>(msgVec[i].get())->GetMessage(), int nbytes = zmq_msg_send(static_cast<Message*>(msgVec[i].get())->GetMessage(), fSocket, (i < vecSize - 1) ? ZMQ_SNDMORE | flags : flags);
fSocket,
(i < vecSize - 1) ? ZMQ_SNDMORE|flags : flags);
if (nbytes >= 0) { if (nbytes >= 0) {
totalSize += nbytes; totalSize += nbytes;
} else { } else {
@ -257,15 +243,16 @@ class Socket final : public fair::mq::Socket
continue; continue;
} }
// store statistics on how many messages have been sent (handle all parts as a single message) // store statistics on how many messages have been sent (handle all parts as a
// single message)
++fMessagesTx; ++fMessagesTx;
fBytesTx += totalSize; fBytesTx += totalSize;
return totalSize; return totalSize;
} }
} // If there's only one part, send it as a regular message } // If there's only one part, send it as a regular message
else if (vecSize == 1) { else if (vecSize == 1) {
return Send(msgVec.back(), timeout); return Send(msgVec.back(), timeout);
} else { // if the vector is empty, something might be wrong } else { // if the vector is empty, something might be wrong
LOG(warn) << "Will not send empty vector"; LOG(warn) << "Will not send empty vector";
return -1; return -1;
} }
@ -319,7 +306,8 @@ class Socket final : public fair::mq::Socket
continue; continue;
} }
// store statistics on how many messages have been received (handle all parts as a single message) // store statistics on how many messages have been received (handle all parts as a
// single message)
++fMessagesRx; ++fMessagesRx;
fBytesRx += totalSize; fBytesRx += totalSize;
return totalSize; return totalSize;
@ -332,13 +320,11 @@ class Socket final : public fair::mq::Socket
{ {
// LOG(debug) << "Closing socket " << fId; // LOG(debug) << "Closing socket " << fId;
if (fSocket == nullptr) if (fSocket == nullptr) {
{
return; return;
} }
if (zmq_close(fSocket) != 0) if (zmq_close(fSocket) != 0) {
{
LOG(error) << "Failed closing socket " << fId << ", reason: " << zmq_strerror(errno); LOG(error) << "Failed closing socket " << fId << ", reason: " << zmq_strerror(errno);
} }
@ -347,16 +333,14 @@ class Socket final : public fair::mq::Socket
void SetOption(const std::string& option, const void* value, size_t valueSize) override void SetOption(const std::string& option, const void* value, size_t valueSize) override
{ {
if (zmq_setsockopt(fSocket, GetConstant(option), value, valueSize) < 0) if (zmq_setsockopt(fSocket, GetConstant(option), value, valueSize) < 0) {
{
LOG(error) << "Failed setting socket option, reason: " << zmq_strerror(errno); LOG(error) << "Failed setting socket option, reason: " << zmq_strerror(errno);
} }
} }
void GetOption(const std::string& option, void* value, size_t* valueSize) override void GetOption(const std::string& option, void* value, size_t* valueSize) override
{ {
if (zmq_getsockopt(fSocket, GetConstant(option), value, valueSize) < 0) if (zmq_getsockopt(fSocket, GetConstant(option), value, valueSize) < 0) {
{
LOG(error) << "Failed getting socket option, reason: " << zmq_strerror(errno); LOG(error) << "Failed getting socket option, reason: " << zmq_strerror(errno);
} }
} }

View File

@ -55,14 +55,17 @@ class TransportFactory final : public FairMQTransportFactory
{ {
return tools::make_unique<Message>(this); return tools::make_unique<Message>(this);
} }
MessagePtr CreateMessage(const size_t size) override MessagePtr CreateMessage(const size_t size) override
{ {
return tools::make_unique<Message>(size, this); return tools::make_unique<Message>(size, this);
} }
MessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override MessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override
{ {
return tools::make_unique<Message>(data, size, ffn, hint, this); return tools::make_unique<Message>(data, size, ffn, hint, this);
} }
MessagePtr CreateMessage(UnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0) override MessagePtr CreateMessage(UnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0) override
{ {
return tools::make_unique<Message>(region, data, size, hint, this); return tools::make_unique<Message>(region, data, size, hint, this);
@ -77,10 +80,12 @@ class TransportFactory final : public FairMQTransportFactory
{ {
return tools::make_unique<Poller>(channels); return tools::make_unique<Poller>(channels);
} }
PollerPtr CreatePoller(const std::vector<FairMQChannel*>& channels) const override PollerPtr CreatePoller(const std::vector<FairMQChannel*>& channels) const override
{ {
return tools::make_unique<Poller>(channels); return tools::make_unique<Poller>(channels);
} }
PollerPtr CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const override PollerPtr CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const override
{ {
return tools::make_unique<Poller>(channelsMap, channelList); return tools::make_unique<Poller>(channelsMap, channelList);
@ -90,21 +95,25 @@ class TransportFactory final : public FairMQTransportFactory
{ {
return CreateUnmanagedRegion(size, 0, callback, nullptr, path, flags); return CreateUnmanagedRegion(size, 0, callback, nullptr, path, flags);
} }
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0) override UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0) override
{ {
return CreateUnmanagedRegion(size, 0, nullptr, bulkCallback, path, flags); return CreateUnmanagedRegion(size, 0, nullptr, bulkCallback, path, flags);
} }
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionCallback callback, const std::string& path = "", int flags = 0) override UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionCallback callback, const std::string& path = "", int flags = 0) override
{ {
return CreateUnmanagedRegion(size, userFlags, callback, nullptr, path, flags); return CreateUnmanagedRegion(size, userFlags, callback, nullptr, path, flags);
} }
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0) override UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0) override
{ {
return CreateUnmanagedRegion(size, userFlags, nullptr, bulkCallback, path, flags); return CreateUnmanagedRegion(size, userFlags, nullptr, bulkCallback, path, flags);
} }
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0)
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string&, int)
{ {
UnmanagedRegionPtr ptr = tools::make_unique<UnmanagedRegion>(*fCtx, size, userFlags, callback, bulkCallback, path, flags, this); UnmanagedRegionPtr ptr = tools::make_unique<UnmanagedRegion>(*fCtx, size, userFlags, callback, bulkCallback, this);
auto zPtr = static_cast<UnmanagedRegion*>(ptr.get()); auto zPtr = static_cast<UnmanagedRegion*>(ptr.get());
fCtx->AddRegion(zPtr->GetId(), zPtr->GetData(), zPtr->GetSize(), zPtr->GetUserFlags(), RegionEvent::created); fCtx->AddRegion(zPtr->GetId(), zPtr->GetData(), zPtr->GetSize(), zPtr->GetUserFlags(), RegionEvent::created);
return ptr; return ptr;

View File

@ -30,13 +30,11 @@ class UnmanagedRegion final : public fair::mq::UnmanagedRegion
public: public:
UnmanagedRegion(Context& ctx, UnmanagedRegion(Context& ctx,
size_t size, size_t size,
int64_t userFlags, int64_t userFlags,
RegionCallback callback, RegionCallback callback,
RegionBulkCallback bulkCallback, RegionBulkCallback bulkCallback,
const std::string& /* path = "" */, FairMQTransportFactory* factory = nullptr)
int /* flags = 0 */,
FairMQTransportFactory* factory = nullptr)
: fair::mq::UnmanagedRegion(factory) : fair::mq::UnmanagedRegion(factory)
, fCtx(ctx) , fCtx(ctx)
, fId(fCtx.RegionCount()) , fId(fCtx.RegionCount())