From 732373faa2fb5fd20bf4ed3a500d98cc975b474a Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Mon, 21 Mar 2016 09:59:00 +0100 Subject: [PATCH] Update multi-part features (nanomsg) and various fixes - Implement nanomsg multipart with MessagePack. - Use the MessagePack from FairSoft and handle not found case. - Update splitter, merger and proxy devices to handle multi-part. - Let FairMQParts.At() return pointer reference (can be used for moving). - Add missing const specifier in the message interface. - Add transmit kernel size setting to channels (ZMQ_SNDBUF). - Remove FairMQBuffer device. - Remove old multi-part methods from Tutorial3 example (to be replaced with Parts API). - Make callback mandatory for newMsg(data, size, callback). - Add missing include in FairMQSocket. --- fairmq/CMakeLists.txt | 11 +- fairmq/FairMQChannel.cxx | 194 +++++++++++++--------- fairmq/FairMQChannel.h | 14 ++ fairmq/FairMQDevice.h | 2 +- fairmq/FairMQMessage.h | 6 +- fairmq/FairMQParts.h | 15 +- fairmq/FairMQSocket.h | 4 + fairmq/FairMQTransportFactory.h | 2 +- fairmq/devices/FairMQBenchmarkSampler.cxx | 3 +- fairmq/devices/FairMQBuffer.cxx | 46 ----- fairmq/devices/FairMQBuffer.h | 30 ---- fairmq/devices/FairMQMerger.cxx | 26 +-- fairmq/devices/FairMQProxy.cxx | 21 ++- fairmq/devices/FairMQSplitter.cxx | 32 ++-- fairmq/nanomsg/FairMQMessageNN.cxx | 15 +- fairmq/nanomsg/FairMQMessageNN.h | 4 +- fairmq/nanomsg/FairMQSocketNN.cxx | 129 +++++++++++++- fairmq/nanomsg/FairMQSocketNN.h | 5 + fairmq/nanomsg/FairMQTransportFactoryNN.h | 2 +- fairmq/run/runBuffer.cxx | 139 ---------------- fairmq/zeromq/FairMQMessageZMQ.cxx | 13 +- fairmq/zeromq/FairMQMessageZMQ.h | 6 +- fairmq/zeromq/FairMQSocketZMQ.cxx | 93 ++++++++++- fairmq/zeromq/FairMQSocketZMQ.h | 3 + fairmq/zeromq/FairMQTransportFactoryZMQ.h | 2 +- 25 files changed, 436 insertions(+), 381 deletions(-) delete mode 100644 fairmq/devices/FairMQBuffer.cxx delete mode 100644 fairmq/devices/FairMQBuffer.h delete mode 100644 fairmq/run/runBuffer.cxx diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index c9d0535a..cc634416 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -37,6 +37,13 @@ If(NANOMSG_FOUND) ${SYSTEM_INCLUDE_DIRECTORIES} ${NANOMSG_INCLUDE_DIR} ) + If(MSGPACK_FOUND) + add_definitions(-DMSGPACK_FOUND) + Set(SYSTEM_INCLUDE_DIRECTORIES + ${SYSTEM_INCLUDE_DIRECTORIES} + ${MSGPACK_INCLUDE_DIR} + ) + EndIf(MSGPACK_FOUND) EndIf(NANOMSG_FOUND) Include_Directories(${INCLUDE_DIRECTORIES}) @@ -67,7 +74,6 @@ Set(SRCS "devices/FairMQBenchmarkSampler.cxx" "devices/FairMQSink.cxx" - "devices/FairMQBuffer.cxx" "devices/FairMQProxy.cxx" "devices/FairMQSplitter.cxx" "devices/FairMQMerger.cxx" @@ -126,6 +132,7 @@ If(NANOMSG_FOUND) Set(DEPENDENCIES ${DEPENDENCIES} ${NANOMSG_LIBRARY_SHARED} + # msgpackc # currently header only ) EndIf(NANOMSG_FOUND) @@ -136,7 +143,6 @@ GENERATE_LIBRARY() Set(Exe_Names bsampler sink - buffer splitter merger proxy @@ -145,7 +151,6 @@ Set(Exe_Names Set(Exe_Source run/runBenchmarkSampler.cxx run/runSink.cxx - run/runBuffer.cxx run/runSplitter.cxx run/runMerger.cxx run/runProxy.cxx diff --git a/fairmq/FairMQChannel.cxx b/fairmq/FairMQChannel.cxx index 7258061a..7c1a29ab 100644 --- a/fairmq/FairMQChannel.cxx +++ b/fairmq/FairMQChannel.cxx @@ -30,6 +30,8 @@ FairMQChannel::FairMQChannel() , fAddress("unspecified") , fSndBufSize(1000) , fRcvBufSize(1000) + , fSndKernelSize(0) + , fRcvKernelSize(0) , fRateLogging(1) , fChannelName("") , fIsValid(false) @@ -50,6 +52,8 @@ FairMQChannel::FairMQChannel(const string& type, const string& method, const str , fAddress(address) , fSndBufSize(1000) , fRcvBufSize(1000) + , fSndKernelSize(0) + , fRcvKernelSize(0) , fRateLogging(1) , fChannelName("") , fIsValid(false) @@ -70,6 +74,8 @@ FairMQChannel::FairMQChannel(const FairMQChannel& chan) , fAddress(chan.fAddress) , fSndBufSize(chan.fSndBufSize) , fRcvBufSize(chan.fRcvBufSize) + , fSndKernelSize(chan.fSndKernelSize) + , fRcvKernelSize(chan.fRcvKernelSize) , fRateLogging(chan.fRateLogging) , fChannelName(chan.fChannelName) , fIsValid(false) @@ -89,6 +95,8 @@ FairMQChannel& FairMQChannel::operator=(const FairMQChannel& chan) fAddress = chan.fAddress; fSndBufSize = chan.fSndBufSize; fRcvBufSize = chan.fRcvBufSize; + fSndKernelSize = chan.fSndKernelSize; + fRcvKernelSize = chan.fRcvKernelSize; fRateLogging = chan.fRateLogging; fSocket = nullptr; fChannelName = chan.fChannelName; @@ -174,6 +182,34 @@ int FairMQChannel::GetRcvBufSize() const } } +int FairMQChannel::GetSndKernelSize() const +{ + try + { + boost::unique_lock scoped_lock(fChannelMutex); + return fSndKernelSize; + } + catch (boost::exception& e) + { + LOG(ERROR) << "Exception caught in FairMQChannel::GetSndKernelSize: " << boost::diagnostic_information(e); + exit(EXIT_FAILURE); + } +} + +int FairMQChannel::GetRcvKernelSize() const +{ + try + { + boost::unique_lock scoped_lock(fChannelMutex); + return fRcvKernelSize; + } + catch (boost::exception& e) + { + LOG(ERROR) << "Exception caught in FairMQChannel::GetRcvKernelSize: " << boost::diagnostic_information(e); + exit(EXIT_FAILURE); + } +} + int FairMQChannel::GetRateLogging() const { try @@ -263,6 +299,36 @@ void FairMQChannel::UpdateRcvBufSize(const int rcvBufSize) } } +void FairMQChannel::UpdateSndKernelSize(const int sndKernelSize) +{ + try + { + boost::unique_lock scoped_lock(fChannelMutex); + fIsValid = false; + fSndKernelSize = sndKernelSize; + } + catch (boost::exception& e) + { + LOG(ERROR) << "Exception caught in FairMQChannel::UpdateSndKernelSize: " << boost::diagnostic_information(e); + exit(EXIT_FAILURE); + } +} + +void FairMQChannel::UpdateRcvKernelSize(const int rcvKernelSize) +{ + try + { + boost::unique_lock scoped_lock(fChannelMutex); + fIsValid = false; + fRcvKernelSize = rcvKernelSize; + } + catch (boost::exception& e) + { + LOG(ERROR) << "Exception caught in FairMQChannel::UpdateRcvKernelSize: " << boost::diagnostic_information(e); + exit(EXIT_FAILURE); + } +} + void FairMQChannel::UpdateRateLogging(const int rateLogging) { try @@ -393,6 +459,24 @@ bool FairMQChannel::ValidateChannel() return false; } + // validate socket kernel transmit size for sending + if (fSndKernelSize < 0) + { + ss << "INVALID"; + LOG(DEBUG) << ss.str(); + LOG(DEBUG) << "invalid channel send kernel transmit size: \"" << fSndKernelSize << "\""; + return false; + } + + // validate socket kernel transmit size for receiving + if (fRcvKernelSize < 0) + { + ss << "INVALID"; + LOG(DEBUG) << ss.str(); + LOG(DEBUG) << "invalid channel receive kernel transmit size: \"" << fRcvKernelSize << "\""; + return false; + } + fIsValid = true; ss << "VALID"; LOG(DEBUG) << ss.str(); @@ -451,80 +535,6 @@ int FairMQChannel::Send(const unique_ptr& msg) const return -2; } -int64_t FairMQChannel::Send(const std::vector>& msgVec) const -{ - // Sending vector typicaly handles more then one part - if (msgVec.size() > 1) - { - int64_t totalSize = 0; - - for (unsigned int i = 0; i < msgVec.size() - 1; ++i) - { - int nbytes = SendPart(msgVec[i]); - if (nbytes >= 0) - { - totalSize += nbytes; - } - else - { - return nbytes; - } - } - - int n = Send(msgVec.back()); - if (n >= 0) - { - totalSize += n; - } - else - { - return n; - } - - return totalSize; - } // If there's only one part, send it as a regular message - else if (msgVec.size() == 1) - { - return Send(msgVec.back()); - } - else // if the vector is empty, something might be wrong - { - LOG(WARN) << "Will not send empty vector"; - return -1; - } -} - -int64_t FairMQChannel::Receive(std::vector>& msgVec) const -{ - // Warn if the vector is filled before Receive() and empty it. - if (msgVec.size() > 0) - { - LOG(WARN) << "Message vector contains elements before Receive(), they will be deleted!"; - msgVec.clear(); - } - - int64_t totalSize = 0; - - do - { - std::unique_ptr part(fTransportFactory->CreateMessage()); - - int nbytes = Receive(part); - if (nbytes >= 0) - { - msgVec.push_back(std::move(part)); - totalSize += nbytes; - } - else - { - return nbytes; - } - } - while (ExpectsAnotherPart()); - - return totalSize; -} - int FairMQChannel::Receive(const unique_ptr& msg) const { fPoller->Poll(fRcvTimeoutInMs); @@ -543,6 +553,42 @@ int FairMQChannel::Receive(const unique_ptr& msg) const return -2; } +int64_t FairMQChannel::Send(const std::vector>& msgVec) const +{ + fPoller->Poll(fSndTimeoutInMs); + + if (fPoller->CheckInput(0)) + { + HandleUnblock(); + return -2; + } + + if (fPoller->CheckOutput(1)) + { + return fSocket->Send(msgVec); + } + + return -2; +} + +int64_t FairMQChannel::Receive(std::vector>& msgVec) const +{ + fPoller->Poll(fRcvTimeoutInMs); + + if (fPoller->CheckInput(0)) + { + HandleUnblock(); + return -2; + } + + if (fPoller->CheckInput(1)) + { + return fSocket->Receive(msgVec); + } + + return -2; +} + int FairMQChannel::Send(FairMQMessage* msg, const string& flag) const { if (flag == "") diff --git a/fairmq/FairMQChannel.h b/fairmq/FairMQChannel.h index f32b7442..31955159 100644 --- a/fairmq/FairMQChannel.h +++ b/fairmq/FairMQChannel.h @@ -65,6 +65,12 @@ class FairMQChannel /// Get socket receive buffer size (in number of messages) /// @return Returns socket receive buffer size (in number of messages) int GetRcvBufSize() const; + /// Get socket kernel transmit send buffer size (in bytes) + /// @return Returns socket kernel transmit send buffer size (in bytes) + int GetSndKernelSize() const; + /// Get socket kernel transmit receive buffer size (in bytes) + /// @return Returns socket kernel transmit receive buffer size (in bytes) + int GetRcvKernelSize() const; /// Get socket rate logging setting (1/0) /// @return Returns socket rate logging setting (1/0) int GetRateLogging() const; @@ -84,6 +90,12 @@ class FairMQChannel /// Set socket receive buffer size /// @param rcvBufSize Socket receive buffer size (in number of messages) void UpdateRcvBufSize(const int rcvBufSize); + /// Set socket kernel transmit send buffer size (in bytes) + /// @param sndKernelSize Socket send buffer size (in bytes) + void UpdateSndKernelSize(const int sndKernelSize); + /// Set socket kernel transmit receive buffer size (in bytes) + /// @param rcvKernelSize Socket receive buffer size (in bytes) + void UpdateRcvKernelSize(const int rcvKernelSize); /// Set socket rate logging setting /// @param rateLogging Socket rate logging setting (1/0) void UpdateRateLogging(const int rateLogging); @@ -223,6 +235,8 @@ class FairMQChannel std::string fAddress; int fSndBufSize; int fRcvBufSize; + int fSndKernelSize; + int fRcvKernelSize; int fRateLogging; std::string fChannelName; diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index e026a36a..456886ae 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -150,7 +150,7 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable /// @param ffn optional callback, called when the message is transfered (and can be deleted) /// @param hint optional helper pointer that can be used in the callback /// @return pointer to FairMQMessage - inline FairMQMessage* NewMessage(void* data, int size, fairmq_free_fn* ffn = NULL, void* hint = NULL) const + inline FairMQMessage* NewMessage(void* data, int size, fairmq_free_fn* ffn, void* hint = NULL) const { return fTransportFactory->CreateMessage(data, size, ffn, hint); } diff --git a/fairmq/FairMQMessage.h b/fairmq/FairMQMessage.h index d21b13a7..1e66f9c1 100644 --- a/fairmq/FairMQMessage.h +++ b/fairmq/FairMQMessage.h @@ -18,14 +18,14 @@ #include // for size_t #include // unique_ptr -typedef void (fairmq_free_fn) (void *data, void *hint); +using fairmq_free_fn = void(void* data, void* hint); class FairMQMessage { public: virtual void Rebuild() = 0; - virtual void Rebuild(size_t size) = 0; - virtual void Rebuild(void* data, size_t size, fairmq_free_fn *ffn = NULL, void* hint = NULL) = 0; + virtual void Rebuild(const size_t size) = 0; + virtual void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = NULL) = 0; virtual void* GetMessage() = 0; virtual void* GetData() = 0; diff --git a/fairmq/FairMQParts.h b/fairmq/FairMQParts.h index e499c836..4a4a555b 100644 --- a/fairmq/FairMQParts.h +++ b/fairmq/FairMQParts.h @@ -15,6 +15,8 @@ #include "FairMQTransportFactory.h" #include "FairMQMessage.h" +/// FairMQParts is a lightweight convenience wrapper around a vector of unique pointers to FairMQMessage, used for sending multi-part messages + class FairMQParts { public: @@ -22,6 +24,8 @@ class FairMQParts FairMQParts() : fParts() {}; /// Copy Constructor FairMQParts(const FairMQParts&) = delete; + /// Move constructor + FairMQParts(FairMQParts&& p) = default; /// Assignment operator FairMQParts& operator=(const FairMQParts&) = delete; /// Default destructor @@ -34,13 +38,20 @@ class FairMQParts fParts.push_back(std::unique_ptr(msg)); } + /// Adds part (std::unique_ptr) to the container (move) + /// @param msg unique pointer to FairMQMessage + inline void AddPart(std::unique_ptr msg) + { + fParts.push_back(std::move(msg)); + } + /// Get reference to part in the container at index (without bounds check) /// @param index container index inline FairMQMessage& operator[](const int index) { return *(fParts[index]); } - /// Get reference to part in the container at index (with bounds check) + /// Get reference to unique pointer to part in the container at index (with bounds check) /// @param index container index - inline FairMQMessage& At(const int index) { return *(fParts.at(index)); } + inline std::unique_ptr& At(const int index) { return fParts.at(index); } /// Get number of parts in the container /// @return number of parts in the container diff --git a/fairmq/FairMQSocket.h b/fairmq/FairMQSocket.h index 0cb59371..e590cbfa 100644 --- a/fairmq/FairMQSocket.h +++ b/fairmq/FairMQSocket.h @@ -16,6 +16,7 @@ #define FAIRMQSOCKET_H_ #include +#include #include "FairMQMessage.h" @@ -39,8 +40,11 @@ class FairMQSocket virtual int Send(FairMQMessage* msg, const std::string& flag = "") = 0; virtual int Send(FairMQMessage* msg, const int flags = 0) = 0; + virtual int64_t Send(const std::vector>& msgVec) = 0; + virtual int Receive(FairMQMessage* msg, const std::string& flag = "") = 0; virtual int Receive(FairMQMessage* msg, const int flags = 0) = 0; + virtual int64_t Receive(std::vector>& msgVec) = 0; virtual void* GetSocket() const = 0; virtual int GetSocket(int nothing) const = 0; diff --git a/fairmq/FairMQTransportFactory.h b/fairmq/FairMQTransportFactory.h index 5be87c6c..bd5fd5ff 100644 --- a/fairmq/FairMQTransportFactory.h +++ b/fairmq/FairMQTransportFactory.h @@ -32,7 +32,7 @@ class FairMQTransportFactory public: virtual FairMQMessage* CreateMessage() = 0; virtual FairMQMessage* CreateMessage(const size_t size) = 0; - virtual FairMQMessage* CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn = NULL, void* hint = NULL) = 0; + virtual FairMQMessage* CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = NULL) = 0; virtual FairMQSocket* CreateSocket(const std::string& type, const std::string& name, const int numIoThreads, const std::string& id = "") = 0; diff --git a/fairmq/devices/FairMQBenchmarkSampler.cxx b/fairmq/devices/FairMQBenchmarkSampler.cxx index d380154b..4e7bd991 100644 --- a/fairmq/devices/FairMQBenchmarkSampler.cxx +++ b/fairmq/devices/FairMQBenchmarkSampler.cxx @@ -35,10 +35,9 @@ FairMQBenchmarkSampler::~FairMQBenchmarkSampler() void FairMQBenchmarkSampler::Run() { - void* buffer = malloc(fMsgSize); int numSentMsgs = 0; - unique_ptr baseMsg(fTransportFactory->CreateMessage(buffer, fMsgSize)); + unique_ptr baseMsg(fTransportFactory->CreateMessage(fMsgSize)); // store the channel reference to avoid traversing the map on every loop iteration const FairMQChannel& dataOutChannel = fChannels.at("data-out").at(0); diff --git a/fairmq/devices/FairMQBuffer.cxx b/fairmq/devices/FairMQBuffer.cxx deleted file mode 100644 index ed64374b..00000000 --- a/fairmq/devices/FairMQBuffer.cxx +++ /dev/null @@ -1,46 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence version 3 (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ -/** - * FairMQBuffer.cxx - * - * @since 2012-10-25 - * @author D. Klein, A. Rybalchenko - */ - -#include - -#include -#include - -#include "FairMQBuffer.h" -#include "FairMQLogger.h" - -FairMQBuffer::FairMQBuffer() -{ -} - -void FairMQBuffer::Run() -{ - // store the channel references to avoid traversing the map on every loop iteration - const FairMQChannel& dataInChannel = fChannels.at("data-in").at(0); - const FairMQChannel& dataOutChannel = fChannels.at("data-out").at(0); - - while (CheckCurrentState(RUNNING)) - { - std::unique_ptr msg(fTransportFactory->CreateMessage()); - - if (dataInChannel.Receive(msg) > 0) - { - dataOutChannel.Send(msg); - } - } -} - -FairMQBuffer::~FairMQBuffer() -{ -} diff --git a/fairmq/devices/FairMQBuffer.h b/fairmq/devices/FairMQBuffer.h deleted file mode 100644 index fd63bb4e..00000000 --- a/fairmq/devices/FairMQBuffer.h +++ /dev/null @@ -1,30 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence version 3 (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ -/** - * FairMQBuffer.h - * - * @since 2012-10-25 - * @author D. Klein, A. Rybalchenko - */ - -#ifndef FAIRMQBUFFER_H_ -#define FAIRMQBUFFER_H_ - -#include "FairMQDevice.h" - -class FairMQBuffer : public FairMQDevice -{ - public: - FairMQBuffer(); - virtual ~FairMQBuffer(); - - protected: - virtual void Run(); -}; - -#endif /* FAIRMQBUFFER_H_ */ diff --git a/fairmq/devices/FairMQMerger.cxx b/fairmq/devices/FairMQMerger.cxx index 251d884f..ac563dad 100644 --- a/fairmq/devices/FairMQMerger.cxx +++ b/fairmq/devices/FairMQMerger.cxx @@ -29,22 +29,12 @@ FairMQMerger::~FairMQMerger() void FairMQMerger::Run() { - // store the channel references to avoid traversing the map on every loop iteration - auto& dataInChannelRef = fChannels.at("data-in"); - const FairMQChannel& dataOutChannel = fChannels.at("data-out").at(0); - int numInputs = dataInChannelRef.size(); - std::vector dataInChannels(numInputs); - for (int i = 0; i < numInputs; ++i) - { - dataInChannels.at(i) = &(dataInChannelRef.at(i)); - } + int numInputs = fChannels.at("data-in").size(); - std::unique_ptr poller(fTransportFactory->CreatePoller(dataInChannelRef)); + std::unique_ptr poller(fTransportFactory->CreatePoller(fChannels.at("data-in"))); while (CheckCurrentState(RUNNING)) { - std::unique_ptr msg(fTransportFactory->CreateMessage()); - poller->Poll(100); // Loop over the data input channels. @@ -53,19 +43,19 @@ void FairMQMerger::Run() // Check if the channel has data ready to be received. if (poller->CheckInput(i)) { - // Try receiving the data. - if (dataInChannels[i]->Receive(msg) >= 0) + FairMQParts parts; + + if (Receive(parts, "data-in", i) >= 0) { - // If data was received, send it to output. - if (dataOutChannel.Send(msg) < 0) + if (Send(parts, "data-out") < 0) { - LOG(DEBUG) << "Blocking send interrupted by a command"; + LOG(DEBUG) << "Transfer interrupted"; break; } } else { - LOG(DEBUG) << "Blocking receive interrupted by a command"; + LOG(DEBUG) << "Transfer interrupted"; break; } } diff --git a/fairmq/devices/FairMQProxy.cxx b/fairmq/devices/FairMQProxy.cxx index 6458121d..dce61526 100644 --- a/fairmq/devices/FairMQProxy.cxx +++ b/fairmq/devices/FairMQProxy.cxx @@ -28,17 +28,22 @@ FairMQProxy::~FairMQProxy() void FairMQProxy::Run() { - std::unique_ptr msg(fTransportFactory->CreateMessage()); - - // store the channel references to avoid traversing the map on every loop iteration - const FairMQChannel& dataInChannel = fChannels.at("data-in").at(0); - const FairMQChannel& dataOutChannel = fChannels.at("data-out").at(0); - while (CheckCurrentState(RUNNING)) { - if (dataInChannel.Receive(msg) > 0) + FairMQParts parts; + + if (Receive(parts, "data-in") >= 0) { - dataOutChannel.Send(msg); + if (Send(parts, "data-out") < 0) + { + LOG(DEBUG) << "Transfer interrupted"; + break; + } + } + else + { + LOG(DEBUG) << "Transfer interrupted"; + break; } } } diff --git a/fairmq/devices/FairMQSplitter.cxx b/fairmq/devices/FairMQSplitter.cxx index fabd8405..f67e43a1 100644 --- a/fairmq/devices/FairMQSplitter.cxx +++ b/fairmq/devices/FairMQSplitter.cxx @@ -28,30 +28,32 @@ FairMQSplitter::~FairMQSplitter() void FairMQSplitter::Run() { - // store the channel references to avoid traversing the map on every loop iteration - auto& dataOutChannelRef = fChannels.at("data-out"); - const FairMQChannel& dataInChannel = fChannels.at("data-in").at(0); - int numOutputs = dataOutChannelRef.size(); - std::vector dataOutChannels(numOutputs); - for (int i = 0; i < numOutputs; ++i) - { - dataOutChannels[i] = &(dataOutChannelRef.at(i)); - } + int numOutputs = fChannels.at("data-out").size(); int direction = 0; while (CheckCurrentState(RUNNING)) { - std::unique_ptr msg(fTransportFactory->CreateMessage()); + FairMQParts parts; - if (dataInChannel.Receive(msg) >= 0) + if (Receive(parts, "data-in") >= 0) { - dataOutChannels[direction]->Send(msg); - ++direction; - if (direction >= numOutputs) + if (Send(parts, "data-out", direction) < 0) { - direction = 0; + LOG(DEBUG) << "Transfer interrupted"; + break; } } + else + { + LOG(DEBUG) << "Transfer interrupted"; + break; + } + + ++direction; + if (direction >= numOutputs) + { + direction = 0; + } } } diff --git a/fairmq/nanomsg/FairMQMessageNN.cxx b/fairmq/nanomsg/FairMQMessageNN.cxx index 4b3f12e3..ef0188d6 100644 --- a/fairmq/nanomsg/FairMQMessageNN.cxx +++ b/fairmq/nanomsg/FairMQMessageNN.cxx @@ -53,7 +53,7 @@ FairMQMessageNN::FairMQMessageNN(const size_t size) * create FairMQMessage object only with size parameter and fill it with data. * possible TODO: make this zero copy (will should then be as efficient as ZeroMQ). */ -FairMQMessageNN::FairMQMessageNN(void* data, const size_t size, fairmq_free_fn *ffn, void* hint) +FairMQMessageNN::FairMQMessageNN(void* data, const size_t size, fairmq_free_fn* ffn, void* hint) : fMessage(NULL) , fSize(0) , fReceiving(false) @@ -67,15 +67,10 @@ FairMQMessageNN::FairMQMessageNN(void* data, const size_t size, fairmq_free_fn * { memcpy(fMessage, data, size); fSize = size; - if (ffn) { ffn(data, hint); } - else - { - if(data) free(data); - } } } @@ -97,7 +92,7 @@ void FairMQMessageNN::Rebuild(const size_t size) fReceiving = false; } -void FairMQMessageNN::Rebuild(void* data, const size_t size, fairmq_free_fn *ffn, void* hint) +void FairMQMessageNN::Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint) { Clear(); fMessage = nn_allocmsg(size, 0); @@ -111,14 +106,10 @@ void FairMQMessageNN::Rebuild(void* data, const size_t size, fairmq_free_fn *ffn fSize = size; fReceiving = false; - if(ffn) + if (ffn) { ffn(data, hint); } - else - { - if(data) free(data); - } } } diff --git a/fairmq/nanomsg/FairMQMessageNN.h b/fairmq/nanomsg/FairMQMessageNN.h index 594ee5f9..0cd826bb 100644 --- a/fairmq/nanomsg/FairMQMessageNN.h +++ b/fairmq/nanomsg/FairMQMessageNN.h @@ -24,13 +24,13 @@ class FairMQMessageNN : public FairMQMessage public: FairMQMessageNN(); FairMQMessageNN(const size_t size); - FairMQMessageNN(void* data, const size_t size, fairmq_free_fn *ffn = NULL, void* hint = NULL); + FairMQMessageNN(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = NULL); FairMQMessageNN(const FairMQMessageNN&) = delete; FairMQMessageNN operator=(const FairMQMessageNN&) = delete; virtual void Rebuild(); virtual void Rebuild(const size_t size); - virtual void Rebuild(void* data, const size_t size, fairmq_free_fn *ffn = NULL, void* hint = NULL); + virtual void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = NULL); virtual void* GetMessage(); virtual void* GetData(); diff --git a/fairmq/nanomsg/FairMQSocketNN.cxx b/fairmq/nanomsg/FairMQSocketNN.cxx index f5278884..3116ed6b 100644 --- a/fairmq/nanomsg/FairMQSocketNN.cxx +++ b/fairmq/nanomsg/FairMQSocketNN.cxx @@ -12,15 +12,18 @@ * @author A. Rybalchenko */ -#include - #include "FairMQSocketNN.h" #include "FairMQMessageNN.h" #include "FairMQLogger.h" +#include +#ifdef MSGPACK_FOUND +#include +#endif /*MSGPACK_FOUND*/ + using namespace std; -FairMQSocketNN::FairMQSocketNN(const string& type, const std::string& name, const int numIoThreads, const std::string& id /*= ""*/) +FairMQSocketNN::FairMQSocketNN(const string& type, const string& name, const int numIoThreads, const string& id /*= ""*/) : FairMQSocket(0, 0, NN_DONTWAIT) , fSocket(-1) , fId() @@ -62,7 +65,7 @@ FairMQSocketNN::FairMQSocketNN(const string& type, const std::string& name, cons } } - LOG(INFO) << "created socket " << fId; + // LOG(INFO) << "created socket " << fId; } string FairMQSocketNN::GetId() @@ -72,7 +75,7 @@ string FairMQSocketNN::GetId() bool FairMQSocketNN::Bind(const string& address) { - LOG(INFO) << "bind socket " << fId << " on " << address; + // LOG(INFO) << "bind socket " << fId << " on " << address; int eid = nn_bind(fSocket, address.c_str()); if (eid < 0) @@ -85,7 +88,7 @@ bool FairMQSocketNN::Bind(const string& address) void FairMQSocketNN::Connect(const string& address) { - LOG(INFO) << "connect socket " << fId << " to " << address; + // LOG(INFO) << "connect socket " << fId << " to " << address; int eid = nn_connect(fSocket, address.c_str()); if (eid < 0) @@ -142,6 +145,46 @@ int FairMQSocketNN::Send(FairMQMessage* msg, const int flags) return nbytes; } +int64_t FairMQSocketNN::Send(const vector>& msgVec) +{ +#ifdef MSGPACK_FOUND + // create msgpack simple buffer + msgpack::sbuffer sbuf; + // create msgpack packer + msgpack::packer packer(&sbuf); + + // pack all parts into a single msgpack simple buffer + for (int i = 0; i < msgVec.size(); ++i) + { + static_cast(msgVec[i].get())->fReceiving = false; + packer.pack_bin(msgVec[i]->GetSize()); + packer.pack_bin_body(static_cast(msgVec[i]->GetData()), msgVec[i]->GetSize()); + } + + int64_t nbytes = nn_send(fSocket, sbuf.data(), sbuf.size(), 0); + if (nbytes >= 0) + { + fBytesTx += nbytes; + ++fMessagesTx; + return nbytes; + } + if (nn_errno() == EAGAIN) + { + return -2; + } + if (nn_errno() == ETERM) + { + LOG(INFO) << "terminating socket " << fId; + return -1; + } + LOG(ERROR) << "Failed sending on socket " << fId << ", reason: " << nn_strerror(errno); + return nbytes; +#else /*MSGPACK_FOUND*/ + LOG(ERROR) << "Cannot use nanomsg multipart because MessagePack was not found."; + exit(EXIT_FAILURE); +#endif /*MSGPACK_FOUND*/ +} + int FairMQSocketNN::Receive(FairMQMessage* msg, const string& flag) { void* ptr = NULL; @@ -193,6 +236,67 @@ int FairMQSocketNN::Receive(FairMQMessage* msg, const int flags) return nbytes; } +int64_t FairMQSocketNN::Receive(vector>& msgVec) +{ +#ifdef MSGPACK_FOUND + // Warn if the vector is filled before Receive() and empty it. + if (msgVec.size() > 0) + { + LOG(WARN) << "Message vector contains elements before Receive(), they will be deleted!"; + msgVec.clear(); + } + + // pointer to point to received message buffer + char* ptr = NULL; + // receive the message into a buffer allocated by nanomsg and let ptr point to it + int nbytes = nn_recv(fSocket, &ptr, NN_MSG, 0); + if (nbytes >= 0) // if no errors or non-blocking timeouts + { + // store statistics on how many bytes received + fBytesRx += nbytes; + // store statistics on how many messages received (count messages instead of parts) + ++fMessagesRx; + + // offset to be used by msgpack to handle separate chunks + size_t offset = 0; + while (offset != nbytes) // continue until all parts have been read + { + // vector of chars to hold blob (unlike char*/void* this type can be converted to by msgpack) + std::vector buf; + + // unpack and convert chunk + msgpack::unpacked result; + unpack(result, ptr, nbytes, offset); + msgpack::object object(result.get()); + object.convert(buf); + // get the single message size + size_t size = buf.size() * sizeof(char); + unique_ptr part(new FairMQMessageNN(size)); + static_cast(part.get())->fReceiving = true; + memcpy(part->GetData(), buf.data(), size); + msgVec.push_back(move(part)); + } + + nn_freemsg(ptr); + return nbytes; + } + if (nn_errno() == EAGAIN) + { + return -2; + } + if (nn_errno() == ETERM) + { + LOG(INFO) << "terminating socket " << fId; + return -1; + } + LOG(ERROR) << "Failed receiving on socket " << fId << ", reason: " << nn_strerror(errno); + return nbytes; +#else /*MSGPACK_FOUND*/ + LOG(ERROR) << "Cannot use nanomsg multipart because MessagePack was not found."; + exit(EXIT_FAILURE); +#endif /*MSGPACK_FOUND*/ +} + void FairMQSocketNN::Close() { nn_close(fSocket); @@ -225,7 +329,8 @@ void FairMQSocketNN::SetOption(const string& option, const void* value, size_t v void FairMQSocketNN::GetOption(const string& option, void* value, size_t* valueSize) { int rc = nn_getsockopt(fSocket, NN_SOL_SOCKET, GetConstant(option), value, valueSize); - if (rc < 0) { + if (rc < 0) + { LOG(ERROR) << "failed getting socket option, reason: " << nn_strerror(errno); } } @@ -329,11 +434,17 @@ int FairMQSocketNN::GetConstant(const string& constant) return NN_SNDBUF; if (constant == "rcv-hwm") return NN_RCVBUF; - if (constant == "snd-more") { + if (constant == "snd-size") + return NN_SNDBUF; + if (constant == "rcv-size") + return NN_RCVBUF; + if (constant == "snd-more") + { LOG(ERROR) << "Multipart messages functionality currently not supported by nanomsg!"; return -1; } - if (constant == "rcv-more") { + if (constant == "rcv-more") + { LOG(ERROR) << "Multipart messages functionality currently not supported by nanomsg!"; return -1; } diff --git a/fairmq/nanomsg/FairMQSocketNN.h b/fairmq/nanomsg/FairMQSocketNN.h index 71f07086..3e22dbb3 100644 --- a/fairmq/nanomsg/FairMQSocketNN.h +++ b/fairmq/nanomsg/FairMQSocketNN.h @@ -15,6 +15,8 @@ #ifndef FAIRMQSOCKETNN_H_ #define FAIRMQSOCKETNN_H_ +#include + #include #include #include @@ -37,8 +39,11 @@ class FairMQSocketNN : public FairMQSocket virtual int Send(FairMQMessage* msg, const std::string& flag = ""); virtual int Send(FairMQMessage* msg, const int flags = 0); + virtual int64_t Send(const std::vector>& msgVec); + virtual int Receive(FairMQMessage* msg, const std::string& flag = ""); virtual int Receive(FairMQMessage* msg, const int flags = 0); + virtual int64_t Receive(std::vector>& msgVec); virtual void* GetSocket() const; virtual int GetSocket(int nothing) const; diff --git a/fairmq/nanomsg/FairMQTransportFactoryNN.h b/fairmq/nanomsg/FairMQTransportFactoryNN.h index 89efb8d0..cdd89280 100644 --- a/fairmq/nanomsg/FairMQTransportFactoryNN.h +++ b/fairmq/nanomsg/FairMQTransportFactoryNN.h @@ -29,7 +29,7 @@ class FairMQTransportFactoryNN : public FairMQTransportFactory virtual FairMQMessage* CreateMessage(); virtual FairMQMessage* CreateMessage(const size_t size); - virtual FairMQMessage* CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn = NULL, void* hint = NULL); + virtual FairMQMessage* CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = NULL); virtual FairMQSocket* CreateSocket(const std::string& type, const std::string& name, const int numIoThreads, const std::string& id = ""); diff --git a/fairmq/run/runBuffer.cxx b/fairmq/run/runBuffer.cxx deleted file mode 100644 index 499a0f78..00000000 --- a/fairmq/run/runBuffer.cxx +++ /dev/null @@ -1,139 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence version 3 (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ -/** - * runBuffer.cxx - * - * @since 2012-10-26 - * @author D. Klein, A. Rybalchenko - */ - -#include - -#include "boost/program_options.hpp" - -#include "FairMQLogger.h" -#include "FairMQBuffer.h" - -using namespace std; - -typedef struct DeviceOptions -{ - DeviceOptions() : - id(), ioThreads(0), transport(), - inputSocketType(), inputBufSize(0), inputMethod(), inputAddress(), - outputSocketType(), outputBufSize(0), outputMethod(), outputAddress() {} - - string id; - int ioThreads; - string transport; - string inputSocketType; - int inputBufSize; - string inputMethod; - string inputAddress; - string outputSocketType; - int outputBufSize; - string outputMethod; - string outputAddress; -} DeviceOptions_t; - -inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) -{ - if (_options == NULL) - throw runtime_error("Internal error: options' container is empty."); - - namespace bpo = boost::program_options; - bpo::options_description desc("Options"); - desc.add_options() - ("id", bpo::value(), "Device ID") - ("io-threads", bpo::value()->default_value(1), "Number of I/O threads") - ("transport", bpo::value()->default_value("zeromq"), "Transport (zeromq/nanomsg)") - ("input-socket-type", bpo::value()->required(), "Input socket type: sub/pull") - ("input-buff-size", bpo::value()->default_value(1000), "Input buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") - ("input-method", bpo::value()->required(), "Input method: bind/connect") - ("input-address", bpo::value()->required(), "Input address, e.g.: \"tcp://localhost:5555\"") - ("output-socket-type", bpo::value()->required(), "Output socket type: pub/push") - ("output-buff-size", bpo::value()->default_value(1000), "Output buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") - ("output-method", bpo::value()->required(), "Output method: bind/connect") - ("output-address", bpo::value()->required(), "Output address, e.g.: \"tcp://localhost:5555\"") - ("help", "Print help messages"); - - bpo::variables_map vm; - bpo::store(bpo::parse_command_line(_argc, _argv, desc), vm); - - if (vm.count("help")) - { - LOG(INFO) << "FairMQ Buffer" << endl << desc; - return false; - } - - bpo::notify(vm); - - if (vm.count("id")) { _options->id = vm["id"].as(); } - if (vm.count("io-threads")) { _options->ioThreads = vm["io-threads"].as(); } - if (vm.count("transport")) { _options->transport = vm["transport"].as(); } - if (vm.count("input-socket-type")) { _options->inputSocketType = vm["input-socket-type"].as(); } - if (vm.count("input-buff-size")) { _options->inputBufSize = vm["input-buff-size"].as(); } - if (vm.count("input-method")) { _options->inputMethod = vm["input-method"].as(); } - if (vm.count("input-address")) { _options->inputAddress = vm["input-address"].as(); } - if (vm.count("output-socket-type")) { _options->outputSocketType = vm["output-socket-type"].as(); } - if (vm.count("output-buff-size")) { _options->outputBufSize = vm["output-buff-size"].as(); } - if (vm.count("output-method")) { _options->outputMethod = vm["output-method"].as(); } - if (vm.count("output-address")) { _options->outputAddress = vm["output-address"].as(); } - - return true; -} - -int main(int argc, char** argv) -{ - FairMQBuffer buffer; - buffer.CatchSignals(); - - DeviceOptions_t options; - try - { - if (!parse_cmd_line(argc, argv, &options)) - return 0; - } - catch (exception& e) - { - LOG(ERROR) << e.what(); - return 1; - } - - LOG(INFO) << "PID: " << getpid(); - - buffer.SetTransport(options.transport); - - FairMQChannel inputChannel(options.inputSocketType, options.inputMethod, options.inputAddress); - inputChannel.UpdateSndBufSize(options.inputBufSize); - inputChannel.UpdateRcvBufSize(options.inputBufSize); - inputChannel.UpdateRateLogging(1); - - buffer.fChannels["data-in"].push_back(inputChannel); - - FairMQChannel outputChannel(options.outputSocketType, options.outputMethod, options.outputAddress); - outputChannel.UpdateSndBufSize(options.outputBufSize); - outputChannel.UpdateRcvBufSize(options.outputBufSize); - outputChannel.UpdateRateLogging(1); - - buffer.fChannels["data-out"].push_back(outputChannel); - - buffer.SetProperty(FairMQBuffer::Id, options.id); - buffer.SetProperty(FairMQBuffer::NumIoThreads, options.ioThreads); - - buffer.ChangeState("INIT_DEVICE"); - buffer.WaitForEndOfState("INIT_DEVICE"); - - buffer.ChangeState("INIT_TASK"); - buffer.WaitForEndOfState("INIT_TASK"); - - buffer.ChangeState("RUN"); - buffer.InteractiveStateLoop(); - - return 0; -} diff --git a/fairmq/zeromq/FairMQMessageZMQ.cxx b/fairmq/zeromq/FairMQMessageZMQ.cxx index 5a6bf85b..56bae5c8 100644 --- a/fairmq/zeromq/FairMQMessageZMQ.cxx +++ b/fairmq/zeromq/FairMQMessageZMQ.cxx @@ -38,10 +38,10 @@ FairMQMessageZMQ::FairMQMessageZMQ(const size_t size) } } -FairMQMessageZMQ::FairMQMessageZMQ(void* data, const size_t size, fairmq_free_fn *ffn, void* hint) +FairMQMessageZMQ::FairMQMessageZMQ(void* data, const size_t size, fairmq_free_fn* ffn, void* hint) : fMessage() { - if (zmq_msg_init_data(&fMessage, data, size, ffn ? ffn : &CleanUp, hint) != 0) + if (zmq_msg_init_data(&fMessage, data, size, ffn, hint) != 0) { LOG(ERROR) << "failed initializing message with data, reason: " << zmq_strerror(errno); } @@ -65,10 +65,10 @@ void FairMQMessageZMQ::Rebuild(const size_t size) } } -void FairMQMessageZMQ::Rebuild(void* data, const size_t size, fairmq_free_fn *ffn, void* hint) +void FairMQMessageZMQ::Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint) { CloseMessage(); - if (zmq_msg_init_data(&fMessage, data, size, ffn ? ffn : &CleanUp, hint) != 0) + if (zmq_msg_init_data(&fMessage, data, size, ffn, hint) != 0) { LOG(ERROR) << "failed initializing message with data, reason: " << zmq_strerror(errno); } @@ -136,11 +136,6 @@ inline void FairMQMessageZMQ::CloseMessage() } } -void FairMQMessageZMQ::CleanUp(void* data, void*) -{ - free(data); -} - FairMQMessageZMQ::~FairMQMessageZMQ() { if (zmq_msg_close(&fMessage) != 0) diff --git a/fairmq/zeromq/FairMQMessageZMQ.h b/fairmq/zeromq/FairMQMessageZMQ.h index 5b5a7734..1b87b4c1 100644 --- a/fairmq/zeromq/FairMQMessageZMQ.h +++ b/fairmq/zeromq/FairMQMessageZMQ.h @@ -26,11 +26,11 @@ class FairMQMessageZMQ : public FairMQMessage public: FairMQMessageZMQ(); FairMQMessageZMQ(const size_t size); - FairMQMessageZMQ(void* data, const size_t size, fairmq_free_fn *ffn = &CleanUp, void* hint = NULL); + FairMQMessageZMQ(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = NULL); virtual void Rebuild(); virtual void Rebuild(const size_t size); - virtual void Rebuild(void* data, const size_t size, fairmq_free_fn *ffn = &CleanUp, void* hint = NULL); + virtual void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = NULL); virtual void* GetMessage(); virtual void* GetData(); @@ -42,8 +42,6 @@ class FairMQMessageZMQ : public FairMQMessage virtual void Copy(FairMQMessage* msg); virtual void Copy(const std::unique_ptr& msg); - static void CleanUp(void* data, void* hint); - virtual ~FairMQMessageZMQ(); private: diff --git a/fairmq/zeromq/FairMQSocketZMQ.cxx b/fairmq/zeromq/FairMQSocketZMQ.cxx index 7f134621..f7759e48 100644 --- a/fairmq/zeromq/FairMQSocketZMQ.cxx +++ b/fairmq/zeromq/FairMQSocketZMQ.cxx @@ -17,6 +17,7 @@ #include #include "FairMQSocketZMQ.h" +#include "FairMQMessageZMQ.h" #include "FairMQLogger.h" using namespace std; @@ -24,7 +25,7 @@ using namespace std; // Context to hold the ZeroMQ sockets boost::shared_ptr FairMQSocketZMQ::fContext = boost::shared_ptr(new FairMQContextZMQ(1)); -FairMQSocketZMQ::FairMQSocketZMQ(const string& type, const string& name, const int numIoThreads, const std::string& id /*= ""*/) +FairMQSocketZMQ::FairMQSocketZMQ(const string& type, const string& name, const int numIoThreads, const string& id /*= ""*/) : FairMQSocket(ZMQ_SNDMORE, ZMQ_RCVMORE, ZMQ_DONTWAIT) , fSocket(NULL) , fId() @@ -68,6 +69,8 @@ FairMQSocketZMQ::FairMQSocketZMQ(const string& type, const string& name, const i LOG(ERROR) << "Failed setting ZMQ_SUBSCRIBE socket option, reason: " << zmq_strerror(errno); } } + + // LOG(INFO) << "created socket " << fId; } string FairMQSocketZMQ::GetId() @@ -147,6 +150,52 @@ int FairMQSocketZMQ::Send(FairMQMessage* msg, const int flags) return nbytes; } +int64_t FairMQSocketZMQ::Send(const vector>& msgVec) +{ + // Sending vector typicaly handles more then one part + if (msgVec.size() > 1) + { + int64_t totalSize = 0; + + for (unsigned int i = 0; i < msgVec.size() - 1; ++i) + { + int nbytes = zmq_msg_send(static_cast(msgVec[i]->GetMessage()), fSocket, ZMQ_SNDMORE); + if (nbytes >= 0) + { + totalSize += nbytes; + fBytesTx += nbytes; + } + else + { + return nbytes; + } + } + + int n = zmq_msg_send(static_cast(msgVec.back()->GetMessage()), fSocket, 0); + if (n >= 0) + { + totalSize += n; + } + else + { + return n; + } + + // store statistics on how many messages have been sent (handle all parts as a single message) + ++fMessagesTx; + return totalSize; + } // If there's only one part, send it as a regular message + else if (msgVec.size() == 1) + { + return zmq_msg_send(static_cast(msgVec.back()->GetMessage()), fSocket, 0); + } + else // if the vector is empty, something might be wrong + { + LOG(WARN) << "Will not send empty vector"; + return -1; + } +} + int FairMQSocketZMQ::Receive(FairMQMessage* msg, const string& flag) { int nbytes = zmq_msg_recv(static_cast(msg->GetMessage()), fSocket, GetConstant(flag)); @@ -191,6 +240,44 @@ int FairMQSocketZMQ::Receive(FairMQMessage* msg, const int flags) return nbytes; } +int64_t FairMQSocketZMQ::Receive(vector>& msgVec) +{ + // Warn if the vector is filled before Receive() and empty it. + if (msgVec.size() > 0) + { + LOG(WARN) << "Message vector contains elements before Receive(), they will be deleted!"; + msgVec.clear(); + } + + int64_t totalSize = 0; + int64_t more = 0; + + do + { + unique_ptr part(new FairMQMessageZMQ()); + + int nbytes = zmq_msg_recv(static_cast(part->GetMessage()), fSocket, 0); + if (nbytes >= 0) + { + msgVec.push_back(move(part)); + totalSize += nbytes; + fBytesRx += nbytes; + } + else + { + return nbytes; + } + + size_t more_size = sizeof(more); + zmq_getsockopt(fSocket, ZMQ_RCVMORE, &more, &more_size); + } + while (more); + + // store statistics on how many messages have been received (handle all parts as a single message) + ++fMessagesRx; + return totalSize; +} + void FairMQSocketZMQ::Close() { // LOG(DEBUG) << "Closing socket " << fId; @@ -414,6 +501,10 @@ int FairMQSocketZMQ::GetConstant(const string& constant) return ZMQ_SNDHWM; if (constant == "rcv-hwm") return ZMQ_RCVHWM; + if (constant == "snd-size") + return ZMQ_SNDBUF; + if (constant == "rcv-size") + return ZMQ_RCVBUF; if (constant == "snd-more") return ZMQ_SNDMORE; if (constant == "rcv-more") diff --git a/fairmq/zeromq/FairMQSocketZMQ.h b/fairmq/zeromq/FairMQSocketZMQ.h index 2bbdc399..042d08d4 100644 --- a/fairmq/zeromq/FairMQSocketZMQ.h +++ b/fairmq/zeromq/FairMQSocketZMQ.h @@ -34,8 +34,11 @@ class FairMQSocketZMQ : public FairMQSocket virtual int Send(FairMQMessage* msg, const std::string& flag = ""); virtual int Send(FairMQMessage* msg, const int flags = 0); + virtual int64_t Send(const std::vector>& msgVec); + virtual int Receive(FairMQMessage* msg, const std::string& flag = ""); virtual int Receive(FairMQMessage* msg, const int flags = 0); + virtual int64_t Receive(std::vector>& msgVec); virtual void* GetSocket() const; virtual int GetSocket(int nothing) const; diff --git a/fairmq/zeromq/FairMQTransportFactoryZMQ.h b/fairmq/zeromq/FairMQTransportFactoryZMQ.h index 8fb6572f..f3b8d10f 100644 --- a/fairmq/zeromq/FairMQTransportFactoryZMQ.h +++ b/fairmq/zeromq/FairMQTransportFactoryZMQ.h @@ -30,7 +30,7 @@ class FairMQTransportFactoryZMQ : public FairMQTransportFactory virtual FairMQMessage* CreateMessage(); virtual FairMQMessage* CreateMessage(const size_t size); - virtual FairMQMessage* CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn = NULL, void* hint = NULL); + virtual FairMQMessage* CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = NULL); virtual FairMQSocket* CreateSocket(const std::string& type, const std::string& name, const int numIoThreads, const std::string& id = "");