From 3353e214a77b8baeed69983ead0dea7bad608b74 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Thu, 7 Apr 2016 17:02:37 +0200 Subject: [PATCH] Add shared memory example - Add shared memory example in examples/MQ/SharedMemory - Device/Task termination: try soft first, and abort if it fails - Interactive mode: prevent cin from blocking forever (poll) --- fairmq/FairMQDevice.cxx | 133 +++++++++++++++++------------ fairmq/FairMQDevice.h | 3 + fairmq/deployment/CMakeLists.txt | 2 - fairmq/deployment/FairMQDDSTools.h | 3 +- fairmq/run/benchmark.json | 4 +- fairmq/zeromq/FairMQContextZMQ.h | 2 +- fairmq/zeromq/FairMQPollerZMQ.h | 2 + 7 files changed, 90 insertions(+), 59 deletions(-) diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index 02a80d13..127edfcf 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -18,6 +18,7 @@ #include #include // for the InteractiveStateLoop +#include #include #include // for choosing random port in range @@ -59,6 +60,8 @@ FairMQDevice::FairMQDevice() , fInitialValidationCondition() , fInitialValidationMutex() , fCatchingSignals(false) + , fTerminated(false) + , fRunning(false) { } @@ -86,10 +89,19 @@ void FairMQDevice::SignalHandler(int signal) Shutdown(); fTerminateStateThread.join(); - LOG(INFO) << "Exiting."; stop(); - // std::abort(); - exit(EXIT_FAILURE); + fRunning = false; + if (!fTerminated) + { + fTerminated = true; + LOG(INFO) << "Exiting."; + } + else + { + LOG(WARN) << "Repeated termination or bad initialization? Aborting."; + // std::abort(); + exit(EXIT_FAILURE); + } } void FairMQDevice::ConnectChannels(list& chans) @@ -728,8 +740,11 @@ void FairMQDevice::LogSocketRates() void FairMQDevice::InteractiveStateLoop() { - bool running = true; + fRunning = true; char c; // hold the user console input + pollfd cinfd[1]; + cinfd[0].fd = fileno(stdin); + cinfd[0].events = POLLIN; struct termios t; tcgetattr(STDIN_FILENO, &t); // get the current terminal I/O structure @@ -738,58 +753,68 @@ void FairMQDevice::InteractiveStateLoop() PrintInteractiveStateLoopHelp(); - while (running && cin >> c) + while (fRunning) { - switch (c) + if (poll(cinfd, 1, 500)) { - case 'i': - LOG(INFO) << "[i] init device"; - ChangeState("INIT_DEVICE"); - break; - case 'j': - LOG(INFO) << "[j] init task"; - ChangeState("INIT_TASK"); - break; - case 'p': - LOG(INFO) << "[p] pause"; - ChangeState("PAUSE"); - break; - case 'r': - LOG(INFO) << "[r] run"; - ChangeState("RUN"); - break; - case 's': - LOG(INFO) << "[s] stop"; - ChangeState("STOP"); - break; - case 't': - LOG(INFO) << "[t] reset task"; - ChangeState("RESET_TASK"); - break; - case 'd': - LOG(INFO) << "[d] reset device"; - ChangeState("RESET_DEVICE"); - break; - case 'h': - LOG(INFO) << "[h] help"; - PrintInteractiveStateLoopHelp(); - break; - // case 'x': - // LOG(INFO) << "[x] ERROR"; - // ChangeState("ERROR_FOUND"); - // break; - case 'q': - LOG(INFO) << "[q] end"; - ChangeState("END"); - if (CheckCurrentState("EXITING")) - { - running = false; - } - break; - default: - LOG(INFO) << "Invalid input: [" << c << "]"; - PrintInteractiveStateLoopHelp(); + if (!fRunning) + { break; + } + + cin >> c; + + switch (c) + { + case 'i': + LOG(INFO) << "[i] init device"; + ChangeState("INIT_DEVICE"); + break; + case 'j': + LOG(INFO) << "[j] init task"; + ChangeState("INIT_TASK"); + break; + case 'p': + LOG(INFO) << "[p] pause"; + ChangeState("PAUSE"); + break; + case 'r': + LOG(INFO) << "[r] run"; + ChangeState("RUN"); + break; + case 's': + LOG(INFO) << "[s] stop"; + ChangeState("STOP"); + break; + case 't': + LOG(INFO) << "[t] reset task"; + ChangeState("RESET_TASK"); + break; + case 'd': + LOG(INFO) << "[d] reset device"; + ChangeState("RESET_DEVICE"); + break; + case 'h': + LOG(INFO) << "[h] help"; + PrintInteractiveStateLoopHelp(); + break; + // case 'x': + // LOG(INFO) << "[x] ERROR"; + // ChangeState("ERROR_FOUND"); + // break; + case 'q': + LOG(INFO) << "[q] end"; + ChangeState("END"); + if (CheckCurrentState("EXITING")) + { + fRunning = false; + } + break; + default: + LOG(INFO) << "Invalid input: [" << c << "]"; + PrintInteractiveStateLoopHelp(); + break; + } } } @@ -921,4 +946,6 @@ FairMQDevice::~FairMQDevice() } delete fTransportFactory; + + LOG(DEBUG) << "Device destroyed"; } diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index 838797f3..6c078c54 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -330,6 +330,9 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable /// Signal handler void SignalHandler(int signal); bool fCatchingSignals; + bool fTerminated; + // Interactive state loop helper + std::atomic fRunning; }; #endif /* FAIRMQDEVICE_H_ */ diff --git a/fairmq/deployment/CMakeLists.txt b/fairmq/deployment/CMakeLists.txt index 6a149602..3dfeb738 100644 --- a/fairmq/deployment/CMakeLists.txt +++ b/fairmq/deployment/CMakeLists.txt @@ -58,5 +58,3 @@ ForEach(_file RANGE 0 ${_length}) set(DEPENDENCIES FairMQ dds_intercom_lib) GENERATE_EXECUTABLE() EndForEach(_file RANGE 0 ${_length}) - - diff --git a/fairmq/deployment/FairMQDDSTools.h b/fairmq/deployment/FairMQDDSTools.h index 2246358b..8f19484c 100644 --- a/fairmq/deployment/FairMQDDSTools.h +++ b/fairmq/deployment/FairMQDDSTools.h @@ -31,7 +31,8 @@ struct DDSConfig /// Addresses of binding channels are published via DDS using channels names as keys /// Addresses of connecting channels are collected from DDS using channels names as keys /// \param device Reference to FairMQDevice whose channels to handle -void HandleConfigViaDDS(FairMQDevice& device) +template +void HandleConfigViaDDS(TMQDevice& device) { // container for binding channels vector bindingChans; diff --git a/fairmq/run/benchmark.json b/fairmq/run/benchmark.json index 971412c0..f2774a20 100644 --- a/fairmq/run/benchmark.json +++ b/fairmq/run/benchmark.json @@ -11,7 +11,7 @@ { "type": "push", "method": "bind", - "address": "tcp://*:5555", + "address": "tcp://127.0.0.1:5555", "sndBufSize": "1000", "rcvBufSize": "1000", "rateLogging": "1" @@ -28,7 +28,7 @@ { "type": "pull", "method": "connect", - "address": "tcp://localhost:5555", + "address": "tcp://127.0.0.1:5555", "sndBufSize": "1000", "rcvBufSize": "1000", "rateLogging": "1" diff --git a/fairmq/zeromq/FairMQContextZMQ.h b/fairmq/zeromq/FairMQContextZMQ.h index fbe4c5f8..5de7da82 100644 --- a/fairmq/zeromq/FairMQContextZMQ.h +++ b/fairmq/zeromq/FairMQContextZMQ.h @@ -6,7 +6,7 @@ * copied verbatim in the file "LICENSE" * ********************************************************************************/ /** - * FairMQContext.h + * FairMQContextZMQ.h * * @since 2012-12-05 * @author D. Klein, A. Rybalchenko diff --git a/fairmq/zeromq/FairMQPollerZMQ.h b/fairmq/zeromq/FairMQPollerZMQ.h index 74eacc44..dc4e0b1c 100644 --- a/fairmq/zeromq/FairMQPollerZMQ.h +++ b/fairmq/zeromq/FairMQPollerZMQ.h @@ -19,6 +19,8 @@ #include #include +#include + #include "FairMQPoller.h" #include "FairMQChannel.h" #include "FairMQTransportFactoryZMQ.h"