From bbc1dd460035c0ffe22aa9d0c506d6e59b333315 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Mon, 1 Mar 2021 13:56:16 +0100 Subject: [PATCH] Add optional file output to FairMQSink --- fairmq/devices/FairMQSink.h | 101 ++++++++++++++++++++++++++++-------- fairmq/run/runSink.cxx | 2 + 2 files changed, 80 insertions(+), 23 deletions(-) diff --git a/fairmq/devices/FairMQSink.h b/fairmq/devices/FairMQSink.h index 9d193c27..8308d66f 100644 --- a/fairmq/devices/FairMQSink.h +++ b/fairmq/devices/FairMQSink.h @@ -17,19 +17,24 @@ #include "../FairMQDevice.h" #include "../FairMQLogger.h" +#include #include #include +#include +#include -// template -class FairMQSink : public FairMQDevice //, public OutputPolicy +class FairMQSink : public FairMQDevice { public: FairMQSink() : fMultipart(false) , fMaxIterations(0) , fNumIterations(0) + , fMaxFileSize(0) + , fBytesWritten(0) , fInChannelName() + , fOutFilename() {} ~FairMQSink() {} @@ -38,13 +43,21 @@ class FairMQSink : public FairMQDevice //, public OutputPolicy bool fMultipart; uint64_t fMaxIterations; uint64_t fNumIterations; + uint64_t fMaxFileSize; + uint64_t fBytesWritten; std::string fInChannelName; + std::string fOutFilename; + std::fstream fOutputFile; void InitTask() override { - fMultipart = fConfig->GetProperty("multipart"); + fMultipart = fConfig->GetProperty("multipart"); fMaxIterations = fConfig->GetProperty("max-iterations"); + fMaxFileSize = fConfig->GetProperty("max-file-size"); fInChannelName = fConfig->GetProperty("in-channel"); + fOutFilename = fConfig->GetProperty("out-filename"); + + fBytesWritten = 0; } void Run() override @@ -52,41 +65,83 @@ class FairMQSink : public FairMQDevice //, public OutputPolicy // store the channel reference to avoid traversing the map on every loop iteration FairMQChannel& dataInChannel = fChannels.at(fInChannelName).at(0); - LOG(info) << "Starting the benchmark and expecting to receive " << fMaxIterations << " messages."; + LOG(info) << "Starting sink and expecting to receive " << fMaxIterations << " messages."; auto tStart = std::chrono::high_resolution_clock::now(); + if (!fOutFilename.empty()) { + LOG(debug) << "Incoming messages will be written to file: " << fOutFilename; + if (fMaxFileSize != 0) { + LOG(debug) << "File output will stop after " << fMaxFileSize << " bytes"; + } else { + LOG(debug) << "ATTENTION: --max-file-size is 0 - output file will continue to grow until sink is stopped"; + } + + fOutputFile.open(fOutFilename, std::ios::out | std::ios::binary); + if (!fOutputFile) { + LOG(error) << "Could not open '" << fOutFilename; + throw std::runtime_error(fair::mq::tools::ToString("Could not open '", fOutFilename)); + } + } + while (!NewStatePending()) { if (fMultipart) { FairMQParts parts; - - if (dataInChannel.Receive(parts) >= 0) { - if (fMaxIterations > 0) { - if (fNumIterations >= fMaxIterations) { - LOG(info) << "Configured maximum number of iterations reached."; - break; - } + if (dataInChannel.Receive(parts) < 0) { + continue; + } + if (fOutputFile.is_open()) { + for (const auto& part : parts) { + WriteToFile(static_cast(part->GetData()), part->GetSize()); } - fNumIterations++; } } else { FairMQMessagePtr msg(dataInChannel.NewMessage()); - - if (dataInChannel.Receive(msg) >= 0) { - if (fMaxIterations > 0) { - if (fNumIterations >= fMaxIterations) { - LOG(info) << "Configured maximum number of iterations reached."; - break; - } - } - fNumIterations++; + if (dataInChannel.Receive(msg) < 0) { + continue; + } + if (fOutputFile.is_open()) { + WriteToFile(static_cast(msg->GetData()), msg->GetSize()); } } + + if (fMaxFileSize > 0 && fBytesWritten >= fMaxFileSize) { + LOG(info) << "Written " << fBytesWritten << " bytes, stopping..."; + break; + } + if (fMaxIterations > 0) { + if (fNumIterations >= fMaxIterations) { + LOG(info) << "Configured maximum number of iterations reached."; + break; + } + } + fNumIterations++; + } + + if (fOutputFile.is_open()) { + fOutputFile.flush(); + fOutputFile.close(); } auto tEnd = std::chrono::high_resolution_clock::now(); + auto ms = std::chrono::duration(tEnd - tStart).count(); + LOG(info) << "Received " << fNumIterations << " messages in " << ms << "ms."; + if (!fOutFilename.empty()) { + auto sec = std::chrono::duration(tEnd - tStart).count(); + LOG(info) << "Closed '" << fOutFilename << "' after writing " << fBytesWritten << " bytes." + << "(" << (fBytesWritten / (1000. * 1000.)) / sec << " MB/s)"; + } - LOG(info) << "Leaving RUNNING state. Received " << fNumIterations << " messages in " - << std::chrono::duration(tEnd - tStart).count() << "ms."; + LOG(info) << "Leaving RUNNING state."; + } + + void WriteToFile(const char* ptr, size_t size) + { + fOutputFile.write(ptr, size); + if (fOutputFile.bad()) { + LOG(error) << "failed writing to file"; + throw std::runtime_error("failed writing to file"); + } + fBytesWritten += size; } }; diff --git a/fairmq/run/runSink.cxx b/fairmq/run/runSink.cxx index 987d22eb..194eb3d7 100644 --- a/fairmq/run/runSink.cxx +++ b/fairmq/run/runSink.cxx @@ -15,6 +15,8 @@ void addCustomOptions(bpo::options_description& options) { options.add_options() ("in-channel", bpo::value()->default_value("data"), "Name of the input channel") + ("out-filename", bpo::value()->default_value(""), "Write incoming message buffers to the specified file") + ("max-file-size", bpo::value()->default_value(2000000000), "Maximum file size for the file output (0 - unlimited)") ("max-iterations", bpo::value()->default_value(0), "Number of run iterations (0 - infinite)") ("multipart", bpo::value()->default_value(false), "Handle multipart payloads"); }