First version of the shared memory transport.

Use via `--transport shmem` cmd option. No pub/sub.
This commit is contained in:
Alexey Rybalchenko 2016-06-03 11:24:12 +02:00
parent 6c3b01f09c
commit a332d9fc83
39 changed files with 2121 additions and 309 deletions

View File

@ -19,6 +19,7 @@ Set(INCLUDE_DIRECTORIES
${CMAKE_SOURCE_DIR}/fairmq/options
${CMAKE_SOURCE_DIR}/fairmq/logger
${CMAKE_SOURCE_DIR}/fairmq/zeromq
${CMAKE_SOURCE_DIR}/fairmq/shmem
${CMAKE_CURRENT_BINARY_DIR}
)
@ -67,6 +68,12 @@ Set(SRCS
"zeromq/FairMQPollerZMQ.cxx"
"zeromq/FairMQContextZMQ.cxx"
"shmem/FairMQTransportFactorySHM.cxx"
"shmem/FairMQMessageSHM.cxx"
"shmem/FairMQSocketSHM.cxx"
"shmem/FairMQPollerSHM.cxx"
"shmem/FairMQContextSHM.cxx"
"FairMQLogger.cxx"
"FairMQConfigurable.cxx"
"FairMQStateMachine.cxx"
@ -125,6 +132,7 @@ Set(DEPENDENCIES
${Boost_PROGRAM_OPTIONS_LIBRARY}
${Boost_REGEX_LIBRARY}
${Boost_DATE_TIME_LIBRARY}
${Boost_INTERPROCESS_LIBRARY}
)
If(NANOMSG_FOUND)

View File

@ -35,6 +35,7 @@
#include "FairMQProgOptions.h"
#include "FairMQTransportFactoryZMQ.h"
#include "FairMQTransportFactorySHM.h"
#ifdef NANOMSG_FOUND
#include "FairMQTransportFactoryNN.h"
#endif
@ -177,6 +178,9 @@ void FairMQDevice::InitWrapper()
{
fCmdSocket = fTransportFactory->CreateSocket("pub", "device-commands", fNumIoThreads, fId);
fCmdSocket->Bind("inproc://commands");
FairMQMessagePtr msg(fTransportFactory->CreateMessage());
msg->SetDeviceId(fId);
}
// Containers to store the uninitialized channels.
@ -301,66 +305,83 @@ bool FairMQDevice::ConnectChannel(FairMQChannel& ch)
bool FairMQDevice::AttachChannel(FairMQChannel& ch)
{
std::vector<std::string> endpoints;
FairMQChannel::Tokenize(endpoints, ch.fAddress);
for (auto& endpoint : endpoints)
{
//(re-)init socket
if (!ch.fSocket) {
ch.fSocket = fTransportFactory->CreateSocket(ch.fType, ch.fChannelName, fNumIoThreads, fId);
std::vector<std::string> endpoints;
FairMQChannel::Tokenize(endpoints, ch.fAddress);
for (auto& endpoint : endpoints)
{
//(re-)init socket
if (!ch.fSocket)
{
ch.fSocket = fTransportFactory->CreateSocket(ch.fType, ch.fChannelName, fNumIoThreads, fId);
}
// set high water marks
ch.fSocket->SetOption("snd-hwm", &(ch.fSndBufSize), sizeof(ch.fSndBufSize));
ch.fSocket->SetOption("rcv-hwm", &(ch.fRcvBufSize), sizeof(ch.fRcvBufSize));
// set kernel transmit size
ch.fSocket->SetOption("snd-size", &(ch.fSndKernelSize), sizeof(ch.fSndKernelSize));
ch.fSocket->SetOption("rcv-size", &(ch.fRcvKernelSize), sizeof(ch.fRcvKernelSize));
// attach
bool bind = (ch.fMethod == "bind");
bool connectionModifier = false;
std::string address = endpoint;
// check if the default fMethod is overridden by a modifier
if (endpoint[0] == '+' || endpoint[0] == '>')
{
connectionModifier = true;
bind = false;
address = endpoint.substr(1);
}
else if (endpoint[0] == '@')
{
connectionModifier = true;
bind = true;
address = endpoint.substr(1);
}
bool rc = true;
// make the connection
if (bind)
{
rc = BindEndpoint(*ch.fSocket, address);
}
else
{
rc = ConnectEndpoint(*ch.fSocket, address);
}
// bind might bind to an address different than requested,
// put the actual address back in the config
endpoint.clear();
if (connectionModifier)
{
endpoint.push_back(bind?'@':'+');
}
endpoint += address;
LOG(DEBUG) << "Attached channel " << ch.fChannelName << " to " << endpoint << (bind ? " (bind) " : " (connect) ");
// after the book keeping is done, exit in case of errors
if (!rc)
{
return rc;
}
}
// set high water marks
ch.fSocket->SetOption("snd-hwm", &(ch.fSndBufSize), sizeof(ch.fSndBufSize));
ch.fSocket->SetOption("rcv-hwm", &(ch.fRcvBufSize), sizeof(ch.fRcvBufSize));
// put the (possibly) modified address back in the config
ch.UpdateAddress(boost::algorithm::join(endpoints, ","));
// attach
bool bind = (ch.fMethod=="bind");
bool connectionModifier = false;
std::string address = endpoint;
// check if the default fMethod is overridden by a modifier
if (endpoint[0]=='+' || endpoint[0]=='>') {
connectionModifier = true;
bind = false;
address = endpoint.substr(1);
} else if (endpoint[0]=='@') {
connectionModifier = true;
bind = true;
address = endpoint.substr(1);
}
bool rc = true;
// make the connection
if (bind) {
rc = BindEndpoint(*ch.fSocket, address);
} else {
rc = ConnectEndpoint(*ch.fSocket, address);
}
// bind might bind to an address different than requested,
// put the actual address back in the config
endpoint.clear();
if (connectionModifier) endpoint.push_back(bind?'@':'+');
endpoint += address;
LOG(DEBUG) << "Attached channel " << ch.fChannelName << " to " << endpoint
<< (bind?" (bind) ":" (connect) ");
// after the book keeping is done, exit in case of errors
if (!rc) return rc;
}
// put the (possibly) modified address back in the config
ch.UpdateAddress(boost::algorithm::join(endpoints, ","));
return true;
return true;
}
bool FairMQDevice::ConnectEndpoint(FairMQSocket& socket, std::string& endpoint)
{
socket.Connect(endpoint);
return true;
socket.Connect(endpoint);
return true;
}
bool FairMQDevice::BindEndpoint(FairMQSocket& socket, std::string& endpoint)
@ -393,6 +414,7 @@ bool FairMQDevice::BindEndpoint(FairMQSocket& socket, std::string& endpoint)
// TODO: thread safety? (this comes in as a reference and DOES get changed in this case).
endpoint = endpoint.substr(0, pos + 1) + newPort.str();
}
return true;
}
@ -475,6 +497,7 @@ void FairMQDevice::RunWrapper()
std::thread rateLogger(&FairMQDevice::LogSocketRates, this);
FairMQChannel::fInterrupted = false;
fCmdSocket->Resume();
try
{
@ -732,19 +755,23 @@ int FairMQDevice::GetProperty(const int key, const int default_ /*= 0*/)
void FairMQDevice::SetTransport(FairMQTransportFactory* factory)
{
fTransportFactory = unique_ptr<FairMQTransportFactory>(factory);
fTransportFactory = shared_ptr<FairMQTransportFactory>(factory);
}
void FairMQDevice::SetTransport(const string& transport)
{
if (transport == "zeromq")
{
fTransportFactory = unique_ptr<FairMQTransportFactory>(new FairMQTransportFactoryZMQ());
fTransportFactory = make_shared<FairMQTransportFactoryZMQ>();
}
else if (transport == "shmem")
{
fTransportFactory = make_shared<FairMQTransportFactorySHM>();
}
#ifdef NANOMSG_FOUND
else if (transport == "nanomsg")
{
fTransportFactory = unique_ptr<FairMQTransportFactory>(new FairMQTransportFactoryNN());
fTransportFactory = make_shared<FairMQTransportFactoryNN>();
}
#endif
else
@ -875,7 +902,7 @@ void FairMQDevice::LogSocketRates()
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
// LOG(DEBUG) << "FairMQDevice::LogSocketRates() stopping";
LOG(DEBUG) << "FairMQDevice::LogSocketRates() stopping";
}
void FairMQDevice::InteractiveStateLoop()
@ -978,6 +1005,7 @@ void FairMQDevice::InteractiveStateLoop()
void FairMQDevice::Unblock()
{
FairMQChannel::fInterrupted = true;
fCmdSocket->Interrupt();
FairMQMessagePtr cmd(fTransportFactory->CreateMessage());
fCmdSocket->Send(cmd.get(), 0);
}

View File

@ -21,6 +21,8 @@
#include <iostream>
#include <unordered_map>
#include <functional>
#include <assert.h> // static_assert
#include <type_traits> // is_trivially_copyable
#include <mutex>
#include <condition_variable>
@ -41,12 +43,6 @@ typedef std::function<bool(FairMQParts&, int)> InputMultipartCallback;
class FairMQProgOptions;
template<typename T>
void FairMQSimpleMsgCleanup(void* /*data*/, void* hint)
{
delete static_cast<T*>(hint);
}
class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable
{
friend class FairMQChannel;
@ -203,20 +199,42 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable
return fTransportFactory->CreateMessage(size);
}
template<typename T>
static void FairMQSimpleMsgCleanup(void* /*data*/, void* hint)
{
delete static_cast<T*>(hint);
}
static void FairMQNoCleanup(void* /*data*/, void* /*hint*/)
{
}
/// @brief Create new FairMQMessage with user provided buffer and size
/// @param data pointer to user provided buffer
/// @param size size of the user provided buffer
/// @param ffn optional callback, called when the message is transfered (and can be deleted)
/// @param hint optional helper pointer that can be used in the callback
/// @return pointer to FairMQMessage
inline FairMQMessagePtr NewMessage(void* data, int size, fairmq_free_fn* ffn, void* hint = NULL) const
inline FairMQMessagePtr NewMessage(void* data, int size, fairmq_free_fn* ffn, void* hint = nullptr) const
{
return fTransportFactory->CreateMessage(data, size, ffn, hint);
}
template<typename T>
inline FairMQMessagePtr NewStaticMessage(const T& data) const
{
return fTransportFactory->CreateMessage(data, sizeof(T), FairMQNoCleanup, nullptr);
}
inline FairMQMessagePtr NewStaticMessage(const std::string& str) const
{
return fTransportFactory->CreateMessage(const_cast<char*>(str.c_str()), str.length(), FairMQNoCleanup, nullptr);
}
template<typename T>
inline FairMQMessagePtr NewSimpleMessage(const T& data) const
{
static_assert(std::is_trivially_copyable<T>::value, "The argument type for NewSimpleMessage has to be trivially copyable!");
T* dataCopy = new T(data);
return fTransportFactory->CreateMessage(dataCopy, sizeof(T), FairMQSimpleMsgCleanup<T>, dataCopy);
}

View File

@ -25,15 +25,15 @@ class FairMQMessage
public:
virtual void Rebuild() = 0;
virtual void Rebuild(const size_t size) = 0;
virtual void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = NULL) = 0;
virtual void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) = 0;
virtual void* GetMessage() = 0;
virtual void* GetData() = 0;
virtual size_t GetSize() = 0;
virtual void SetMessage(void* data, size_t size) = 0;
virtual void CloseMessage() = 0;
virtual void Copy(FairMQMessage* msg) = 0;
virtual void SetDeviceId(const std::string& deviceId) = 0;
virtual void Copy(const std::unique_ptr<FairMQMessage>& msg) = 0;
virtual ~FairMQMessage() {};

View File

