mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 00:31:14 +00:00
Fix issues found by Codacy
This commit is contained in:
parent
985150437a
commit
e1f555bc05
|
@ -104,19 +104,22 @@ FairMQChannel::FairMQChannel(const FairMQChannel& chan)
|
||||||
|
|
||||||
FairMQChannel& FairMQChannel::operator=(const FairMQChannel& chan)
|
FairMQChannel& FairMQChannel::operator=(const FairMQChannel& chan)
|
||||||
{
|
{
|
||||||
|
fSocket = nullptr;
|
||||||
fType = chan.fType;
|
fType = chan.fType;
|
||||||
fMethod = chan.fMethod;
|
fMethod = chan.fMethod;
|
||||||
fAddress = chan.fAddress;
|
fAddress = chan.fAddress;
|
||||||
|
fTransportType = chan.fTransportType;
|
||||||
fSndBufSize = chan.fSndBufSize;
|
fSndBufSize = chan.fSndBufSize;
|
||||||
fRcvBufSize = chan.fRcvBufSize;
|
fRcvBufSize = chan.fRcvBufSize;
|
||||||
fSndKernelSize = chan.fSndKernelSize;
|
fSndKernelSize = chan.fSndKernelSize;
|
||||||
fRcvKernelSize = chan.fRcvKernelSize;
|
fRcvKernelSize = chan.fRcvKernelSize;
|
||||||
fRateLogging = chan.fRateLogging;
|
fRateLogging = chan.fRateLogging;
|
||||||
fSocket = nullptr;
|
|
||||||
fName = chan.fName;
|
fName = chan.fName;
|
||||||
fIsValid = false;
|
fIsValid = false;
|
||||||
fTransportType = chan.fTransportType;
|
|
||||||
fTransportFactory = nullptr;
|
fTransportFactory = nullptr;
|
||||||
|
fMultipart = chan.fMultipart;
|
||||||
|
fModified = chan.fModified;
|
||||||
|
fReset = false;
|
||||||
|
|
||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
|
|
|
@ -816,8 +816,6 @@ void FairMQDevice::LogSocketRates()
|
||||||
chrono::time_point<chrono::high_resolution_clock> t0;
|
chrono::time_point<chrono::high_resolution_clock> t0;
|
||||||
chrono::time_point<chrono::high_resolution_clock> t1;
|
chrono::time_point<chrono::high_resolution_clock> t1;
|
||||||
|
|
||||||
unsigned long long msSinceLastLog;
|
|
||||||
|
|
||||||
vector<FairMQSocket*> filteredSockets;
|
vector<FairMQSocket*> filteredSockets;
|
||||||
vector<string> filteredChannelNames;
|
vector<string> filteredChannelNames;
|
||||||
vector<int> logIntervals;
|
vector<int> logIntervals;
|
||||||
|
@ -879,7 +877,7 @@ void FairMQDevice::LogSocketRates()
|
||||||
{
|
{
|
||||||
t1 = chrono::high_resolution_clock::now();
|
t1 = chrono::high_resolution_clock::now();
|
||||||
|
|
||||||
msSinceLastLog = chrono::duration_cast<chrono::milliseconds>(t1 - t0).count();
|
unsigned long long msSinceLastLog = chrono::duration_cast<chrono::milliseconds>(t1 - t0).count();
|
||||||
|
|
||||||
i = 0;
|
i = 0;
|
||||||
|
|
||||||
|
|
|
@ -18,8 +18,8 @@ class FairMQPoller
|
||||||
virtual void Poll(const int timeout) = 0;
|
virtual void Poll(const int timeout) = 0;
|
||||||
virtual bool CheckInput(const int index) = 0;
|
virtual bool CheckInput(const int index) = 0;
|
||||||
virtual bool CheckOutput(const int index) = 0;
|
virtual bool CheckOutput(const int index) = 0;
|
||||||
virtual bool CheckInput(const std::string channelKey, const int index) = 0;
|
virtual bool CheckInput(const std::string& channelKey, const int index) = 0;
|
||||||
virtual bool CheckOutput(const std::string channelKey, const int index) = 0;
|
virtual bool CheckOutput(const std::string& channelKey, const int index) = 0;
|
||||||
|
|
||||||
virtual ~FairMQPoller() {};
|
virtual ~FairMQPoller() {};
|
||||||
};
|
};
|
||||||
|
|
|
@ -644,7 +644,7 @@ bool FairMQStateMachine::CheckCurrentState(int state) const
|
||||||
{
|
{
|
||||||
return state == static_pointer_cast<FairMQFSM>(fFsm)->fState;
|
return state == static_pointer_cast<FairMQFSM>(fFsm)->fState;
|
||||||
}
|
}
|
||||||
bool FairMQStateMachine::CheckCurrentState(string state) const
|
bool FairMQStateMachine::CheckCurrentState(const string& state) const
|
||||||
{
|
{
|
||||||
return state == GetCurrentStateName();
|
return state == GetCurrentStateName();
|
||||||
}
|
}
|
||||||
|
|
|
@ -82,7 +82,7 @@ class FairMQStateMachine
|
||||||
static std::string GetStateName(const State);
|
static std::string GetStateName(const State);
|
||||||
int GetCurrentState() const;
|
int GetCurrentState() const;
|
||||||
bool CheckCurrentState(int state) const;
|
bool CheckCurrentState(int state) const;
|
||||||
bool CheckCurrentState(std::string state) const;
|
bool CheckCurrentState(const std::string& state) const;
|
||||||
|
|
||||||
// actions to be overwritten by derived classes
|
// actions to be overwritten by derived classes
|
||||||
virtual void InitWrapper() {}
|
virtual void InitWrapper() {}
|
||||||
|
|
|
@ -68,10 +68,9 @@ FairMQPollerNN::FairMQPollerNN(const unordered_map<string, vector<FairMQChannel>
|
||||||
, fNumItems(0)
|
, fNumItems(0)
|
||||||
, fOffsetMap()
|
, fOffsetMap()
|
||||||
{
|
{
|
||||||
int offset = 0;
|
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
int offset = 0;
|
||||||
// calculate offsets and the total size of the poll item set
|
// calculate offsets and the total size of the poll item set
|
||||||
for (string channel : channelList)
|
for (string channel : channelList)
|
||||||
{
|
{
|
||||||
|
@ -184,7 +183,7 @@ bool FairMQPollerNN::CheckOutput(const int index)
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool FairMQPollerNN::CheckInput(const string channelKey, const int index)
|
bool FairMQPollerNN::CheckInput(const string& channelKey, const int index)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
@ -203,7 +202,7 @@ bool FairMQPollerNN::CheckInput(const string channelKey, const int index)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool FairMQPollerNN::CheckOutput(const string channelKey, const int index)
|
bool FairMQPollerNN::CheckOutput(const string& channelKey, const int index)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
|
|
@ -44,8 +44,8 @@ class FairMQPollerNN : public FairMQPoller
|
||||||
virtual void Poll(const int timeout);
|
virtual void Poll(const int timeout);
|
||||||
virtual bool CheckInput(const int index);
|
virtual bool CheckInput(const int index);
|
||||||
virtual bool CheckOutput(const int index);
|
virtual bool CheckOutput(const int index);
|
||||||
virtual bool CheckInput(const std::string channelKey, const int index);
|
virtual bool CheckInput(const std::string& channelKey, const int index);
|
||||||
virtual bool CheckOutput(const std::string channelKey, const int index);
|
virtual bool CheckOutput(const std::string& channelKey, const int index);
|
||||||
|
|
||||||
virtual ~FairMQPollerNN();
|
virtual ~FairMQPollerNN();
|
||||||
|
|
||||||
|
|
|
@ -197,7 +197,6 @@ int FairMQSocketNN::SendImpl(FairMQMessagePtr& msg, const int flags, const int t
|
||||||
|
|
||||||
int FairMQSocketNN::ReceiveImpl(FairMQMessagePtr& msg, const int flags, const int timeout)
|
int FairMQSocketNN::ReceiveImpl(FairMQMessagePtr& msg, const int flags, const int timeout)
|
||||||
{
|
{
|
||||||
int nbytes = -1;
|
|
||||||
int elapsed = 0;
|
int elapsed = 0;
|
||||||
|
|
||||||
FairMQMessageNN* msgPtr = static_cast<FairMQMessageNN*>(msg.get());
|
FairMQMessageNN* msgPtr = static_cast<FairMQMessageNN*>(msg.get());
|
||||||
|
@ -205,7 +204,7 @@ int FairMQSocketNN::ReceiveImpl(FairMQMessagePtr& msg, const int flags, const in
|
||||||
while (true)
|
while (true)
|
||||||
{
|
{
|
||||||
void* ptr = nullptr;
|
void* ptr = nullptr;
|
||||||
nbytes = nn_recv(fSocket, &ptr, NN_MSG, flags);
|
int nbytes = nn_recv(fSocket, &ptr, NN_MSG, flags);
|
||||||
if (nbytes >= 0)
|
if (nbytes >= 0)
|
||||||
{
|
{
|
||||||
fBytesRx += nbytes;
|
fBytesRx += nbytes;
|
||||||
|
@ -279,11 +278,9 @@ int64_t FairMQSocketNN::SendImpl(vector<FairMQMessagePtr>& msgVec, const int fla
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t nbytes = -1;
|
|
||||||
|
|
||||||
while (true)
|
while (true)
|
||||||
{
|
{
|
||||||
nbytes = nn_send(fSocket, sbuf.data(), sbuf.size(), flags);
|
int64_t nbytes = nn_send(fSocket, sbuf.data(), sbuf.size(), flags);
|
||||||
if (nbytes >= 0)
|
if (nbytes >= 0)
|
||||||
{
|
{
|
||||||
fBytesTx += nbytes;
|
fBytesTx += nbytes;
|
||||||
|
|
|
@ -59,9 +59,8 @@ Poller::Poller(const vector<const FairMQChannel*>& channels)
|
||||||
|
|
||||||
Poller::Poller(const unordered_map<string, vector<FairMQChannel>>& channelsMap, const vector<string>& channelList)
|
Poller::Poller(const unordered_map<string, vector<FairMQChannel>>& channelsMap, const vector<string>& channelList)
|
||||||
{
|
{
|
||||||
int offset = 0;
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
int offset = 0;
|
||||||
// calculate offsets and the total size of the poll item set
|
// calculate offsets and the total size of the poll item set
|
||||||
for (string channel : channelList) {
|
for (string channel : channelList) {
|
||||||
fOffsetMap[channel] = offset;
|
fOffsetMap[channel] = offset;
|
||||||
|
|
|
@ -33,7 +33,7 @@ namespace mq
|
||||||
namespace plugins
|
namespace plugins
|
||||||
{
|
{
|
||||||
|
|
||||||
Control::Control(const string name, const Plugin::Version version, const string maintainer, const string homepage, PluginServices* pluginServices)
|
Control::Control(const string& name, const Plugin::Version version, const string& maintainer, const string& homepage, PluginServices* pluginServices)
|
||||||
: Plugin(name, version, maintainer, homepage, pluginServices)
|
: Plugin(name, version, maintainer, homepage, pluginServices)
|
||||||
, fControllerThread()
|
, fControllerThread()
|
||||||
, fSignalHandlerThread()
|
, fSignalHandlerThread()
|
||||||
|
|
|
@ -29,7 +29,7 @@ namespace plugins
|
||||||
class Control : public Plugin
|
class Control : public Plugin
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
Control(const std::string name, const Plugin::Version version, const std::string maintainer, const std::string homepage, PluginServices* pluginServices);
|
Control(const std::string& name, const Plugin::Version version, const std::string& maintainer, const std::string& homepage, PluginServices* pluginServices);
|
||||||
|
|
||||||
~Control();
|
~Control();
|
||||||
|
|
||||||
|
|
|
@ -31,7 +31,7 @@ namespace mq
|
||||||
namespace plugins
|
namespace plugins
|
||||||
{
|
{
|
||||||
|
|
||||||
DDS::DDS(const string name, const Plugin::Version version, const string maintainer, const string homepage, PluginServices* pluginServices)
|
DDS::DDS(const string& name, const Plugin::Version version, const string& maintainer, const string& homepage, PluginServices* pluginServices)
|
||||||
: Plugin(name, version, maintainer, homepage, pluginServices)
|
: Plugin(name, version, maintainer, homepage, pluginServices)
|
||||||
, fService()
|
, fService()
|
||||||
, fDDSCustomCmd(fService)
|
, fDDSCustomCmd(fService)
|
||||||
|
|
|
@ -61,7 +61,7 @@ struct IofN
|
||||||
class DDS : public Plugin
|
class DDS : public Plugin
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
DDS(const std::string name, const Plugin::Version version, const std::string maintainer, const std::string homepage, PluginServices* pluginServices);
|
DDS(const std::string& name, const Plugin::Version version, const std::string& maintainer, const std::string& homepage, PluginServices* pluginServices);
|
||||||
|
|
||||||
~DDS();
|
~DDS();
|
||||||
|
|
||||||
|
|
|
@ -33,7 +33,7 @@ int main(int argc, char* argv[])
|
||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
string sessionID;
|
string sessionID;
|
||||||
char command;
|
char command = ' ';
|
||||||
string topologyPath;
|
string topologyPath;
|
||||||
|
|
||||||
bpo::options_description options("fairmq-dds-command-ui options");
|
bpo::options_description options("fairmq-dds-command-ui options");
|
||||||
|
|
|
@ -68,10 +68,9 @@ FairMQPollerSHM::FairMQPollerSHM(const unordered_map<string, vector<FairMQChanne
|
||||||
, fNumItems(0)
|
, fNumItems(0)
|
||||||
, fOffsetMap()
|
, fOffsetMap()
|
||||||
{
|
{
|
||||||
int offset = 0;
|
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
int offset = 0;
|
||||||
// calculate offsets and the total size of the poll item set
|
// calculate offsets and the total size of the poll item set
|
||||||
for (string channel : channelList)
|
for (string channel : channelList)
|
||||||
{
|
{
|
||||||
|
@ -189,7 +188,7 @@ bool FairMQPollerSHM::CheckOutput(const int index)
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool FairMQPollerSHM::CheckInput(const string channelKey, const int index)
|
bool FairMQPollerSHM::CheckInput(const string& channelKey, const int index)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
@ -208,7 +207,7 @@ bool FairMQPollerSHM::CheckInput(const string channelKey, const int index)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool FairMQPollerSHM::CheckOutput(const string channelKey, const int index)
|
bool FairMQPollerSHM::CheckOutput(const string& channelKey, const int index)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
|
|
@ -37,8 +37,8 @@ class FairMQPollerSHM : public FairMQPoller
|
||||||
void Poll(const int timeout) override;
|
void Poll(const int timeout) override;
|
||||||
bool CheckInput(const int index) override;
|
bool CheckInput(const int index) override;
|
||||||
bool CheckOutput(const int index) override;
|
bool CheckOutput(const int index) override;
|
||||||
bool CheckInput(const std::string channelKey, const int index) override;
|
bool CheckInput(const std::string& channelKey, const int index) override;
|
||||||
bool CheckOutput(const std::string channelKey, const int index) override;
|
bool CheckOutput(const std::string& channelKey, const int index) override;
|
||||||
|
|
||||||
~FairMQPollerSHM() override;
|
~FairMQPollerSHM() override;
|
||||||
|
|
||||||
|
|
|
@ -121,12 +121,11 @@ int64_t FairMQSocketSHM::TryReceive(vector<unique_ptr<FairMQMessage>>& msgVec) {
|
||||||
|
|
||||||
int FairMQSocketSHM::SendImpl(FairMQMessagePtr& msg, const int flags, const int timeout)
|
int FairMQSocketSHM::SendImpl(FairMQMessagePtr& msg, const int flags, const int timeout)
|
||||||
{
|
{
|
||||||
int nbytes = -1;
|
|
||||||
int elapsed = 0;
|
int elapsed = 0;
|
||||||
|
|
||||||
while (true && !fInterrupted)
|
while (true && !fInterrupted)
|
||||||
{
|
{
|
||||||
nbytes = zmq_msg_send(static_cast<FairMQMessageSHM*>(msg.get())->GetMessage(), fSocket, flags);
|
int nbytes = zmq_msg_send(static_cast<FairMQMessageSHM*>(msg.get())->GetMessage(), fSocket, flags);
|
||||||
if (nbytes == 0)
|
if (nbytes == 0)
|
||||||
{
|
{
|
||||||
return nbytes;
|
return nbytes;
|
||||||
|
@ -177,13 +176,12 @@ int FairMQSocketSHM::SendImpl(FairMQMessagePtr& msg, const int flags, const int
|
||||||
|
|
||||||
int FairMQSocketSHM::ReceiveImpl(FairMQMessagePtr& msg, const int flags, const int timeout)
|
int FairMQSocketSHM::ReceiveImpl(FairMQMessagePtr& msg, const int flags, const int timeout)
|
||||||
{
|
{
|
||||||
int nbytes = -1;
|
|
||||||
int elapsed = 0;
|
int elapsed = 0;
|
||||||
|
|
||||||
zmq_msg_t* msgPtr = static_cast<FairMQMessageSHM*>(msg.get())->GetMessage();
|
zmq_msg_t* msgPtr = static_cast<FairMQMessageSHM*>(msg.get())->GetMessage();
|
||||||
while (true)
|
while (true)
|
||||||
{
|
{
|
||||||
nbytes = zmq_msg_recv(msgPtr, fSocket, flags);
|
int nbytes = zmq_msg_recv(msgPtr, fSocket, flags);
|
||||||
if (nbytes == 0)
|
if (nbytes == 0)
|
||||||
{
|
{
|
||||||
++fMessagesRx;
|
++fMessagesRx;
|
||||||
|
@ -249,7 +247,6 @@ int FairMQSocketSHM::ReceiveImpl(FairMQMessagePtr& msg, const int flags, const i
|
||||||
int64_t FairMQSocketSHM::SendImpl(vector<FairMQMessagePtr>& msgVec, const int flags, const int timeout)
|
int64_t FairMQSocketSHM::SendImpl(vector<FairMQMessagePtr>& msgVec, const int flags, const int timeout)
|
||||||
{
|
{
|
||||||
const unsigned int vecSize = msgVec.size();
|
const unsigned int vecSize = msgVec.size();
|
||||||
int64_t totalSize = 0;
|
|
||||||
int elapsed = 0;
|
int elapsed = 0;
|
||||||
|
|
||||||
if (vecSize == 1) {
|
if (vecSize == 1) {
|
||||||
|
@ -263,7 +260,7 @@ int64_t FairMQSocketSHM::SendImpl(vector<FairMQMessagePtr>& msgVec, const int fl
|
||||||
// prepare the message with shm metas
|
// prepare the message with shm metas
|
||||||
MetaHeader* metas = static_cast<MetaHeader*>(zmq_msg_data(&zmqMsg));
|
MetaHeader* metas = static_cast<MetaHeader*>(zmq_msg_data(&zmqMsg));
|
||||||
|
|
||||||
for (auto &msg : msgVec)
|
for (auto& msg : msgVec)
|
||||||
{
|
{
|
||||||
zmq_msg_t* metaMsg = static_cast<FairMQMessageSHM*>(msg.get())->GetMessage();
|
zmq_msg_t* metaMsg = static_cast<FairMQMessageSHM*>(msg.get())->GetMessage();
|
||||||
memcpy(metas++, zmq_msg_data(metaMsg), sizeof(MetaHeader));
|
memcpy(metas++, zmq_msg_data(metaMsg), sizeof(MetaHeader));
|
||||||
|
@ -271,9 +268,8 @@ int64_t FairMQSocketSHM::SendImpl(vector<FairMQMessagePtr>& msgVec, const int fl
|
||||||
|
|
||||||
while (!fInterrupted)
|
while (!fInterrupted)
|
||||||
{
|
{
|
||||||
int nbytes = -1;
|
int64_t totalSize = 0;
|
||||||
nbytes = zmq_msg_send(&zmqMsg, fSocket, flags);
|
int nbytes = zmq_msg_send(&zmqMsg, fSocket, flags);
|
||||||
|
|
||||||
if (nbytes == 0)
|
if (nbytes == 0)
|
||||||
{
|
{
|
||||||
zmq_msg_close(&zmqMsg);
|
zmq_msg_close(&zmqMsg);
|
||||||
|
@ -283,7 +279,7 @@ int64_t FairMQSocketSHM::SendImpl(vector<FairMQMessagePtr>& msgVec, const int fl
|
||||||
{
|
{
|
||||||
assert(nbytes == (vecSize * sizeof(MetaHeader))); // all or nothing
|
assert(nbytes == (vecSize * sizeof(MetaHeader))); // all or nothing
|
||||||
|
|
||||||
for (auto &msg : msgVec)
|
for (auto& msg : msgVec)
|
||||||
{
|
{
|
||||||
FairMQMessageSHM* shmMsg = static_cast<FairMQMessageSHM*>(msg.get());
|
FairMQMessageSHM* shmMsg = static_cast<FairMQMessageSHM*>(msg.get());
|
||||||
shmMsg->fQueued = true;
|
shmMsg->fQueued = true;
|
||||||
|
@ -338,7 +334,6 @@ int64_t FairMQSocketSHM::SendImpl(vector<FairMQMessagePtr>& msgVec, const int fl
|
||||||
|
|
||||||
int64_t FairMQSocketSHM::ReceiveImpl(vector<FairMQMessagePtr>& msgVec, const int flags, const int timeout)
|
int64_t FairMQSocketSHM::ReceiveImpl(vector<FairMQMessagePtr>& msgVec, const int flags, const int timeout)
|
||||||
{
|
{
|
||||||
int64_t totalSize = 0;
|
|
||||||
int elapsed = 0;
|
int elapsed = 0;
|
||||||
|
|
||||||
zmq_msg_t zmqMsg;
|
zmq_msg_t zmqMsg;
|
||||||
|
@ -346,6 +341,7 @@ int64_t FairMQSocketSHM::ReceiveImpl(vector<FairMQMessagePtr>& msgVec, const int
|
||||||
|
|
||||||
while (!fInterrupted)
|
while (!fInterrupted)
|
||||||
{
|
{
|
||||||
|
int64_t totalSize = 0;
|
||||||
int nbytes = zmq_msg_recv(&zmqMsg, fSocket, flags);
|
int nbytes = zmq_msg_recv(&zmqMsg, fSocket, flags);
|
||||||
if (nbytes == 0)
|
if (nbytes == 0)
|
||||||
{
|
{
|
||||||
|
|
|
@ -145,8 +145,6 @@ FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const Fai
|
||||||
|
|
||||||
void FairMQTransportFactorySHM::StartMonitor()
|
void FairMQTransportFactorySHM::StartMonitor()
|
||||||
{
|
{
|
||||||
int numTries = 0;
|
|
||||||
|
|
||||||
auto env = boost::this_process::environment();
|
auto env = boost::this_process::environment();
|
||||||
|
|
||||||
vector<bfs::path> ownPath = boost::this_process::path();
|
vector<bfs::path> ownPath = boost::this_process::path();
|
||||||
|
@ -161,7 +159,7 @@ void FairMQTransportFactorySHM::StartMonitor()
|
||||||
if (!p.empty())
|
if (!p.empty())
|
||||||
{
|
{
|
||||||
boost::process::spawn(p, "-x", "-s", fSessionName, "-d", "-t", "2000", env);
|
boost::process::spawn(p, "-x", "-s", fSessionName, "-d", "-t", "2000", env);
|
||||||
|
int numTries = 0;
|
||||||
do
|
do
|
||||||
{
|
{
|
||||||
MonitorStatus* monitorStatus = fManager->ManagementSegment().find<MonitorStatus>(bipc::unique_instance).first;
|
MonitorStatus* monitorStatus = fManager->ManagementSegment().find<MonitorStatus>(bipc::unique_instance).first;
|
||||||
|
|
|
@ -245,11 +245,11 @@ void Monitor::Interactive()
|
||||||
|
|
||||||
void Monitor::CheckSegment()
|
void Monitor::CheckSegment()
|
||||||
{
|
{
|
||||||
static uint64_t counter = 0;
|
|
||||||
char c = '#';
|
char c = '#';
|
||||||
|
|
||||||
if (fInteractive)
|
if (fInteractive)
|
||||||
{
|
{
|
||||||
|
static uint64_t counter = 0;
|
||||||
int mod = counter++ % 5;
|
int mod = counter++ % 5;
|
||||||
switch (mod)
|
switch (mod)
|
||||||
{
|
{
|
||||||
|
|
|
@ -71,7 +71,7 @@ int main(int argc, char** argv)
|
||||||
bool cleanup = false;
|
bool cleanup = false;
|
||||||
bool selfDestruct = false;
|
bool selfDestruct = false;
|
||||||
bool interactive = false;
|
bool interactive = false;
|
||||||
unsigned int timeoutInMS;
|
unsigned int timeoutInMS = 5000;
|
||||||
bool runAsDaemon = false;
|
bool runAsDaemon = false;
|
||||||
bool cleanOnExit = false;
|
bool cleanOnExit = false;
|
||||||
|
|
||||||
|
|
|
@ -30,7 +30,7 @@ namespace tools
|
||||||
* @param[in] log_prefix How to prefix each captured output line with
|
* @param[in] log_prefix How to prefix each captured output line with
|
||||||
* @return Captured stdout output and exit code
|
* @return Captured stdout output and exit code
|
||||||
*/
|
*/
|
||||||
execute_result execute(string cmd, string prefix)
|
execute_result execute(const string& cmd, const string& prefix)
|
||||||
{
|
{
|
||||||
execute_result result;
|
execute_result result;
|
||||||
stringstream out;
|
stringstream out;
|
||||||
|
|
|
@ -35,7 +35,7 @@ struct execute_result
|
||||||
* @param[in] log_prefix How to prefix each captured output line with
|
* @param[in] log_prefix How to prefix each captured output line with
|
||||||
* @return Captured stdout output and exit code
|
* @return Captured stdout output and exit code
|
||||||
*/
|
*/
|
||||||
execute_result execute(std::string cmd, std::string prefix = "");
|
execute_result execute(const std::string& cmd, const std::string& prefix = "");
|
||||||
|
|
||||||
} /* namespace tools */
|
} /* namespace tools */
|
||||||
} /* namespace mq */
|
} /* namespace mq */
|
||||||
|
|
|
@ -69,10 +69,9 @@ FairMQPollerZMQ::FairMQPollerZMQ(const unordered_map<string, vector<FairMQChanne
|
||||||
, fNumItems(0)
|
, fNumItems(0)
|
||||||
, fOffsetMap()
|
, fOffsetMap()
|
||||||
{
|
{
|
||||||
int offset = 0;
|
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
int offset = 0;
|
||||||
// calculate offsets and the total size of the poll item set
|
// calculate offsets and the total size of the poll item set
|
||||||
for (string channel : channelList)
|
for (string channel : channelList)
|
||||||
{
|
{
|
||||||
|
@ -190,7 +189,7 @@ bool FairMQPollerZMQ::CheckOutput(const int index)
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool FairMQPollerZMQ::CheckInput(const string channelKey, const int index)
|
bool FairMQPollerZMQ::CheckInput(const string& channelKey, const int index)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
@ -209,7 +208,7 @@ bool FairMQPollerZMQ::CheckInput(const string channelKey, const int index)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool FairMQPollerZMQ::CheckOutput(const string channelKey, const int index)
|
bool FairMQPollerZMQ::CheckOutput(const string& channelKey, const int index)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
|
|
@ -45,8 +45,8 @@ class FairMQPollerZMQ : public FairMQPoller
|
||||||
virtual void Poll(const int timeout);
|
virtual void Poll(const int timeout);
|
||||||
virtual bool CheckInput(const int index);
|
virtual bool CheckInput(const int index);
|
||||||
virtual bool CheckOutput(const int index);
|
virtual bool CheckOutput(const int index);
|
||||||
virtual bool CheckInput(const std::string channelKey, const int index);
|
virtual bool CheckInput(const std::string& channelKey, const int index);
|
||||||
virtual bool CheckOutput(const std::string channelKey, const int index);
|
virtual bool CheckOutput(const std::string& channelKey, const int index);
|
||||||
|
|
||||||
virtual ~FairMQPollerZMQ();
|
virtual ~FairMQPollerZMQ();
|
||||||
|
|
||||||
|
|
|
@ -116,14 +116,13 @@ int64_t FairMQSocketZMQ::TryReceive(vector<unique_ptr<FairMQMessage>>& msgVec) {
|
||||||
|
|
||||||
int FairMQSocketZMQ::SendImpl(FairMQMessagePtr& msg, const int flags, const int timeout)
|
int FairMQSocketZMQ::SendImpl(FairMQMessagePtr& msg, const int flags, const int timeout)
|
||||||
{
|
{
|
||||||
int nbytes = -1;
|
|
||||||
int elapsed = 0;
|
int elapsed = 0;
|
||||||
|
|
||||||
static_cast<FairMQMessageZMQ*>(msg.get())->ApplyUsedSize();
|
static_cast<FairMQMessageZMQ*>(msg.get())->ApplyUsedSize();
|
||||||
|
|
||||||
while (true)
|
while (true)
|
||||||
{
|
{
|
||||||
nbytes = zmq_msg_send(static_cast<FairMQMessageZMQ*>(msg.get())->GetMessage(), fSocket, flags);
|
int nbytes = zmq_msg_send(static_cast<FairMQMessageZMQ*>(msg.get())->GetMessage(), fSocket, flags);
|
||||||
if (nbytes >= 0)
|
if (nbytes >= 0)
|
||||||
{
|
{
|
||||||
fBytesTx += nbytes;
|
fBytesTx += nbytes;
|
||||||
|
@ -212,25 +211,21 @@ int FairMQSocketZMQ::ReceiveImpl(FairMQMessagePtr& msg, const int flags, const i
|
||||||
int64_t FairMQSocketZMQ::SendImpl(vector<FairMQMessagePtr>& msgVec, const int flags, const int timeout)
|
int64_t FairMQSocketZMQ::SendImpl(vector<FairMQMessagePtr>& msgVec, const int flags, const int timeout)
|
||||||
{
|
{
|
||||||
const unsigned int vecSize = msgVec.size();
|
const unsigned int vecSize = msgVec.size();
|
||||||
int elapsed = 0;
|
|
||||||
|
|
||||||
// Sending vector typicaly handles more then one part
|
// Sending vector typicaly handles more then one part
|
||||||
if (vecSize > 1)
|
if (vecSize > 1)
|
||||||
{
|
{
|
||||||
int64_t totalSize = 0;
|
int elapsed = 0;
|
||||||
int nbytes = -1;
|
|
||||||
bool repeat = false;
|
|
||||||
|
|
||||||
while (true)
|
while (true)
|
||||||
{
|
{
|
||||||
totalSize = 0;
|
int64_t totalSize = 0;
|
||||||
repeat = false;
|
bool repeat = false;
|
||||||
|
|
||||||
for (unsigned int i = 0; i < vecSize; ++i)
|
for (unsigned int i = 0; i < vecSize; ++i)
|
||||||
{
|
{
|
||||||
static_cast<FairMQMessageZMQ*>(msgVec[i].get())->ApplyUsedSize();
|
static_cast<FairMQMessageZMQ*>(msgVec[i].get())->ApplyUsedSize();
|
||||||
|
|
||||||
nbytes = zmq_msg_send(static_cast<FairMQMessageZMQ*>(msgVec[i].get())->GetMessage(),
|
int nbytes = zmq_msg_send(static_cast<FairMQMessageZMQ*>(msgVec[i].get())->GetMessage(),
|
||||||
fSocket,
|
fSocket,
|
||||||
(i < vecSize - 1) ? ZMQ_SNDMORE|flags : flags);
|
(i < vecSize - 1) ? ZMQ_SNDMORE|flags : flags);
|
||||||
if (nbytes >= 0)
|
if (nbytes >= 0)
|
||||||
|
@ -294,16 +289,13 @@ int64_t FairMQSocketZMQ::SendImpl(vector<FairMQMessagePtr>& msgVec, const int fl
|
||||||
|
|
||||||
int64_t FairMQSocketZMQ::ReceiveImpl(vector<FairMQMessagePtr>& msgVec, const int flags, const int timeout)
|
int64_t FairMQSocketZMQ::ReceiveImpl(vector<FairMQMessagePtr>& msgVec, const int flags, const int timeout)
|
||||||
{
|
{
|
||||||
int64_t totalSize = 0;
|
|
||||||
int64_t more = 0;
|
|
||||||
bool repeat = false;
|
|
||||||
int elapsed = 0;
|
int elapsed = 0;
|
||||||
|
|
||||||
while (true)
|
while (true)
|
||||||
{
|
{
|
||||||
totalSize = 0;
|
int64_t totalSize = 0;
|
||||||
more = 0;
|
int64_t more = 0;
|
||||||
repeat = false;
|
bool repeat = false;
|
||||||
|
|
||||||
do
|
do
|
||||||
{
|
{
|
||||||
|
|
Loading…
Reference in New Issue
Block a user