Update multi-part features (nanomsg) and various fixes

- Implement nanomsg multipart with MessagePack.
 - Use the MessagePack from FairSoft and handle not found case.
 - Update splitter, merger and proxy devices to handle multi-part.
 - Let FairMQParts.At() return pointer reference (can be used for moving).
 - Add missing const specifier in the message interface.
 - Add transmit kernel size setting to channels (ZMQ_SNDBUF).
 - Remove FairMQBuffer device.
 - Remove old multi-part methods from Tutorial3 example (to be replaced with Parts API).
 - Make callback mandatory for newMsg(data, size, callback).
 - Add missing <vector> include in FairMQSocket.
This commit is contained in:
Alexey Rybalchenko 2016-03-21 09:59:00 +01:00
parent 4ca66e33da
commit 732373faa2
25 changed files with 436 additions and 381 deletions

View File

@ -37,6 +37,13 @@ If(NANOMSG_FOUND)
${SYSTEM_INCLUDE_DIRECTORIES}
${NANOMSG_INCLUDE_DIR}
)
If(MSGPACK_FOUND)
add_definitions(-DMSGPACK_FOUND)
Set(SYSTEM_INCLUDE_DIRECTORIES
${SYSTEM_INCLUDE_DIRECTORIES}
${MSGPACK_INCLUDE_DIR}
)
EndIf(MSGPACK_FOUND)
EndIf(NANOMSG_FOUND)
Include_Directories(${INCLUDE_DIRECTORIES})
@ -67,7 +74,6 @@ Set(SRCS
"devices/FairMQBenchmarkSampler.cxx"
"devices/FairMQSink.cxx"
"devices/FairMQBuffer.cxx"
"devices/FairMQProxy.cxx"
"devices/FairMQSplitter.cxx"
"devices/FairMQMerger.cxx"
@ -126,6 +132,7 @@ If(NANOMSG_FOUND)
Set(DEPENDENCIES
${DEPENDENCIES}
${NANOMSG_LIBRARY_SHARED}
# msgpackc # currently header only
)
EndIf(NANOMSG_FOUND)
@ -136,7 +143,6 @@ GENERATE_LIBRARY()
Set(Exe_Names
bsampler
sink
buffer
splitter
merger
proxy
@ -145,7 +151,6 @@ Set(Exe_Names
Set(Exe_Source
run/runBenchmarkSampler.cxx
run/runSink.cxx
run/runBuffer.cxx
run/runSplitter.cxx
run/runMerger.cxx
run/runProxy.cxx

View File

@ -30,6 +30,8 @@ FairMQChannel::FairMQChannel()
, fAddress("unspecified")
, fSndBufSize(1000)
, fRcvBufSize(1000)
, fSndKernelSize(0)
, fRcvKernelSize(0)
, fRateLogging(1)
, fChannelName("")
, fIsValid(false)
@ -50,6 +52,8 @@ FairMQChannel::FairMQChannel(const string& type, const string& method, const str
, fAddress(address)
, fSndBufSize(1000)
, fRcvBufSize(1000)
, fSndKernelSize(0)
, fRcvKernelSize(0)
, fRateLogging(1)
, fChannelName("")
, fIsValid(false)
@ -70,6 +74,8 @@ FairMQChannel::FairMQChannel(const FairMQChannel& chan)
, fAddress(chan.fAddress)
, fSndBufSize(chan.fSndBufSize)
, fRcvBufSize(chan.fRcvBufSize)
, fSndKernelSize(chan.fSndKernelSize)
, fRcvKernelSize(chan.fRcvKernelSize)
, fRateLogging(chan.fRateLogging)
, fChannelName(chan.fChannelName)
, fIsValid(false)
@ -89,6 +95,8 @@ FairMQChannel& FairMQChannel::operator=(const FairMQChannel& chan)
fAddress = chan.fAddress;
fSndBufSize = chan.fSndBufSize;
fRcvBufSize = chan.fRcvBufSize;
fSndKernelSize = chan.fSndKernelSize;
fRcvKernelSize = chan.fRcvKernelSize;
fRateLogging = chan.fRateLogging;
fSocket = nullptr;
fChannelName = chan.fChannelName;
@ -174,6 +182,34 @@ int FairMQChannel::GetRcvBufSize() const
}
}
int FairMQChannel::GetSndKernelSize() const
{
try
{
boost::unique_lock<boost::mutex> scoped_lock(fChannelMutex);
return fSndKernelSize;
}
catch (boost::exception& e)
{
LOG(ERROR) << "Exception caught in FairMQChannel::GetSndKernelSize: " << boost::diagnostic_information(e);
exit(EXIT_FAILURE);
}
}
int FairMQChannel::GetRcvKernelSize() const
{
try
{
boost::unique_lock<boost::mutex> scoped_lock(fChannelMutex);
return fRcvKernelSize;
}
catch (boost::exception& e)
{
LOG(ERROR) << "Exception caught in FairMQChannel::GetRcvKernelSize: " << boost::diagnostic_information(e);
exit(EXIT_FAILURE);
}
}
int FairMQChannel::GetRateLogging() const
{
try
@ -263,6 +299,36 @@ void FairMQChannel::UpdateRcvBufSize(const int rcvBufSize)
}
}
void FairMQChannel::UpdateSndKernelSize(const int sndKernelSize)
{
try
{
boost::unique_lock<boost::mutex> scoped_lock(fChannelMutex);
fIsValid = false;
fSndKernelSize = sndKernelSize;
}
catch (boost::exception& e)
{
LOG(ERROR) << "Exception caught in FairMQChannel::UpdateSndKernelSize: " << boost::diagnostic_information(e);
exit(EXIT_FAILURE);
}
}
void FairMQChannel::UpdateRcvKernelSize(const int rcvKernelSize)
{
try
{
boost::unique_lock<boost::mutex> scoped_lock(fChannelMutex);
fIsValid = false;
fRcvKernelSize = rcvKernelSize;
}
catch (boost::exception& e)
{
LOG(ERROR) << "Exception caught in FairMQChannel::UpdateRcvKernelSize: " << boost::diagnostic_information(e);
exit(EXIT_FAILURE);
}
}
void FairMQChannel::UpdateRateLogging(const int rateLogging)
{
try
@ -393,6 +459,24 @@ bool FairMQChannel::ValidateChannel()
return false;
}
// validate socket kernel transmit size for sending
if (fSndKernelSize < 0)
{
ss << "INVALID";
LOG(DEBUG) << ss.str();
LOG(DEBUG) << "invalid channel send kernel transmit size: \"" << fSndKernelSize << "\"";
return false;
}
// validate socket kernel transmit size for receiving
if (fRcvKernelSize < 0)
{
ss << "INVALID";
LOG(DEBUG) << ss.str();
LOG(DEBUG) << "invalid channel receive kernel transmit size: \"" << fRcvKernelSize << "\"";
return false;
}
fIsValid = true;
ss << "VALID";
LOG(DEBUG) << ss.str();
@ -451,80 +535,6 @@ int FairMQChannel::Send(const unique_ptr<FairMQMessage>& msg) const
return -2;
}
int64_t FairMQChannel::Send(const std::vector<std::unique_ptr<FairMQMessage>>& msgVec) const
{
// Sending vector typicaly handles more then one part
if (msgVec.size() > 1)
{
int64_t totalSize = 0;
for (unsigned int i = 0; i < msgVec.size() - 1; ++i)
{
int nbytes = SendPart(msgVec[i]);
if (nbytes >= 0)
{
totalSize += nbytes;
}
else
{
return nbytes;
}
}
int n = Send(msgVec.back());
if (n >= 0)
{
totalSize += n;
}
else
{
return n;
}
return totalSize;
} // If there's only one part, send it as a regular message
else if (msgVec.size() == 1)
{
return Send(msgVec.back());
}
else // if the vector is empty, something might be wrong
{
LOG(WARN) << "Will not send empty vector";
return -1;
}
}
int64_t FairMQChannel::Receive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) const
{
// Warn if the vector is filled before Receive() and empty it.
if (msgVec.size() > 0)
{
LOG(WARN) << "Message vector contains elements before Receive(), they will be deleted!";
msgVec.clear();
}
int64_t totalSize = 0;
do
{
std::unique_ptr<FairMQMessage> part(fTransportFactory->CreateMessage());
int nbytes = Receive(part);
if (nbytes >= 0)
{
msgVec.push_back(std::move(part));
totalSize += nbytes;
}
else
{
return nbytes;
}
}
while (ExpectsAnotherPart());
return totalSize;
}
int FairMQChannel::Receive(const unique_ptr<FairMQMessage>& msg) const
{
fPoller->Poll(fRcvTimeoutInMs);
@ -543,6 +553,42 @@ int FairMQChannel::Receive(const unique_ptr<FairMQMessage>& msg) const
return -2;
}
int64_t FairMQChannel::Send(const std::vector<std::unique_ptr<FairMQMessage>>& msgVec) const
{
fPoller->Poll(fSndTimeoutInMs);
if (fPoller->CheckInput(0))
{
HandleUnblock();
return -2;
}
if (fPoller->CheckOutput(1))
{
return fSocket->Send(msgVec);
}
return -2;
}
int64_t FairMQChannel::Receive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) const
{
fPoller->Poll(fRcvTimeoutInMs);
if (fPoller->CheckInput(0))
{
HandleUnblock();
return -2;
}
if (fPoller->CheckInput(1))
{
return fSocket->Receive(msgVec);
}
return -2;
}
int FairMQChannel::Send(FairMQMessage* msg, const string& flag) const
{
if (flag == "")

View File

@ -65,6 +65,12 @@ class FairMQChannel
/// Get socket receive buffer size (in number of messages)
/// @return Returns socket receive buffer size (in number of messages)
int GetRcvBufSize() const;
/// Get socket kernel transmit send buffer size (in bytes)
/// @return Returns socket kernel transmit send buffer size (in bytes)
int GetSndKernelSize() const;
/// Get socket kernel transmit receive buffer size (in bytes)
/// @return Returns socket kernel transmit receive buffer size (in bytes)
int GetRcvKernelSize() const;
/// Get socket rate logging setting (1/0)
/// @return Returns socket rate logging setting (1/0)
int GetRateLogging() const;
@ -84,6 +90,12 @@ class FairMQChannel
/// Set socket receive buffer size
/// @param rcvBufSize Socket receive buffer size (in number of messages)
void UpdateRcvBufSize(const int rcvBufSize);
/// Set socket kernel transmit send buffer size (in bytes)
/// @param sndKernelSize Socket send buffer size (in bytes)
void UpdateSndKernelSize(const int sndKernelSize);
/// Set socket kernel transmit receive buffer size (in bytes)
/// @param rcvKernelSize Socket receive buffer size (in bytes)
void UpdateRcvKernelSize(const int rcvKernelSize);
/// Set socket rate logging setting
/// @param rateLogging Socket rate logging setting (1/0)
void UpdateRateLogging(const int rateLogging);
@ -223,6 +235,8 @@ class FairMQChannel
std::string fAddress;
int fSndBufSize;
int fRcvBufSize;
int fSndKernelSize;
int fRcvKernelSize;
int fRateLogging;
std::string fChannelName;

View File

@ -150,7 +150,7 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable
/// @param ffn optional callback, called when the message is transfered (and can be deleted)
/// @param hint optional helper pointer that can be used in the callback
/// @return pointer to FairMQMessage
inline FairMQMessage* NewMessage(void* data, int size, fairmq_free_fn* ffn = NULL, void* hint = NULL) const
inline FairMQMessage* NewMessage(void* data, int size, fairmq_free_fn* ffn, void* hint = NULL) const
{
return fTransportFactory->CreateMessage(data, size, ffn, hint);
}

View File

@ -18,14 +18,14 @@
#include <cstddef> // for size_t
#include <memory> // unique_ptr
typedef void (fairmq_free_fn) (void *data, void *hint);
using fairmq_free_fn = void(void* data, void* hint);
class FairMQMessage
{
public:
virtual void Rebuild() = 0;
virtual void Rebuild(size_t size) = 0;
virtual void Rebuild(void* data, size_t size, fairmq_free_fn *ffn = NULL, void* hint = NULL) = 0;
virtual void Rebuild(const size_t size) = 0;
virtual void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = NULL) = 0;
virtual void* GetMessage() = 0;
virtual void* GetData() = 0;

View File

@ -15,6 +15,8 @@
#include "FairMQTransportFactory.h"
#include "FairMQMessage.h"
/// FairMQParts is a lightweight convenience wrapper around a vector of unique pointers to FairMQMessage, used for sending multi-part messages
class FairMQParts
{
public:
@ -22,6 +24,8 @@ class FairMQParts
FairMQParts() : fParts() {};
/// Copy Constructor
FairMQParts(const FairMQParts&) = delete;
/// Move constructor
FairMQParts(FairMQParts&& p) = default;
/// Assignment operator
FairMQParts& operator=(const FairMQParts&) = delete;
/// Default destructor
@ -34,13 +38,20 @@ class FairMQParts
fParts.push_back(std::unique_ptr<FairMQMessage>(msg));
}
/// Adds part (std::unique_ptr<FairMQMessage>) to the container (move)
/// @param msg unique pointer to FairMQMessage
inline void AddPart(std::unique_ptr<FairMQMessage> msg)
{
fParts.push_back(std::move(msg));
}
/// Get reference to part in the container at index (without bounds check)
/// @param index container index
inline FairMQMessage& operator[](const int index) { return *(fParts[index]); }
/// Get reference to part in the container at index (with bounds check)
/// Get reference to unique pointer to part in the container at index (with bounds check)
/// @param index container index
inline FairMQMessage& At(const int index) { return *(fParts.at(index)); }
inline std::unique_ptr<FairMQMessage>& At(const int index) { return fParts.at(index); }
/// Get number of parts in the container
/// @return number of parts in the container

View File

@ -16,6 +16,7 @@
#define FAIRMQSOCKET_H_
#include <string>
#include <vector>
#include "FairMQMessage.h"
@ -39,8 +40,11 @@ class FairMQSocket
virtual int Send(FairMQMessage* msg, const std::string& flag = "") = 0;
virtual int Send(FairMQMessage* msg, const int flags = 0) = 0;
virtual int64_t Send(const std::vector<std::unique_ptr<FairMQMessage>>& msgVec) = 0;
virtual int Receive(FairMQMessage* msg, const std::string& flag = "") = 0;
virtual int Receive(FairMQMessage* msg, const int flags = 0) = 0;
virtual int64_t Receive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) = 0;
virtual void* GetSocket() const = 0;
virtual int GetSocket(int nothing) const = 0;