@ -17,48 +17,64 @@
bool FairMQSocket::Attach(const std::string& config, bool serverish)
{
if (config.empty())
return false;
if (config.size()<2)
return false;
const char* endpoints = config.c_str();
// We hold each individual endpoint here
char endpoint [256];
while (*endpoints) {
const char *delimiter = strchr (endpoints, ',');
if (!delimiter)
delimiter = endpoints + strlen (endpoints);
if (delimiter - endpoints > 255)
return false;
memcpy (endpoint, endpoints, delimiter - endpoints);
endpoint [delimiter - endpoints] = 0;
bool rc;
if (endpoint [0] == '@') {
rc = Bind(endpoint + 1);
if (config.empty())
{
return false;
}
else if (endpoint [0] == '>' || endpoint [0] == '-' || endpoint [0] == '+' ) {
Connect(endpoint + 1);
}
else if (serverish) {
rc = Bind(endpoint);
}
else {
Connect(endpoint);
if (config.size() < 2)
{
return false;
}
if (!rc) {
return false;
const char* endpoints = config.c_str();
// We hold each individual endpoint here
char endpoint [256];
while (*endpoints)
{
const char *delimiter = strchr(endpoints, ',');
if (!delimiter)
{
delimiter = endpoints + strlen(endpoints);
}
if (delimiter - endpoints > 255)
{
return false;
}
memcpy(endpoint, endpoints, delimiter - endpoints);
endpoint[delimiter - endpoints] = 0;
bool rc = false;
if (endpoint [0] == '@')
{
rc = Bind(endpoint + 1);
}
else if (endpoint [0] == '>' || endpoint [0] == '-' || endpoint [0] == '+' )
{
Connect(endpoint + 1);
}
else if (serverish)
{
rc = Bind(endpoint);
}
else
{
Connect(endpoint);
}
if (!rc)
{
return false;
}
if (*delimiter == 0)
{
break;
}
endpoints = delimiter + 1;
}
if (*delimiter == 0) {
break;
}
endpoints = delimiter + 1;
}
return true;
return true;
}

View File

@ -53,6 +53,9 @@ class FairMQSocket
virtual void Close() = 0;
virtual void Terminate() = 0;
virtual void Interrupt() = 0;
virtual void Resume() = 0;
virtual void SetOption(const std::string& option, const void* value, size_t valueSize) = 0;
virtual void GetOption(const std::string& option, void* value, size_t* valueSize) = 0;

View File

@ -213,7 +213,6 @@ struct FairMQFSM_ : public msmf::state_machine_def<FairMQFSM_>
fsm.fState = PAUSED;
fsm.Unblock();
std::unique_lock<std::mutex> lock(fsm.fWorkMutex);
while (fsm.fWorkActive)
{
@ -539,8 +538,8 @@ struct FairMQFSM_ : public msmf::state_machine_def<FairMQFSM_>
bool fWorkAvailable;
protected:
std::mutex fChangeStateMutex;
std::atomic<State> fState;
std::mutex fChangeStateMutex;
};
// reactivate the warning for non-virtual destructor

View File

@ -33,7 +33,7 @@ class FairMQTransportFactory
public:
virtual FairMQMessagePtr CreateMessage() const = 0;
virtual FairMQMessagePtr CreateMessage(const size_t size) const = 0;
virtual FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = NULL) const = 0;
virtual FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) const = 0;
virtual FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name, const int numIoThreads, const std::string& id = "") const = 0;

View File

@ -14,27 +14,40 @@ Example of a simple FairMQ topology:
![example of FairMQ topology](../docs/images/fairmq-example-topology.png?raw=true "Example of possible FairMQ topology")
Within a topology each device needs a unique id (given to it via required command line option `--id`).
Topology configuration is currently happening via setup scripts. This is very rudimentary and a much more flexible system is now in development. For now, example setup scripts can be found in directory `FairRoot/example/Tutorial3/` along with some additional documentation.
## Communication Patterns
FairMQ devices communicate via the communication patterns offered by ZeroMQ (or nanomsg): PUSH-PULL, PUB-SUB, REQ-REP, PAIR, [more info here](http://api.zeromq.org/4-0:zmq-socket).
FairMQ devices communicate via the communication patterns offered by ZeroMQ (or nanomsg): PUSH-PULL, PUB-SUB, REQ-REP, PAIR, [more info here](http://api.zeromq.org/4-0:zmq-socket). Each transport may provide further patterns.
## Messages
Devices transport data between each other in form of `FairMQMessage`s. These can be filled with arbitrary content and transport either raw data or serialized data as described above. Message can be initialized in three different ways:
- **with no parameters**: This is usefull for receiving a message, since neither size nor contents are yet known.
- **given message size**: Initialize message body with a size and fill the contents later, either with `memcpy` or by writing directly into message memory.
- **given message size and buffer**: initialize the message given an existing buffer. This is a zero-copy operation.
Devices transport data between each other in form of `FairMQMessage`s. These can be filled with arbitrary content. Message can be initialized in three different ways:
- **with no parameters**: Initializes an empty message (typically used for receiving).
- **given message size**: Initializes message body with a given size. Fill the created contents via buffer pointer.
- **given existing buffer and a size**: Initialize the message from an existing buffer. In case of ZeroMQ this is a zero-copy operation.
After sending the message, the queueing system takes over control over the message body and will free it with `free()` after it is no longer used. A callback can be given to the message object, to be called instead of the destruction with `free()`.
After sending the message, the transport takes over control over the message body and will free it with `free()` after it is no longer used. A callback can be given to the message object, to be called instead of the destruction with `free()` (for initialization via buffer+size).
## Transport Interface
The communication layer is available through an interface. Two interface implementations are currently available. Main implementation uses the [ZeroMQ](http://zeromq.org) library. Alternative implementation relies on the [nanomsg](http://nanomsg.org) library. Here is an overview to give an idea how interface is implemented:
The communication layer is available through an interface. Three interface implementations are currently available. Main implementation uses the [ZeroMQ](http://zeromq.org) library. Alternative implementation relies on the [nanomsg](http://nanomsg.org) library. Third transport implementation is using shared memory via boost::interprocess & ZeroMQ combination.
Here is an overview to give an idea how interface is implemented:
![FairMQ transport interface](../docs/images/fairmq-transport-interface.png?raw=true "FairMQ transport interface")
Currently, the transports have been tested to work with these communication patterns:
| | ZeroMQ | nanomsg | Shared Memory |
| ------------- |--------| ------- | ------------- |
| PAIR | yes | yes | yes |
| PUSH/PULL | yes | yes | yes |
| PUB/SUB | yes | yes | no |
| REQ/REP | yes | yes | yes |
## State Machine
Each FairMQ device has an internal state machine:

View File

@ -23,10 +23,11 @@
using namespace std;
FairMQBenchmarkSampler::FairMQBenchmarkSampler()
: fMsgSize(10000)
, fNumMsgs(0)
: fSameMessage(true)
, fMsgSize(10000)
, fMsgCounter(0)
, fMsgRate(1)
, fNumMsgs(0)
, fOutChannelName()
{
}
@ -37,9 +38,10 @@ FairMQBenchmarkSampler::~FairMQBenchmarkSampler()
void FairMQBenchmarkSampler::InitTask()
{
fSameMessage = fConfig->GetValue<bool>("same-msg");
fMsgSize = fConfig->GetValue<int>("msg-size");
fNumMsgs = fConfig->GetValue<int>("num-msgs");
fMsgRate = fConfig->GetValue<int>("msg-rate");
fNumMsgs = fConfig->GetValue<uint64_t>("num-msgs");
fOutChannelName = fConfig->GetValue<string>("out-channel");
}
@ -47,7 +49,7 @@ void FairMQBenchmarkSampler::Run()
{
// std::thread resetMsgCounter(&FairMQBenchmarkSampler::ResetMsgCounter, this);
int numSentMsgs = 0;
uint64_t numSentMsgs = 0;
FairMQMessagePtr baseMsg(fTransportFactory->CreateMessage(fMsgSize));
@ -59,17 +61,37 @@ void FairMQBenchmarkSampler::Run()
while (CheckCurrentState(RUNNING))
{
FairMQMessagePtr msg(fTransportFactory->CreateMessage());
msg->Copy(baseMsg);
if (dataOutChannel.Send(msg) >= 0)
if (fSameMessage)
{
if (fNumMsgs > 0)
FairMQMessagePtr msg(fTransportFactory->CreateMessage());
msg->Copy(baseMsg);
if (dataOutChannel.Send(msg) >= 0)
{
if (++numSentMsgs >= fNumMsgs)
if (fNumMsgs > 0)
{
break;
if (numSentMsgs >= fNumMsgs)
{
break;
}
}
++numSentMsgs;
}
}
else
{
FairMQMessagePtr msg(fTransportFactory->CreateMessage(fMsgSize));
if (dataOutChannel.Send(msg) >= 0)
{
if (fNumMsgs > 0)
{
if (numSentMsgs >= fNumMsgs)
{
break;
}
}
++numSentMsgs;
}
}
@ -82,8 +104,7 @@ void FairMQBenchmarkSampler::Run()
auto tEnd = chrono::high_resolution_clock::now();
LOG(INFO) << "Sent " << numSentMsgs << " messages, leaving RUNNING state.";
LOG(INFO) << "Sending time: " << chrono::duration<double, milli>(tEnd - tStart).count() << " ms";
LOG(INFO) << "Leaving RUNNING state. Sent " << numSentMsgs << " messages in " << chrono::duration<double, milli>(tEnd - tStart).count() << "ms.";
// resetMsgCounter.join();
}

View File

@ -32,10 +32,11 @@ class FairMQBenchmarkSampler : public FairMQDevice
void ResetMsgCounter();
protected:
bool fSameMessage;
int fMsgSize;
int fNumMsgs;
int fMsgCounter;
int fMsgRate;
uint64_t fNumMsgs;
std::string fOutChannelName;
virtual void InitTask();

View File

@ -34,7 +34,7 @@ void FairMQSink::InitTask()
void FairMQSink::Run()
{
int numReceivedMsgs = 0;
uint64_t numReceivedMsgs = 0;
// store the channel reference to avoid traversing the map on every loop iteration
const FairMQChannel& dataInChannel = fChannels.at(fInChannelName).at(0);
@ -43,25 +43,24 @@ void FairMQSink::Run()
while (CheckCurrentState(RUNNING))
{
std::unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
FairMQMessagePtr msg(fTransportFactory->CreateMessage());
if (dataInChannel.Receive(msg) >= 0)
{
if (fNumMsgs > 0)
{
numReceivedMsgs++;
if (numReceivedMsgs >= fNumMsgs)
{
break;
}
}
numReceivedMsgs++;
}
}
auto tEnd = chrono::high_resolution_clock::now();
LOG(INFO) << "Received " << numReceivedMsgs << " messages, leaving RUNNING state.";
LOG(INFO) << "Receiving time: " << chrono::duration<double, milli>(tEnd - tStart).count() << " ms";
LOG(INFO) << "Leaving RUNNING state. Received " << numReceivedMsgs << " messages in " << chrono::duration<double, milli>(tEnd - tStart).count() << "ms.";
}
FairMQSink::~FairMQSink()

View File

@ -22,8 +22,10 @@
using namespace std;
string FairMQMessageNN::fDeviceID = string();
FairMQMessageNN::FairMQMessageNN()
: fMessage(NULL)
: fMessage(nullptr)
, fSize(0)
, fReceiving(false)
{
@ -35,7 +37,7 @@ FairMQMessageNN::FairMQMessageNN()
}
FairMQMessageNN::FairMQMessageNN(const size_t size)
: fMessage(NULL)
: fMessage(nullptr)
, fSize(0)
, fReceiving(false)
{
@ -54,7 +56,7 @@ FairMQMessageNN::FairMQMessageNN(const size_t size)
* possible TODO: make this zero copy (will should then be as efficient as ZeroMQ).
*/
FairMQMessageNN::FairMQMessageNN(void* data, const size_t size, fairmq_free_fn* ffn, void* hint)
: fMessage(NULL)
: fMessage(nullptr)
, fSize(0)
, fReceiving(false)
{
@ -71,6 +73,10 @@ FairMQMessageNN::FairMQMessageNN(void* data, const size_t size, fairmq_free_fn*
{
ffn(data, hint);
}
else
{
free(data);
}
}
}
@ -134,30 +140,9 @@ void FairMQMessageNN::SetMessage(void* data, const size_t size)
fSize = size;
}
void FairMQMessageNN::Copy(FairMQMessage* msg)
void FairMQMessageNN::SetDeviceId(const string& deviceId)
{
// DEPRECATED: Use Copy(const unique_ptr<FairMQMessage>&)
if (fMessage)
{
if (nn_freemsg(fMessage) < 0)
{
LOG(ERROR) << "failed freeing message, reason: " << nn_strerror(errno);
}
}
size_t size = msg->GetSize();
fMessage = nn_allocmsg(size, 0);
if (!fMessage)
{
LOG(ERROR) << "failed allocating message, reason: " << nn_strerror(errno);
}
else
{
memcpy(fMessage, msg->GetMessage(), size);
fSize = size;
}
fDeviceID = deviceId;
}
void FairMQMessageNN::Copy(const unique_ptr<FairMQMessage>& msg)
@ -192,7 +177,7 @@ inline void FairMQMessageNN::Clear()
}
else
{
fMessage = NULL;
fMessage = nullptr;
fSize = 0;
}
}
@ -208,7 +193,7 @@ FairMQMessageNN::~FairMQMessageNN()
}
else
{
fMessage = NULL;
fMessage = nullptr;
fSize = 0;
}
}

View File

@ -16,6 +16,7 @@
#define FAIRMQMESSAGENN_H_
#include <cstddef>
#include <string>
#include "FairMQMessage.h"
@ -24,13 +25,13 @@ class FairMQMessageNN : public FairMQMessage
public:
FairMQMessageNN();
FairMQMessageNN(const size_t size);
FairMQMessageNN(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = NULL);
FairMQMessageNN(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr);
FairMQMessageNN(const FairMQMessageNN&) = delete;
FairMQMessageNN operator=(const FairMQMessageNN&) = delete;
virtual void Rebuild();
virtual void Rebuild(const size_t size);
virtual void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = NULL);
virtual void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr);
virtual void* GetMessage();
virtual void* GetData();
@ -38,8 +39,8 @@ class FairMQMessageNN : public FairMQMessage
virtual void SetMessage(void* data, const size_t size);
virtual void CloseMessage() {};
virtual void Copy(FairMQMessage* msg);
virtual void SetDeviceId(const std::string& deviceId);
virtual void Copy(const std::unique_ptr<FairMQMessage>& msg);
virtual ~FairMQMessageNN();
@ -50,6 +51,7 @@ class FairMQMessageNN : public FairMQMessage
void* fMessage;
size_t fSize;
bool fReceiving;
static std::string fDeviceID;
void Clear();
};

