diff --git a/fairmq/shmem/Socket.h b/fairmq/shmem/Socket.h index 05a273de..ae8d7f53 100644 --- a/fairmq/shmem/Socket.h +++ b/fairmq/shmem/Socket.h @@ -46,7 +46,7 @@ struct ZMsg class Socket final : public fair::mq::Socket { public: - Socket(Manager& manager, const std::string& type, const std::string& name, const std::string& id = "", void* context = nullptr, FairMQTransportFactory* fac = nullptr) + Socket(Manager& manager, const std::string& type, const std::string& name, const std::string& id, void* context, FairMQTransportFactory* fac = nullptr) : fair::mq::Socket(fac) , fSocket(nullptr) , fManager(manager) diff --git a/fairmq/shmem/TransportFactory.h b/fairmq/shmem/TransportFactory.h index 77f38405..39efff11 100644 --- a/fairmq/shmem/TransportFactory.h +++ b/fairmq/shmem/TransportFactory.h @@ -25,11 +25,9 @@ #include -#include +#include // unique_ptr #include -#include -#include -#include +#include namespace fair { @@ -45,7 +43,7 @@ class TransportFactory final : public fair::mq::TransportFactory : fair::mq::TransportFactory(id) , fDeviceId(id) , fShmId() - , fZMQContext(nullptr) + , fZMQContext(zmq_ctx_new()) , fManager(nullptr) { int major, minor, patch; @@ -53,7 +51,6 @@ class TransportFactory final : public fair::mq::TransportFactory LOG(debug) << "Transport: Using ZeroMQ (" << major << "." << minor << "." << patch << ") & " << "boost::interprocess (" << (BOOST_VERSION / 100000) << "." << (BOOST_VERSION / 100 % 1000) << "." << (BOOST_VERSION % 100) << ")"; - fZMQContext = zmq_ctx_new(); if (!fZMQContext) { throw std::runtime_error(tools::ToString("failed creating context, reason: ", zmq_strerror(errno))); } @@ -120,7 +117,6 @@ class TransportFactory final : public fair::mq::TransportFactory SocketPtr CreateSocket(const std::string& type, const std::string& name) override { - assert(fZMQContext); return tools::make_unique(*fManager, type, name, GetId(), fZMQContext, this); } @@ -141,43 +137,33 @@ class TransportFactory final : public fair::mq::TransportFactory UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) override { - return tools::make_unique(*fManager, size, 0, callback, nullptr, path, flags, this); + return CreateUnmanagedRegion(size, 0, callback, nullptr, path, flags); } UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionBulkCallback bulkCallback = nullptr, const std::string& path = "", int flags = 0) override { - return tools::make_unique(*fManager, size, 0, nullptr, bulkCallback, path, flags, this); + return CreateUnmanagedRegion(size, 0, nullptr, bulkCallback, path, flags); } UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) override { - return tools::make_unique(*fManager, size, userFlags, callback, nullptr, path, flags, this); + return CreateUnmanagedRegion(size, userFlags, callback, nullptr, path, flags); } UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionBulkCallback bulkCallback = nullptr, const std::string& path = "", int flags = 0) override { - return tools::make_unique(*fManager, size, userFlags, nullptr, bulkCallback, path, flags, this); + return CreateUnmanagedRegion(size, userFlags, nullptr, bulkCallback, path, flags); } - void SubscribeToRegionEvents(RegionEventCallback callback) override + UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string& path, int flags) { - fManager->SubscribeToRegionEvents(callback); + return tools::make_unique(*fManager, size, userFlags, callback, bulkCallback, path, flags, this); } - bool SubscribedToRegionEvents() override - { - return fManager->SubscribedToRegionEvents(); - } - - void UnsubscribeFromRegionEvents() override - { - fManager->UnsubscribeFromRegionEvents(); - } - - std::vector GetRegionInfo() override - { - return fManager->GetRegionInfo(); - } + void SubscribeToRegionEvents(RegionEventCallback callback) override { fManager->SubscribeToRegionEvents(callback); } + bool SubscribedToRegionEvents() override { return fManager->SubscribedToRegionEvents(); } + void UnsubscribeFromRegionEvents() override { fManager->UnsubscribeFromRegionEvents(); } + std::vector GetRegionInfo() override { return fManager->GetRegionInfo(); } Transport GetType() const override { return fair::mq::Transport::SHM; } diff --git a/fairmq/zeromq/Socket.h b/fairmq/zeromq/Socket.h index 32b7f20c..41069bb5 100644 --- a/fairmq/zeromq/Socket.h +++ b/fairmq/zeromq/Socket.h @@ -9,29 +9,24 @@ #ifndef FAIR_MQ_ZMQ_SOCKET_H #define FAIR_MQ_ZMQ_SOCKET_H +#include +#include +#include +#include +#include #include #include -#include -#include -#include -#include - +#include // unique_ptr #include -#include -#include // unique_ptr - -namespace fair -{ -namespace mq -{ -namespace zmq -{ +namespace fair { +namespace mq { +namespace zmq { class Socket final : public fair::mq::Socket { public: - Socket(Context& ctx, const std::string& type, const std::string& name, const std::string& id = "", FairMQTransportFactory* factory = nullptr) + Socket(Context& ctx, const std::string& type, const std::string& name, const std::string& id, FairMQTransportFactory* factory = nullptr) : fair::mq::Socket(factory) , fCtx(ctx) , fSocket(zmq_socket(fCtx.GetZmqCtx(), GetConstant(type))) @@ -43,39 +38,32 @@ class Socket final : public fair::mq::Socket , fSndTimeout(100) , fRcvTimeout(100) { - if (fSocket == nullptr) - { + if (fSocket == nullptr) { LOG(error) << "Failed creating socket " << fId << ", reason: " << zmq_strerror(errno); exit(EXIT_FAILURE); } - if (zmq_setsockopt(fSocket, ZMQ_IDENTITY, fId.c_str(), fId.length()) != 0) - { + if (zmq_setsockopt(fSocket, ZMQ_IDENTITY, fId.c_str(), fId.length()) != 0) { LOG(error) << "Failed setting ZMQ_IDENTITY socket option, reason: " << zmq_strerror(errno); } - // Tell socket to try and send/receive outstanding messages for milliseconds before terminating. - // Default value for ZeroMQ is -1, which is to wait forever. + // Tell socket to try and send/receive outstanding messages for milliseconds before + // terminating. Default value for ZeroMQ is -1, which is to wait forever. int linger = 1000; - if (zmq_setsockopt(fSocket, ZMQ_LINGER, &linger, sizeof(linger)) != 0) - { + if (zmq_setsockopt(fSocket, ZMQ_LINGER, &linger, sizeof(linger)) != 0) { LOG(error) << "Failed setting ZMQ_LINGER socket option, reason: " << zmq_strerror(errno); } - if (zmq_setsockopt(fSocket, ZMQ_SNDTIMEO, &fSndTimeout, sizeof(fSndTimeout)) != 0) - { + if (zmq_setsockopt(fSocket, ZMQ_SNDTIMEO, &fSndTimeout, sizeof(fSndTimeout)) != 0) { LOG(error) << "Failed setting ZMQ_SNDTIMEO socket option, reason: " << zmq_strerror(errno); } - if (zmq_setsockopt(fSocket, ZMQ_RCVTIMEO, &fRcvTimeout, sizeof(fRcvTimeout)) != 0) - { + if (zmq_setsockopt(fSocket, ZMQ_RCVTIMEO, &fRcvTimeout, sizeof(fRcvTimeout)) != 0) { LOG(error) << "Failed setting ZMQ_RCVTIMEO socket option, reason: " << zmq_strerror(errno); } - if (type == "sub") - { - if (zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, nullptr, 0) != 0) - { + if (type == "sub") { + if (zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, nullptr, 0) != 0) { LOG(error) << "Failed setting ZMQ_SUBSCRIBE socket option, reason: " << zmq_strerror(errno); } } @@ -92,10 +80,11 @@ class Socket final : public fair::mq::Socket { // LOG(info) << "bind socket " << fId << " on " << address; - if (zmq_bind(fSocket, address.c_str()) != 0) - { + if (zmq_bind(fSocket, address.c_str()) != 0) { if (errno == EADDRINUSE) { - // do not print error in this case, this is handled by FairMQDevice in case no connection could be established after trying a number of random ports from a range. + // do not print error in this case, this is handled by FairMQDevice in case no + // connection could be established after trying a number of random ports from a + // range. return false; } LOG(error) << "Failed binding socket " << fId << ", address: " << address << ", reason: " << zmq_strerror(errno); @@ -109,8 +98,7 @@ class Socket final : public fair::mq::Socket { // LOG(info) << "connect socket " << fId << " on " << address; - if (zmq_connect(fSocket, address.c_str()) != 0) - { + if (zmq_connect(fSocket, address.c_str()) != 0) { LOG(error) << "Failed connecting socket " << fId << ", address: " << address << ", reason: " << zmq_strerror(errno); return false; } @@ -219,9 +207,7 @@ class Socket final : public fair::mq::Socket for (unsigned int i = 0; i < vecSize; ++i) { static_cast(msgVec[i].get())->ApplyUsedSize(); - int nbytes = zmq_msg_send(static_cast(msgVec[i].get())->GetMessage(), - fSocket, - (i < vecSize - 1) ? ZMQ_SNDMORE|flags : flags); + int nbytes = zmq_msg_send(static_cast(msgVec[i].get())->GetMessage(), fSocket, (i < vecSize - 1) ? ZMQ_SNDMORE | flags : flags); if (nbytes >= 0) { totalSize += nbytes; } else { @@ -257,15 +243,16 @@ class Socket final : public fair::mq::Socket continue; } - // store statistics on how many messages have been sent (handle all parts as a single message) + // store statistics on how many messages have been sent (handle all parts as a + // single message) ++fMessagesTx; fBytesTx += totalSize; return totalSize; } - } // If there's only one part, send it as a regular message + } // If there's only one part, send it as a regular message else if (vecSize == 1) { return Send(msgVec.back(), timeout); - } else { // if the vector is empty, something might be wrong + } else { // if the vector is empty, something might be wrong LOG(warn) << "Will not send empty vector"; return -1; } @@ -319,7 +306,8 @@ class Socket final : public fair::mq::Socket continue; } - // store statistics on how many messages have been received (handle all parts as a single message) + // store statistics on how many messages have been received (handle all parts as a + // single message) ++fMessagesRx; fBytesRx += totalSize; return totalSize; @@ -332,13 +320,11 @@ class Socket final : public fair::mq::Socket { // LOG(debug) << "Closing socket " << fId; - if (fSocket == nullptr) - { + if (fSocket == nullptr) { return; } - if (zmq_close(fSocket) != 0) - { + if (zmq_close(fSocket) != 0) { LOG(error) << "Failed closing socket " << fId << ", reason: " << zmq_strerror(errno); } @@ -347,16 +333,14 @@ class Socket final : public fair::mq::Socket void SetOption(const std::string& option, const void* value, size_t valueSize) override { - if (zmq_setsockopt(fSocket, GetConstant(option), value, valueSize) < 0) - { + if (zmq_setsockopt(fSocket, GetConstant(option), value, valueSize) < 0) { LOG(error) << "Failed setting socket option, reason: " << zmq_strerror(errno); } } void GetOption(const std::string& option, void* value, size_t* valueSize) override { - if (zmq_getsockopt(fSocket, GetConstant(option), value, valueSize) < 0) - { + if (zmq_getsockopt(fSocket, GetConstant(option), value, valueSize) < 0) { LOG(error) << "Failed getting socket option, reason: " << zmq_strerror(errno); } } diff --git a/fairmq/zeromq/TransportFactory.h b/fairmq/zeromq/TransportFactory.h index df11c932..f4fc1f34 100644 --- a/fairmq/zeromq/TransportFactory.h +++ b/fairmq/zeromq/TransportFactory.h @@ -55,14 +55,17 @@ class TransportFactory final : public FairMQTransportFactory { return tools::make_unique(this); } + MessagePtr CreateMessage(const size_t size) override { return tools::make_unique(size, this); } + MessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override { return tools::make_unique(data, size, ffn, hint, this); } + MessagePtr CreateMessage(UnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0) override { return tools::make_unique(region, data, size, hint, this); @@ -77,10 +80,12 @@ class TransportFactory final : public FairMQTransportFactory { return tools::make_unique(channels); } + PollerPtr CreatePoller(const std::vector& channels) const override { return tools::make_unique(channels); } + PollerPtr CreatePoller(const std::unordered_map>& channelsMap, const std::vector& channelList) const override { return tools::make_unique(channelsMap, channelList); @@ -90,21 +95,25 @@ class TransportFactory final : public FairMQTransportFactory { return CreateUnmanagedRegion(size, 0, callback, nullptr, path, flags); } + UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0) override { return CreateUnmanagedRegion(size, 0, nullptr, bulkCallback, path, flags); } + UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionCallback callback, const std::string& path = "", int flags = 0) override { return CreateUnmanagedRegion(size, userFlags, callback, nullptr, path, flags); } + UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0) override { return CreateUnmanagedRegion(size, userFlags, nullptr, bulkCallback, path, flags); } - UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0) + + UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string&, int) { - UnmanagedRegionPtr ptr = tools::make_unique(*fCtx, size, userFlags, callback, bulkCallback, path, flags, this); + UnmanagedRegionPtr ptr = tools::make_unique(*fCtx, size, userFlags, callback, bulkCallback, this); auto zPtr = static_cast(ptr.get()); fCtx->AddRegion(zPtr->GetId(), zPtr->GetData(), zPtr->GetSize(), zPtr->GetUserFlags(), RegionEvent::created); return ptr; diff --git a/fairmq/zeromq/UnmanagedRegion.h b/fairmq/zeromq/UnmanagedRegion.h index 95cb2a38..e40bd971 100644 --- a/fairmq/zeromq/UnmanagedRegion.h +++ b/fairmq/zeromq/UnmanagedRegion.h @@ -30,13 +30,11 @@ class UnmanagedRegion final : public fair::mq::UnmanagedRegion public: UnmanagedRegion(Context& ctx, - size_t size, - int64_t userFlags, - RegionCallback callback, - RegionBulkCallback bulkCallback, - const std::string& /* path = "" */, - int /* flags = 0 */, - FairMQTransportFactory* factory = nullptr) + size_t size, + int64_t userFlags, + RegionCallback callback, + RegionBulkCallback bulkCallback, + FairMQTransportFactory* factory = nullptr) : fair::mq::UnmanagedRegion(factory) , fCtx(ctx) , fId(fCtx.RegionCount())