mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-15 09:31:45 +00:00
Compare commits
5 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
bbc1dd4600 | ||
|
8327810942 | ||
|
c37742e3b4 | ||
|
93dff3c5a7 | ||
|
2b3e38d9a4 |
@@ -1,5 +1,5 @@
|
||||
/********************************************************************************
|
||||
* Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* Copyright (C) 2017-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public License (LGPL) version 3, *
|
||||
@@ -84,6 +84,7 @@ Properties SuboptParser(const vector<string>& channelConfig, const string& devic
|
||||
char* subopts = &argString[0];
|
||||
char* value = nullptr;
|
||||
while (subopts && *subopts != 0 && *subopts != ' ') {
|
||||
char* cur = subopts;
|
||||
int subopt = getsubopt(&subopts, (char**)channelOptionKeys, &value);
|
||||
if (subopt == NAME) {
|
||||
channelName = value;
|
||||
@@ -94,6 +95,8 @@ Properties SuboptParser(const vector<string>& channelConfig, const string& devic
|
||||
socketsArray.push_back(make_pair("", socketProperties));
|
||||
} else if (subopt >= 0 && value != nullptr) {
|
||||
channelProperties.put(channelOptionKeys[subopt], value);
|
||||
} else if (subopt == -1) {
|
||||
LOG(warn) << "Ignoring unknown argument in --channel-config: " << cur;
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -21,7 +21,7 @@
|
||||
#define FAIRMQ_GIT_DATE "@PROJECT_GIT_DATE@"
|
||||
#define FAIRMQ_REPO_URL "https://github.com/FairRootGroup/FairMQ"
|
||||
#define FAIRMQ_LICENSE "LGPL-3.0"
|
||||
#define FAIRMQ_COPYRIGHT "2012-2020 GSI"
|
||||
#define FAIRMQ_COPYRIGHT "2012-2021 GSI"
|
||||
#define FAIRMQ_BUILD_TYPE "@CMAKE_BUILD_TYPE@"
|
||||
|
||||
#endif // FAIR_MQ_VERSION_H
|
||||
|
@@ -17,19 +17,24 @@
|
||||
|
||||
#include "../FairMQDevice.h"
|
||||
#include "../FairMQLogger.h"
|
||||
#include <fairmq/tools/Strings.h>
|
||||
|
||||
#include <chrono>
|
||||
#include <string>
|
||||
#include <fstream>
|
||||
#include <stdexcept>
|
||||
|
||||
// template<typename OutputPolicy>
|
||||
class FairMQSink : public FairMQDevice //, public OutputPolicy
|
||||
class FairMQSink : public FairMQDevice
|
||||
{
|
||||
public:
|
||||
FairMQSink()
|
||||
: fMultipart(false)
|
||||
, fMaxIterations(0)
|
||||
, fNumIterations(0)
|
||||
, fMaxFileSize(0)
|
||||
, fBytesWritten(0)
|
||||
, fInChannelName()
|
||||
, fOutFilename()
|
||||
{}
|
||||
|
||||
~FairMQSink() {}
|
||||
@@ -38,13 +43,21 @@ class FairMQSink : public FairMQDevice //, public OutputPolicy
|
||||
bool fMultipart;
|
||||
uint64_t fMaxIterations;
|
||||
uint64_t fNumIterations;
|
||||
uint64_t fMaxFileSize;
|
||||
uint64_t fBytesWritten;
|
||||
std::string fInChannelName;
|
||||
std::string fOutFilename;
|
||||
std::fstream fOutputFile;
|
||||
|
||||
void InitTask() override
|
||||
{
|
||||
fMultipart = fConfig->GetProperty<bool>("multipart");
|
||||
fMultipart = fConfig->GetProperty<bool>("multipart");
|
||||
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
|
||||
fMaxFileSize = fConfig->GetProperty<uint64_t>("max-file-size");
|
||||
fInChannelName = fConfig->GetProperty<std::string>("in-channel");
|
||||
fOutFilename = fConfig->GetProperty<std::string>("out-filename");
|
||||
|
||||
fBytesWritten = 0;
|
||||
}
|
||||
|
||||
void Run() override
|
||||
@@ -52,41 +65,83 @@ class FairMQSink : public FairMQDevice //, public OutputPolicy
|
||||
// store the channel reference to avoid traversing the map on every loop iteration
|
||||
FairMQChannel& dataInChannel = fChannels.at(fInChannelName).at(0);
|
||||
|
||||
LOG(info) << "Starting the benchmark and expecting to receive " << fMaxIterations << " messages.";
|
||||
LOG(info) << "Starting sink and expecting to receive " << fMaxIterations << " messages.";
|
||||
auto tStart = std::chrono::high_resolution_clock::now();
|
||||
|
||||
if (!fOutFilename.empty()) {
|
||||
LOG(debug) << "Incoming messages will be written to file: " << fOutFilename;
|
||||
if (fMaxFileSize != 0) {
|
||||
LOG(debug) << "File output will stop after " << fMaxFileSize << " bytes";
|
||||
} else {
|
||||
LOG(debug) << "ATTENTION: --max-file-size is 0 - output file will continue to grow until sink is stopped";
|
||||
}
|
||||
|
||||
fOutputFile.open(fOutFilename, std::ios::out | std::ios::binary);
|
||||
if (!fOutputFile) {
|
||||
LOG(error) << "Could not open '" << fOutFilename;
|
||||
throw std::runtime_error(fair::mq::tools::ToString("Could not open '", fOutFilename));
|
||||
}
|
||||
}
|
||||
|
||||
while (!NewStatePending()) {
|
||||
if (fMultipart) {
|
||||
FairMQParts parts;
|
||||
|
||||
if (dataInChannel.Receive(parts) >= 0) {
|
||||
if (fMaxIterations > 0) {
|
||||
if (fNumIterations >= fMaxIterations) {
|
||||
LOG(info) << "Configured maximum number of iterations reached.";
|
||||
break;
|
||||
}
|
||||
if (dataInChannel.Receive(parts) < 0) {
|
||||
continue;
|
||||
}
|
||||
if (fOutputFile.is_open()) {
|
||||
for (const auto& part : parts) {
|
||||
WriteToFile(static_cast<const char*>(part->GetData()), part->GetSize());
|
||||
}
|
||||
fNumIterations++;
|
||||
}
|
||||
} else {
|
||||
FairMQMessagePtr msg(dataInChannel.NewMessage());
|
||||
|
||||
if (dataInChannel.Receive(msg) >= 0) {
|
||||
if (fMaxIterations > 0) {
|
||||
if (fNumIterations >= fMaxIterations) {
|
||||
LOG(info) << "Configured maximum number of iterations reached.";
|
||||
break;
|
||||
}
|
||||
}
|
||||
fNumIterations++;
|
||||
if (dataInChannel.Receive(msg) < 0) {
|
||||
continue;
|
||||
}
|
||||
if (fOutputFile.is_open()) {
|
||||
WriteToFile(static_cast<const char*>(msg->GetData()), msg->GetSize());
|
||||
}
|
||||
}
|
||||
|
||||
if (fMaxFileSize > 0 && fBytesWritten >= fMaxFileSize) {
|
||||
LOG(info) << "Written " << fBytesWritten << " bytes, stopping...";
|
||||
break;
|
||||
}
|
||||
if (fMaxIterations > 0) {
|
||||
if (fNumIterations >= fMaxIterations) {
|
||||
LOG(info) << "Configured maximum number of iterations reached.";
|
||||
break;
|
||||
}
|
||||
}
|
||||
fNumIterations++;
|
||||
}
|
||||
|
||||
if (fOutputFile.is_open()) {
|
||||
fOutputFile.flush();
|
||||
fOutputFile.close();
|
||||
}
|
||||
|
||||
auto tEnd = std::chrono::high_resolution_clock::now();
|
||||
auto ms = std::chrono::duration<double, std::milli>(tEnd - tStart).count();
|
||||
LOG(info) << "Received " << fNumIterations << " messages in " << ms << "ms.";
|
||||
if (!fOutFilename.empty()) {
|
||||
auto sec = std::chrono::duration<double>(tEnd - tStart).count();
|
||||
LOG(info) << "Closed '" << fOutFilename << "' after writing " << fBytesWritten << " bytes."
|
||||
<< "(" << (fBytesWritten / (1000. * 1000.)) / sec << " MB/s)";
|
||||
}
|
||||
|
||||
LOG(info) << "Leaving RUNNING state. Received " << fNumIterations << " messages in "
|
||||
<< std::chrono::duration<double, std::milli>(tEnd - tStart).count() << "ms.";
|
||||
LOG(info) << "Leaving RUNNING state.";
|
||||
}
|
||||
|
||||
void WriteToFile(const char* ptr, size_t size)
|
||||
{
|
||||
fOutputFile.write(ptr, size);
|
||||
if (fOutputFile.bad()) {
|
||||
LOG(error) << "failed writing to file";
|
||||
throw std::runtime_error("failed writing to file");
|
||||
}
|
||||
fBytesWritten += size;
|
||||
}
|
||||
};
|
||||
|
||||
|
@@ -15,6 +15,8 @@ void addCustomOptions(bpo::options_description& options)
|
||||
{
|
||||
options.add_options()
|
||||
("in-channel", bpo::value<std::string>()->default_value("data"), "Name of the input channel")
|
||||
("out-filename", bpo::value<std::string>()->default_value(""), "Write incoming message buffers to the specified file")
|
||||
("max-file-size", bpo::value<uint64_t>()->default_value(2000000000), "Maximum file size for the file output (0 - unlimited)")
|
||||
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Number of run iterations (0 - infinite)")
|
||||
("multipart", bpo::value<bool>()->default_value(false), "Handle multipart payloads");
|
||||
}
|
||||
|
@@ -45,6 +45,27 @@ namespace
|
||||
namespace fair::mq::shmem
|
||||
{
|
||||
|
||||
struct TerminalConfig
|
||||
{
|
||||
TerminalConfig()
|
||||
{
|
||||
termios t;
|
||||
tcgetattr(STDIN_FILENO, &t); // get the current terminal I/O structure
|
||||
t.c_lflag &= ~ICANON; // disable canonical input
|
||||
t.c_lflag &= ~ECHO; // do not echo input chars
|
||||
tcsetattr(STDIN_FILENO, TCSANOW, &t); // apply the new settings
|
||||
}
|
||||
|
||||
~TerminalConfig()
|
||||
{
|
||||
termios t;
|
||||
tcgetattr(STDIN_FILENO, &t); // get the current terminal I/O structure
|
||||
t.c_lflag |= ICANON; // re-enable canonical input
|
||||
t.c_lflag |= ECHO; // echo input chars
|
||||
tcsetattr(STDIN_FILENO, TCSANOW, &t); // apply the new settings
|
||||
}
|
||||
};
|
||||
|
||||
void signalHandler(int signal)
|
||||
{
|
||||
gSignalStatus = signal;
|
||||
@@ -115,6 +136,8 @@ void Monitor::Run()
|
||||
|
||||
if (fInteractive) {
|
||||
Interactive();
|
||||
} else if (fViewOnly) {
|
||||
CheckSegment();
|
||||
} else {
|
||||
while (!fTerminating) {
|
||||
this_thread::sleep_for(chrono::milliseconds(fIntervalInMS));
|
||||
@@ -154,27 +177,6 @@ void Monitor::MonitorHeartbeats()
|
||||
RemoveQueue(fControlQueueName);
|
||||
}
|
||||
|
||||
struct TerminalConfig
|
||||
{
|
||||
TerminalConfig()
|
||||
{
|
||||
termios t;
|
||||
tcgetattr(STDIN_FILENO, &t); // get the current terminal I/O structure
|
||||
t.c_lflag &= ~ICANON; // disable canonical input
|
||||
t.c_lflag &= ~ECHO; // do not echo input chars
|
||||
tcsetattr(STDIN_FILENO, TCSANOW, &t); // apply the new settings
|
||||
}
|
||||
|
||||
~TerminalConfig()
|
||||
{
|
||||
termios t;
|
||||
tcgetattr(STDIN_FILENO, &t); // get the current terminal I/O structure
|
||||
t.c_lflag |= ICANON; // re-enable canonical input
|
||||
t.c_lflag |= ECHO; // echo input chars
|
||||
tcsetattr(STDIN_FILENO, TCSANOW, &t); // apply the new settings
|
||||
}
|
||||
};
|
||||
|
||||
void Monitor::Interactive()
|
||||
{
|
||||
char c;
|
||||
|
@@ -125,13 +125,12 @@ int main(int argc, char** argv)
|
||||
}
|
||||
|
||||
cout << "Starting shared memory monitor for session: \"" << sessionName << "\" (shmId: " << shmId << ")..." << endl;
|
||||
if (viewOnly && !interactive) {
|
||||
cout << "running in non-interactive view-only mode, outputting with interval of " << intervalInMS << "ms. (change with --interval), press ctrl+C to exit." << endl;
|
||||
}
|
||||
|
||||
Monitor monitor(shmId, selfDestruct, interactive, viewOnly, timeoutInMS, intervalInMS, runAsDaemon, cleanOnExit);
|
||||
|
||||
monitor.CatchSignals();
|
||||
if (interactive || !viewOnly) {
|
||||
monitor.CatchSignals();
|
||||
}
|
||||
monitor.Run();
|
||||
} catch (Monitor::DaemonPresent& dp) {
|
||||
return 0;
|
||||
|
Reference in New Issue
Block a user