View File

@ -110,26 +110,7 @@ void FairMQSocketNN::Connect(const string& address)
int FairMQSocketNN::Send(FairMQMessage* msg, const string& flag)
{
void* ptr = msg->GetMessage();
int nbytes = nn_send(fSocket, &ptr, NN_MSG, GetConstant(flag));
if (nbytes >= 0)
{
fBytesTx += nbytes;
++fMessagesTx;
static_cast<FairMQMessageNN*>(msg)->fReceiving = false;
return nbytes;
}
if (nn_errno() == EAGAIN)
{
return -2;
}
if (nn_errno() == ETERM)
{
LOG(INFO) << "terminating socket " << fId;
return -1;
}
LOG(ERROR) << "Failed sending on socket " << fId << ", reason: " << nn_strerror(errno);
return nbytes;
return Send(msg, GetConstant(flag));
}
int FairMQSocketNN::Send(FairMQMessage* msg, const int flags)
@ -198,28 +179,7 @@ int64_t FairMQSocketNN::Send(const vector<unique_ptr<FairMQMessage>>& msgVec, co
int FairMQSocketNN::Receive(FairMQMessage* msg, const string& flag)
{
void* ptr = NULL;
int nbytes = nn_recv(fSocket, &ptr, NN_MSG, GetConstant(flag));
if (nbytes >= 0)
{
fBytesRx += nbytes;
++fMessagesRx;
msg->Rebuild();
msg->SetMessage(ptr, nbytes);
static_cast<FairMQMessageNN*>(msg)->fReceiving = true;
return nbytes;
}
if (nn_errno() == EAGAIN)
{
return -2;
}
if (nn_errno() == ETERM)
{
LOG(INFO) << "terminating socket " << fId;
return -1;
}
LOG(ERROR) << "Failed receiving on socket " << fId << ", reason: " << nn_strerror(errno);
return nbytes;
return Receive(msg, GetConstant(flag));
}
int FairMQSocketNN::Receive(FairMQMessage* msg, const int flags)
@ -318,6 +278,14 @@ void FairMQSocketNN::Terminate()
nn_term();
}
void FairMQSocketNN::Interrupt()
{
}
void FairMQSocketNN::Resume()
{
}
void* FairMQSocketNN::GetSocket() const
{
return NULL; // dummy method to comply with the interface. functionality not possible in zeromq.
@ -330,6 +298,21 @@ int FairMQSocketNN::GetSocket(int /*nothing*/) const
void FairMQSocketNN::SetOption(const string& option, const void* value, size_t valueSize)
{
if (option == "snd-size" || option == "rcv-size")
{
int val = *(static_cast<int*>(const_cast<void*>(value)));
if (val <= 0)
{
LOG(WARN) << "nanomsg: value for sndKernelSize/rcvKernelSize should be greater than 0, using defaults (128kB).";
return;
}
}
if (option == "snd-hwm" || option == "rcv-hwm")
{
return;
}
int rc = nn_setsockopt(fSocket, NN_SOL_SOCKET, GetConstant(option), value, valueSize);
if (rc < 0)
{

View File

@ -45,6 +45,9 @@ class FairMQSocketNN : public FairMQSocket
virtual void Close();
virtual void Terminate();
virtual void Interrupt();
virtual void Resume();
virtual void SetOption(const std::string& option, const void* value, size_t valueSize);
virtual void GetOption(const std::string& option, void* value, size_t* valueSize);

View File

@ -230,6 +230,8 @@ void ChannelParser(const boost::property_tree::ptree& tree, FairMQMap& channelMa
commonChannel.UpdateAddress(q.second.get<string>("address", commonChannel.GetAddress()));
commonChannel.UpdateSndBufSize(q.second.get<int>("sndBufSize", commonChannel.GetSndBufSize()));
commonChannel.UpdateRcvBufSize(q.second.get<int>("rcvBufSize", commonChannel.GetRcvBufSize()));
commonChannel.UpdateSndKernelSize(q.second.get<int>("sndKernelSize", commonChannel.GetSndKernelSize()));
commonChannel.UpdateRcvKernelSize(q.second.get<int>("rcvKernelSize", commonChannel.GetRcvKernelSize()));
commonChannel.UpdateRateLogging(q.second.get<int>("rateLogging", commonChannel.GetRateLogging()));
// temporary FairMQChannel container
@ -241,12 +243,14 @@ void ChannelParser(const boost::property_tree::ptree& tree, FairMQMap& channelMa
LOG(DEBUG) << "\tnumSockets of " << numSockets << " specified,";
LOG(DEBUG) << "\tapplying common settings to each:";
LOG(DEBUG) << "\ttype = " << commonChannel.GetType();
LOG(DEBUG) << "\tmethod = " << commonChannel.GetMethod();
LOG(DEBUG) << "\taddress = " << commonChannel.GetAddress();
LOG(DEBUG) << "\tsndBufSize = " << commonChannel.GetSndBufSize();
LOG(DEBUG) << "\trcvBufSize = " << commonChannel.GetRcvBufSize();
LOG(DEBUG) << "\trateLogging = " << commonChannel.GetRateLogging();
LOG(DEBUG) << "\ttype = " << commonChannel.GetType();
LOG(DEBUG) << "\tmethod = " << commonChannel.GetMethod();
LOG(DEBUG) << "\taddress = " << commonChannel.GetAddress();
LOG(DEBUG) << "\tsndBufSize = " << commonChannel.GetSndBufSize();
LOG(DEBUG) << "\trcvBufSize = " << commonChannel.GetRcvBufSize();
LOG(DEBUG) << "\tsndKernelSize = " << commonChannel.GetSndKernelSize();
LOG(DEBUG) << "\trcvKernelSize = " << commonChannel.GetRcvKernelSize();
LOG(DEBUG) << "\trateLogging = " << commonChannel.GetRateLogging();
for (int i = 0; i < numSockets; ++i)
{
@ -287,6 +291,8 @@ void ChannelParser(const boost::property_tree::ptree& tree, FairMQMap& channelMa
commonChannel.UpdateAddress(p.second.get<string>("address", commonChannel.GetAddress()));
commonChannel.UpdateSndBufSize(p.second.get<int>("sndBufSize", commonChannel.GetSndBufSize()));
commonChannel.UpdateRcvBufSize(p.second.get<int>("rcvBufSize", commonChannel.GetRcvBufSize()));
commonChannel.UpdateSndKernelSize(p.second.get<int>("sndKernelSize", commonChannel.GetSndKernelSize()));
commonChannel.UpdateRcvKernelSize(p.second.get<int>("rcvKernelSize", commonChannel.GetRcvKernelSize()));
commonChannel.UpdateRateLogging(p.second.get<int>("rateLogging", commonChannel.GetRateLogging()));
}
@ -299,12 +305,14 @@ void ChannelParser(const boost::property_tree::ptree& tree, FairMQMap& channelMa
LOG(DEBUG) << "\tnumSockets of " << numSockets << " specified,";
LOG(DEBUG) << "\tapplying common settings to each:";
LOG(DEBUG) << "\ttype = " << commonChannel.GetType();
LOG(DEBUG) << "\tmethod = " << commonChannel.GetMethod();
LOG(DEBUG) << "\taddress = " << commonChannel.GetAddress();
LOG(DEBUG) << "\tsndBufSize = " << commonChannel.GetSndBufSize();
LOG(DEBUG) << "\trcvBufSize = " << commonChannel.GetRcvBufSize();
LOG(DEBUG) << "\trateLogging = " << commonChannel.GetRateLogging();
LOG(DEBUG) << "\ttype = " << commonChannel.GetType();
LOG(DEBUG) << "\tmethod = " << commonChannel.GetMethod();
LOG(DEBUG) << "\taddress = " << commonChannel.GetAddress();
LOG(DEBUG) << "\tsndBufSize = " << commonChannel.GetSndBufSize();
LOG(DEBUG) << "\trcvBufSize = " << commonChannel.GetRcvBufSize();
LOG(DEBUG) << "\tsndKernelSize = " << commonChannel.GetSndKernelSize();
LOG(DEBUG) << "\trcvKernelSize = " << commonChannel.GetRcvKernelSize();
LOG(DEBUG) << "\trateLogging = " << commonChannel.GetRateLogging();
for (int i = 0; i < numSockets; ++i)
{
@ -342,15 +350,19 @@ void SocketParser(const boost::property_tree::ptree& tree, vector<FairMQChannel>
channel.UpdateAddress(q.second.get<string>("address", channel.GetAddress()));
channel.UpdateSndBufSize(q.second.get<int>("sndBufSize", channel.GetSndBufSize()));
channel.UpdateRcvBufSize(q.second.get<int>("rcvBufSize", channel.GetRcvBufSize()));
channel.UpdateSndKernelSize(q.second.get<int>("sndKernelSize", channel.GetSndKernelSize()));
channel.UpdateRcvKernelSize(q.second.get<int>("rcvKernelSize", channel.GetRcvKernelSize()));
channel.UpdateRateLogging(q.second.get<int>("rateLogging", channel.GetRateLogging()));
LOG(DEBUG) << "" << channelName << "[" << socketCounter << "]:";
LOG(DEBUG) << "\ttype = " << channel.GetType();
LOG(DEBUG) << "\tmethod = " << channel.GetMethod();
LOG(DEBUG) << "\taddress = " << channel.GetAddress();
LOG(DEBUG) << "\tsndBufSize = " << channel.GetSndBufSize();
LOG(DEBUG) << "\trcvBufSize = " << channel.GetRcvBufSize();
LOG(DEBUG) << "\trateLogging = " << channel.GetRateLogging();
LOG(DEBUG) << "\ttype = " << channel.GetType();
LOG(DEBUG) << "\tmethod = " << channel.GetMethod();
LOG(DEBUG) << "\taddress = " << channel.GetAddress();
LOG(DEBUG) << "\tsndBufSize = " << channel.GetSndBufSize();
LOG(DEBUG) << "\trcvBufSize = " << channel.GetRcvBufSize();
LOG(DEBUG) << "\tsndKernelSize = " << channel.GetSndKernelSize();
LOG(DEBUG) << "\trcvKernelSize = " << channel.GetRcvKernelSize();
LOG(DEBUG) << "\trateLogging = " << channel.GetRateLogging();
channelList.push_back(channel);
++socketCounter;
@ -368,15 +380,19 @@ void SocketParser(const boost::property_tree::ptree& tree, vector<FairMQChannel>
channel.UpdateAddress(p.second.get<string>("address", channel.GetAddress()));
channel.UpdateSndBufSize(p.second.get<int>("sndBufSize", channel.GetSndBufSize()));
channel.UpdateRcvBufSize(p.second.get<int>("rcvBufSize", channel.GetRcvBufSize()));
channel.UpdateSndKernelSize(p.second.get<int>("sndKernelSize", channel.GetSndKernelSize()));
channel.UpdateRcvKernelSize(p.second.get<int>("rcvKernelSize", channel.GetRcvKernelSize()));
channel.UpdateRateLogging(p.second.get<int>("rateLogging", channel.GetRateLogging()));
LOG(DEBUG) << "" << channelName << "[" << socketCounter << "]:";
LOG(DEBUG) << "\ttype = " << channel.GetType();
LOG(DEBUG) << "\tmethod = " << channel.GetMethod();
LOG(DEBUG) << "\taddress = " << channel.GetAddress();
LOG(DEBUG) << "\tsndBufSize = " << channel.GetSndBufSize();
LOG(DEBUG) << "\trcvBufSize = " << channel.GetRcvBufSize();
LOG(DEBUG) << "\trateLogging = " << channel.GetRateLogging();
LOG(DEBUG) << "\ttype = " << channel.GetType();
LOG(DEBUG) << "\tmethod = " << channel.GetMethod();
LOG(DEBUG) << "\taddress = " << channel.GetAddress();
LOG(DEBUG) << "\tsndBufSize = " << channel.GetSndBufSize();
LOG(DEBUG) << "\trcvBufSize = " << channel.GetRcvBufSize();
LOG(DEBUG) << "\tsndKernelSize = " << channel.GetSndKernelSize();
LOG(DEBUG) << "\trcvKernelSize = " << channel.GetRcvKernelSize();
LOG(DEBUG) << "\trateLogging = " << channel.GetRateLogging();
channelList.push_back(channel);
++socketCounter;
@ -395,12 +411,14 @@ void SocketParser(const boost::property_tree::ptree& tree, vector<FairMQChannel>
FairMQChannel channel(commonChannel);
LOG(DEBUG) << "\ttype = " << channel.GetType();
LOG(DEBUG) << "\tmethod = " << channel.GetMethod();
LOG(DEBUG) << "\taddress = " << channel.GetAddress();
LOG(DEBUG) << "\tsndBufSize = " << channel.GetSndBufSize();
LOG(DEBUG) << "\trcvBufSize = " << channel.GetRcvBufSize();
LOG(DEBUG) << "\trateLogging = " << channel.GetRateLogging();
LOG(DEBUG) << "\ttype = " << channel.GetType();
LOG(DEBUG) << "\tmethod = " << channel.GetMethod();
LOG(DEBUG) << "\taddress = " << channel.GetAddress();
LOG(DEBUG) << "\tsndBufSize = " << channel.GetSndBufSize();
LOG(DEBUG) << "\trcvBufSize = " << channel.GetRcvBufSize();
LOG(DEBUG) << "\tsndKernelSize = " << channel.GetSndKernelSize();
LOG(DEBUG) << "\trcvKernelSize = " << channel.GetRcvKernelSize();
LOG(DEBUG) << "\trateLogging = " << channel.GetRateLogging();
channelList.push_back(channel);
}

View File

@ -235,6 +235,8 @@ void FairMQProgOptions::UpdateMQValues()
string addressKey = p.first + "." + to_string(index) + ".address";
string sndBufSizeKey = p.first + "." + to_string(index) + ".sndBufSize";
string rcvBufSizeKey = p.first + "." + to_string(index) + ".rcvBufSize";
string sndKernelSizeKey = p.first + "." + to_string(index) + ".sndKernelSize";
string rcvKernelSizeKey = p.first + "." + to_string(index) + ".rcvKernelSize";
string rateLoggingKey = p.first + "." + to_string(index) + ".rateLogging";
fMQKeyMap[typeKey] = make_tuple(p.first, index, "type");
@ -242,6 +244,8 @@ void FairMQProgOptions::UpdateMQValues()
fMQKeyMap[addressKey] = make_tuple(p.first, index, "address");
fMQKeyMap[sndBufSizeKey] = make_tuple(p.first, index, "sndBufSize");
fMQKeyMap[rcvBufSizeKey] = make_tuple(p.first, index, "rcvBufSize");
fMQKeyMap[sndKernelSizeKey] = make_tuple(p.first, index, "sndKernelSize");
fMQKeyMap[rcvKernelSizeKey] = make_tuple(p.first, index, "rcvkernelSize");
fMQKeyMap[rateLoggingKey] = make_tuple(p.first, index, "rateLogging");
UpdateVarMap<string>(typeKey, channel.GetType());
@ -255,6 +259,12 @@ void FairMQProgOptions::UpdateMQValues()
//UpdateVarMap<string>(rcvBufSizeKey, to_string(channel.GetRcvBufSize()));// string API
UpdateVarMap<int>(rcvBufSizeKey, channel.GetRcvBufSize());
//UpdateVarMap<string>(sndKernelSizeKey, to_string(channel.GetSndKernelSize()));// string API
UpdateVarMap<int>(sndKernelSizeKey, channel.GetSndKernelSize());
//UpdateVarMap<string>(rcvKernelSizeKey, to_string(channel.GetRcvKernelSize()));// string API
UpdateVarMap<int>(rcvKernelSizeKey, channel.GetRcvKernelSize());
//UpdateVarMap<string>(rateLoggingKey,to_string(channel.GetRateLogging()));// string API
UpdateVarMap<int>(rateLoggingKey, channel.GetRateLogging());
@ -265,6 +275,8 @@ void FairMQProgOptions::UpdateMQValues()
LOG(DEBUG) << "key = " << addressKey <<"\t value = " << GetValue<string>(addressKey);
LOG(DEBUG) << "key = " << sndBufSizeKey << "\t value = " << GetValue<int>(sndBufSizeKey);
LOG(DEBUG) << "key = " << rcvBufSizeKey <<"\t value = " << GetValue<int>(rcvBufSizeKey);
LOG(DEBUG) << "key = " << sndKernelSizeKey << "\t value = " << GetValue<int>(sndKernelSizeKey);
LOG(DEBUG) << "key = " << rcvKernelSizeKey <<"\t value = " << GetValue<int>(rcvKernelSizeKey);
LOG(DEBUG) << "key = " << rateLoggingKey <<"\t value = " << GetValue<int>(rateLoggingKey);
*/
index++;
@ -376,7 +388,7 @@ int FairMQProgOptions::UpdateChannelMap(const string& channelName, int index, co
{
//if we get there it means something is wrong
LOG(ERROR) << "update of FairMQChannel map failed for the following key: "
<< channelName<<"."<<index<<"."<<member;
<< channelName << "." << index << "." << member;
return 1;
}
}
@ -443,7 +455,7 @@ int FairMQProgOptions::UpdateChannelMap(const string& channelName, int index, co
{
//if we get there it means something is wrong
LOG(ERROR) << "update of FairMQChannel map failed for the following key: "
<< channelName<<"."<<index<<"."<<member;
<< channelName << "." << index << "." << member;
return 1;
}
}

View File

@ -15,8 +15,9 @@ void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("out-channel", bpo::value<std::string>()->default_value("data"), "Name of the output channel")
("same-msg", bpo::value<bool>()->default_value(true), "Re-send the same message (default), or recreate for each iteration")
("msg-size", bpo::value<int>()->default_value(1000), "Message size in bytes")
("num-msgs", bpo::value<int>()->default_value(0), "Number of messages to send")
("num-msgs", bpo::value<uint64_t>()->default_value(0), "Number of messages to send")
("msg-rate", bpo::value<int>()->default_value(0), "Msg rate limit in maximum number of messages per second");
}

View File

@ -26,7 +26,8 @@ int main(int argc, char* argv[])
CIntercomService service;
CCustomCmd ddsCustomCmd(service);
service.subscribeOnError([](const EErrorCode errorCode, const string& errorMsg) {
service.subscribeOnError([](const EErrorCode errorCode, const string& errorMsg)
{
cout << "DDS error received: error code: " << errorCode << ", error message: " << errorMsg << endl;
});
@ -46,10 +47,14 @@ int main(int argc, char* argv[])
t.c_lflag &= ~ICANON; // disable canonical input
tcsetattr(STDIN_FILENO, TCSANOW, &t); // apply the new settings
if ( argc != 2 )
if (argc != 2)
{
PrintControlsHelp();
}
else
{
cin.putback(argv[1][0]);
}
while (cin >> c)
{

View File

@ -2,17 +2,28 @@
numMsgs="0"
msgSize="1000000"
transport="zeromq"
sameMsg="true"
if [[ $1 =~ ^[0-9]+$ ]]; then
msgSize=$1
fi
echo "Starting benchmark with message size of $msgSize bytes."
if [[ $2 =~ ^[0-9]+$ ]]; then
numMsgs=$2
fi
if [[ $3 =~ ^[a-z]+$ ]]; then
transport=$3
fi
if [[ $4 =~ ^[a-z]+$ ]]; then
sameMsg=$4
fi
echo "Starting benchmark with message size of $msgSize bytes ($numMsgs messages) and $transport transport."
echo "Using $transport transport."
if [ $numMsgs = 0 ]; then
echo "Unlimited number of messages."
else
@ -20,14 +31,15 @@ else
fi
echo ""
echo "Usage: startBenchmark [message size=1000000] [number of messages=0]"
echo "Usage: startBenchmark [message size=1000000] [number of messages=0] [transport=zeromq/nanomsg/shmem] [resend same message=true]"
SAMPLER="bsampler"
SAMPLER+=" --id bsampler1"
#SAMPLER+=" --io-threads 2"
#SAMPLER+=" --control static"
#SAMPLER+=" --transport nanomsg"
SAMPLER+=" --transport $transport"
SAMPLER+=" --msg-size $msgSize"
SAMPLER+=" --same-msg $sameMsg"
# SAMPLER+=" --msg-rate 1000"
SAMPLER+=" --num-msgs $numMsgs"
SAMPLER+=" --mq-config @CMAKE_BINARY_DIR@/bin/config/benchmark.json"
@ -37,7 +49,7 @@ SINK="sink"
SINK+=" --id sink1"
#SINK+=" --io-threads 2"
#SINK+=" --control static"
#SINK+=" --transport nanomsg"
SINK+=" --transport $transport"
SINK+=" --num-msgs $numMsgs"
SINK+=" --mq-config @CMAKE_BINARY_DIR@/bin/config/benchmark.json"
xterm -geometry 80x23+500+0 -hold -e @CMAKE_BINARY_DIR@/bin/$SINK &

View File

@ -0,0 +1,81 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <zmq.h>
#include <boost/interprocess/managed_shared_memory.hpp>
#include "FairMQLogger.h"
#include "FairMQContextSHM.h"
#include "FairMQShmManager.h"
using namespace FairMQ::shmem;
FairMQContextSHM::FairMQContextSHM(int numIoThreads)
: fContext()
{
fContext = zmq_ctx_new();
if (fContext == NULL)
{
LOG(ERROR) << "failed creating context, reason: " << zmq_strerror(errno);
exit(EXIT_FAILURE);
}
if (zmq_ctx_set(fContext, ZMQ_IO_THREADS, numIoThreads) != 0)
{
LOG(ERROR) << "failed configuring context, reason: " << zmq_strerror(errno);
}
// Set the maximum number of allowed sockets on the context.
if (zmq_ctx_set(fContext, ZMQ_MAX_SOCKETS, 10000) != 0)
{
LOG(ERROR) << "failed configuring context, reason: " << zmq_strerror(errno);
}
Manager::Instance().InitializeSegment("open_or_create", "FairMQSharedMemory", 2000000000);
LOG(INFO) << "Created/Opened shared memory segment of 2,000,000,000 bytes. Available are " << Manager::Instance().Segment()->get_free_memory() << " bytes.";
}
FairMQContextSHM::~FairMQContextSHM()
{
Close();
if (boost::interprocess::shared_memory_object::remove("FairMQSharedMemory"))
{
LOG(INFO) << "Successfully removed shared memory after the device has stopped.";
}
else
{
LOG(INFO) << "Did not remove shared memory after the device stopped. Still in use?";
}
}
void* FairMQContextSHM::GetContext()
{
return fContext;
}
void FairMQContextSHM::Close()
{
if (fContext == NULL)
{
return;
}
if (zmq_ctx_destroy(fContext) != 0)
{
if (errno == EINTR)
{
LOG(ERROR) << " failed closing context, reason: " << zmq_strerror(errno);
}
else
{
fContext = NULL;
return;
}
}
}

View File

@ -0,0 +1,27 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIRMQCONTEXTSHM_H_
#define FAIRMQCONTEXTSHM_H_
class FairMQContextSHM
{
public:
/// Constructor
FairMQContextSHM(int numIoThreads);
FairMQContextSHM(const FairMQContextSHM&) = delete;
FairMQContextSHM operator=(const FairMQContextSHM&) = delete;
virtual ~FairMQContextSHM();
void* GetContext();
void Close();
private:
void* fContext;
};
#endif /* FAIRMQCONTEXTSHM_H_ */

View File

@ -0,0 +1,292 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <string>
#include <cstdlib>
#include "FairMQMessageSHM.h"
#include "FairMQLogger.h"
using namespace std;
using namespace FairMQ::shmem;
uint64_t FairMQMessageSHM::fMessageID = 0;
string FairMQMessageSHM::fDeviceID = string();
atomic<bool> FairMQMessageSHM::fInterrupted(false);
FairMQMessageSHM::FairMQMessageSHM()
: fMessage()
, fOwner(nullptr)
, fReceiving(false)
, fQueued(false)
{
if (zmq_msg_init(&fMessage) != 0)
{
LOG(ERROR) << "failed initializing message, reason: " << zmq_strerror(errno);
}
}
void FairMQMessageSHM::StringDeleter(void* /*data*/, void* str)
{
delete static_cast<string*>(str);
}
FairMQMessageSHM::FairMQMessageSHM(const size_t size)
: fMessage()
, fOwner(nullptr)
, fReceiving(false)
, fQueued(false)
{
InitializeChunk(size);
}
FairMQMessageSHM::FairMQMessageSHM(void* data, const size_t size, fairmq_free_fn* ffn, void* hint)
: fMessage()
, fOwner(nullptr)
, fReceiving(false)
, fQueued(false)
{
InitializeChunk(size);
memcpy(fOwner->fPtr->GetData(), data, size);
if (ffn)
{
ffn(data, hint);
}
else
{
free(data);
}
}
void FairMQMessageSHM::InitializeChunk(const size_t size)
{
string chunkID = fDeviceID + "c" + to_string(fMessageID);
string* ownerID = new string(fDeviceID + "o" + to_string(fMessageID));
bool success = false;
while (!success)
{
try
{
fOwner = Manager::Instance().Segment()->construct<ShPtrOwner>(ownerID->c_str())(
make_managed_shared_ptr(Manager::Instance().Segment()->construct<Chunk>(chunkID.c_str())(size),
*(Manager::Instance().Segment())));
success = true;
}
catch (bipc::bad_alloc& ba)
{
LOG(WARN) << "Shared memory full...";
this_thread::sleep_for(chrono::milliseconds(50));
if (fInterrupted)
{
break;
}
else
{
continue;
}
}
}
if (zmq_msg_init_data(&fMessage, const_cast<char*>(ownerID->c_str()), ownerID->length(), StringDeleter, ownerID) != 0)
{
LOG(ERROR) << "failed initializing meta message, reason: " << zmq_strerror(errno);
}
++fMessageID;
}
void FairMQMessageSHM::Rebuild()
{
CloseMessage();
fReceiving = false;
fQueued = false;
if (zmq_msg_init(&fMessage) != 0)
{
LOG(ERROR) << "failed initializing message, reason: " << zmq_strerror(errno);
}
}
void FairMQMessageSHM::Rebuild(const size_t size)
{
CloseMessage();
fReceiving = false;
fQueued = false;
InitializeChunk(size);
}
void FairMQMessageSHM::Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint)
{
CloseMessage();
fReceiving = false;
fQueued = false;
InitializeChunk(size);
memcpy(fOwner->fPtr->GetData(), data, size);
if (ffn)
{
ffn(data, hint);
}
else
{
free(data);
}
}
void* FairMQMessageSHM::GetMessage()
{
return &fMessage;
}
void* FairMQMessageSHM::GetData()
{
if (fOwner)
{
return fOwner->fPtr->GetData();
}
else
{
LOG(ERROR) << "Trying to get data of an empty shared memory message";
exit(EXIT_FAILURE);
}
}
size_t FairMQMessageSHM::GetSize()
{
if (fOwner)
{
return fOwner->fPtr->GetSize();
}
else
{
return 0;
}
}
void FairMQMessageSHM::SetMessage(void*, const size_t)
{
// dummy method to comply with the interface. functionality not allowed in zeromq.
}
void FairMQMessageSHM::SetDeviceId(const string& deviceId)
{
fDeviceID = deviceId;
}
void FairMQMessageSHM::Copy(const unique_ptr<FairMQMessage>& msg)
{
if (!fOwner)
{
FairMQ::shmem::ShPtrOwner* otherOwner = static_cast<FairMQMessageSHM*>(msg.get())->fOwner;
if (otherOwner)
{
InitializeChunk(otherOwner->fPtr->GetSize());
memcpy(fOwner->fPtr->GetData(), otherOwner->fPtr->GetData(), otherOwner->fPtr->GetSize());
}
else
{
LOG(ERROR) << "FairMQMessageSHM::Copy() fail: source message not initialized!";
}
}
else
{
LOG(ERROR) << "FairMQMessageSHM::Copy() fail: target message already initialized!";
}
// version with sharing the sent data
// if (!fOwner)
// {
// if (static_cast<FairMQMessageSHM*>(msg.get())->fOwner)
// {
// string* ownerID = new string(fDeviceID + "o" + to_string(fMessageID));
// bool success = false;
// do
// {
// try
// {
// fOwner = Manager::Instance().Segment()->construct<ShPtrOwner>(ownerID->c_str())(*(static_cast<FairMQMessageSHM*>(msg.get())->fOwner));
// success = true;
// }
// catch (bipc::bad_alloc& ba)
// {
// LOG(WARN) << "Shared memory full...";
// this_thread::sleep_for(chrono::milliseconds(10));
// if (fInterrupted)
// {
// break;
// }
// else
// {
// continue;
// }
// }
// }
// while (!success);
// if (zmq_msg_init_data(&fMessage, const_cast<char*>(ownerID->c_str()), ownerID->length(), StringDeleter, ownerID) != 0)
// {
// LOG(ERROR) << "failed initializing meta message, reason: " << zmq_strerror(errno);
// }
// ++fMessageID;
// }
// else
// {
// LOG(ERROR) << "FairMQMessageSHM::Copy() fail: source message not initialized!";
// }
// }
// else
// {
// LOG(ERROR) << "FairMQMessageSHM::Copy() fail: target message already initialized!";
// }
}
void FairMQMessageSHM::CloseMessage()
{
if (fReceiving)
{
if (fOwner)
{
Manager::Instance().Segment()->destroy_ptr(fOwner);
fOwner = nullptr;
}
else
{
LOG(ERROR) << "No shared pointer owner when closing a received message";
}
}
else
{
if (fOwner && !fQueued)
{
LOG(WARN) << "Destroying unsent message";
Manager::Instance().Segment()->destroy_ptr(fOwner);
fOwner = nullptr;
}
}
if (zmq_msg_close(&fMessage) != 0)
{
LOG(ERROR) << "failed closing message, reason: " << zmq_strerror(errno);
}
}
FairMQMessageSHM::~FairMQMessageSHM()
{
CloseMessage();
}

View File

@ -0,0 +1,63 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIRMQMESSAGESHM_H_
#define FAIRMQMESSAGESHM_H_
#include <cstddef>
#include <string>
#include <atomic>
#include <zmq.h>
#include "FairMQMessage.h"
#include "FairMQShmManager.h"
class FairMQMessageSHM : public FairMQMessage
{
friend class FairMQSocketSHM;
public:
FairMQMessageSHM();
FairMQMessageSHM(const size_t size);
FairMQMessageSHM(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr);
FairMQMessageSHM(const FairMQMessageSHM&) = delete;
FairMQMessageSHM operator=(const FairMQMessageSHM&) = delete;
void InitializeChunk(const size_t size);
virtual void Rebuild();
virtual void Rebuild(const size_t size);
virtual void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr);
virtual void* GetMessage();
virtual void* GetData();
virtual size_t GetSize();
virtual void SetMessage(void* data, const size_t size);
virtual void SetDeviceId(const std::string& deviceId);
virtual void Copy(const std::unique_ptr<FairMQMessage>& msg);
void CloseMessage();
virtual ~FairMQMessageSHM();
static void StringDeleter(void* data, void* str);
private:
zmq_msg_t fMessage;
FairMQ::shmem::ShPtrOwner* fOwner;
static uint64_t fMessageID;
static std::string fDeviceID;
bool fReceiving;
bool fQueued;
static std::atomic<bool> fInterrupted;
};
#endif /* FAIRMQMESSAGESHM_H_ */

View File

@ -0,0 +1,240 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQPollerSHM.cxx
*
* @since 2014-01-23
* @author A. Rybalchenko
*/
#include <zmq.h>
#include "FairMQPollerSHM.h"
#include "FairMQLogger.h"
using namespace std;
FairMQPollerSHM::FairMQPollerSHM(const vector<FairMQChannel>& channels)
: items()
, fNumItems(0)
, fOffsetMap()
{
fNumItems = channels.size();
items = new zmq_pollitem_t[fNumItems];
for (int i = 0; i < fNumItems; ++i)
{
items[i].socket = channels.at(i).fSocket->GetSocket();
items[i].fd = 0;
items[i].revents = 0;
int type = 0;
size_t size = sizeof(type);
zmq_getsockopt (channels.at(i).fSocket->GetSocket(), ZMQ_TYPE, &type, &size);
if (type == ZMQ_REQ || type == ZMQ_REP || type == ZMQ_PAIR || type == ZMQ_DEALER || type == ZMQ_ROUTER)
{
items[i].events = ZMQ_POLLIN|ZMQ_POLLOUT;
}
else if (type == ZMQ_PUSH || type == ZMQ_PUB || type == ZMQ_XPUB)
{
items[i].events = ZMQ_POLLOUT;
}
else if (type == ZMQ_PULL || type == ZMQ_SUB || type == ZMQ_XSUB)
{
items[i].events = ZMQ_POLLIN;
}
else
{
LOG(ERROR) << "invalid poller configuration, exiting.";
exit(EXIT_FAILURE);
}
}
}
FairMQPollerSHM::FairMQPollerSHM(const unordered_map<string, vector<FairMQChannel>>& channelsMap, const vector<string>& channelList)
: items()
, fNumItems(0)
, fOffsetMap()
{
int offset = 0;
try
{
// calculate offsets and the total size of the poll item set
for (string channel : channelList)
{
fOffsetMap[channel] = offset;
offset += channelsMap.at(channel).size();
fNumItems += channelsMap.at(channel).size();
}
items = new zmq_pollitem_t[fNumItems];
int index = 0;
for (string channel : channelList)
{
for (unsigned int i = 0; i < channelsMap.at(channel).size(); ++i)
{
index = fOffsetMap[channel] + i;
items[index].socket = channelsMap.at(channel).at(i).fSocket->GetSocket();
items[index].fd = 0;
items[index].revents = 0;
int type = 0;
size_t size = sizeof(type);
zmq_getsockopt (channelsMap.at(channel).at(i).fSocket->GetSocket(), ZMQ_TYPE, &type, &size);
if (type == ZMQ_REQ || type == ZMQ_REP || type == ZMQ_PAIR || type == ZMQ_DEALER || type == ZMQ_ROUTER)
{
items[index].events = ZMQ_POLLIN|ZMQ_POLLOUT;
}
else if (type == ZMQ_PUSH || type == ZMQ_PUB || type == ZMQ_XPUB)
{
items[index].events = ZMQ_POLLOUT;
}
else if (type == ZMQ_PULL || type == ZMQ_SUB || type == ZMQ_XSUB)
{
items[index].events = ZMQ_POLLIN;
}
else
{
LOG(ERROR) << "invalid poller configuration, exiting.";
exit(EXIT_FAILURE);
}
}
}
}
catch (const std::out_of_range& oor)
{
LOG(ERROR) << "At least one of the provided channel keys for poller initialization is invalid";
LOG(ERROR) << "Out of Range error: " << oor.what() << '\n';
exit(EXIT_FAILURE);
}
}
FairMQPollerSHM::FairMQPollerSHM(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket)
: items()
, fNumItems(2)
, fOffsetMap()
{
items = new zmq_pollitem_t[fNumItems];
items[0].socket = cmdSocket.GetSocket();
items[0].fd = 0;
items[0].events = ZMQ_POLLIN;
items[0].revents = 0;
items[1].socket = dataSocket.GetSocket();
items[1].fd = 0;
items[1].revents = 0;
int type = 0;
size_t size = sizeof(type);
zmq_getsockopt(dataSocket.GetSocket(), ZMQ_TYPE, &type, &size);
if (type == ZMQ_REQ || type == ZMQ_REP || type == ZMQ_PAIR || type == ZMQ_DEALER || type == ZMQ_ROUTER)
{
items[1].events = ZMQ_POLLIN|ZMQ_POLLOUT;
}
else if (type == ZMQ_PUSH || type == ZMQ_PUB || type == ZMQ_XPUB)
{
items[1].events = ZMQ_POLLOUT;
}
else if (type == ZMQ_PULL || type == ZMQ_SUB || type == ZMQ_XSUB)
{
items[1].events = ZMQ_POLLIN;
}
else
{
LOG(ERROR) << "invalid poller configuration, exiting.";
exit(EXIT_FAILURE);
}
}
void FairMQPollerSHM::Poll(const int timeout)
{
if (zmq_poll(items, fNumItems, timeout) < 0)
{
if (errno == ETERM)
{
LOG(DEBUG) << "polling exited, reason: " << zmq_strerror(errno);
}
else
{
LOG(ERROR) << "polling failed, reason: " << zmq_strerror(errno);
}
}
}
bool FairMQPollerSHM::CheckInput(const int index)
{
if (items[index].revents & ZMQ_POLLIN)
{
return true;
}
return false;
}
bool FairMQPollerSHM::CheckOutput(const int index)
{
if (items[index].revents & ZMQ_POLLOUT)
{
return true;
}
return false;
}
bool FairMQPollerSHM::CheckInput(const string channelKey, const int index)
{
try
{
if (items[fOffsetMap.at(channelKey) + index].revents & ZMQ_POLLIN)
{
return true;
}
return false;
}
catch (const std::out_of_range& oor)
{
LOG(ERROR) << "Invalid channel key: \"" << channelKey << "\"";
LOG(ERROR) << "Out of Range error: " << oor.what() << '\n';
exit(EXIT_FAILURE);
}
}
bool FairMQPollerSHM::CheckOutput(const string channelKey, const int index)
{
try
{
if (items[fOffsetMap.at(channelKey) + index].revents & ZMQ_POLLOUT)
{
return true;
}
return false;
}
catch (const std::out_of_range& oor)
{
LOG(ERROR) << "Invalid channel key: \"" << channelKey << "\"";
LOG(ERROR) << "Out of Range error: " << oor.what() << '\n';
exit(EXIT_FAILURE);
}
}
FairMQPollerSHM::~FairMQPollerSHM()
{
if (items != NULL)
{
delete[] items;
}
}

View File

@ -0,0 +1,51 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIRMQPOLLERSHM_H_
#define FAIRMQPOLLERSHM_H_
#include <vector>
#include <unordered_map>
#include <initializer_list>
#include <zmq.h>
#include "FairMQPoller.h"
#include "FairMQChannel.h"
#include "FairMQTransportFactorySHM.h"
class FairMQChannel;
class FairMQPollerSHM : public FairMQPoller
{
friend class FairMQChannel;
friend class FairMQTransportFactorySHM;
public:
FairMQPollerSHM(const std::vector<FairMQChannel>& channels);
FairMQPollerSHM(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList);
FairMQPollerSHM(const FairMQPollerSHM&) = delete;
FairMQPollerSHM operator=(const FairMQPollerSHM&) = delete;
virtual void Poll(const int timeout);
virtual bool CheckInput(const int index);
virtual bool CheckOutput(const int index);
virtual bool CheckInput(const std::string channelKey, const int index);
virtual bool CheckOutput(const std::string channelKey, const int index);
virtual ~FairMQPollerSHM();
private:
FairMQPollerSHM(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket);
zmq_pollitem_t* items;
int fNumItems;
std::unordered_map<std::string, int> fOffsetMap;
};
#endif /* FAIRMQPOLLERSHM_H_ */

View File

@ -0,0 +1,185 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQShmManager.h
*
* @since 2016-04-08
* @author A. Rybalchenko
*/
#ifndef FAIRMQSHMMANAGER_H_
#define FAIRMQSHMMANAGER_H_
#include <thread>
#include <chrono>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/smart_ptr/shared_ptr.hpp>
#include "FairMQLogger.h"
namespace bipc = boost::interprocess;
namespace FairMQ
{
namespace shmem
{
class Manager
{
public:
static Manager& Instance()
{
static Manager man;
return man;
}
void InitializeSegment(const std::string& op, const std::string& name, const size_t size = 0)
{
if (!fSegment)
{
try
{
if (op == "open_or_create")
{
fSegment = new bipc::managed_shared_memory(bipc::open_or_create, name.c_str(), size);
}
else if (op == "create_only")
{
fSegment = new bipc::managed_shared_memory(bipc::create_only, name.c_str(), size);
}
else if (op == "open_only")
{
int numTries = 0;
bool success = false;
do
{
try
{
fSegment = new bipc::managed_shared_memory(bipc::open_only, name.c_str());
success = true;
}
catch (bipc::interprocess_exception& ie)
{
if (++numTries == 5)
{
LOG(ERROR) << "Could not open shared memory after " << numTries << " attempts, exiting!";
exit(EXIT_FAILURE);
}
else
{
LOG(DEBUG) << "Could not open shared memory segment on try " << numTries << ". Retrying in 1 second...";
LOG(DEBUG) << ie.what();
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
}
}
while (!success);
}
else
{
LOG(ERROR) << "Unknown operation when initializing shared memory segment: " << op;
}
}
catch (std::exception& e)
{
LOG(ERROR) << "Exception during shared memory segment initialization: " << e.what() << ", application will now exit";
exit(EXIT_FAILURE);
}
}
else
{
LOG(INFO) << "Segment already initialized";
}
}
bipc::managed_shared_memory* Segment() const
{
if (fSegment)
{
return fSegment;
}
else
{
LOG(ERROR) << "Segment not initialized";
exit(EXIT_FAILURE);
}
}
private:
Manager()
: fSegment(nullptr)
{}
bipc::managed_shared_memory* fSegment;
};
class Chunk
{
public:
Chunk()
: fHandle()
, fSize(0)
{
}
Chunk(const size_t size)
: fHandle()
, fSize(size)
{
void* ptr = Manager::Instance().Segment()->allocate(size);
fHandle = Manager::Instance().Segment()->get_handle_from_address(ptr);
}
~Chunk()
{
Manager::Instance().Segment()->deallocate(Manager::Instance().Segment()->get_address_from_handle(fHandle));
}
// bipc::managed_shared_memory::handle_t GetHandle() const
// {
// return fHandle;
// }
void* GetData() const
{
return Manager::Instance().Segment()->get_address_from_handle(fHandle);
}
size_t GetSize() const
{
return fSize;
}
private:
bipc::managed_shared_memory::handle_t fHandle;
size_t fSize;
};
typedef bipc::managed_shared_ptr<Chunk, bipc::managed_shared_memory>::type ShPtrType;
struct ShPtrOwner
{
ShPtrOwner(const ShPtrType& other)
: fPtr(other)
{}
ShPtrOwner(const ShPtrOwner& other)
: fPtr(other.fPtr)
{}
ShPtrType fPtr;
};
} // namespace shmem
} // namespace FairMQ
#endif /* FAIRMQSHMMANAGER_H_ */

View File

@ -0,0 +1,591 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <sstream>
#include <zmq.h>
#include "FairMQSocketSHM.h"
#include "FairMQMessageSHM.h"
#include "FairMQLogger.h"
using namespace std;
using namespace FairMQ::shmem;
// Context to hold the ZeroMQ sockets
unique_ptr<FairMQContextSHM> FairMQSocketSHM::fContext = unique_ptr<FairMQContextSHM>(new FairMQContextSHM(1));
// bool FairMQSocketSHM::fContextInitialized = false;
FairMQSocketSHM::FairMQSocketSHM(const string& type, const string& name, const int numIoThreads, const string& id /*= ""*/)
: FairMQSocket(ZMQ_SNDMORE, ZMQ_RCVMORE, ZMQ_DONTWAIT)
, fSocket(NULL)
, fId()
, fBytesTx(0)
, fBytesRx(0)
, fMessagesTx(0)
, fMessagesRx(0)
{
fId = id + "." + name + "." + type;
// if (!fContextInitialized)
// {
// fContext = unique_ptr<FairMQContextSHM>(new FairMQContextSHM(1));
// fContextInitialized = true;
// }
if (zmq_ctx_set(fContext->GetContext(), ZMQ_IO_THREADS, numIoThreads) != 0)
{
LOG(ERROR) << "Failed configuring context, reason: " << zmq_strerror(errno);
}
fSocket = zmq_socket(fContext->GetContext(), GetConstant(type));
if (fSocket == NULL)
{
LOG(ERROR) << "Failed creating socket " << fId << ", reason: " << zmq_strerror(errno);
exit(EXIT_FAILURE);
}
if (zmq_setsockopt(fSocket, ZMQ_IDENTITY, fId.c_str(), fId.length()) != 0)
{
LOG(ERROR) << "Failed setting ZMQ_IDENTITY socket option, reason: " << zmq_strerror(errno);
}
// Tell socket to try and send/receive outstanding messages for <linger> milliseconds before terminating.
// Default value for ZeroMQ is -1, which is to wait forever.
int linger = 500;
if (zmq_setsockopt(fSocket, ZMQ_LINGER, &linger, sizeof(linger)) != 0)
{
LOG(ERROR) << "Failed setting ZMQ_LINGER socket option, reason: " << zmq_strerror(errno);
}
int kernelSndSize = 10000;
if (zmq_setsockopt(fSocket, ZMQ_SNDBUF, &kernelSndSize, sizeof(kernelSndSize)) != 0)
{
LOG(ERROR) << "Failed setting ZMQ_SNDBUF socket option, reason: " << zmq_strerror(errno);
}
int kernelRcvSize = 10000;
if (zmq_setsockopt(fSocket, ZMQ_RCVBUF, &kernelRcvSize, sizeof(kernelRcvSize)) != 0)
{
LOG(ERROR) << "Failed setting ZMQ_RCVBUF socket option, reason: " << zmq_strerror(errno);
}
if (type == "sub")
{
if (zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, NULL, 0) != 0)
{
LOG(ERROR) << "Failed setting ZMQ_SUBSCRIBE socket option, reason: " << zmq_strerror(errno);
}
}
// LOG(INFO) << "created socket " << fId;
}
string FairMQSocketSHM::GetId()
{
return fId;
}
bool FairMQSocketSHM::Bind(const string& address)
{
// LOG(INFO) << "bind socket " << fId << " on " << address;
if (zmq_bind(fSocket, address.c_str()) != 0)
{
if (errno == EADDRINUSE) {
// do not print error in this case, this is handled by FairMQDevice in case no connection could be established after trying a number of random ports from a range.
return false;
}
LOG(ERROR) << "Failed binding socket " << fId << ", reason: " << zmq_strerror(errno);
return false;
}
return true;
}
void FairMQSocketSHM::Connect(const string& address)
{
// LOG(INFO) << "connect socket " << fId << " on " << address;
if (zmq_connect(fSocket, address.c_str()) != 0)
{
LOG(ERROR) << "Failed connecting socket " << fId << ", reason: " << zmq_strerror(errno);
// error here means incorrect configuration. exit if it happens.
exit(EXIT_FAILURE);
}
}
int FairMQSocketSHM::Send(FairMQMessage* msg, const string& flag)
{
return Send(msg, GetConstant(flag));
}
int FairMQSocketSHM::Send(FairMQMessage* msg, const int flags)
{
int nbytes = zmq_msg_send(static_cast<zmq_msg_t*>(msg->GetMessage()), fSocket, flags);
if (nbytes >= 0)
{
static_cast<FairMQMessageSHM*>(msg)->fReceiving = false;
static_cast<FairMQMessageSHM*>(msg)->fQueued = true;
size_t size = msg->GetSize();
fBytesTx += size;
++fMessagesTx;
return size;
}
if (zmq_errno() == EAGAIN)
{
return -2;
}
if (zmq_errno() == ETERM)
{
LOG(INFO) << "terminating socket " << fId;
return -1;
}
LOG(ERROR) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno);
return nbytes;
}
int64_t FairMQSocketSHM::Send(const vector<FairMQMessagePtr>& msgVec, const int flags)
{
// Sending vector typicaly handles more then one part
if (msgVec.size() > 1)
{
int64_t totalSize = 0;
for (unsigned int i = 0; i < msgVec.size() - 1; ++i)
{
int nbytes = zmq_msg_send(static_cast<zmq_msg_t*>(msgVec[i]->GetMessage()), fSocket, ZMQ_SNDMORE|flags);
if (nbytes >= 0)
{
static_cast<FairMQMessageSHM*>(msgVec[i].get())->fReceiving = false;
static_cast<FairMQMessageSHM*>(msgVec[i].get())->fQueued = true;
size_t size = msgVec[i]->GetSize();
totalSize += size;
fBytesTx += size;
}
else
{
if (zmq_errno() == EAGAIN)
{
return -2;
}
if (zmq_errno() == ETERM)
{
LOG(INFO) << "terminating socket " << fId;
return -1;
}
LOG(ERROR) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno);
return nbytes;
}
}
int nbytes = zmq_msg_send(static_cast<zmq_msg_t*>(msgVec.back()->GetMessage()), fSocket, flags);
if (nbytes >= 0)
{
static_cast<FairMQMessageSHM*>(msgVec.back().get())->fReceiving = false;
static_cast<FairMQMessageSHM*>(msgVec.back().get())->fQueued = true;
size_t size = msgVec.back()->GetSize();
totalSize += size;
fBytesTx += size;
}
else
{
if (zmq_errno() == EAGAIN)
{
return -2;
}
if (zmq_errno() == ETERM)
{
LOG(INFO) << "terminating socket " << fId;
return -1;
}
LOG(ERROR) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno);
return nbytes;
}
// store statistics on how many messages have been sent (handle all parts as a single message)
++fMessagesTx;
return totalSize;
} // If there's only one part, send it as a regular message
else if (msgVec.size() == 1)
{
return Send(msgVec.back().get(), flags);
}
else // if the vector is empty, something might be wrong
{
LOG(WARN) << "Will not send empty vector";
return -1;
}
}
int FairMQSocketSHM::Receive(FairMQMessage* msg, const string& flag)
{
return Receive(msg, GetConstant(flag));
}
int FairMQSocketSHM::Receive(FairMQMessage* msg, const int flags)
{
zmq_msg_t* msgPtr = static_cast<zmq_msg_t*>(msg->GetMessage());
int nbytes = zmq_msg_recv(msgPtr, fSocket, flags);
if (nbytes == 0)
{
++fMessagesRx;
return nbytes;
}
else if (nbytes > 0)
{
string ownerID(static_cast<char*>(zmq_msg_data(msgPtr)), zmq_msg_size(msgPtr));
ShPtrOwner* owner = Manager::Instance().Segment()->find<ShPtrOwner>(ownerID.c_str()).first;
size_t size = 0;
if (owner)
{
static_cast<FairMQMessageSHM*>(msg)->fOwner = owner;
static_cast<FairMQMessageSHM*>(msg)->fReceiving = true;
size = msg->GetSize();
fBytesRx += size;
++fMessagesRx;
return size;
}
else
{
LOG(ERROR) << "Received meta data, but could not find corresponding chunk";
return -1;
}
}
if (zmq_errno() == EAGAIN)
{
return -2;
}
if (zmq_errno() == ETERM)
{
LOG(INFO) << "terminating socket " << fId;
return -1;
}
LOG(ERROR) << "Failed receiving on socket " << fId << ", reason: " << zmq_strerror(errno);
return nbytes;
}
int64_t FairMQSocketSHM::Receive(vector<FairMQMessagePtr>& msgVec, const int flags)
{
// Warn if the vector is filled before Receive() and empty it.
if (msgVec.size() > 0)
{
LOG(WARN) << "Message vector contains elements before Receive(), they will be deleted!";
msgVec.clear();
}
int64_t totalSize = 0;
int64_t more = 0;
do
{
FairMQMessagePtr part(new FairMQMessageSHM());
zmq_msg_t* msgPtr = static_cast<zmq_msg_t*>(part->GetMessage());
int nbytes = zmq_msg_recv(msgPtr, fSocket, flags);
if (nbytes == 0)
{
msgVec.push_back(move(part));
}
else if (nbytes > 0)
{
string ownerID(static_cast<char*>(zmq_msg_data(msgPtr)), zmq_msg_size(msgPtr));
ShPtrOwner* owner = Manager::Instance().Segment()->find<ShPtrOwner>(ownerID.c_str()).first;
size_t size = 0;
if (owner)
{
static_cast<FairMQMessageSHM*>(part.get())->fOwner = owner;
static_cast<FairMQMessageSHM*>(part.get())->fReceiving = true;
size = part->GetSize();
msgVec.push_back(move(part));
fBytesRx += size;
totalSize += size;
}
else
{
LOG(ERROR) << "Received meta data, but could not find corresponding chunk";
return -1;
}
}
else
{
return nbytes;
}
size_t more_size = sizeof(more);
zmq_getsockopt(fSocket, ZMQ_RCVMORE, &more, &more_size);
}
while (more);
// store statistics on how many messages have been received (handle all parts as a single message)
++fMessagesRx;
return totalSize;
}
void FairMQSocketSHM::Close()
{
// LOG(DEBUG) << "Closing socket " << fId;
if (fSocket == NULL)
{
return;
}
if (zmq_close(fSocket) != 0)
{
LOG(ERROR) << "Failed closing socket " << fId << ", reason: " << zmq_strerror(errno);
}
fSocket = NULL;
}
void FairMQSocketSHM::Terminate()
{
if (zmq_ctx_destroy(fContext->GetContext()) != 0)
{
LOG(ERROR) << "Failed terminating context, reason: " << zmq_strerror(errno);
}
}
void FairMQSocketSHM::Interrupt()
{
FairMQMessageSHM::fInterrupted = true;
}
void FairMQSocketSHM::Resume()
{
FairMQMessageSHM::fInterrupted = false;
}
void* FairMQSocketSHM::GetSocket() const
{
return fSocket;
}
int FairMQSocketSHM::GetSocket(int) const
{
// dummy method to comply with the interface. functionality not possible in zeromq.
return -1;
}
void FairMQSocketSHM::SetOption(const string& option, const void* value, size_t valueSize)
{
if (zmq_setsockopt(fSocket, GetConstant(option), value, valueSize) < 0)
{
LOG(ERROR) << "Failed setting socket option, reason: " << zmq_strerror(errno);
}
}
void FairMQSocketSHM::GetOption(const string& option, void* value, size_t* valueSize)
{
if (zmq_getsockopt(fSocket, GetConstant(option), value, valueSize) < 0)
{
LOG(ERROR) << "Failed getting socket option, reason: " << zmq_strerror(errno);
}
}
unsigned long FairMQSocketSHM::GetBytesTx() const
{
return fBytesTx;
}
unsigned long FairMQSocketSHM::GetBytesRx() const
{
return fBytesRx;
}
unsigned long FairMQSocketSHM::GetMessagesTx() const
{
return fMessagesTx;
}
unsigned long FairMQSocketSHM::GetMessagesRx() const
{
return fMessagesRx;
}
bool FairMQSocketSHM::SetSendTimeout(const int timeout, const string& address, const string& method)
{
if (method == "bind")
{
if (zmq_unbind(fSocket, address.c_str()) != 0)
{
LOG(ERROR) << "Failed unbinding socket " << fId << ", reason: " << zmq_strerror(errno);
return false;
}
if (zmq_setsockopt(fSocket, ZMQ_SNDTIMEO, &timeout, sizeof(int)) != 0)
{
LOG(ERROR) << "Failed setting option on socket " << fId << ", reason: " << zmq_strerror(errno);
return false;
}
if (zmq_bind(fSocket, address.c_str()) != 0)
{
LOG(ERROR) << "Failed binding socket " << fId << ", reason: " << zmq_strerror(errno);
return false;
}
}
else if (method == "connect")
{
if (zmq_disconnect(fSocket, address.c_str()) != 0)
{
LOG(ERROR) << "Failed disconnecting socket " << fId << ", reason: " << zmq_strerror(errno);
return false;
}
if (zmq_setsockopt(fSocket, ZMQ_SNDTIMEO, &timeout, sizeof(int)) != 0)
{
LOG(ERROR) << "Failed setting option on socket " << fId << ", reason: " << zmq_strerror(errno);
return false;
}
if (zmq_connect(fSocket, address.c_str()) != 0)
{
LOG(ERROR) << "Failed connecting socket " << fId << ", reason: " << zmq_strerror(errno);
return false;
}
}
else
{
LOG(ERROR) << "SetSendTimeout() failed - unknown method provided!";
return false;
}
return true;
}
int FairMQSocketSHM::GetSendTimeout() const
{
int timeout = -1;
size_t size = sizeof(timeout);
if (zmq_getsockopt(fSocket, ZMQ_SNDTIMEO, &timeout, &size) != 0)
{
LOG(ERROR) << "Failed getting option 'receive timeout' on socket " << fId << ", reason: " << zmq_strerror(errno);
}
return timeout;
}
bool FairMQSocketSHM::SetReceiveTimeout(const int timeout, const string& address, const string& method)
{
if (method == "bind")
{
if (zmq_unbind(fSocket, address.c_str()) != 0)
{
LOG(ERROR) << "Failed unbinding socket " << fId << ", reason: " << zmq_strerror(errno);
return false;
}
if (zmq_setsockopt(fSocket, ZMQ_RCVTIMEO, &timeout, sizeof(int)) != 0)
{
LOG(ERROR) << "Failed setting option on socket " << fId << ", reason: " << zmq_strerror(errno);
return false;
}
if (zmq_bind(fSocket, address.c_str()) != 0)
{
LOG(ERROR) << "Failed binding socket " << fId << ", reason: " << zmq_strerror(errno);
return false;
}
}
else if (method == "connect")
{
if (zmq_disconnect(fSocket, address.c_str()) != 0)
{
LOG(ERROR) << "Failed disconnecting socket " << fId << ", reason: " << zmq_strerror(errno);
return false;
}
if (zmq_setsockopt(fSocket, ZMQ_RCVTIMEO, &timeout, sizeof(int)) != 0)
{
LOG(ERROR) << "Failed setting option on socket " << fId << ", reason: " << zmq_strerror(errno);
return false;
}
if (zmq_connect(fSocket, address.c_str()) != 0)
{
LOG(ERROR) << "Failed connecting socket " << fId << ", reason: " << zmq_strerror(errno);
return false;
}
}
else
{
LOG(ERROR) << "SetReceiveTimeout() failed - unknown method provided!";
return false;
}
return true;
}
int FairMQSocketSHM::GetReceiveTimeout() const
{
int timeout = -1;
size_t size = sizeof(timeout);
if (zmq_getsockopt(fSocket, ZMQ_RCVTIMEO, &timeout, &size) != 0)
{
LOG(ERROR) << "Failed getting option 'receive timeout' on socket " << fId << ", reason: " << zmq_strerror(errno);
}
return timeout;
}
int FairMQSocketSHM::GetConstant(const string& constant)
{
if (constant == "")
return 0;
if (constant == "sub")
return ZMQ_SUB;
if (constant == "pub")
return ZMQ_PUB;
if (constant == "xsub")
return ZMQ_XSUB;
if (constant == "xpub")
return ZMQ_XPUB;
if (constant == "push")
return ZMQ_PUSH;
if (constant == "pull")
return ZMQ_PULL;
if (constant == "req")
return ZMQ_REQ;
if (constant == "rep")
return ZMQ_REP;
if (constant == "dealer")
return ZMQ_DEALER;
if (constant == "router")
return ZMQ_ROUTER;
if (constant == "pair")
return ZMQ_PAIR;
if (constant == "snd-hwm")
return ZMQ_SNDHWM;
if (constant == "rcv-hwm")
return ZMQ_RCVHWM;
if (constant == "snd-size")
return ZMQ_SNDBUF;
if (constant == "rcv-size")
return ZMQ_RCVBUF;
if (constant == "snd-more")
return ZMQ_SNDMORE;
if (constant == "rcv-more")
return ZMQ_RCVMORE;
if (constant == "linger")
return ZMQ_LINGER;
if (constant == "no-block")
return ZMQ_DONTWAIT;
if (constant == "snd-more no-block")
return ZMQ_DONTWAIT|ZMQ_SNDMORE;
return -1;
}
FairMQSocketSHM::~FairMQSocketSHM()
{
}

View File

@ -0,0 +1,76 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIRMQSOCKETSHM_H_
#define FAIRMQSOCKETSHM_H_
#include <atomic>
#include <memory> // unique_ptr
#include "FairMQSocket.h"
#include "FairMQContextSHM.h"
#include "FairMQShmManager.h"
class FairMQSocketSHM : public FairMQSocket
{
public:
FairMQSocketSHM(const std::string& type, const std::string& name, const int numIoThreads, const std::string& id = "");
FairMQSocketSHM(const FairMQSocketSHM&) = delete;
FairMQSocketSHM operator=(const FairMQSocketSHM&) = delete;
virtual std::string GetId();
virtual bool Bind(const std::string& address);
virtual void Connect(const std::string& address);
virtual int Send(FairMQMessage* msg, const std::string& flag = "");
virtual int Send(FairMQMessage* msg, const int flags = 0);
virtual int64_t Send(const std::vector<std::unique_ptr<FairMQMessage>>& msgVec, const int flags = 0);
virtual int Receive(FairMQMessage* msg, const std::string& flag = "");
virtual int Receive(FairMQMessage* msg, const int flags = 0);
virtual int64_t Receive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec, const int flags = 0);
virtual void* GetSocket() const;
virtual int GetSocket(int nothing) const;
virtual void Close();
virtual void Terminate();
virtual void Interrupt();
virtual void Resume();
virtual void SetOption(const std::string& option, const void* value, size_t valueSize);
virtual void GetOption(const std::string& option, void* value, size_t* valueSize);
virtual unsigned long GetBytesTx() const;
virtual unsigned long GetBytesRx() const;
virtual unsigned long GetMessagesTx() const;
virtual unsigned long GetMessagesRx() const;
virtual bool SetSendTimeout(const int timeout, const std::string& address, const std::string& method);
virtual int GetSendTimeout() const;
virtual bool SetReceiveTimeout(const int timeout, const std::string& address, const std::string& method);
virtual int GetReceiveTimeout() const;
static int GetConstant(const std::string& constant);
virtual ~FairMQSocketSHM();
private:
void* fSocket;
std::string fId;
std::atomic<unsigned long> fBytesTx;
std::atomic<unsigned long> fBytesRx;
std::atomic<unsigned long> fMessagesTx;
std::atomic<unsigned long> fMessagesRx;
static std::unique_ptr<FairMQContextSHM> fContext;
// static bool fContextInitialized;
};
#endif /* FAIRMQSOCKETSHM_H_ */

View File

@ -0,0 +1,56 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "zmq.h"
#include <boost/version.hpp>
#include "FairMQTransportFactorySHM.h"
using namespace std;
FairMQTransportFactorySHM::FairMQTransportFactorySHM()
{
int major, minor, patch;
zmq_version(&major, &minor, &patch);
LOG(DEBUG) << "Using ZeroMQ (" << major << "." << minor << "." << patch << ") & "
<< "boost::interprocess (" << (BOOST_VERSION / 100000) << "." << (BOOST_VERSION / 100 % 1000) << "." << (BOOST_VERSION % 100) << ")";
}
FairMQMessagePtr FairMQTransportFactorySHM::CreateMessage() const
{
return unique_ptr<FairMQMessage>(new FairMQMessageSHM());
}
FairMQMessagePtr FairMQTransportFactorySHM::CreateMessage(const size_t size) const
{
return unique_ptr<FairMQMessage>(new FairMQMessageSHM(size));
}
FairMQMessagePtr FairMQTransportFactorySHM::CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint) const
{
return unique_ptr<FairMQMessage>(new FairMQMessageSHM(data, size, ffn, hint));
}
FairMQSocketPtr FairMQTransportFactorySHM::CreateSocket(const string& type, const string& name, const int numIoThreads, const string& id /*= ""*/) const
{
return unique_ptr<FairMQSocket>(new FairMQSocketSHM(type, name, numIoThreads, id));
}
FairMQPollerPtr FairMQTransportFactorySHM::CreatePoller(const vector<FairMQChannel>& channels) const
{
return unique_ptr<FairMQPoller>(new FairMQPollerSHM(channels));
}
FairMQPollerPtr FairMQTransportFactorySHM::CreatePoller(const unordered_map<string, vector<FairMQChannel>>& channelsMap, const vector<string>& channelList) const
{
return unique_ptr<FairMQPoller>(new FairMQPollerSHM(channelsMap, channelList));
}
FairMQPollerPtr FairMQTransportFactorySHM::CreatePoller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket) const
{
return unique_ptr<FairMQPoller>(new FairMQPollerSHM(cmdSocket, dataSocket));
}

View File

@ -0,0 +1,37 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIRMQTRANSPORTFACTORYSHM_H_
#define FAIRMQTRANSPORTFACTORYSHM_H_
#include <vector>
#include "FairMQTransportFactory.h"
#include "FairMQContextSHM.h"
#include "FairMQMessageSHM.h"
#include "FairMQSocketSHM.h"
#include "FairMQPollerSHM.h"
class FairMQTransportFactorySHM : public FairMQTransportFactory
{
public:
FairMQTransportFactorySHM();
virtual FairMQMessagePtr CreateMessage() const;
virtual FairMQMessagePtr CreateMessage(const size_t size) const;
virtual FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = NULL) const;
virtual FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name, const int numIoThreads, const std::string& id = "") const;
virtual FairMQPollerPtr CreatePoller(const std::vector<FairMQChannel>& channels) const;
virtual FairMQPollerPtr CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const;
virtual FairMQPollerPtr CreatePoller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket) const;
virtual ~FairMQTransportFactorySHM() {};
};
#endif /* FAIRMQTRANSPORTFACTORYSHM_H_ */

10
fairmq/shmem/README.md Normal file
View File

@ -0,0 +1,10 @@
# Shared Memory transport
First version of the shared memory transport. To try with existing devices, run the devices with `--transport shmem` option.
The transport manages shared memory via boost::interprocess library. The transfer of the meta data, required to locate the content in the share memory, is done via ZeroMQ. The transport supports all communication patterns where a single message is received by a single receiver. For multiple receivers for the same message, the message has to be copied.
Under development:
- Cleanup of the shared memory segment in case all devices crash. Currently at least one device has to stop properly for a cleanup.
- Implement more than one transport per device.
- Configuration of the shared memory size (currently hard-coded).

View File

@ -96,6 +96,11 @@ inline int runStateMachine(TMQDevice& device, FairMQProgOptions& cfg)
LOG(ERROR) << "Cannot load fairmqControlPlugin(): " << dlsymError;
fairmqControlPlugin = nullptr;
dlclose(ldControlHandle);
// also close the config plugin before quiting with error.
if (ldConfigHandle)
{
dlclose(ldConfigHandle);
}
return 1;
}