View File

@ -32,7 +32,7 @@ class FairMQTransportFactory
public:
virtual FairMQMessage* CreateMessage() = 0;
virtual FairMQMessage* CreateMessage(const size_t size) = 0;
virtual FairMQMessage* CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn = NULL, void* hint = NULL) = 0;
virtual FairMQMessage* CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = NULL) = 0;
virtual FairMQSocket* CreateSocket(const std::string& type, const std::string& name, const int numIoThreads, const std::string& id = "") = 0;

View File

@ -35,10 +35,9 @@ FairMQBenchmarkSampler::~FairMQBenchmarkSampler()
void FairMQBenchmarkSampler::Run()
{
void* buffer = malloc(fMsgSize);
int numSentMsgs = 0;
unique_ptr<FairMQMessage> baseMsg(fTransportFactory->CreateMessage(buffer, fMsgSize));
unique_ptr<FairMQMessage> baseMsg(fTransportFactory->CreateMessage(fMsgSize));
// store the channel reference to avoid traversing the map on every loop iteration
const FairMQChannel& dataOutChannel = fChannels.at("data-out").at(0);

View File

@ -1,46 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQBuffer.cxx
*
* @since 2012-10-25
* @author D. Klein, A. Rybalchenko
*/
#include <iostream>
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include "FairMQBuffer.h"
#include "FairMQLogger.h"
FairMQBuffer::FairMQBuffer()
{
}
void FairMQBuffer::Run()
{
// store the channel references to avoid traversing the map on every loop iteration
const FairMQChannel& dataInChannel = fChannels.at("data-in").at(0);
const FairMQChannel& dataOutChannel = fChannels.at("data-out").at(0);
while (CheckCurrentState(RUNNING))
{
std::unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
if (dataInChannel.Receive(msg) > 0)
{
dataOutChannel.Send(msg);
}
}
}
FairMQBuffer::~FairMQBuffer()
{
}

View File

@ -1,30 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQBuffer.h
*
* @since 2012-10-25
* @author D. Klein, A. Rybalchenko
*/
#ifndef FAIRMQBUFFER_H_
#define FAIRMQBUFFER_H_
#include "FairMQDevice.h"
class FairMQBuffer : public FairMQDevice
{
public:
FairMQBuffer();
virtual ~FairMQBuffer();
protected:
virtual void Run();
};
#endif /* FAIRMQBUFFER_H_ */

View File

@ -29,22 +29,12 @@ FairMQMerger::~FairMQMerger()
void FairMQMerger::Run()
{
// store the channel references to avoid traversing the map on every loop iteration
auto& dataInChannelRef = fChannels.at("data-in");
const FairMQChannel& dataOutChannel = fChannels.at("data-out").at(0);
int numInputs = dataInChannelRef.size();
std::vector<FairMQChannel*> dataInChannels(numInputs);
for (int i = 0; i < numInputs; ++i)
{
dataInChannels.at(i) = &(dataInChannelRef.at(i));
}
int numInputs = fChannels.at("data-in").size();
std::unique_ptr<FairMQPoller> poller(fTransportFactory->CreatePoller(dataInChannelRef));
std::unique_ptr<FairMQPoller> poller(fTransportFactory->CreatePoller(fChannels.at("data-in")));
while (CheckCurrentState(RUNNING))
{
std::unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
poller->Poll(100);
// Loop over the data input channels.
@ -53,19 +43,19 @@ void FairMQMerger::Run()
// Check if the channel has data ready to be received.
if (poller->CheckInput(i))
{
// Try receiving the data.
if (dataInChannels[i]->Receive(msg) >= 0)
FairMQParts parts;
if (Receive(parts, "data-in", i) >= 0)
{
// If data was received, send it to output.
if (dataOutChannel.Send(msg) < 0)
if (Send(parts, "data-out") < 0)
{
LOG(DEBUG) << "Blocking send interrupted by a command";
LOG(DEBUG) << "Transfer interrupted";
break;
}
}
else
{
LOG(DEBUG) << "Blocking receive interrupted by a command";
LOG(DEBUG) << "Transfer interrupted";
break;
}
}

View File

@ -28,17 +28,22 @@ FairMQProxy::~FairMQProxy()
void FairMQProxy::Run()
{
std::unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
// store the channel references to avoid traversing the map on every loop iteration
const FairMQChannel& dataInChannel = fChannels.at("data-in").at(0);
const FairMQChannel& dataOutChannel = fChannels.at("data-out").at(0);
while (CheckCurrentState(RUNNING))
{
if (dataInChannel.Receive(msg) > 0)
FairMQParts parts;
if (Receive(parts, "data-in") >= 0)
{
dataOutChannel.Send(msg);
if (Send(parts, "data-out") < 0)
{
LOG(DEBUG) << "Transfer interrupted";
break;
}
}
else
{
LOG(DEBUG) << "Transfer interrupted";
break;
}
}
}

View File

@ -28,30 +28,32 @@ FairMQSplitter::~FairMQSplitter()
void FairMQSplitter::Run()
{
// store the channel references to avoid traversing the map on every loop iteration
auto& dataOutChannelRef = fChannels.at("data-out");
const FairMQChannel& dataInChannel = fChannels.at("data-in").at(0);
int numOutputs = dataOutChannelRef.size();
std::vector<FairMQChannel*> dataOutChannels(numOutputs);
for (int i = 0; i < numOutputs; ++i)
{
dataOutChannels[i] = &(dataOutChannelRef.at(i));
}
int numOutputs = fChannels.at("data-out").size();
int direction = 0;
while (CheckCurrentState(RUNNING))
{
std::unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
FairMQParts parts;
if (dataInChannel.Receive(msg) >= 0)
if (Receive(parts, "data-in") >= 0)
{
dataOutChannels[direction]->Send(msg);
++direction;
if (direction >= numOutputs)
if (Send(parts, "data-out", direction) < 0)
{
direction = 0;
LOG(DEBUG) << "Transfer interrupted";
break;
}
}
else
{
LOG(DEBUG) << "Transfer interrupted";
break;
}
++direction;
if (direction >= numOutputs)
{
direction = 0;
}
}
}

View File

@ -53,7 +53,7 @@ FairMQMessageNN::FairMQMessageNN(const size_t size)
* create FairMQMessage object only with size parameter and fill it with data.
* possible TODO: make this zero copy (will should then be as efficient as ZeroMQ).
*/
FairMQMessageNN::FairMQMessageNN(void* data, const size_t size, fairmq_free_fn *ffn, void* hint)
FairMQMessageNN::FairMQMessageNN(void* data, const size_t size, fairmq_free_fn* ffn, void* hint)
: fMessage(NULL)
, fSize(0)
, fReceiving(false)
@ -67,15 +67,10 @@ FairMQMessageNN::FairMQMessageNN(void* data, const size_t size, fairmq_free_fn *
{
memcpy(fMessage, data, size);
fSize = size;
if (ffn)
{
ffn(data, hint);
}
else
{
if(data) free(data);
}
}
}
@ -97,7 +92,7 @@ void FairMQMessageNN::Rebuild(const size_t size)
fReceiving = false;
}
void FairMQMessageNN::Rebuild(void* data, const size_t size, fairmq_free_fn *ffn, void* hint)
void FairMQMessageNN::Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint)
{
Clear();
fMessage = nn_allocmsg(size, 0);
@ -111,14 +106,10 @@ void FairMQMessageNN::Rebuild(void* data, const size_t size, fairmq_free_fn *ffn
fSize = size;
fReceiving = false;
if(ffn)
if (ffn)
{
ffn(data, hint);
}
else
{
if(data) free(data);
}
}
}

View File

@ -24,13 +24,13 @@ class FairMQMessageNN : public FairMQMessage
public:
FairMQMessageNN();
FairMQMessageNN(const size_t size);
FairMQMessageNN(void* data, const size_t size, fairmq_free_fn *ffn = NULL, void* hint = NULL);
FairMQMessageNN(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = NULL);
FairMQMessageNN(const FairMQMessageNN&) = delete;
FairMQMessageNN operator=(const FairMQMessageNN&) = delete;
virtual void Rebuild();
virtual void Rebuild(const size_t size);
virtual void Rebuild(void* data, const size_t size, fairmq_free_fn *ffn = NULL, void* hint = NULL);
virtual void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = NULL);
virtual void* GetMessage();
virtual void* GetData();

