mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-12 16:21:13 +00:00
use clang-format for Tutorial3
This commit is contained in:
parent
08e603911d
commit
c03d4ad6d5
|
@ -23,80 +23,96 @@ FairMQDevice::FairMQDevice()
|
||||||
|
|
||||||
void FairMQDevice::Init()
|
void FairMQDevice::Init()
|
||||||
{
|
{
|
||||||
LOG(INFO) << ">>>>>>> Init <<<<<<<";
|
LOG(INFO) << ">>>>>>> Init <<<<<<<";
|
||||||
LOG(INFO) << "numIoThreads: " << fNumIoThreads;
|
LOG(INFO) << "numIoThreads: " << fNumIoThreads;
|
||||||
|
|
||||||
fInputAddress = new vector<string>(fNumInputs);
|
fInputAddress = new vector<string>(fNumInputs);
|
||||||
fInputMethod = new vector<string>();
|
fInputMethod = new vector<string>();
|
||||||
fInputSocketType = new vector<string>();
|
fInputSocketType = new vector<string>();
|
||||||
fInputSndBufSize = new vector<int>();
|
fInputSndBufSize = new vector<int>();
|
||||||
fInputRcvBufSize = new vector<int>();
|
fInputRcvBufSize = new vector<int>();
|
||||||
|
|
||||||
for (int i = 0; i < fNumInputs; ++i) {
|
for (int i = 0; i < fNumInputs; ++i)
|
||||||
fInputMethod->push_back("connect"); // default value, can be overwritten in configuration
|
{
|
||||||
fInputSocketType->push_back("sub"); // default value, can be overwritten in configuration
|
fInputMethod->push_back("connect"); // default value, can be overwritten in configuration
|
||||||
fInputSndBufSize->push_back(10000); // default value, can be overwritten in configuration
|
fInputSocketType->push_back("sub"); // default value, can be overwritten in configuration
|
||||||
fInputRcvBufSize->push_back(10000); // default value, can be overwritten in configuration
|
fInputSndBufSize->push_back(10000); // default value, can be overwritten in configuration
|
||||||
}
|
fInputRcvBufSize->push_back(10000); // default value, can be overwritten in configuration
|
||||||
|
}
|
||||||
|
|
||||||
fOutputAddress = new vector<string>(fNumOutputs);
|
fOutputAddress = new vector<string>(fNumOutputs);
|
||||||
fOutputMethod = new vector<string>();
|
fOutputMethod = new vector<string>();
|
||||||
fOutputSocketType = new vector<string>();
|
fOutputSocketType = new vector<string>();
|
||||||
fOutputSndBufSize = new vector<int>();
|
fOutputSndBufSize = new vector<int>();
|
||||||
fOutputRcvBufSize = new vector<int>();
|
fOutputRcvBufSize = new vector<int>();
|
||||||
|
|
||||||
for (int i = 0; i < fNumOutputs; ++i) {
|
for (int i = 0; i < fNumOutputs; ++i)
|
||||||
fOutputMethod->push_back("bind"); // default value, can be overwritten in configuration
|
{
|
||||||
fOutputSocketType->push_back("pub"); // default value, can be overwritten in configuration
|
fOutputMethod->push_back("bind"); // default value, can be overwritten in configuration
|
||||||
fOutputSndBufSize->push_back(10000); // default value, can be overwritten in configuration
|
fOutputSocketType->push_back("pub"); // default value, can be overwritten in configuration
|
||||||
fOutputRcvBufSize->push_back(10000); // default value, can be overwritten in configuration
|
fOutputSndBufSize->push_back(10000); // default value, can be overwritten in configuration
|
||||||
}
|
fOutputRcvBufSize->push_back(10000); // default value, can be overwritten in configuration
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void FairMQDevice::InitInput()
|
void FairMQDevice::InitInput()
|
||||||
{
|
{
|
||||||
LOG(INFO) << ">>>>>>> InitInput <<<<<<<";
|
LOG(INFO) << ">>>>>>> InitInput <<<<<<<";
|
||||||
|
|
||||||
for (int i = 0; i < fNumInputs; ++i) {
|
for (int i = 0; i < fNumInputs; ++i)
|
||||||
FairMQSocket* socket = fTransportFactory->CreateSocket(fInputSocketType->at(i), i, fNumIoThreads);
|
{
|
||||||
|
FairMQSocket* socket = fTransportFactory->CreateSocket(fInputSocketType->at(i), i, fNumIoThreads);
|
||||||
|
|
||||||
socket->SetOption("snd-hwm", &fInputSndBufSize->at(i), sizeof(fInputSndBufSize->at(i)));
|
socket->SetOption("snd-hwm", &fInputSndBufSize->at(i), sizeof(fInputSndBufSize->at(i)));
|
||||||
socket->SetOption("rcv-hwm", &fInputRcvBufSize->at(i), sizeof(fInputRcvBufSize->at(i)));
|
socket->SetOption("rcv-hwm", &fInputRcvBufSize->at(i), sizeof(fInputRcvBufSize->at(i)));
|
||||||
|
|
||||||
fPayloadInputs->push_back(socket);
|
fPayloadInputs->push_back(socket);
|
||||||
|
|
||||||
try {
|
try
|
||||||
if (fInputMethod->at(i) == "bind") {
|
{
|
||||||
fPayloadInputs->at(i)->Bind(fInputAddress->at(i));
|
if (fInputMethod->at(i) == "bind")
|
||||||
} else {
|
{
|
||||||
fPayloadInputs->at(i)->Connect(fInputAddress->at(i));
|
fPayloadInputs->at(i)->Bind(fInputAddress->at(i));
|
||||||
}
|
}
|
||||||
} catch (std::out_of_range& e) {
|
else
|
||||||
|
{
|
||||||
|
fPayloadInputs->at(i)->Connect(fInputAddress->at(i));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (std::out_of_range& e)
|
||||||
|
{
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void FairMQDevice::InitOutput()
|
void FairMQDevice::InitOutput()
|
||||||
{
|
{
|
||||||
LOG(INFO) << ">>>>>>> InitOutput <<<<<<<";
|
LOG(INFO) << ">>>>>>> InitOutput <<<<<<<";
|
||||||
|
|
||||||
for (int i = 0; i < fNumOutputs; ++i) {
|
for (int i = 0; i < fNumOutputs; ++i)
|
||||||
FairMQSocket* socket = fTransportFactory->CreateSocket(fOutputSocketType->at(i), i, fNumIoThreads);
|
{
|
||||||
|
FairMQSocket* socket = fTransportFactory->CreateSocket(fOutputSocketType->at(i), i, fNumIoThreads);
|
||||||
|
|
||||||
socket->SetOption("snd-hwm", &fOutputSndBufSize->at(i), sizeof(fOutputSndBufSize->at(i)));
|
socket->SetOption("snd-hwm", &fOutputSndBufSize->at(i), sizeof(fOutputSndBufSize->at(i)));
|
||||||
socket->SetOption("rcv-hwm", &fOutputRcvBufSize->at(i), sizeof(fOutputRcvBufSize->at(i)));
|
socket->SetOption("rcv-hwm", &fOutputRcvBufSize->at(i), sizeof(fOutputRcvBufSize->at(i)));
|
||||||
|
|
||||||
fPayloadOutputs->push_back(socket);
|
fPayloadOutputs->push_back(socket);
|
||||||
|
|
||||||
try {
|
try
|
||||||
if (fOutputMethod->at(i) == "bind") {
|
{
|
||||||
fPayloadOutputs->at(i)->Bind(fOutputAddress->at(i));
|
if (fOutputMethod->at(i) == "bind")
|
||||||
} else {
|
{
|
||||||
fPayloadOutputs->at(i)->Connect(fOutputAddress->at(i));
|
fPayloadOutputs->at(i)->Bind(fOutputAddress->at(i));
|
||||||
}
|
}
|
||||||
} catch (std::out_of_range& e) {
|
else
|
||||||
|
{
|
||||||
|
fPayloadOutputs->at(i)->Connect(fOutputAddress->at(i));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (std::out_of_range& e)
|
||||||
|
{
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void FairMQDevice::Run()
|
void FairMQDevice::Run()
|
||||||
|
|
|
@ -169,18 +169,18 @@ namespace FairMQFSM
|
||||||
}
|
}
|
||||||
// Transition table for FairMQFMS
|
// Transition table for FairMQFMS
|
||||||
struct transition_table : mpl::vector<
|
struct transition_table : mpl::vector<
|
||||||
// Start Event Next Action Guard
|
// Start Event Next Action Guard
|
||||||
// +---------+---------+-------+---------+--------+
|
// +---------+---------+-------+---------+--------+
|
||||||
msmf::Row<IDLE_FSM, INIT, INITIALIZING_FSM, InitFct, msmf::none>,
|
msmf::Row<IDLE_FSM, INIT, INITIALIZING_FSM, InitFct, msmf::none>,
|
||||||
msmf::Row<IDLE_FSM, END, msmf::none, TestFct, msmf::none>, // this is an invalid transition...
|
msmf::Row<IDLE_FSM, END, msmf::none, TestFct, msmf::none>, // this is an invalid transition...
|
||||||
msmf::Row<INITIALIZING_FSM, SETOUTPUT, SETTINGOUTPUT_FSM, SetOutputFct, msmf::none>,
|
msmf::Row<INITIALIZING_FSM, SETOUTPUT, SETTINGOUTPUT_FSM, SetOutputFct, msmf::none>,
|
||||||
msmf::Row<SETTINGOUTPUT_FSM, SETINPUT, SETTINGINPUT_FSM, SetInputFct, msmf::none>,
|
msmf::Row<SETTINGOUTPUT_FSM, SETINPUT, SETTINGINPUT_FSM, SetInputFct, msmf::none>,
|
||||||
msmf::Row<SETTINGINPUT_FSM, PAUSE, WAITING_FSM, PauseFct, msmf::none>,
|
msmf::Row<SETTINGINPUT_FSM, PAUSE, WAITING_FSM, PauseFct, msmf::none>,
|
||||||
msmf::Row<SETTINGINPUT_FSM, RUN, RUNNING_FSM, RunFct, msmf::none>,
|
msmf::Row<SETTINGINPUT_FSM, RUN, RUNNING_FSM, RunFct, msmf::none>,
|
||||||
msmf::Row<WAITING_FSM, RUN, RUNNING_FSM, RunFct, msmf::none>,
|
msmf::Row<WAITING_FSM, RUN, RUNNING_FSM, RunFct, msmf::none>,
|
||||||
msmf::Row<WAITING_FSM, STOP, IDLE_FSM, StopFct, msmf::none>,
|
msmf::Row<WAITING_FSM, STOP, IDLE_FSM, StopFct, msmf::none>,
|
||||||
msmf::Row<RUNNING_FSM, PAUSE, WAITING_FSM, PauseFct, msmf::none>,
|
msmf::Row<RUNNING_FSM, PAUSE, WAITING_FSM, PauseFct, msmf::none>,
|
||||||
msmf::Row<RUNNING_FSM, STOP, IDLE_FSM, StopFct, msmf::none> >
|
msmf::Row<RUNNING_FSM, STOP, IDLE_FSM, StopFct, msmf::none> >
|
||||||
{
|
{
|
||||||
};
|
};
|
||||||
// Replaces the default no-transition response.
|
// Replaces the default no-transition response.
|
||||||
|
|
|
@ -1,3 +1,3 @@
|
||||||
#!/bin/bash
|
#!/bin/bash
|
||||||
|
|
||||||
find . -type f \( -iname "*.h" ! -iname "*.pb.h" -o -iname "*.cxx" \) -execdir clang-format -i {} \;
|
find . -type f \( -iname "*.h" ! -iname "*.pb.h" ! -iname "*LinkDef.h" -o -iname "*.cxx" -o -iname "*.tpl" \) -execdir clang-format -i {} \;
|
||||||
|
|
|
@ -11,24 +11,26 @@
|
||||||
#include "FairMQMessageNN.h"
|
#include "FairMQMessageNN.h"
|
||||||
#include "FairMQLogger.h"
|
#include "FairMQLogger.h"
|
||||||
|
|
||||||
FairMQSocketNN::FairMQSocketNN(const string& type, int num, int numIoThreads) :
|
FairMQSocketNN::FairMQSocketNN(const string& type, int num, int numIoThreads)
|
||||||
fBytesTx(0),
|
: fBytesTx(0)
|
||||||
fBytesRx(0),
|
, fBytesRx(0)
|
||||||
fMessagesTx(0),
|
, fMessagesTx(0)
|
||||||
fMessagesRx(0)
|
, fMessagesRx(0)
|
||||||
{
|
{
|
||||||
stringstream id;
|
stringstream id;
|
||||||
id << type << "." << num;
|
id << type << "." << num;
|
||||||
fId = id.str();
|
fId = id.str();
|
||||||
|
|
||||||
if ( numIoThreads > 1 ) {
|
if (numIoThreads > 1)
|
||||||
LOG(INFO) << "number of I/O threads is not used in nanomsg";
|
{
|
||||||
}
|
LOG(INFO) << "number of I/O threads is not used in nanomsg";
|
||||||
|
}
|
||||||
|
|
||||||
fSocket = nn_socket (AF_SP, GetConstant(type));
|
fSocket = nn_socket(AF_SP, GetConstant(type));
|
||||||
if (type == "sub") {
|
if (type == "sub")
|
||||||
nn_setsockopt(fSocket, NN_SUB, NN_SUB_SUBSCRIBE, NULL, 0);
|
{
|
||||||
}
|
nn_setsockopt(fSocket, NN_SUB, NN_SUB_SUBSCRIBE, NULL, 0);
|
||||||
|
}
|
||||||
|
|
||||||
LOG(INFO) << "created socket #" << fId;
|
LOG(INFO) << "created socket #" << fId;
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,7 +29,7 @@ FairMQMessage* FairMQTransportFactoryNN::CreateMessage(void* data, size_t size)
|
||||||
|
|
||||||
FairMQSocket* FairMQTransportFactoryNN::CreateSocket(const string& type, int num, int numIoThreads)
|
FairMQSocket* FairMQTransportFactoryNN::CreateSocket(const string& type, int num, int numIoThreads)
|
||||||
{
|
{
|
||||||
return new FairMQSocketNN(type, num, numIoThreads);
|
return new FairMQSocketNN(type, num, numIoThreads);
|
||||||
}
|
}
|
||||||
|
|
||||||
FairMQPoller* FairMQTransportFactoryNN::CreatePoller(const vector<FairMQSocket*>& inputs)
|
FairMQPoller* FairMQTransportFactoryNN::CreatePoller(const vector<FairMQSocket*>& inputs)
|
||||||
|
|
|
@ -12,36 +12,40 @@
|
||||||
|
|
||||||
boost::shared_ptr<FairMQContextZMQ> FairMQSocketZMQ::fContext = boost::shared_ptr<FairMQContextZMQ>(new FairMQContextZMQ(1));
|
boost::shared_ptr<FairMQContextZMQ> FairMQSocketZMQ::fContext = boost::shared_ptr<FairMQContextZMQ>(new FairMQContextZMQ(1));
|
||||||
|
|
||||||
FairMQSocketZMQ::FairMQSocketZMQ(const string& type, int num, int numIoThreads) :
|
FairMQSocketZMQ::FairMQSocketZMQ(const string& type, int num, int numIoThreads)
|
||||||
fBytesTx(0),
|
: fBytesTx(0)
|
||||||
fBytesRx(0),
|
, fBytesRx(0)
|
||||||
fMessagesTx(0),
|
, fMessagesTx(0)
|
||||||
fMessagesRx(0)
|
, fMessagesRx(0)
|
||||||
{
|
{
|
||||||
stringstream id;
|
stringstream id;
|
||||||
id << type << "." << num;
|
id << type << "." << num;
|
||||||
fId = id.str();
|
fId = id.str();
|
||||||
|
|
||||||
int rc = zmq_ctx_set (fContext->GetContext(), ZMQ_IO_THREADS, numIoThreads);
|
int rc = zmq_ctx_set(fContext->GetContext(), ZMQ_IO_THREADS, numIoThreads);
|
||||||
if (rc != 0){
|
if (rc != 0)
|
||||||
LOG(ERROR) << "failed configuring context, reason: " << zmq_strerror(errno);
|
{
|
||||||
}
|
LOG(ERROR) << "failed configuring context, reason: " << zmq_strerror(errno);
|
||||||
|
|
||||||
fSocket = zmq_socket(fContext->GetContext(), GetConstant(type));
|
|
||||||
|
|
||||||
rc = zmq_setsockopt(fSocket, ZMQ_IDENTITY, &fId, fId.length());
|
|
||||||
if (rc != 0) {
|
|
||||||
LOG(ERROR) << "failed setting socket option, reason: " << zmq_strerror(errno);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (type == "sub") {
|
|
||||||
rc = zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, NULL, 0);
|
|
||||||
if (rc != 0) {
|
|
||||||
LOG(ERROR) << "failed setting socket option, reason: " << zmq_strerror(errno);
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
LOG(INFO) << "created socket #" << fId;
|
fSocket = zmq_socket(fContext->GetContext(), GetConstant(type));
|
||||||
|
|
||||||
|
rc = zmq_setsockopt(fSocket, ZMQ_IDENTITY, &fId, fId.length());
|
||||||
|
if (rc != 0)
|
||||||
|
{
|
||||||
|
LOG(ERROR) << "failed setting socket option, reason: " << zmq_strerror(errno);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (type == "sub")
|
||||||
|
{
|
||||||
|
rc = zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, NULL, 0);
|
||||||
|
if (rc != 0)
|
||||||
|
{
|
||||||
|
LOG(ERROR) << "failed setting socket option, reason: " << zmq_strerror(errno);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG(INFO) << "created socket #" << fId;
|
||||||
}
|
}
|
||||||
|
|
||||||
string FairMQSocketZMQ::GetId()
|
string FairMQSocketZMQ::GetId()
|
||||||
|
|
|
@ -33,7 +33,7 @@ FairMQMessage* FairMQTransportFactoryZMQ::CreateMessage(void* data, size_t size)
|
||||||
|
|
||||||
FairMQSocket* FairMQTransportFactoryZMQ::CreateSocket(const string& type, int num, int numIoThreads)
|
FairMQSocket* FairMQTransportFactoryZMQ::CreateSocket(const string& type, int num, int numIoThreads)
|
||||||
{
|
{
|
||||||
return new FairMQSocketZMQ(type, num, numIoThreads);
|
return new FairMQSocketZMQ(type, num, numIoThreads);
|
||||||
}
|
}
|
||||||
|
|
||||||
FairMQPoller* FairMQTransportFactoryZMQ::CreatePoller(const vector<FairMQSocket*>& inputs)
|
FairMQPoller* FairMQTransportFactoryZMQ::CreatePoller(const vector<FairMQSocket*>& inputs)
|
||||||
|
|
Loading…
Reference in New Issue
Block a user