- Proper process termination:

if interrupted with CTRL+C blocking socket calls will return with -1. Each device should call FairMQDevice::Shutdown() before ending the running state to close open sockets, otherwise the interrupt call itself will block.

- FIX: Update number of received messages for FairMQFileSink.
- Add ability to poll on outputs for FairMQPoller.
This commit is contained in:
Alexey Rybalchenko
2014-08-12 09:11:51 +02:00
committed by Mohammad Al-Turany
parent 8cd120aef4
commit 0a610926a1
23 changed files with 236 additions and 208 deletions

View File

@@ -256,79 +256,61 @@ void FairMQDevice::LogSocketRates()
timestamp_t t0;
timestamp_t t1;
timestamp_t timeSinceLastLog_ms;
timestamp_t msSinceLastLog;
vector<unsigned long> bytesInput(fNumInputs);
vector<unsigned long> messagesInput(fNumInputs);
vector<unsigned long> bytesOutput(fNumOutputs);
vector<unsigned long> messagesOutput(fNumOutputs);
vector<unsigned long> bytesIn(fNumInputs);
vector<unsigned long> msgIn(fNumInputs);
vector<unsigned long> bytesOut(fNumOutputs);
vector<unsigned long> msgOut(fNumOutputs);
vector<unsigned long> bytesInputNew(fNumInputs);
vector<unsigned long> messagesInputNew(fNumInputs);
vector<unsigned long> bytesOutputNew(fNumOutputs);
vector<unsigned long> messagesOutputNew(fNumOutputs);
vector<unsigned long> bytesInNew(fNumInputs);
vector<unsigned long> msgInNew(fNumInputs);
vector<unsigned long> bytesOutNew(fNumOutputs);
vector<unsigned long> msgOutNew(fNumOutputs);
vector<double> megabytesPerSecondInput(fNumInputs);
vector<double> messagesPerSecondInput(fNumInputs);
vector<double> megabytesPerSecondOutput(fNumOutputs);
vector<double> messagesPerSecondOutput(fNumOutputs);
// Temp stuff for process termination
// bool receivedSomething = false;
// bool sentSomething = false;
// int didNotReceiveFor = 0;
// int didNotSendFor = 0;
// End of temp stuff
vector<double> mbPerSecIn(fNumInputs);
vector<double> msgPerSecIn(fNumInputs);
vector<double> mbPerSecOut(fNumOutputs);
vector<double> msgPerSecOut(fNumOutputs);
int i = 0;
for (vector<FairMQSocket*>::iterator itr = fPayloadInputs->begin(); itr != fPayloadInputs->end(); itr++)
{
bytesInput.at(i) = (*itr)->GetBytesRx();
messagesInput.at(i) = (*itr)->GetMessagesRx();
bytesIn.at(i) = (*itr)->GetBytesRx();
msgIn.at(i) = (*itr)->GetMessagesRx();
++i;
}
i = 0;
for (vector<FairMQSocket*>::iterator itr = fPayloadOutputs->begin(); itr != fPayloadOutputs->end(); itr++)
{
bytesOutput.at(i) = (*itr)->GetBytesTx();
messagesOutput.at(i) = (*itr)->GetMessagesTx();
bytesOut.at(i) = (*itr)->GetBytesTx();
msgOut.at(i) = (*itr)->GetMessagesTx();
++i;
}
t0 = get_timestamp();
while (true)
while (fState == RUNNING)
{
try
{
t1 = get_timestamp();
timeSinceLastLog_ms = (t1 - t0) / 1000.0L;
msSinceLastLog = (t1 - t0) / 1000.0L;
i = 0;
for (vector<FairMQSocket*>::iterator itr = fPayloadInputs->begin(); itr != fPayloadInputs->end(); itr++)
{
bytesInputNew.at(i) = (*itr)->GetBytesRx();
megabytesPerSecondInput.at(i) = ((double)(bytesInputNew.at(i) - bytesInput.at(i)) / (1024. * 1024.)) / (double)timeSinceLastLog_ms * 1000.;
bytesInput.at(i) = bytesInputNew.at(i);
messagesInputNew.at(i) = (*itr)->GetMessagesRx();
messagesPerSecondInput.at(i) = (double)(messagesInputNew.at(i) - messagesInput.at(i)) / (double)timeSinceLastLog_ms * 1000.;
messagesInput.at(i) = messagesInputNew.at(i);
bytesInNew.at(i) = (*itr)->GetBytesRx();
mbPerSecIn.at(i) = ((double)(bytesInNew.at(i) - bytesIn.at(i)) / (1024. * 1024.)) / (double)msSinceLastLog * 1000.;
bytesIn.at(i) = bytesInNew.at(i);
msgInNew.at(i) = (*itr)->GetMessagesRx();
msgPerSecIn.at(i) = (double)(msgInNew.at(i) - msgIn.at(i)) / (double)msSinceLastLog * 1000.;
msgIn.at(i) = msgInNew.at(i);
LOG(DEBUG) << "#" << fId << "." << (*itr)->GetId() << ": " << messagesPerSecondInput.at(i) << " msg/s, " << megabytesPerSecondInput.at(i) << " MB/s";
// Temp stuff for process termination
// if ( !receivedSomething && messagesPerSecondInput.at(i) > 0 ) {
// receivedSomething = true;
// }
// if ( receivedSomething && messagesPerSecondInput.at(i) == 0 ) {
// cout << "Did not receive anything on socket " << i << " for " << didNotReceiveFor++ << " seconds." << endl;
// } else {
// didNotReceiveFor = 0;
// }
// End of temp stuff
LOG(DEBUG) << "#" << fId << "." << (*itr)->GetId() << ": " << msgPerSecIn.at(i) << " msg/s, " << mbPerSecIn.at(i) << " MB/s";
++i;
}
@@ -337,52 +319,29 @@ void FairMQDevice::LogSocketRates()
for (vector<FairMQSocket*>::iterator itr = fPayloadOutputs->begin(); itr != fPayloadOutputs->end(); itr++)
{
bytesOutputNew.at(i) = (*itr)->GetBytesTx();
megabytesPerSecondOutput.at(i) = ((double)(bytesOutputNew.at(i) - bytesOutput.at(i)) / (1024. * 1024.)) / (double)timeSinceLastLog_ms * 1000.;
bytesOutput.at(i) = bytesOutputNew.at(i);
messagesOutputNew.at(i) = (*itr)->GetMessagesTx();
messagesPerSecondOutput.at(i) = (double)(messagesOutputNew.at(i) - messagesOutput.at(i)) / (double)timeSinceLastLog_ms * 1000.;
messagesOutput.at(i) = messagesOutputNew.at(i);
bytesOutNew.at(i) = (*itr)->GetBytesTx();
mbPerSecOut.at(i) = ((double)(bytesOutNew.at(i) - bytesOut.at(i)) / (1024. * 1024.)) / (double)msSinceLastLog * 1000.;
bytesOut.at(i) = bytesOutNew.at(i);
msgOutNew.at(i) = (*itr)->GetMessagesTx();
msgPerSecOut.at(i) = (double)(msgOutNew.at(i) - msgOut.at(i)) / (double)msSinceLastLog * 1000.;
msgOut.at(i) = msgOutNew.at(i);
LOG(DEBUG) << "#" << fId << "." << (*itr)->GetId() << ": " << messagesPerSecondOutput.at(i) << " msg/s, " << megabytesPerSecondOutput.at(i)
<< " MB/s";
// Temp stuff for process termination
// if ( !sentSomething && messagesPerSecondOutput.at(i) > 0 ) {
// sentSomething = true;
// }
// if ( sentSomething && messagesPerSecondOutput.at(i) == 0 ) {
// cout << "Did not send anything on socket " << i << " for " << didNotSendFor++ << " seconds." << endl;
// } else {
// didNotSendFor = 0;
// }
// End of temp stuff
LOG(DEBUG) << "#" << fId << "." << (*itr)->GetId() << ": " << msgPerSecOut.at(i) << " msg/s, " << mbPerSecOut.at(i) << " MB/s";
++i;
}
// Temp stuff for process termination
// if (receivedSomething && didNotReceiveFor > 5) {
// cout << "stopping because nothing was received for 5 seconds." << endl;
// ChangeState(STOP);
// }
// if (sentSomething && didNotSendFor > 5) {
// cout << "stopping because nothing was sent for 5 seconds." << endl;
// ChangeState(STOP);
// }
// End of temp stuff
t0 = t1;
boost::this_thread::sleep(boost::posix_time::milliseconds(fLogIntervalInMs));
}
catch (boost::thread_interrupted&)
{
cout << "rateLogger interrupted" << endl;
LOG(INFO) << "FairMQDevice::LogSocketRates() interrupted";
break;
}
}
LOG(INFO) << ">>>>>>> stopping rateLogger <<<<<<<";
LOG(INFO) << ">>>>>>> stopping FairMQDevice::LogSocketRates() <<<<<<<";
}
void FairMQDevice::ListenToCommands()
@@ -404,6 +363,24 @@ void FairMQDevice::Shutdown()
}
}
void FairMQDevice::Terminate()
{
// Termination signal has to be sent only once to any socket.
// Find available socket and send termination signal to it.
if (fPayloadInputs->size() > 0)
{
fPayloadInputs->at(0)->Terminate();
}
else if (fPayloadOutputs->size() > 0)
{
fPayloadOutputs->at(0)->Terminate();
}
else
{
LOG(ERROR) << "No sockets available to terminate.";
}
}
FairMQDevice::~FairMQDevice()
{
for (vector<FairMQSocket*>::iterator itr = fPayloadInputs->begin(); itr != fPayloadInputs->end(); itr++)