Add optional file output to FairMQSink

This commit is contained in:
Alexey Rybalchenko 2021-03-01 13:56:16 +01:00 committed by Dennis Klein
parent 8327810942
commit bbc1dd4600
2 changed files with 80 additions and 23 deletions

View File

@ -17,19 +17,24 @@
#include "../FairMQDevice.h" #include "../FairMQDevice.h"
#include "../FairMQLogger.h" #include "../FairMQLogger.h"
#include <fairmq/tools/Strings.h>
#include <chrono> #include <chrono>
#include <string> #include <string>
#include <fstream>
#include <stdexcept>
// template<typename OutputPolicy> class FairMQSink : public FairMQDevice
class FairMQSink : public FairMQDevice //, public OutputPolicy
{ {
public: public:
FairMQSink() FairMQSink()
: fMultipart(false) : fMultipart(false)
, fMaxIterations(0) , fMaxIterations(0)
, fNumIterations(0) , fNumIterations(0)
, fMaxFileSize(0)
, fBytesWritten(0)
, fInChannelName() , fInChannelName()
, fOutFilename()
{} {}
~FairMQSink() {} ~FairMQSink() {}
@ -38,13 +43,21 @@ class FairMQSink : public FairMQDevice //, public OutputPolicy
bool fMultipart; bool fMultipart;
uint64_t fMaxIterations; uint64_t fMaxIterations;
uint64_t fNumIterations; uint64_t fNumIterations;
uint64_t fMaxFileSize;
uint64_t fBytesWritten;
std::string fInChannelName; std::string fInChannelName;
std::string fOutFilename;
std::fstream fOutputFile;
void InitTask() override void InitTask() override
{ {
fMultipart = fConfig->GetProperty<bool>("multipart"); fMultipart = fConfig->GetProperty<bool>("multipart");
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations"); fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
fMaxFileSize = fConfig->GetProperty<uint64_t>("max-file-size");
fInChannelName = fConfig->GetProperty<std::string>("in-channel"); fInChannelName = fConfig->GetProperty<std::string>("in-channel");
fOutFilename = fConfig->GetProperty<std::string>("out-filename");
fBytesWritten = 0;
} }
void Run() override void Run() override
@ -52,41 +65,83 @@ class FairMQSink : public FairMQDevice //, public OutputPolicy
// store the channel reference to avoid traversing the map on every loop iteration // store the channel reference to avoid traversing the map on every loop iteration
FairMQChannel& dataInChannel = fChannels.at(fInChannelName).at(0); FairMQChannel& dataInChannel = fChannels.at(fInChannelName).at(0);
LOG(info) << "Starting the benchmark and expecting to receive " << fMaxIterations << " messages."; LOG(info) << "Starting sink and expecting to receive " << fMaxIterations << " messages.";
auto tStart = std::chrono::high_resolution_clock::now(); auto tStart = std::chrono::high_resolution_clock::now();
if (!fOutFilename.empty()) {
LOG(debug) << "Incoming messages will be written to file: " << fOutFilename;
if (fMaxFileSize != 0) {
LOG(debug) << "File output will stop after " << fMaxFileSize << " bytes";
} else {
LOG(debug) << "ATTENTION: --max-file-size is 0 - output file will continue to grow until sink is stopped";
}
fOutputFile.open(fOutFilename, std::ios::out | std::ios::binary);
if (!fOutputFile) {
LOG(error) << "Could not open '" << fOutFilename;
throw std::runtime_error(fair::mq::tools::ToString("Could not open '", fOutFilename));
}
}
while (!NewStatePending()) { while (!NewStatePending()) {
if (fMultipart) { if (fMultipart) {
FairMQParts parts; FairMQParts parts;
if (dataInChannel.Receive(parts) < 0) {
if (dataInChannel.Receive(parts) >= 0) { continue;
if (fMaxIterations > 0) { }
if (fNumIterations >= fMaxIterations) { if (fOutputFile.is_open()) {
LOG(info) << "Configured maximum number of iterations reached."; for (const auto& part : parts) {
break; WriteToFile(static_cast<const char*>(part->GetData()), part->GetSize());
}
} }
fNumIterations++;
} }
} else { } else {
FairMQMessagePtr msg(dataInChannel.NewMessage()); FairMQMessagePtr msg(dataInChannel.NewMessage());
if (dataInChannel.Receive(msg) < 0) {
if (dataInChannel.Receive(msg) >= 0) { continue;
if (fMaxIterations > 0) { }
if (fNumIterations >= fMaxIterations) { if (fOutputFile.is_open()) {
LOG(info) << "Configured maximum number of iterations reached."; WriteToFile(static_cast<const char*>(msg->GetData()), msg->GetSize());
break;
}
}
fNumIterations++;
} }
} }
if (fMaxFileSize > 0 && fBytesWritten >= fMaxFileSize) {
LOG(info) << "Written " << fBytesWritten << " bytes, stopping...";
break;
}
if (fMaxIterations > 0) {
if (fNumIterations >= fMaxIterations) {
LOG(info) << "Configured maximum number of iterations reached.";
break;
}
}
fNumIterations++;
}
if (fOutputFile.is_open()) {
fOutputFile.flush();
fOutputFile.close();
} }
auto tEnd = std::chrono::high_resolution_clock::now(); auto tEnd = std::chrono::high_resolution_clock::now();
auto ms = std::chrono::duration<double, std::milli>(tEnd - tStart).count();
LOG(info) << "Received " << fNumIterations << " messages in " << ms << "ms.";
if (!fOutFilename.empty()) {
auto sec = std::chrono::duration<double>(tEnd - tStart).count();
LOG(info) << "Closed '" << fOutFilename << "' after writing " << fBytesWritten << " bytes."
<< "(" << (fBytesWritten / (1000. * 1000.)) / sec << " MB/s)";
}
LOG(info) << "Leaving RUNNING state. Received " << fNumIterations << " messages in " LOG(info) << "Leaving RUNNING state.";
<< std::chrono::duration<double, std::milli>(tEnd - tStart).count() << "ms."; }
void WriteToFile(const char* ptr, size_t size)
{
fOutputFile.write(ptr, size);
if (fOutputFile.bad()) {
LOG(error) << "failed writing to file";
throw std::runtime_error("failed writing to file");
}
fBytesWritten += size;
} }
}; };

View File

@ -15,6 +15,8 @@ void addCustomOptions(bpo::options_description& options)
{ {
options.add_options() options.add_options()
("in-channel", bpo::value<std::string>()->default_value("data"), "Name of the input channel") ("in-channel", bpo::value<std::string>()->default_value("data"), "Name of the input channel")
("out-filename", bpo::value<std::string>()->default_value(""), "Write incoming message buffers to the specified file")
("max-file-size", bpo::value<uint64_t>()->default_value(2000000000), "Maximum file size for the file output (0 - unlimited)")
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Number of run iterations (0 - infinite)") ("max-iterations", bpo::value<uint64_t>()->default_value(0), "Number of run iterations (0 - infinite)")
("multipart", bpo::value<bool>()->default_value(false), "Handle multipart payloads"); ("multipart", bpo::value<bool>()->default_value(false), "Handle multipart payloads");
} }