View File

@ -12,8 +12,6 @@
* @author D. Klein, A. Rybalchenko
*/
#include <sstream>
#include <zmq.h>
#include "FairMQLogger.h"
@ -60,9 +58,12 @@ void FairMQContextZMQ::Close()
if (zmq_ctx_destroy(fContext) != 0)
{
if (errno == EINTR) {
if (errno == EINTR)
{
LOG(ERROR) << " failed closing context, reason: " << zmq_strerror(errno);
} else {
}
else
{
fContext = NULL;
return;
}

View File

@ -20,6 +20,8 @@
using namespace std;
string FairMQMessageZMQ::fDeviceID = string();
FairMQMessageZMQ::FairMQMessageZMQ()
: fMessage()
{
@ -94,22 +96,9 @@ void FairMQMessageZMQ::SetMessage(void*, const size_t)
// dummy method to comply with the interface. functionality not allowed in zeromq.
}
void FairMQMessageZMQ::Copy(FairMQMessage* msg)
void FairMQMessageZMQ::SetDeviceId(const string& deviceId)
{
// DEPRECATED: Use Copy(const unique_ptr<FairMQMessage>&)
// Shares the message buffer between msg and this fMessage.
if (zmq_msg_copy(&fMessage, static_cast<zmq_msg_t*>(msg->GetMessage())) != 0)
{
LOG(ERROR) << "failed copying message, reason: " << zmq_strerror(errno);
}
// Alternatively, following code does a hard copy of the message, which allows to modify the original after making a copy, without affecting the new msg.
// CloseMessage();
// size_t size = msg->GetSize();
// zmq_msg_init_size(&fMessage, size);
// memcpy(zmq_msg_data(&fMessage), msg->GetData(), size);
fDeviceID = deviceId;
}
void FairMQMessageZMQ::Copy(const unique_ptr<FairMQMessage>& msg)
@ -128,7 +117,7 @@ void FairMQMessageZMQ::Copy(const unique_ptr<FairMQMessage>& msg)
// memcpy(zmq_msg_data(&fMessage), msg->GetData(), size);
}
inline void FairMQMessageZMQ::CloseMessage()
void FairMQMessageZMQ::CloseMessage()
{
if (zmq_msg_close(&fMessage) != 0)
{

View File

@ -16,6 +16,7 @@
#define FAIRMQMESSAGEZMQ_H_
#include <cstddef>
#include <string>
#include <zmq.h>
@ -26,11 +27,11 @@ class FairMQMessageZMQ : public FairMQMessage
public:
FairMQMessageZMQ();
FairMQMessageZMQ(const size_t size);
FairMQMessageZMQ(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = NULL);
FairMQMessageZMQ(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr);
virtual void Rebuild();
virtual void Rebuild(const size_t size);
virtual void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = NULL);
virtual void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr);
virtual void* GetMessage();
virtual void* GetData();
@ -38,14 +39,17 @@ class FairMQMessageZMQ : public FairMQMessage
virtual void SetMessage(void* data, const size_t size);
virtual void CloseMessage();
virtual void Copy(FairMQMessage* msg);
virtual void SetDeviceId(const std::string& deviceId);
virtual void Copy(const std::unique_ptr<FairMQMessage>& msg);
void CloseMessage();
virtual ~FairMQMessageZMQ();
private:
zmq_msg_t fMessage;
static std::string fDeviceID;
};
#endif /* FAIRMQMESSAGEZMQ_H_ */

