mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-16 01:51:45 +00:00
use clang-format for FairMQ
This commit is contained in:
@@ -12,125 +12,139 @@
|
||||
#include "FairMQMessageNN.h"
|
||||
#include "FairMQLogger.h"
|
||||
|
||||
FairMQMessageNN::FairMQMessageNN() :
|
||||
fSize(0),
|
||||
fMessage(NULL),
|
||||
fReceiving(false)
|
||||
FairMQMessageNN::FairMQMessageNN()
|
||||
: fSize(0)
|
||||
, fMessage(NULL)
|
||||
, fReceiving(false)
|
||||
{
|
||||
}
|
||||
|
||||
FairMQMessageNN::FairMQMessageNN(size_t size)
|
||||
{
|
||||
fMessage = nn_allocmsg(size, 0);
|
||||
if (!fMessage){
|
||||
LOG(ERROR) << "failed allocating message, reason: " << nn_strerror(errno);
|
||||
}
|
||||
fSize = size;
|
||||
fReceiving = false;
|
||||
fMessage = nn_allocmsg(size, 0);
|
||||
if (!fMessage)
|
||||
{
|
||||
LOG(ERROR) << "failed allocating message, reason: " << nn_strerror(errno);
|
||||
}
|
||||
fSize = size;
|
||||
fReceiving = false;
|
||||
}
|
||||
|
||||
FairMQMessageNN::FairMQMessageNN(void* data, size_t size)
|
||||
{
|
||||
fMessage = nn_allocmsg(size, 0);
|
||||
if (!fMessage){
|
||||
LOG(ERROR) << "failed allocating message, reason: " << nn_strerror(errno);
|
||||
}
|
||||
memcpy (fMessage, data, size);
|
||||
fSize = size;
|
||||
fReceiving = false;
|
||||
fMessage = nn_allocmsg(size, 0);
|
||||
if (!fMessage)
|
||||
{
|
||||
LOG(ERROR) << "failed allocating message, reason: " << nn_strerror(errno);
|
||||
}
|
||||
memcpy(fMessage, data, size);
|
||||
fSize = size;
|
||||
fReceiving = false;
|
||||
}
|
||||
|
||||
void FairMQMessageNN::Rebuild()
|
||||
{
|
||||
Clear();
|
||||
fSize = 0;
|
||||
fMessage = NULL;
|
||||
fReceiving = false;
|
||||
Clear();
|
||||
fSize = 0;
|
||||
fMessage = NULL;
|
||||
fReceiving = false;
|
||||
}
|
||||
|
||||
void FairMQMessageNN::Rebuild(size_t size)
|
||||
{
|
||||
Clear();
|
||||
fMessage = nn_allocmsg(size, 0);
|
||||
if (!fMessage){
|
||||
LOG(ERROR) << "failed allocating message, reason: " << nn_strerror(errno);
|
||||
}
|
||||
fSize = size;
|
||||
fReceiving = false;
|
||||
Clear();
|
||||
fMessage = nn_allocmsg(size, 0);
|
||||
if (!fMessage)
|
||||
{
|
||||
LOG(ERROR) << "failed allocating message, reason: " << nn_strerror(errno);
|
||||
}
|
||||
fSize = size;
|
||||
fReceiving = false;
|
||||
}
|
||||
|
||||
void FairMQMessageNN::Rebuild(void* data, size_t size)
|
||||
{
|
||||
Clear();
|
||||
fMessage = nn_allocmsg(size, 0);
|
||||
if (!fMessage){
|
||||
LOG(ERROR) << "failed allocating message, reason: " << nn_strerror(errno);
|
||||
}
|
||||
memcpy (fMessage, data, size);
|
||||
fSize = size;
|
||||
fReceiving = false;
|
||||
Clear();
|
||||
fMessage = nn_allocmsg(size, 0);
|
||||
if (!fMessage)
|
||||
{
|
||||
LOG(ERROR) << "failed allocating message, reason: " << nn_strerror(errno);
|
||||
}
|
||||
memcpy(fMessage, data, size);
|
||||
fSize = size;
|
||||
fReceiving = false;
|
||||
}
|
||||
|
||||
void* FairMQMessageNN::GetMessage()
|
||||
{
|
||||
return fMessage;
|
||||
return fMessage;
|
||||
}
|
||||
|
||||
void* FairMQMessageNN::GetData()
|
||||
{
|
||||
return fMessage;
|
||||
return fMessage;
|
||||
}
|
||||
|
||||
size_t FairMQMessageNN::GetSize()
|
||||
{
|
||||
return fSize;
|
||||
return fSize;
|
||||
}
|
||||
|
||||
void FairMQMessageNN::SetMessage(void* data, size_t size)
|
||||
{
|
||||
fMessage = data;
|
||||
fSize = size;
|
||||
fMessage = data;
|
||||
fSize = size;
|
||||
}
|
||||
|
||||
void FairMQMessageNN::Copy(FairMQMessage* msg)
|
||||
{
|
||||
if (fMessage){
|
||||
int rc = nn_freemsg(fMessage);
|
||||
if ( rc < 0 ){
|
||||
LOG(ERROR) << "failed freeing message, reason: " << nn_strerror(errno);
|
||||
if (fMessage)
|
||||
{
|
||||
int rc = nn_freemsg(fMessage);
|
||||
if (rc < 0)
|
||||
{
|
||||
LOG(ERROR) << "failed freeing message, reason: " << nn_strerror(errno);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
size_t size = msg->GetSize();
|
||||
size_t size = msg->GetSize();
|
||||
|
||||
fMessage = nn_allocmsg(size, 0);
|
||||
if (!fMessage){
|
||||
LOG(ERROR) << "failed allocating message, reason: " << nn_strerror(errno);
|
||||
}
|
||||
std::memcpy (fMessage, msg->GetMessage(), size);
|
||||
fSize = size;
|
||||
fMessage = nn_allocmsg(size, 0);
|
||||
if (!fMessage)
|
||||
{
|
||||
LOG(ERROR) << "failed allocating message, reason: " << nn_strerror(errno);
|
||||
}
|
||||
std::memcpy(fMessage, msg->GetMessage(), size);
|
||||
fSize = size;
|
||||
}
|
||||
|
||||
inline void FairMQMessageNN::Clear()
|
||||
{
|
||||
int rc = nn_freemsg(fMessage);
|
||||
if (rc < 0) {
|
||||
LOG(ERROR) << "failed freeing message, reason: " << nn_strerror(errno);
|
||||
} else {
|
||||
fMessage = NULL;
|
||||
fSize = 0;
|
||||
}
|
||||
int rc = nn_freemsg(fMessage);
|
||||
if (rc < 0)
|
||||
{
|
||||
LOG(ERROR) << "failed freeing message, reason: " << nn_strerror(errno);
|
||||
}
|
||||
else
|
||||
{
|
||||
fMessage = NULL;
|
||||
fSize = 0;
|
||||
}
|
||||
}
|
||||
|
||||
FairMQMessageNN::~FairMQMessageNN()
|
||||
{
|
||||
if(fReceiving){
|
||||
int rc = nn_freemsg(fMessage);
|
||||
if (rc < 0) {
|
||||
LOG(ERROR) << "failed freeing message, reason: " << nn_strerror(errno);
|
||||
} else {
|
||||
fMessage = NULL;
|
||||
fSize = 0;
|
||||
if (fReceiving)
|
||||
{
|
||||
int rc = nn_freemsg(fMessage);
|
||||
if (rc < 0)
|
||||
{
|
||||
LOG(ERROR) << "failed freeing message, reason: " << nn_strerror(errno);
|
||||
}
|
||||
else
|
||||
{
|
||||
fMessage = NULL;
|
||||
fSize = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -12,14 +12,13 @@
|
||||
|
||||
#include "FairMQMessage.h"
|
||||
|
||||
|
||||
class FairMQMessageNN : public FairMQMessage
|
||||
{
|
||||
public:
|
||||
FairMQMessageNN();
|
||||
FairMQMessageNN(size_t size);
|
||||
FairMQMessageNN(void* data, size_t size);
|
||||
|
||||
|
||||
virtual void Rebuild();
|
||||
virtual void Rebuild(size_t size);
|
||||
virtual void Rebuild(void* data, size_t site);
|
||||
|
@@ -11,29 +11,31 @@
|
||||
|
||||
FairMQPollerNN::FairMQPollerNN(const vector<FairMQSocket*>& inputs)
|
||||
{
|
||||
fNumItems = inputs.size();
|
||||
items = new nn_pollfd[fNumItems];
|
||||
fNumItems = inputs.size();
|
||||
items = new nn_pollfd[fNumItems];
|
||||
|
||||
for (int i = 0; i < fNumItems; i++) {
|
||||
items[i].fd = inputs.at(i)->GetSocket(1);
|
||||
items[i].events = NN_POLLIN;
|
||||
}
|
||||
for (int i = 0; i < fNumItems; i++)
|
||||
{
|
||||
items[i].fd = inputs.at(i)->GetSocket(1);
|
||||
items[i].events = NN_POLLIN;
|
||||
}
|
||||
}
|
||||
|
||||
void FairMQPollerNN::Poll(int timeout)
|
||||
{
|
||||
nn_poll(items, fNumItems, timeout);
|
||||
nn_poll(items, fNumItems, timeout);
|
||||
}
|
||||
|
||||
bool FairMQPollerNN::CheckInput(int index)
|
||||
{
|
||||
if (items[index].revents & NN_POLLIN)
|
||||
return true;
|
||||
if (items[index].revents & NN_POLLIN)
|
||||
return true;
|
||||
|
||||
return false;
|
||||
return false;
|
||||
}
|
||||
|
||||
FairMQPollerNN::~FairMQPollerNN()
|
||||
{
|
||||
if (items != NULL) delete [] items;
|
||||
if (items != NULL)
|
||||
delete[] items;
|
||||
}
|
||||
|
@@ -17,9 +17,9 @@ FairMQSocketNN::FairMQSocketNN(const string& type, int num, int numIoThreads) :
|
||||
fMessagesTx(0),
|
||||
fMessagesRx(0)
|
||||
{
|
||||
stringstream id;
|
||||
id << type << "." << num;
|
||||
fId = id.str();
|
||||
stringstream id;
|
||||
id << type << "." << num;
|
||||
fId = id.str();
|
||||
|
||||
if ( numIoThreads > 1 ) {
|
||||
LOG(INFO) << "number of I/O threads is not used in nanomsg";
|
||||
@@ -30,123 +30,140 @@ FairMQSocketNN::FairMQSocketNN(const string& type, int num, int numIoThreads) :
|
||||
nn_setsockopt(fSocket, NN_SUB, NN_SUB_SUBSCRIBE, NULL, 0);
|
||||
}
|
||||
|
||||
LOG(INFO) << "created socket #" << fId;
|
||||
LOG(INFO) << "created socket #" << fId;
|
||||
}
|
||||
|
||||
string FairMQSocketNN::GetId()
|
||||
{
|
||||
return fId;
|
||||
return fId;
|
||||
}
|
||||
|
||||
void FairMQSocketNN::Bind(const string& address)
|
||||
{
|
||||
LOG(INFO) << "bind socket #" << fId << " on " << address;
|
||||
LOG(INFO) << "bind socket #" << fId << " on " << address;
|
||||
|
||||
int eid = nn_bind(fSocket, address.c_str());
|
||||
if (eid < 0) {
|
||||
LOG(ERROR) << "failed binding socket #" << fId << ", reason: " << nn_strerror(errno);
|
||||
}
|
||||
int eid = nn_bind(fSocket, address.c_str());
|
||||
if (eid < 0)
|
||||
{
|
||||
LOG(ERROR) << "failed binding socket #" << fId << ", reason: " << nn_strerror(errno);
|
||||
}
|
||||
}
|
||||
|
||||
void FairMQSocketNN::Connect(const string& address)
|
||||
{
|
||||
LOG(INFO) << "connect socket #" << fId << " to " << address;
|
||||
LOG(INFO) << "connect socket #" << fId << " to " << address;
|
||||
|
||||
int eid = nn_connect(fSocket, address.c_str());
|
||||
if (eid < 0) {
|
||||
LOG(ERROR) << "failed connecting socket #" << fId << ", reason: " << nn_strerror(errno);
|
||||
}
|
||||
int eid = nn_connect(fSocket, address.c_str());
|
||||
if (eid < 0)
|
||||
{
|
||||
LOG(ERROR) << "failed connecting socket #" << fId << ", reason: " << nn_strerror(errno);
|
||||
}
|
||||
}
|
||||
|
||||
size_t FairMQSocketNN::Send(FairMQMessage* msg)
|
||||
{
|
||||
void* ptr = msg->GetMessage();
|
||||
int rc = nn_send(fSocket, &ptr, NN_MSG, 0);
|
||||
if (rc < 0) {
|
||||
LOG(ERROR) << "failed sending on socket #" << fId << ", reason: " << nn_strerror(errno);
|
||||
} else {
|
||||
fBytesTx += rc;
|
||||
++fMessagesTx;
|
||||
static_cast<FairMQMessageNN*>(msg)->fReceiving = false;
|
||||
}
|
||||
void* ptr = msg->GetMessage();
|
||||
int rc = nn_send(fSocket, &ptr, NN_MSG, 0);
|
||||
if (rc < 0)
|
||||
{
|
||||
LOG(ERROR) << "failed sending on socket #" << fId << ", reason: " << nn_strerror(errno);
|
||||
}
|
||||
else
|
||||
{
|
||||
fBytesTx += rc;
|
||||
++fMessagesTx;
|
||||
static_cast<FairMQMessageNN*>(msg)->fReceiving = false;
|
||||
}
|
||||
|
||||
return rc;
|
||||
return rc;
|
||||
}
|
||||
|
||||
size_t FairMQSocketNN::Receive(FairMQMessage* msg)
|
||||
{
|
||||
void* ptr = NULL;
|
||||
int rc = nn_recv(fSocket, &ptr, NN_MSG, 0);
|
||||
if (rc < 0) {
|
||||
LOG(ERROR) << "failed receiving on socket #" << fId << ", reason: " << nn_strerror(errno);
|
||||
} else {
|
||||
fBytesRx += rc;
|
||||
++fMessagesRx;
|
||||
msg->SetMessage(ptr, rc);
|
||||
static_cast<FairMQMessageNN*>(msg)->fReceiving = true;
|
||||
}
|
||||
void* ptr = NULL;
|
||||
int rc = nn_recv(fSocket, &ptr, NN_MSG, 0);
|
||||
if (rc < 0)
|
||||
{
|
||||
LOG(ERROR) << "failed receiving on socket #" << fId << ", reason: " << nn_strerror(errno);
|
||||
}
|
||||
else
|
||||
{
|
||||
fBytesRx += rc;
|
||||
++fMessagesRx;
|
||||
msg->SetMessage(ptr, rc);
|
||||
static_cast<FairMQMessageNN*>(msg)->fReceiving = true;
|
||||
}
|
||||
|
||||
return rc;
|
||||
return rc;
|
||||
}
|
||||
|
||||
void* FairMQSocketNN::GetSocket()
|
||||
{
|
||||
return NULL; // dummy method to comply with the interface. functionality not possible in zeromq.
|
||||
return NULL; // dummy method to comply with the interface. functionality not possible in zeromq.
|
||||
}
|
||||
|
||||
int FairMQSocketNN::GetSocket(int nothing)
|
||||
{
|
||||
return fSocket;
|
||||
return fSocket;
|
||||
}
|
||||
|
||||
void FairMQSocketNN::Close()
|
||||
{
|
||||
nn_close(fSocket);
|
||||
nn_close(fSocket);
|
||||
}
|
||||
|
||||
void FairMQSocketNN::SetOption(const string& option, const void* value, size_t valueSize)
|
||||
{
|
||||
int rc = nn_setsockopt(fSocket, NN_SOL_SOCKET, GetConstant(option), value, valueSize);
|
||||
if (rc < 0) {
|
||||
LOG(ERROR) << "failed setting socket option, reason: " << nn_strerror(errno);
|
||||
}
|
||||
int rc = nn_setsockopt(fSocket, NN_SOL_SOCKET, GetConstant(option), value, valueSize);
|
||||
if (rc < 0)
|
||||
{
|
||||
LOG(ERROR) << "failed setting socket option, reason: " << nn_strerror(errno);
|
||||
}
|
||||
}
|
||||
|
||||
unsigned long FairMQSocketNN::GetBytesTx()
|
||||
{
|
||||
return fBytesTx;
|
||||
return fBytesTx;
|
||||
}
|
||||
|
||||
unsigned long FairMQSocketNN::GetBytesRx()
|
||||
{
|
||||
return fBytesRx;
|
||||
return fBytesRx;
|
||||
}
|
||||
|
||||
unsigned long FairMQSocketNN::GetMessagesTx()
|
||||
{
|
||||
return fMessagesTx;
|
||||
return fMessagesTx;
|
||||
}
|
||||
|
||||
unsigned long FairMQSocketNN::GetMessagesRx()
|
||||
{
|
||||
return fMessagesRx;
|
||||
return fMessagesRx;
|
||||
}
|
||||
|
||||
int FairMQSocketNN::GetConstant(const string& constant)
|
||||
{
|
||||
if (constant == "sub") return NN_SUB;
|
||||
if (constant == "pub") return NN_PUB;
|
||||
if (constant == "xsub") return NN_SUB; // TODO: is there XPUB, XSUB for nanomsg?
|
||||
if (constant == "xpub") return NN_PUB;
|
||||
if (constant == "push") return NN_PUSH;
|
||||
if (constant == "pull") return NN_PULL;
|
||||
if (constant == "snd-hwm") return NN_SNDBUF;
|
||||
if (constant == "rcv-hwm") return NN_RCVBUF;
|
||||
if (constant == "sub")
|
||||
return NN_SUB;
|
||||
if (constant == "pub")
|
||||
return NN_PUB;
|
||||
if (constant == "xsub")
|
||||
return NN_SUB; // TODO: is there XPUB, XSUB for nanomsg?
|
||||
if (constant == "xpub")
|
||||
return NN_PUB;
|
||||
if (constant == "push")
|
||||
return NN_PUSH;
|
||||
if (constant == "pull")
|
||||
return NN_PULL;
|
||||
if (constant == "snd-hwm")
|
||||
return NN_SNDBUF;
|
||||
if (constant == "rcv-hwm")
|
||||
return NN_RCVBUF;
|
||||
|
||||
return -1;
|
||||
return -1;
|
||||
}
|
||||
|
||||
FairMQSocketNN::~FairMQSocketNN()
|
||||
{
|
||||
Close();
|
||||
Close();
|
||||
}
|
||||
|
@@ -14,7 +14,6 @@
|
||||
|
||||
#include "FairMQSocket.h"
|
||||
|
||||
|
||||
class FairMQSocketNN : public FairMQSocket
|
||||
{
|
||||
public:
|
||||
|
@@ -9,22 +9,22 @@
|
||||
|
||||
FairMQTransportFactoryNN::FairMQTransportFactoryNN()
|
||||
{
|
||||
LOG(INFO) << "Using nanonsg library";
|
||||
LOG(INFO) << "Using nanonsg library";
|
||||
}
|
||||
|
||||
FairMQMessage* FairMQTransportFactoryNN::CreateMessage()
|
||||
{
|
||||
return new FairMQMessageNN();
|
||||
return new FairMQMessageNN();
|
||||
}
|
||||
|
||||
FairMQMessage* FairMQTransportFactoryNN::CreateMessage(size_t size)
|
||||
{
|
||||
return new FairMQMessageNN(size);
|
||||
return new FairMQMessageNN(size);
|
||||
}
|
||||
|
||||
FairMQMessage* FairMQTransportFactoryNN::CreateMessage(void* data, size_t size)
|
||||
{
|
||||
return new FairMQMessageNN(data, size);
|
||||
return new FairMQMessageNN(data, size);
|
||||
}
|
||||
|
||||
FairMQSocket* FairMQTransportFactoryNN::CreateSocket(const string& type, int num, int numIoThreads)
|
||||
@@ -34,5 +34,5 @@ FairMQSocket* FairMQTransportFactoryNN::CreateSocket(const string& type, int num
|
||||
|
||||
FairMQPoller* FairMQTransportFactoryNN::CreatePoller(const vector<FairMQSocket*>& inputs)
|
||||
{
|
||||
return new FairMQPollerNN(inputs);
|
||||
return new FairMQPollerNN(inputs);
|
||||
}
|
||||
|
@@ -26,7 +26,6 @@ class FairMQTransportFactoryNN : public FairMQTransportFactory
|
||||
virtual FairMQSocket* CreateSocket(const string& type, int num, int numIoThreads);
|
||||
virtual FairMQPoller* CreatePoller(const vector<FairMQSocket*>& inputs);
|
||||
|
||||
|
||||
virtual ~FairMQTransportFactoryNN() {};
|
||||
};
|
||||
|
||||
|
Reference in New Issue
Block a user