Add shared memory example

- Add shared memory example in examples/MQ/SharedMemory
 - Device/Task termination: try soft first, and abort if it fails
 - Interactive mode: prevent cin from blocking forever (poll)
This commit is contained in:
Alexey Rybalchenko 2016-04-07 17:02:37 +02:00
parent 599d1b3e05
commit 3353e214a7
7 changed files with 90 additions and 59 deletions

View File

@ -18,6 +18,7 @@
#include <cstdlib>
#include <termios.h> // for the InteractiveStateLoop
#include <poll.h>
#include <boost/thread.hpp>
#include <boost/random/mersenne_twister.hpp> // for choosing random port in range
@ -59,6 +60,8 @@ FairMQDevice::FairMQDevice()
, fInitialValidationCondition()
, fInitialValidationMutex()
, fCatchingSignals(false)
, fTerminated(false)
, fRunning(false)
{
}
@ -86,10 +89,19 @@ void FairMQDevice::SignalHandler(int signal)
Shutdown();
fTerminateStateThread.join();
LOG(INFO) << "Exiting.";
stop();
// std::abort();
exit(EXIT_FAILURE);
fRunning = false;
if (!fTerminated)
{
fTerminated = true;
LOG(INFO) << "Exiting.";
}
else
{
LOG(WARN) << "Repeated termination or bad initialization? Aborting.";
// std::abort();
exit(EXIT_FAILURE);
}
}
void FairMQDevice::ConnectChannels(list<FairMQChannel*>& chans)
@ -728,8 +740,11 @@ void FairMQDevice::LogSocketRates()
void FairMQDevice::InteractiveStateLoop()
{
bool running = true;
fRunning = true;
char c; // hold the user console input
pollfd cinfd[1];
cinfd[0].fd = fileno(stdin);
cinfd[0].events = POLLIN;
struct termios t;
tcgetattr(STDIN_FILENO, &t); // get the current terminal I/O structure
@ -738,58 +753,68 @@ void FairMQDevice::InteractiveStateLoop()
PrintInteractiveStateLoopHelp();
while (running && cin >> c)
while (fRunning)
{
switch (c)
if (poll(cinfd, 1, 500))
{
case 'i':
LOG(INFO) << "[i] init device";
ChangeState("INIT_DEVICE");
break;
case 'j':
LOG(INFO) << "[j] init task";
ChangeState("INIT_TASK");
break;
case 'p':
LOG(INFO) << "[p] pause";
ChangeState("PAUSE");
break;
case 'r':
LOG(INFO) << "[r] run";
ChangeState("RUN");
break;
case 's':
LOG(INFO) << "[s] stop";
ChangeState("STOP");
break;
case 't':
LOG(INFO) << "[t] reset task";
ChangeState("RESET_TASK");
break;
case 'd':
LOG(INFO) << "[d] reset device";
ChangeState("RESET_DEVICE");
break;
case 'h':
LOG(INFO) << "[h] help";
PrintInteractiveStateLoopHelp();
break;
// case 'x':
// LOG(INFO) << "[x] ERROR";
// ChangeState("ERROR_FOUND");
// break;
case 'q':
LOG(INFO) << "[q] end";
ChangeState("END");
if (CheckCurrentState("EXITING"))
{
running = false;
}
break;
default:
LOG(INFO) << "Invalid input: [" << c << "]";
PrintInteractiveStateLoopHelp();
if (!fRunning)
{
break;
}
cin >> c;
switch (c)
{
case 'i':
LOG(INFO) << "[i] init device";
ChangeState("INIT_DEVICE");
break;
case 'j':
LOG(INFO) << "[j] init task";
ChangeState("INIT_TASK");
break;
case 'p':
LOG(INFO) << "[p] pause";
ChangeState("PAUSE");
break;
case 'r':
LOG(INFO) << "[r] run";
ChangeState("RUN");
break;
case 's':
LOG(INFO) << "[s] stop";
ChangeState("STOP");
break;
case 't':
LOG(INFO) << "[t] reset task";
ChangeState("RESET_TASK");
break;
case 'd':
LOG(INFO) << "[d] reset device";
ChangeState("RESET_DEVICE");
break;
case 'h':
LOG(INFO) << "[h] help";
PrintInteractiveStateLoopHelp();
break;
// case 'x':
// LOG(INFO) << "[x] ERROR";
// ChangeState("ERROR_FOUND");
// break;
case 'q':
LOG(INFO) << "[q] end";
ChangeState("END");
if (CheckCurrentState("EXITING"))
{
fRunning = false;
}
break;
default:
LOG(INFO) << "Invalid input: [" << c << "]";
PrintInteractiveStateLoopHelp();
break;
}
}
}
@ -921,4 +946,6 @@ FairMQDevice::~FairMQDevice()
}
delete fTransportFactory;
LOG(DEBUG) << "Device destroyed";
}

View File

@ -330,6 +330,9 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable
/// Signal handler
void SignalHandler(int signal);
bool fCatchingSignals;
bool fTerminated;
// Interactive state loop helper
std::atomic<bool> fRunning;
};
#endif /* FAIRMQDEVICE_H_ */

View File

@ -58,5 +58,3 @@ ForEach(_file RANGE 0 ${_length})
set(DEPENDENCIES FairMQ dds_intercom_lib)
GENERATE_EXECUTABLE()
EndForEach(_file RANGE 0 ${_length})

View File

@ -31,7 +31,8 @@ struct DDSConfig
/// Addresses of binding channels are published via DDS using channels names as keys
/// Addresses of connecting channels are collected from DDS using channels names as keys
/// \param device Reference to FairMQDevice whose channels to handle
void HandleConfigViaDDS(FairMQDevice& device)
template<typename TMQDevice>
void HandleConfigViaDDS(TMQDevice& device)
{
// container for binding channels
vector<FairMQChannel*> bindingChans;

View File

@ -11,7 +11,7 @@
{
"type": "push",
"method": "bind",
"address": "tcp://*:5555",
"address": "tcp://127.0.0.1:5555",
"sndBufSize": "1000",
"rcvBufSize": "1000",
"rateLogging": "1"
@ -28,7 +28,7 @@
{
"type": "pull",
"method": "connect",
"address": "tcp://localhost:5555",
"address": "tcp://127.0.0.1:5555",
"sndBufSize": "1000",
"rcvBufSize": "1000",
"rateLogging": "1"

View File

@ -6,7 +6,7 @@
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQContext.h
* FairMQContextZMQ.h
*
* @since 2012-12-05
* @author D. Klein, A. Rybalchenko

View File

@ -19,6 +19,8 @@
#include <unordered_map>
#include <initializer_list>
#include <zmq.h>
#include "FairMQPoller.h"
#include "FairMQChannel.h"
#include "FairMQTransportFactoryZMQ.h"