View File

@ -108,24 +108,7 @@ void FairMQSocketZMQ::Connect(const string& address)
int FairMQSocketZMQ::Send(FairMQMessage* msg, const string& flag)
{
int nbytes = zmq_msg_send(static_cast<zmq_msg_t*>(msg->GetMessage()), fSocket, GetConstant(flag));
if (nbytes >= 0)
{
fBytesTx += nbytes;
++fMessagesTx;
return nbytes;
}
if (zmq_errno() == EAGAIN)
{
return -2;
}
if (zmq_errno() == ETERM)
{
LOG(INFO) << "terminating socket " << fId;
return -1;
}
LOG(ERROR) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno);
return nbytes;
return Send(msg, GetConstant(flag));
}
int FairMQSocketZMQ::Send(FairMQMessage* msg, const int flags)
@ -218,24 +201,7 @@ int64_t FairMQSocketZMQ::Send(const vector<unique_ptr<FairMQMessage>>& msgVec, c
int FairMQSocketZMQ::Receive(FairMQMessage* msg, const string& flag)
{
int nbytes = zmq_msg_recv(static_cast<zmq_msg_t*>(msg->GetMessage()), fSocket, GetConstant(flag));
if (nbytes >= 0)
{
fBytesRx += nbytes;
++fMessagesRx;
return nbytes;
}
if (zmq_errno() == EAGAIN)
{
return -2;
}
if (zmq_errno() == ETERM)
{
LOG(INFO) << "terminating socket " << fId;
return -1;
}
LOG(ERROR) << "Failed receiving on socket " << fId << ", reason: " << zmq_strerror(errno);
return nbytes;
return Receive(msg, GetConstant(flag));
}
int FairMQSocketZMQ::Receive(FairMQMessage* msg, const int flags)
@ -323,6 +289,14 @@ void FairMQSocketZMQ::Terminate()
}
}
void FairMQSocketZMQ::Interrupt()
{
}
void FairMQSocketZMQ::Resume()
{
}
void* FairMQSocketZMQ::GetSocket() const
{
return fSocket;

View File

@ -47,6 +47,9 @@ class FairMQSocketZMQ : public FairMQSocket
virtual void Close();
virtual void Terminate();
virtual void Interrupt();
virtual void Resume();
virtual void SetOption(const std::string& option, const void* value, size_t valueSize);
virtual void GetOption(const std::string& option, void* value, size_t* valueSize);