View File

@ -12,15 +12,18 @@
* @author A. Rybalchenko
*/
#include <sstream>
#include "FairMQSocketNN.h"
#include "FairMQMessageNN.h"
#include "FairMQLogger.h"
#include <sstream>
#ifdef MSGPACK_FOUND
#include <msgpack.hpp>
#endif /*MSGPACK_FOUND*/
using namespace std;
FairMQSocketNN::FairMQSocketNN(const string& type, const std::string& name, const int numIoThreads, const std::string& id /*= ""*/)
FairMQSocketNN::FairMQSocketNN(const string& type, const string& name, const int numIoThreads, const string& id /*= ""*/)
: FairMQSocket(0, 0, NN_DONTWAIT)
, fSocket(-1)
, fId()
@ -62,7 +65,7 @@ FairMQSocketNN::FairMQSocketNN(const string& type, const std::string& name, cons
}
}
LOG(INFO) << "created socket " << fId;
// LOG(INFO) << "created socket " << fId;
}
string FairMQSocketNN::GetId()
@ -72,7 +75,7 @@ string FairMQSocketNN::GetId()
bool FairMQSocketNN::Bind(const string& address)
{
LOG(INFO) << "bind socket " << fId << " on " << address;
// LOG(INFO) << "bind socket " << fId << " on " << address;
int eid = nn_bind(fSocket, address.c_str());
if (eid < 0)
@ -85,7 +88,7 @@ bool FairMQSocketNN::Bind(const string& address)
void FairMQSocketNN::Connect(const string& address)
{
LOG(INFO) << "connect socket " << fId << " to " << address;
// LOG(INFO) << "connect socket " << fId << " to " << address;
int eid = nn_connect(fSocket, address.c_str());
if (eid < 0)
@ -142,6 +145,46 @@ int FairMQSocketNN::Send(FairMQMessage* msg, const int flags)
return nbytes;
}
int64_t FairMQSocketNN::Send(const vector<unique_ptr<FairMQMessage>>& msgVec)
{
#ifdef MSGPACK_FOUND
// create msgpack simple buffer
msgpack::sbuffer sbuf;
// create msgpack packer
msgpack::packer<msgpack::sbuffer> packer(&sbuf);
// pack all parts into a single msgpack simple buffer
for (int i = 0; i < msgVec.size(); ++i)
{
static_cast<FairMQMessageNN*>(msgVec[i].get())->fReceiving = false;
packer.pack_bin(msgVec[i]->GetSize());
packer.pack_bin_body(static_cast<char*>(msgVec[i]->GetData()), msgVec[i]->GetSize());
}
int64_t nbytes = nn_send(fSocket, sbuf.data(), sbuf.size(), 0);
if (nbytes >= 0)
{
fBytesTx += nbytes;
++fMessagesTx;
return nbytes;
}
if (nn_errno() == EAGAIN)
{
return -2;
}
if (nn_errno() == ETERM)
{
LOG(INFO) << "terminating socket " << fId;
return -1;
}
LOG(ERROR) << "Failed sending on socket " << fId << ", reason: " << nn_strerror(errno);
return nbytes;
#else /*MSGPACK_FOUND*/
LOG(ERROR) << "Cannot use nanomsg multipart because MessagePack was not found.";
exit(EXIT_FAILURE);
#endif /*MSGPACK_FOUND*/
}
int FairMQSocketNN::Receive(FairMQMessage* msg, const string& flag)
{
void* ptr = NULL;
@ -193,6 +236,67 @@ int FairMQSocketNN::Receive(FairMQMessage* msg, const int flags)
return nbytes;
}
int64_t FairMQSocketNN::Receive(vector<unique_ptr<FairMQMessage>>& msgVec)
{
#ifdef MSGPACK_FOUND
// Warn if the vector is filled before Receive() and empty it.
if (msgVec.size() > 0)
{
LOG(WARN) << "Message vector contains elements before Receive(), they will be deleted!";
msgVec.clear();
}
// pointer to point to received message buffer
char* ptr = NULL;
// receive the message into a buffer allocated by nanomsg and let ptr point to it
int nbytes = nn_recv(fSocket, &ptr, NN_MSG, 0);
if (nbytes >= 0) // if no errors or non-blocking timeouts
{
// store statistics on how many bytes received
fBytesRx += nbytes;
// store statistics on how many messages received (count messages instead of parts)
++fMessagesRx;
// offset to be used by msgpack to handle separate chunks
size_t offset = 0;
while (offset != nbytes) // continue until all parts have been read
{
// vector of chars to hold blob (unlike char*/void* this type can be converted to by msgpack)
std::vector<char> buf;
// unpack and convert chunk
msgpack::unpacked result;
unpack(result, ptr, nbytes, offset);
msgpack::object object(result.get());
object.convert(buf);
// get the single message size
size_t size = buf.size() * sizeof(char);
unique_ptr<FairMQMessage> part(new FairMQMessageNN(size));
static_cast<FairMQMessageNN*>(part.get())->fReceiving = true;
memcpy(part->GetData(), buf.data(), size);
msgVec.push_back(move(part));
}
nn_freemsg(ptr);
return nbytes;
}
if (nn_errno() == EAGAIN)
{
return -2;
}
if (nn_errno() == ETERM)
{
LOG(INFO) << "terminating socket " << fId;
return -1;
}
LOG(ERROR) << "Failed receiving on socket " << fId << ", reason: " << nn_strerror(errno);
return nbytes;
#else /*MSGPACK_FOUND*/
LOG(ERROR) << "Cannot use nanomsg multipart because MessagePack was not found.";
exit(EXIT_FAILURE);
#endif /*MSGPACK_FOUND*/
}
void FairMQSocketNN::Close()
{
nn_close(fSocket);
@ -225,7 +329,8 @@ void FairMQSocketNN::SetOption(const string& option, const void* value, size_t v
void FairMQSocketNN::GetOption(const string& option, void* value, size_t* valueSize)
{
int rc = nn_getsockopt(fSocket, NN_SOL_SOCKET, GetConstant(option), value, valueSize);
if (rc < 0) {
if (rc < 0)
{
LOG(ERROR) << "failed getting socket option, reason: " << nn_strerror(errno);
}
}
@ -329,11 +434,17 @@ int FairMQSocketNN::GetConstant(const string& constant)
return NN_SNDBUF;
if (constant == "rcv-hwm")
return NN_RCVBUF;
if (constant == "snd-more") {
if (constant == "snd-size")
return NN_SNDBUF;
if (constant == "rcv-size")
return NN_RCVBUF;
if (constant == "snd-more")
{
LOG(ERROR) << "Multipart messages functionality currently not supported by nanomsg!";
return -1;
}
if (constant == "rcv-more") {
if (constant == "rcv-more")
{
LOG(ERROR) << "Multipart messages functionality currently not supported by nanomsg!";
return -1;
}

View File

@ -15,6 +15,8 @@
#ifndef FAIRMQSOCKETNN_H_
#define FAIRMQSOCKETNN_H_
#include <vector>
#include <nanomsg/nn.h>
#include <nanomsg/pipeline.h>
#include <nanomsg/pubsub.h>
@ -37,8 +39,11 @@ class FairMQSocketNN : public FairMQSocket
virtual int Send(FairMQMessage* msg, const std::string& flag = "");
virtual int Send(FairMQMessage* msg, const int flags = 0);
virtual int64_t Send(const std::vector<std::unique_ptr<FairMQMessage>>& msgVec);
virtual int Receive(FairMQMessage* msg, const std::string& flag = "");
virtual int Receive(FairMQMessage* msg, const int flags = 0);
virtual int64_t Receive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec);
virtual void* GetSocket() const;
virtual int GetSocket(int nothing) const;

View File

@ -29,7 +29,7 @@ class FairMQTransportFactoryNN : public FairMQTransportFactory
virtual FairMQMessage* CreateMessage();
virtual FairMQMessage* CreateMessage(const size_t size);
virtual FairMQMessage* CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn = NULL, void* hint = NULL);
virtual FairMQMessage* CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = NULL);
virtual FairMQSocket* CreateSocket(const std::string& type, const std::string& name, const int numIoThreads, const std::string& id = "");

View File

@ -1,139 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* runBuffer.cxx
*
* @since 2012-10-26
* @author D. Klein, A. Rybalchenko
*/
#include <iostream>
#include "boost/program_options.hpp"
#include "FairMQLogger.h"
#include "FairMQBuffer.h"
using namespace std;
typedef struct DeviceOptions
{
DeviceOptions() :
id(), ioThreads(0), transport(),
inputSocketType(), inputBufSize(0), inputMethod(), inputAddress(),
outputSocketType(), outputBufSize(0), outputMethod(), outputAddress() {}
string id;
int ioThreads;
string transport;
string inputSocketType;
int inputBufSize;
string inputMethod;
string inputAddress;
string outputSocketType;
int outputBufSize;
string outputMethod;
string outputAddress;
} DeviceOptions_t;
inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options)
{
if (_options == NULL)
throw runtime_error("Internal error: options' container is empty.");
namespace bpo = boost::program_options;
bpo::options_description desc("Options");
desc.add_options()
("id", bpo::value<string>(), "Device ID")
("io-threads", bpo::value<int>()->default_value(1), "Number of I/O threads")
("transport", bpo::value<string>()->default_value("zeromq"), "Transport (zeromq/nanomsg)")
("input-socket-type", bpo::value<string>()->required(), "Input socket type: sub/pull")
("input-buff-size", bpo::value<int>()->default_value(1000), "Input buffer size in number of messages (ZeroMQ)/bytes(nanomsg)")
("input-method", bpo::value<string>()->required(), "Input method: bind/connect")
("input-address", bpo::value<string>()->required(), "Input address, e.g.: \"tcp://localhost:5555\"")
("output-socket-type", bpo::value<string>()->required(), "Output socket type: pub/push")
("output-buff-size", bpo::value<int>()->default_value(1000), "Output buffer size in number of messages (ZeroMQ)/bytes(nanomsg)")
("output-method", bpo::value<string>()->required(), "Output method: bind/connect")
("output-address", bpo::value<string>()->required(), "Output address, e.g.: \"tcp://localhost:5555\"")
("help", "Print help messages");
bpo::variables_map vm;
bpo::store(bpo::parse_command_line(_argc, _argv, desc), vm);
if (vm.count("help"))
{
LOG(INFO) << "FairMQ Buffer" << endl << desc;
return false;
}
bpo::notify(vm);
if (vm.count("id")) { _options->id = vm["id"].as<string>(); }
if (vm.count("io-threads")) { _options->ioThreads = vm["io-threads"].as<int>(); }
if (vm.count("transport")) { _options->transport = vm["transport"].as<string>(); }
if (vm.count("input-socket-type")) { _options->inputSocketType = vm["input-socket-type"].as<string>(); }
if (vm.count("input-buff-size")) { _options->inputBufSize = vm["input-buff-size"].as<int>(); }
if (vm.count("input-method")) { _options->inputMethod = vm["input-method"].as<string>(); }
if (vm.count("input-address")) { _options->inputAddress = vm["input-address"].as<string>(); }
if (vm.count("output-socket-type")) { _options->outputSocketType = vm["output-socket-type"].as<string>(); }
if (vm.count("output-buff-size")) { _options->outputBufSize = vm["output-buff-size"].as<int>(); }
if (vm.count("output-method")) { _options->outputMethod = vm["output-method"].as<string>(); }
if (vm.count("output-address")) { _options->outputAddress = vm["output-address"].as<string>(); }
return true;
}
int main(int argc, char** argv)
{
FairMQBuffer buffer;
buffer.CatchSignals();
DeviceOptions_t options;
try
{
if (!parse_cmd_line(argc, argv, &options))
return 0;
}
catch (exception& e)
{
LOG(ERROR) << e.what();
return 1;
}
LOG(INFO) << "PID: " << getpid();
buffer.SetTransport(options.transport);
FairMQChannel inputChannel(options.inputSocketType, options.inputMethod, options.inputAddress);
inputChannel.UpdateSndBufSize(options.inputBufSize);
inputChannel.UpdateRcvBufSize(options.inputBufSize);
inputChannel.UpdateRateLogging(1);
buffer.fChannels["data-in"].push_back(inputChannel);
FairMQChannel outputChannel(options.outputSocketType, options.outputMethod, options.outputAddress);
outputChannel.UpdateSndBufSize(options.outputBufSize);
outputChannel.UpdateRcvBufSize(options.outputBufSize);
outputChannel.UpdateRateLogging(1);
buffer.fChannels["data-out"].push_back(outputChannel);
buffer.SetProperty(FairMQBuffer::Id, options.id);
buffer.SetProperty(FairMQBuffer::NumIoThreads, options.ioThreads);
buffer.ChangeState("INIT_DEVICE");
buffer.WaitForEndOfState("INIT_DEVICE");
buffer.ChangeState("INIT_TASK");
buffer.WaitForEndOfState("INIT_TASK");
buffer.ChangeState("RUN");
buffer.InteractiveStateLoop();
return 0;
}

View File

@ -38,10 +38,10 @@ FairMQMessageZMQ::FairMQMessageZMQ(const size_t size)
}
}
FairMQMessageZMQ::FairMQMessageZMQ(void* data, const size_t size, fairmq_free_fn *ffn, void* hint)
FairMQMessageZMQ::FairMQMessageZMQ(void* data, const size_t size, fairmq_free_fn* ffn, void* hint)
: fMessage()
{
if (zmq_msg_init_data(&fMessage, data, size, ffn ? ffn : &CleanUp, hint) != 0)
if (zmq_msg_init_data(&fMessage, data, size, ffn, hint) != 0)
{
LOG(ERROR) << "failed initializing message with data, reason: " << zmq_strerror(errno);
}
@ -65,10 +65,10 @@ void FairMQMessageZMQ::Rebuild(const size_t size)
}
}
void FairMQMessageZMQ::Rebuild(void* data, const size_t size, fairmq_free_fn *ffn, void* hint)
void FairMQMessageZMQ::Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint)
{
CloseMessage();
if (zmq_msg_init_data(&fMessage, data, size, ffn ? ffn : &CleanUp, hint) != 0)
if (zmq_msg_init_data(&fMessage, data, size, ffn, hint) != 0)
{
LOG(ERROR) << "failed initializing message with data, reason: " << zmq_strerror(errno);
}
@ -136,11 +136,6 @@ inline void FairMQMessageZMQ::CloseMessage()
}
}
void FairMQMessageZMQ::CleanUp(void* data, void*)
{
free(data);
}
FairMQMessageZMQ::~FairMQMessageZMQ()
{
if (zmq_msg_close(&fMessage) != 0)

View File

@ -26,11 +26,11 @@ class FairMQMessageZMQ : public FairMQMessage
public:
FairMQMessageZMQ();
FairMQMessageZMQ(const size_t size);
FairMQMessageZMQ(void* data, const size_t size, fairmq_free_fn *ffn = &CleanUp, void* hint = NULL);
FairMQMessageZMQ(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = NULL);
virtual void Rebuild();
virtual void Rebuild(const size_t size);
virtual void Rebuild(void* data, const size_t size, fairmq_free_fn *ffn = &CleanUp, void* hint = NULL);
virtual void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = NULL);
virtual void* GetMessage();
virtual void* GetData();
@ -42,8 +42,6 @@ class FairMQMessageZMQ : public FairMQMessage
virtual void Copy(FairMQMessage* msg);
virtual void Copy(const std::unique_ptr<FairMQMessage>& msg);
static void CleanUp(void* data, void* hint);
virtual ~FairMQMessageZMQ();
private:

View File

@ -17,6 +17,7 @@
#include <zmq.h>
#include "FairMQSocketZMQ.h"
#include "FairMQMessageZMQ.h"
#include "FairMQLogger.h"
using namespace std;
@ -24,7 +25,7 @@ using namespace std;
// Context to hold the ZeroMQ sockets
boost::shared_ptr<FairMQContextZMQ> FairMQSocketZMQ::fContext = boost::shared_ptr<FairMQContextZMQ>(new FairMQContextZMQ(1));
FairMQSocketZMQ::FairMQSocketZMQ(const string& type, const string& name, const int numIoThreads, const std::string& id /*= ""*/)
FairMQSocketZMQ::FairMQSocketZMQ(const string& type, const string& name, const int numIoThreads, const string& id /*= ""*/)
: FairMQSocket(ZMQ_SNDMORE, ZMQ_RCVMORE, ZMQ_DONTWAIT)
, fSocket(NULL)
, fId()
@ -68,6 +69,8 @@ FairMQSocketZMQ::FairMQSocketZMQ(const string& type, const string& name, const i
LOG(ERROR) << "Failed setting ZMQ_SUBSCRIBE socket option, reason: " << zmq_strerror(errno);
}
}
// LOG(INFO) << "created socket " << fId;
}
string FairMQSocketZMQ::GetId()
@ -147,6 +150,52 @@ int FairMQSocketZMQ::Send(FairMQMessage* msg, const int flags)
return nbytes;
}
int64_t FairMQSocketZMQ::Send(const vector<unique_ptr<FairMQMessage>>& msgVec)
{
// Sending vector typicaly handles more then one part
if (msgVec.size() > 1)
{
int64_t totalSize = 0;
for (unsigned int i = 0; i < msgVec.size() - 1; ++i)
{
int nbytes = zmq_msg_send(static_cast<zmq_msg_t*>(msgVec[i]->GetMessage()), fSocket, ZMQ_SNDMORE);
if (nbytes >= 0)
{
totalSize += nbytes;
fBytesTx += nbytes;
}
else
{
return nbytes;
}
}
int n = zmq_msg_send(static_cast<zmq_msg_t*>(msgVec.back()->GetMessage()), fSocket, 0);
if (n >= 0)
{
totalSize += n;
}
else
{
return n;
}
// store statistics on how many messages have been sent (handle all parts as a single message)
++fMessagesTx;
return totalSize;
} // If there's only one part, send it as a regular message
else if (msgVec.size() == 1)
{
return zmq_msg_send(static_cast<zmq_msg_t*>(msgVec.back()->GetMessage()), fSocket, 0);
}
else // if the vector is empty, something might be wrong
{
LOG(WARN) << "Will not send empty vector";
return -1;
}
}
int FairMQSocketZMQ::Receive(FairMQMessage* msg, const string& flag)
{
int nbytes = zmq_msg_recv(static_cast<zmq_msg_t*>(msg->GetMessage()), fSocket, GetConstant(flag));
@ -191,6 +240,44 @@ int FairMQSocketZMQ::Receive(FairMQMessage* msg, const int flags)
return nbytes;
}
int64_t FairMQSocketZMQ::Receive(vector<unique_ptr<FairMQMessage>>& msgVec)
{
// Warn if the vector is filled before Receive() and empty it.
if (msgVec.size() > 0)
{
LOG(WARN) << "Message vector contains elements before Receive(), they will be deleted!";
msgVec.clear();
}
int64_t totalSize = 0;
int64_t more = 0;
do
{
unique_ptr<FairMQMessage> part(new FairMQMessageZMQ());
int nbytes = zmq_msg_recv(static_cast<zmq_msg_t*>(part->GetMessage()), fSocket, 0);
if (nbytes >= 0)
{
msgVec.push_back(move(part));
totalSize += nbytes;
fBytesRx += nbytes;
}
else
{
return nbytes;
}
size_t more_size = sizeof(more);
zmq_getsockopt(fSocket, ZMQ_RCVMORE, &more, &more_size);
}
while (more);
// store statistics on how many messages have been received (handle all parts as a single message)
++fMessagesRx;
return totalSize;
}
void FairMQSocketZMQ::Close()
{
// LOG(DEBUG) << "Closing socket " << fId;
@ -414,6 +501,10 @@ int FairMQSocketZMQ::GetConstant(const string& constant)
return ZMQ_SNDHWM;
if (constant == "rcv-hwm")
return ZMQ_RCVHWM;
if (constant == "snd-size")
return ZMQ_SNDBUF;
if (constant == "rcv-size")
return ZMQ_RCVBUF;
if (constant == "snd-more")
return ZMQ_SNDMORE;
if (constant == "rcv-more")

View File

@ -34,8 +34,11 @@ class FairMQSocketZMQ : public FairMQSocket
virtual int Send(FairMQMessage* msg, const std::string& flag = "");
virtual int Send(FairMQMessage* msg, const int flags = 0);
virtual int64_t Send(const std::vector<std::unique_ptr<FairMQMessage>>& msgVec);
virtual int Receive(FairMQMessage* msg, const std::string& flag = "");
virtual int Receive(FairMQMessage* msg, const int flags = 0);
virtual int64_t Receive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec);
virtual void* GetSocket() const;
virtual int GetSocket(int nothing) const;

View File

@ -30,7 +30,7 @@ class FairMQTransportFactoryZMQ : public FairMQTransportFactory
virtual FairMQMessage* CreateMessage();
virtual FairMQMessage* CreateMessage(const size_t size);
virtual FairMQMessage* CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn = NULL, void* hint = NULL);
virtual FairMQMessage* CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = NULL);
virtual FairMQSocket* CreateSocket(const std::string& type, const std::string& name, const int numIoThreads, const std::string& id = "");