Compare commits

..

14 Commits

Author SHA1 Message Date
Alexey Rybalchenko
e090967645 Shmem: Build shmem names out of session id + user id 2018-10-11 17:06:39 +02:00
Alexey Rybalchenko
1d45095d75 Add session id to example tests 2018-10-11 17:06:39 +02:00
Dennis Klein
1fdf510ae7 Pick correct build type in CI 2018-10-10 20:28:10 +02:00
Dennis Klein
78acb954cd Test more cases with interactive controller 2018-10-10 19:35:18 +02:00
Dennis Klein
3a1b769937 Support feeding the child process data on stdin 2018-10-10 19:35:18 +02:00
Dennis Klein
9f325451e5 Make sure we reset terminal config also on exception 2018-10-10 19:35:18 +02:00
Dennis Klein
a78d35d90d Remove obsolete declaration 2018-10-10 19:35:18 +02:00
Alexey Rybalchenko
cb199e7283 Fix throw after quit signal case 2018-10-10 19:35:18 +02:00
Dennis Klein
e39316c866 Test exceptions thrown in user code 2018-10-10 19:35:18 +02:00
Alexey Rybalchenko
bde12f58b2 Handle errors in static and interactive controllers 2018-10-10 19:35:18 +02:00
Alexey Rybalchenko
45354f268b Use future instead of thread for device rateLogger 2018-10-10 19:35:18 +02:00
Dennis Klein
1aab354a5d Resolve hanging process in case of uncaught exception 2018-10-10 19:35:18 +02:00
Alexey Rybalchenko
e1f555bc05 Fix issues found by Codacy 2018-10-08 17:20:02 +02:00
Alexey Rybalchenko
985150437a Remove shmem prototype code - unused 2018-10-08 17:20:02 +02:00
57 changed files with 656 additions and 1201 deletions

View File

@@ -19,7 +19,7 @@ Set(BUILD_COMMAND "make")
Set(CTEST_BUILD_COMMAND "${BUILD_COMMAND} -j$ENV{number_of_processors}")
String(TOUPPER $ENV{ctest_model} _Model)
Set(configure_options "-DCMAKE_BUILD_TYPE=${_Model}")
Set(configure_options "-DCMAKE_BUILD_TYPE=$ENV{ctest_model}")
Set(CTEST_USE_LAUNCHERS 1)
Set(configure_options "${configure_options};-DCTEST_USE_LAUNCHERS=${CTEST_USE_LAUNCHERS}")

View File

@@ -164,12 +164,13 @@ macro(set_fairmq_defaults)
set(PROJECT_EXPORT_SET ${PROJECT_NAME}Targets)
# Configure build types
set(CMAKE_CONFIGURATION_TYPES "Debug" "Release" "RelWithDebInfo" "Nightly" "Profile" "AdressSan" "ThreadSan")
set(CMAKE_CONFIGURATION_TYPES "Debug" "Release" "RelWithDebInfo" "Nightly" "Profile" "Experimental" "AdressSan" "ThreadSan")
set(CMAKE_CXX_FLAGS_DEBUG "-g -Wshadow -Wall -Wextra")
set(CMAKE_CXX_FLAGS_RELEASE "-O2 -DNDEBUG")
set(CMAKE_CXX_FLAGS_RELWITHDEBINFO "-O2 -g -Wshadow -Wall -Wextra -DNDEBUG")
set(CMAKE_CXX_FLAGS_NIGHTLY "-O2 -g -Wshadow -Wall -Wextra")
set(CMAKE_CXX_FLAGS_PROFILE "-g3 -Wshadow -Wall -Wextra -fno-inline -ftest-coverage -fprofile-arcs")
set(CMAKE_CXX_FLAGS_EXPERIMENTAL "-O2 -g -Wshadow -Wall -Wextra -DNDEBUG")
set(CMAKE_CXX_FLAGS_ADRESSSAN "-O2 -g -Wshadow -Wall -Wextra -fsanitize=address -fno-omit-frame-pointer")
set(CMAKE_CXX_FLAGS_THREADSAN "-O2 -g -Wshadow -Wall -Wextra -fsanitize=thread")

View File

@@ -8,6 +8,8 @@ if [[ $1 =~ ^[a-z]+$ ]]; then
transport=$1
fi
SESSION="$(@CMAKE_BINARY_DIR@/fairmq/fairmq-uuid-gen -h)"
# setup a trap to kill everything if the test fails/timeouts
trap 'kill -TERM $SAMPLER_PID; kill -TERM $SINK_PID; wait $SAMPLER_PID; wait $SINK_PID;' TERM
@@ -16,6 +18,7 @@ SAMPLER+=" --id sampler1"
SAMPLER+=" --rate 1"
SAMPLER+=" --transport $transport"
SAMPLER+=" --verbosity veryhigh"
SAMPLER+=" --session $SESSION"
SAMPLER+=" --control static --color false"
SAMPLER+=" --max-iterations 1"
SAMPLER+=" --channel-config name=data,type=push,method=bind,address=tcp://*:5555,rateLogging=0"
@@ -26,6 +29,7 @@ SINK="fairmq-ex-1-1-sink"
SINK+=" --id sink1"
SINK+=" --transport $transport"
SINK+=" --verbosity veryhigh"
SINK+=" --session $SESSION"
SINK+=" --control static --color false"
SINK+=" --max-iterations 1"
SINK+=" --channel-config name=data,type=pull,method=connect,address=tcp://localhost:5555,rateLogging=0"

View File

@@ -9,6 +9,7 @@ if [[ $1 =~ ^[a-z]+$ ]]; then
fi
ex2config="@CMAKE_CURRENT_BINARY_DIR@/ex-1-n-1.json"
SESSION="$(@CMAKE_BINARY_DIR@/fairmq/fairmq-uuid-gen -h)"
# setup a trap to kill everything if the test fails/timeouts
trap 'kill -TERM $SAMPLER_PID; kill -TERM $SINK_PID; kill -TERM $PROCESSOR1_PID; kill -TERM $PROCESSOR2_PID; wait $SAMPLER_PID; wait $SINK_PID; wait $PROCESSOR1_PID; wait $PROCESSOR2_PID;' TERM
@@ -17,6 +18,7 @@ SAMPLER="fairmq-ex-1-n-1-sampler"
SAMPLER+=" --id sampler1"
SAMPLER+=" --transport $transport"
SAMPLER+=" --verbosity veryhigh"
SAMPLER+=" --session $SESSION"
SAMPLER+=" --control static --color false"
SAMPLER+=" --max-iterations 2"
SAMPLER+=" --mq-config $ex2config"
@@ -27,6 +29,7 @@ PROCESSOR1="fairmq-ex-1-n-1-processor"
PROCESSOR1+=" --id processor1"
PROCESSOR1+=" --transport $transport"
PROCESSOR1+=" --verbosity veryhigh"
PROCESSOR1+=" --session $SESSION"
PROCESSOR1+=" --control static --color false"
PROCESSOR1+=" --mq-config $ex2config"
PROCESSOR1+=" --config-key processor"
@@ -37,6 +40,7 @@ PROCESSOR2="fairmq-ex-1-n-1-processor"
PROCESSOR2+=" --id processor2"
PROCESSOR2+=" --transport $transport"
PROCESSOR2+=" --verbosity veryhigh"
PROCESSOR2+=" --session $SESSION"
PROCESSOR2+=" --control static --color false"
PROCESSOR2+=" --mq-config $ex2config"
PROCESSOR2+=" --config-key processor"
@@ -47,6 +51,7 @@ SINK="fairmq-ex-1-n-1-sink"
SINK+=" --id sink1"
SINK+=" --transport $transport"
SINK+=" --verbosity veryhigh"
SINK+=" --session $SESSION"
SINK+=" --control static --color false"
SINK+=" --max-iterations 2"
SINK+=" --mq-config $ex2config"

View File

@@ -8,6 +8,8 @@ if [[ $1 =~ ^[a-z]+$ ]]; then
transport=$1
fi
SESSION="$(@CMAKE_BINARY_DIR@/fairmq/fairmq-uuid-gen -h)"
# setup a trap to kill everything if the test fails/timeouts
trap 'kill -TERM $SAMPLER_PID; kill -TERM $SINK1_PID; kill -TERM $SINK2_PID; wait $SAMPLER_PID; wait $SINK1_PID; wait $SINK2_PID;' TERM
@@ -15,6 +17,7 @@ SAMPLER="fairmq-ex-copypush-sampler"
SAMPLER+=" --id sampler1"
SAMPLER+=" --transport $transport"
SAMPLER+=" --verbosity veryhigh"
SAMPLER+=" --session $SESSION"
SAMPLER+=" --control static --color false"
SAMPLER+=" --max-iterations 1"
SAMPLER+=" --channel-config name=data,type=push,method=bind,rateLogging=0,address=tcp://*:5555,address=tcp://*:5556"
@@ -25,6 +28,7 @@ SINK1="fairmq-ex-copypush-sink"
SINK1+=" --id sink1"
SINK1+=" --transport $transport"
SINK1+=" --verbosity veryhigh"
SINK1+=" --session $SESSION"
SINK1+=" --control static --color false"
SINK1+=" --max-iterations 1"
SINK1+=" --channel-config name=data,type=pull,method=connect,rateLogging=0,address=tcp://localhost:5555"
@@ -35,6 +39,7 @@ SINK2="fairmq-ex-copypush-sink"
SINK2+=" --id sink2"
SINK2+=" --transport $transport"
SINK2+=" --verbosity veryhigh"
SINK2+=" --session $SESSION"
SINK2+=" --control static --color false"
SINK2+=" --max-iterations 1"
SINK2+=" --channel-config name=data,type=pull,method=connect,rateLogging=0,address=tcp://localhost:5556"

View File

@@ -8,6 +8,8 @@ if [[ $1 =~ ^[a-z]+$ ]]; then
transport=$1
fi
SESSION="$(@CMAKE_BINARY_DIR@/fairmq/fairmq-uuid-gen -h)"
# setup a trap to kill everything if the test fails/timeouts
trap 'kill -TERM $SAMPLER_PID; kill -TERM $SINK_PID; wait $SAMPLER_PID; wait $SINK_PID;' TERM
@@ -15,6 +17,7 @@ SAMPLER="fairmq-ex-multipart-sampler"
SAMPLER+=" --id sampler1"
SAMPLER+=" --transport $transport"
SAMPLER+=" --verbosity veryhigh"
SAMPLER+=" --session $SESSION"
SAMPLER+=" --max-iterations 1"
SAMPLER+=" --control static --color false"
SAMPLER+=" --channel-config name=data,type=push,method=connect,rateLogging=0,address=tcp://127.0.0.1:5555"
@@ -25,6 +28,7 @@ SINK="fairmq-ex-multipart-sink"
SINK+=" --id sink1"
SINK+=" --transport $transport"
SINK+=" --verbosity veryhigh"
SINK+=" --session $SESSION"
SINK+=" --control static --color false"
SINK+=" --channel-config name=data,type=pull,method=bind,rateLogging=0,address=tcp://127.0.0.1:5555"
@CMAKE_CURRENT_BINARY_DIR@/$SINK &

View File

@@ -8,6 +8,8 @@ if [[ $1 =~ ^[a-z]+$ ]]; then
transport=$1
fi
SESSION="$(@CMAKE_BINARY_DIR@/fairmq/fairmq-uuid-gen -h)"
# setup a trap to kill everything if the test fails/timeouts
trap 'kill -TERM $CLIENT_PID; kill -TERM $SERVER_PID; wait $CLIENT_PID; wait $SERVER_PID;' TERM
@@ -15,6 +17,7 @@ CLIENT="fairmq-ex-req-rep-client"
CLIENT+=" --id client"
CLIENT+=" --transport $transport"
CLIENT+=" --verbosity veryhigh"
CLIENT+=" --session $SESSION"
CLIENT+=" --control static --color false"
CLIENT+=" --max-iterations 1"
CLIENT+=" --channel-config name=data,type=req,method=connect,rateLogging=0,address=tcp://127.0.0.1:5005"
@@ -25,6 +28,7 @@ SERVER="fairmq-ex-req-rep-server"
SERVER+=" --id server"
SERVER+=" --transport $transport"
SERVER+=" --verbosity veryhigh"
SERVER+=" --session $SESSION"
SERVER+=" --control static --color false"
SERVER+=" --max-iterations 1"
SERVER+=" --channel-config name=data,type=rep,method=bind,rateLogging=0,address=tcp://127.0.0.1:5005"

View File

@@ -28,7 +28,6 @@ endif()
##################
# subdirectories #
##################
# add_subdirectory(shmem/prototype)
if(BUILD_OFI_TRANSPORT)
add_subdirectory(ofi)
endif()

View File

@@ -111,12 +111,10 @@ auto DeviceRunner::RunWithExceptionHandlers() -> int
try {
return Run();
} catch (std::exception& e) {
LOG(error) << "Unhandled exception reached the top of main: " << e.what()
<< ", application will now exit";
LOG(error) << "Uncaught exception reached the top of DeviceRunner: " << e.what();
return 1;
} catch (...) {
LOG(error) << "Non-exception instance being thrown. Please make sure you use "
"std::runtime_exception() instead. Application will now exit.";
LOG(error) << "Uncaught exception reached the top of DeviceRunner.";
return 1;
}
}

View File

@@ -104,19 +104,22 @@ FairMQChannel::FairMQChannel(const FairMQChannel& chan)
FairMQChannel& FairMQChannel::operator=(const FairMQChannel& chan)
{
fSocket = nullptr;
fType = chan.fType;
fMethod = chan.fMethod;
fAddress = chan.fAddress;
fTransportType = chan.fTransportType;
fSndBufSize = chan.fSndBufSize;
fRcvBufSize = chan.fRcvBufSize;
fSndKernelSize = chan.fSndKernelSize;
fRcvKernelSize = chan.fRcvKernelSize;
fRateLogging = chan.fRateLogging;
fSocket = nullptr;
fName = chan.fName;
fIsValid = false;
fTransportType = chan.fTransportType;
fTransportFactory = nullptr;
fMultipart = chan.fMultipart;
fModified = chan.fModified;
fReset = false;
return *this;
}

View File

@@ -24,6 +24,7 @@
#include <functional>
#include <sstream>
#include <iomanip>
#include <future>
#include <algorithm> // std::max
using namespace std;
@@ -491,7 +492,7 @@ void FairMQDevice::RunWrapper()
LOG(info) << "DEVICE: Running...";
// start the rate logger thread
thread rateLogger(&FairMQDevice::LogSocketRates, this);
future<void> rateLogger = async(launch::async, &FairMQDevice::LogSocketRates, this);
// notify transports to resume transfers
{
@@ -549,7 +550,7 @@ void FairMQDevice::RunWrapper()
PostRun();
rateLogger.join();
rateLogger.get();
}
void FairMQDevice::HandleSingleChannelInput()
@@ -816,8 +817,6 @@ void FairMQDevice::LogSocketRates()
chrono::time_point<chrono::high_resolution_clock> t0;
chrono::time_point<chrono::high_resolution_clock> t1;
unsigned long long msSinceLastLog;
vector<FairMQSocket*> filteredSockets;
vector<string> filteredChannelNames;
vector<int> logIntervals;
@@ -879,7 +878,7 @@ void FairMQDevice::LogSocketRates()
{
t1 = chrono::high_resolution_clock::now();
msSinceLastLog = chrono::duration_cast<chrono::milliseconds>(t1 - t0).count();
unsigned long long msSinceLastLog = chrono::duration_cast<chrono::milliseconds>(t1 - t0).count();
i = 0;

View File

@@ -462,39 +462,30 @@ class FairMQDevice : public FairMQStateMachine
std::string fId; ///< Device ID
/// Additional user initialization (can be overloaded in child classes). Prefer to use InitTask().
/// Executed in a worker thread
virtual void Init();
/// Task initialization (can be overloaded in child classes)
/// Executed in a worker thread
virtual void InitTask();
/// Runs the device (to be overloaded in child classes)
/// Executed in a worker thread
virtual void Run();
/// Called in the RUNNING state once before executing the Run()/ConditionalRun() method
/// Executed in a worker thread
virtual void PreRun();
/// Called during RUNNING state repeatedly until it returns false or device state changes
/// Executed in a worker thread
virtual bool ConditionalRun();
/// Called in the RUNNING state once after executing the Run()/ConditionalRun() method
/// Executed in a worker thread
virtual void PostRun();
/// Handles the PAUSE state
/// Executed in a worker thread
virtual void Pause();
/// Resets the user task (to be overloaded in child classes)
/// Executed in a worker thread
virtual void ResetTask();
/// Resets the device (can be overloaded in child classes)
/// Executed in a worker thread
virtual void Reset();
private:

View File

@@ -18,8 +18,8 @@ class FairMQPoller
virtual void Poll(const int timeout) = 0;
virtual bool CheckInput(const int index) = 0;
virtual bool CheckOutput(const int index) = 0;
virtual bool CheckInput(const std::string channelKey, const int index) = 0;
virtual bool CheckOutput(const std::string channelKey, const int index) = 0;
virtual bool CheckInput(const std::string& channelKey, const int index) = 0;
virtual bool CheckOutput(const std::string& channelKey, const int index) = 0;
virtual ~FairMQPoller() {};
};

View File

@@ -644,16 +644,27 @@ bool FairMQStateMachine::CheckCurrentState(int state) const
{
return state == static_pointer_cast<FairMQFSM>(fFsm)->fState;
}
bool FairMQStateMachine::CheckCurrentState(string state) const
bool FairMQStateMachine::CheckCurrentState(const string& state) const
{
return state == GetCurrentStateName();
}
void FairMQStateMachine::ProcessWork()
try
{
static_pointer_cast<FairMQFSM>(fFsm)->ProcessWork();
} catch(...) {
{
lock_guard<mutex> lock(static_pointer_cast<FairMQFSM>(fFsm)->fWorkMutex);
static_pointer_cast<FairMQFSM>(fFsm)->fWorkActive = false;
static_pointer_cast<FairMQFSM>(fFsm)->fWorkAvailable = false;
static_pointer_cast<FairMQFSM>(fFsm)->fWorkDoneCondition.notify_one();
}
ChangeState(ERROR_FOUND);
throw;
}
int FairMQStateMachine::GetEventNumber(const string& event)
{
return eventNumbers.at(event);

View File

@@ -82,7 +82,7 @@ class FairMQStateMachine
static std::string GetStateName(const State);
int GetCurrentState() const;
bool CheckCurrentState(int state) const;
bool CheckCurrentState(std::string state) const;
bool CheckCurrentState(const std::string& state) const;
// actions to be overwritten by derived classes
virtual void InitWrapper() {}

View File

@@ -21,6 +21,7 @@
#include <unordered_map>
#include <mutex>
#include <condition_variable>
#include <stdexcept>
namespace fair
{

View File

@@ -88,7 +88,6 @@ void FairMQBenchmarkSampler::Run()
}
}
if (fMsgRate > 0)
{
rateLimiter.maybe_sleep();

View File

@@ -68,10 +68,9 @@ FairMQPollerNN::FairMQPollerNN(const unordered_map<string, vector<FairMQChannel>
, fNumItems(0)
, fOffsetMap()
{
int offset = 0;
try
{
int offset = 0;
// calculate offsets and the total size of the poll item set
for (string channel : channelList)
{
@@ -184,7 +183,7 @@ bool FairMQPollerNN::CheckOutput(const int index)
return false;
}
bool FairMQPollerNN::CheckInput(const string channelKey, const int index)
bool FairMQPollerNN::CheckInput(const string& channelKey, const int index)
{
try
{
@@ -203,7 +202,7 @@ bool FairMQPollerNN::CheckInput(const string channelKey, const int index)
}
}
bool FairMQPollerNN::CheckOutput(const string channelKey, const int index)
bool FairMQPollerNN::CheckOutput(const string& channelKey, const int index)
{
try
{

View File

@@ -44,8 +44,8 @@ class FairMQPollerNN : public FairMQPoller
virtual void Poll(const int timeout);
virtual bool CheckInput(const int index);
virtual bool CheckOutput(const int index);
virtual bool CheckInput(const std::string channelKey, const int index);
virtual bool CheckOutput(const std::string channelKey, const int index);
virtual bool CheckInput(const std::string& channelKey, const int index);
virtual bool CheckOutput(const std::string& channelKey, const int index);
virtual ~FairMQPollerNN();

View File

@@ -197,7 +197,6 @@ int FairMQSocketNN::SendImpl(FairMQMessagePtr& msg, const int flags, const int t
int FairMQSocketNN::ReceiveImpl(FairMQMessagePtr& msg, const int flags, const int timeout)
{
int nbytes = -1;
int elapsed = 0;
FairMQMessageNN* msgPtr = static_cast<FairMQMessageNN*>(msg.get());
@@ -205,7 +204,7 @@ int FairMQSocketNN::ReceiveImpl(FairMQMessagePtr& msg, const int flags, const in
while (true)
{
void* ptr = nullptr;
nbytes = nn_recv(fSocket, &ptr, NN_MSG, flags);
int nbytes = nn_recv(fSocket, &ptr, NN_MSG, flags);
if (nbytes >= 0)
{
fBytesRx += nbytes;
@@ -279,11 +278,9 @@ int64_t FairMQSocketNN::SendImpl(vector<FairMQMessagePtr>& msgVec, const int fla
}
}
int64_t nbytes = -1;
while (true)
{
nbytes = nn_send(fSocket, sbuf.data(), sbuf.size(), flags);
int64_t nbytes = nn_send(fSocket, sbuf.data(), sbuf.size(), flags);
if (nbytes >= 0)
{
fBytesTx += nbytes;

View File

@@ -59,9 +59,8 @@ Poller::Poller(const vector<const FairMQChannel*>& channels)
Poller::Poller(const unordered_map<string, vector<FairMQChannel>>& channelsMap, const vector<string>& channelList)
{
int offset = 0;
try {
int offset = 0;
// calculate offsets and the total size of the poll item set
for (string channel : channelList) {
fOffsetMap[channel] = offset;

View File

@@ -1,5 +1,5 @@
/********************************************************************************
* Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2017-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
@@ -11,6 +11,7 @@
#include <termios.h> // for the interactive mode
#include <poll.h> // for the interactive mode
#include <csignal> // catching system signals
#include <cstdlib>
#include <functional>
#include <atomic>
@@ -18,11 +19,18 @@ using namespace std;
namespace
{
std::atomic<sig_atomic_t> gSignalStatus(0);
std::atomic<sig_atomic_t> gLastSignal(0);
std::atomic<int> gSignalCount(0);
extern "C" auto signal_handler(int signal) -> void
{
gSignalStatus = signal;
++gSignalCount;
gLastSignal = signal;
if (gSignalCount > 1)
{
std::abort();
}
}
}
@@ -33,18 +41,27 @@ namespace mq
namespace plugins
{
Control::Control(const string name, const Plugin::Version version, const string maintainer, const string homepage, PluginServices* pluginServices)
Control::Control(const string& name, const Plugin::Version version, const string& maintainer, const string& homepage, PluginServices* pluginServices)
: Plugin(name, version, maintainer, homepage, pluginServices)
, fControllerThread()
, fSignalHandlerThread()
, fShutdownThread()
, fEvents()
, fEventsMutex()
, fShutdownMutex()
, fControllerMutex()
, fNewEvent()
, fDeviceTerminationRequested(false)
, fHasShutdown(false)
, fDeviceShutdownRequested(false)
, fDeviceHasShutdown(false)
, fPluginShutdownRequested(false)
{
SubscribeToDeviceStateChange([&](DeviceState newState)
{
{
lock_guard<mutex> lock{fEventsMutex};
fEvents.push(newState);
}
fNewEvent.notify_one();
});
try
{
TakeDeviceControl();
@@ -96,124 +113,140 @@ auto ControlPluginProgramOptions() -> Plugin::ProgOptions
return pluginOptions;
}
auto Control::InteractiveMode() -> void
struct terminal_config
{
try
terminal_config()
{
RunStartupSequence();
char input; // hold the user console input
pollfd cinfd[1];
cinfd[0].fd = fileno(stdin);
cinfd[0].events = POLLIN;
struct termios t;
termios t;
tcgetattr(STDIN_FILENO, &t); // get the current terminal I/O structure
t.c_lflag &= ~ICANON; // disable canonical input
t.c_lflag &= ~ECHO; // do not echo input chars
tcsetattr(STDIN_FILENO, TCSANOW, &t); // apply the new settings
}
PrintInteractiveHelp();
bool keepRunning = true;
while (keepRunning)
{
if (poll(cinfd, 1, 500))
{
if (fDeviceTerminationRequested)
{
break;
}
cin >> input;
switch (input)
{
case 'i':
LOG(info) << "\n\n --> [i] init device\n";
ChangeDeviceState(DeviceStateTransition::InitDevice);
break;
case 'j':
LOG(info) << "\n\n --> [j] init task\n";
ChangeDeviceState(DeviceStateTransition::InitTask);
break;
case 'p':
LOG(info) << "\n\n --> [p] pause\n";
ChangeDeviceState(DeviceStateTransition::Pause);
break;
case 'r':
LOG(info) << "\n\n --> [r] run\n";
ChangeDeviceState(DeviceStateTransition::Run);
break;
case 's':
LOG(info) << "\n\n --> [s] stop\n";
ChangeDeviceState(DeviceStateTransition::Stop);
break;
case 't':
LOG(info) << "\n\n --> [t] reset task\n";
ChangeDeviceState(DeviceStateTransition::ResetTask);
break;
case 'd':
LOG(info) << "\n\n --> [d] reset device\n";
ChangeDeviceState(DeviceStateTransition::ResetDevice);
break;
case 'k':
LOG(info) << "\n\n --> [k] increase log severity\n";
CycleLogConsoleSeverityUp();
break;
case 'l':
LOG(info) << "\n\n --> [l] decrease log severity\n";
CycleLogConsoleSeverityDown();
break;
case 'n':
LOG(info) << "\n\n --> [n] increase log verbosity\n";
CycleLogVerbosityUp();
break;
case 'm':
LOG(info) << "\n\n --> [m] decrease log verbosity\n";
CycleLogVerbosityDown();
break;
case 'h':
LOG(info) << "\n\n --> [h] help\n";
PrintInteractiveHelp();
break;
// case 'x':
// LOG(info) << "\n\n --> [x] ERROR\n";
// ChangeDeviceState(DeviceStateTransition::ERROR_FOUND);
// break;
case 'q':
LOG(info) << "\n\n --> [q] end\n";
keepRunning = false;
break;
default:
LOG(info) << "Invalid input: [" << input << "]";
PrintInteractiveHelp();
break;
}
}
if (fDeviceTerminationRequested)
{
keepRunning = false;
}
}
~terminal_config()
{
termios t;
tcgetattr(STDIN_FILENO, &t); // get the current terminal I/O structure
t.c_lflag |= ICANON; // re-enable canonical input
t.c_lflag |= ECHO; // echo input chars
tcsetattr(STDIN_FILENO, TCSANOW, &t); // apply the new settings
}
};
if (!fDeviceTerminationRequested)
auto Control::InteractiveMode() -> void
try
{
RunStartupSequence();
char input; // hold the user console input
pollfd cinfd[1];
cinfd[0].fd = fileno(stdin);
cinfd[0].events = POLLIN;
cinfd[0].revents = 0;
terminal_config tconfig;
PrintInteractiveHelp();
bool keepRunning = true;
while (keepRunning)
{
if (poll(cinfd, 1, 500))
{
RunShutdownSequence();
if (fDeviceShutdownRequested)
{
break;
}
cin >> input;
switch (input)
{
case 'i':
LOG(info) << "\n\n --> [i] init device\n";
ChangeDeviceState(DeviceStateTransition::InitDevice);
break;
case 'j':
LOG(info) << "\n\n --> [j] init task\n";
ChangeDeviceState(DeviceStateTransition::InitTask);
break;
case 'p':
LOG(info) << "\n\n --> [p] pause\n";
ChangeDeviceState(DeviceStateTransition::Pause);
break;
case 'r':
LOG(info) << "\n\n --> [r] run\n";
ChangeDeviceState(DeviceStateTransition::Run);
break;
case 's':
LOG(info) << "\n\n --> [s] stop\n";
ChangeDeviceState(DeviceStateTransition::Stop);
break;
case 't':
LOG(info) << "\n\n --> [t] reset task\n";
ChangeDeviceState(DeviceStateTransition::ResetTask);
break;
case 'd':
LOG(info) << "\n\n --> [d] reset device\n";
ChangeDeviceState(DeviceStateTransition::ResetDevice);
break;
case 'k':
LOG(info) << "\n\n --> [k] increase log severity\n";
CycleLogConsoleSeverityUp();
break;
case 'l':
LOG(info) << "\n\n --> [l] decrease log severity\n";
CycleLogConsoleSeverityDown();
break;
case 'n':
LOG(info) << "\n\n --> [n] increase log verbosity\n";
CycleLogVerbosityUp();
break;
case 'm':
LOG(info) << "\n\n --> [m] decrease log verbosity\n";
CycleLogVerbosityDown();
break;
case 'h':
LOG(info) << "\n\n --> [h] help\n";
PrintInteractiveHelp();
break;
// case 'x':
// LOG(info) << "\n\n --> [x] ERROR\n";
// ChangeDeviceState(DeviceStateTransition::ERROR_FOUND);
// break;
case 'q':
LOG(info) << "\n\n --> [q] end\n";
keepRunning = false;
break;
default:
LOG(info) << "Invalid input: [" << input << "]";
PrintInteractiveHelp();
break;
}
}
if (GetCurrentDeviceState() == DeviceState::Error)
{
throw DeviceErrorState("Controlled device transitioned to error state.");
}
if (fDeviceShutdownRequested)
{
break;
}
}
catch (PluginServices::DeviceControlError& e)
{
// If we are here, it means another plugin has taken control. That's fine, just print the exception message and do nothing else.
LOG(debug) << e.what();
}
RunShutdownSequence();
}
catch (PluginServices::DeviceControlError& e)
{
// If we are here, it means another plugin has taken control. That's fine, just print the exception message and do nothing else.
LOG(debug) << e.what();
}
catch (DeviceErrorState&)
{
}
auto Control::PrintInteractiveHelp() -> void
@@ -234,137 +267,124 @@ auto Control::WaitForNextState() -> DeviceState
}
auto result = fEvents.front();
if (result == DeviceState::Error)
{
throw DeviceErrorState("Controlled device transitioned to error state.");
}
fEvents.pop();
return result;
}
auto Control::StaticMode() -> void
try
{
try
{
RunStartupSequence();
RunStartupSequence();
{
// Wait for next state, which is DeviceState::Ready,
// or for device shutdown request (Ctrl-C)
unique_lock<mutex> lock{fEventsMutex};
while (fEvents.empty() && !fDeviceShutdownRequested)
{
// Wait for next state, which is DeviceState::Ready,
// or for device termination request
unique_lock<mutex> lock{fEventsMutex};
while (fEvents.empty() && !fDeviceTerminationRequested)
{
fNewEvent.wait(lock);
}
fNewEvent.wait_for(lock, chrono::milliseconds(50));
}
if (!fDeviceTerminationRequested)
if (fEvents.front() == DeviceState::Error)
{
RunShutdownSequence();
throw DeviceErrorState("Controlled device transitioned to error state.");
}
}
catch (PluginServices::DeviceControlError& e)
{
// If we are here, it means another plugin has taken control. That's fine, just print the exception message and do nothing else.
LOG(debug) << e.what();
}
RunShutdownSequence();
}
catch (PluginServices::DeviceControlError& e)
{
// If we are here, it means another plugin has taken control. That's fine, just print the exception message and do nothing else.
LOG(debug) << e.what();
}
catch (DeviceErrorState&)
{
}
auto Control::SignalHandler() -> void
{
while (true)
while (gSignalCount == 0 && !fPluginShutdownRequested)
{
if (gSignalStatus != 0 && !fHasShutdown)
{
LOG(info) << "Received device shutdown request (signal " << gSignalStatus << ").";
LOG(info) << "Waiting for graceful device shutdown. Hit Ctrl-C again to abort immediately.";
if (!fDeviceTerminationRequested)
{
fDeviceTerminationRequested = true;
gSignalStatus = 0;
fShutdownThread = thread(&Control::HandleShutdownSignal, this);
}
else
{
LOG(warn) << "Received 2nd device shutdown request (signal " << gSignalStatus << ").";
LOG(warn) << "Aborting immediately!";
abort();
}
}
else if (fHasShutdown)
{
break;
}
this_thread::sleep_for(chrono::milliseconds(100));
}
}
auto Control::HandleShutdownSignal() -> void
{
StealDeviceControl();
UnsubscribeFromDeviceStateChange(); // In case, static or interactive mode have subscribed already
SubscribeToDeviceStateChange([&](DeviceState newState)
if (!fPluginShutdownRequested)
{
{
lock_guard<mutex> lock{fEventsMutex};
fEvents.push(newState);
}
fNewEvent.notify_one();
});
LOG(info) << "Received device shutdown request (signal " << gLastSignal << ").";
LOG(info) << "Waiting for graceful device shutdown. Hit Ctrl-C again to abort immediately.";
RunShutdownSequence();
// Signal and wait for controller thread, if we are controller
fDeviceShutdownRequested = true;
{
unique_lock<mutex> lock(fControllerMutex);
if (fControllerThread.joinable()) fControllerThread.join();
}
if (!fDeviceHasShutdown)
{
// Take over control and attempt graceful shutdown
StealDeviceControl();
try
{
RunShutdownSequence();
}
catch (PluginServices::DeviceControlError& e)
{
LOG(info) << "Graceful device shutdown failed: " << e.what() << " If hanging, hit Ctrl-C again to abort immediately.";
}
catch (...)
{
LOG(info) << "Graceful device shutdown failed. If hanging, hit Ctrl-C again to abort immediately.";
}
}
}
}
auto Control::RunShutdownSequence() -> void
{
lock_guard<mutex> lock(fShutdownMutex);
if (!fHasShutdown)
auto nextState = GetCurrentDeviceState();
EmptyEventQueue();
while (nextState != DeviceState::Exiting && nextState != DeviceState::Error)
{
auto nextState = GetCurrentDeviceState();
EmptyEventQueue();
while (nextState != DeviceState::Exiting)
switch (nextState)
{
switch (nextState)
{
case DeviceState::Idle:
ChangeDeviceState(DeviceStateTransition::End);
break;
case DeviceState::DeviceReady:
ChangeDeviceState(DeviceStateTransition::ResetDevice);
break;
case DeviceState::Ready:
ChangeDeviceState(DeviceStateTransition::ResetTask);
break;
case DeviceState::Running:
ChangeDeviceState(DeviceStateTransition::Stop);
break;
case DeviceState::Paused:
ChangeDeviceState(DeviceStateTransition::Resume);
break;
default:
// ignore other states
break;
}
nextState = WaitForNextState();
case DeviceState::Idle:
ChangeDeviceState(DeviceStateTransition::End);
break;
case DeviceState::DeviceReady:
ChangeDeviceState(DeviceStateTransition::ResetDevice);
break;
case DeviceState::Ready:
ChangeDeviceState(DeviceStateTransition::ResetTask);
break;
case DeviceState::Running:
ChangeDeviceState(DeviceStateTransition::Stop);
break;
case DeviceState::Paused:
ChangeDeviceState(DeviceStateTransition::Resume);
break;
default:
LOG(debug) << "Controller ignoring event: " << nextState;
break;
}
fHasShutdown = true;
UnsubscribeFromDeviceStateChange();
ReleaseDeviceControl();
nextState = WaitForNextState();
}
fDeviceHasShutdown = true;
ReleaseDeviceControl();
}
auto Control::RunStartupSequence() -> void
{
SubscribeToDeviceStateChange([&](DeviceState newState)
{
{
lock_guard<mutex> lock{fEventsMutex};
fEvents.push(newState);
}
fNewEvent.notify_one();
});
ChangeDeviceState(DeviceStateTransition::InitDevice);
while (WaitForNextState() != DeviceState::DeviceReady) {}
ChangeDeviceState(DeviceStateTransition::InitTask);
@@ -381,9 +401,16 @@ auto Control::EmptyEventQueue() -> void
Control::~Control()
{
if (fControllerThread.joinable()) fControllerThread.join();
// Notify threads to exit
fPluginShutdownRequested = true;
{
unique_lock<mutex> lock(fControllerMutex);
if (fControllerThread.joinable()) fControllerThread.join();
}
if (fSignalHandlerThread.joinable()) fSignalHandlerThread.join();
if (fShutdownThread.joinable()) fShutdownThread.join();
UnsubscribeFromDeviceStateChange();
}
} /* namespace plugins */

View File

@@ -18,6 +18,7 @@
#include <queue>
#include <thread>
#include <atomic>
#include <stdexcept>
namespace fair
{
@@ -29,30 +30,31 @@ namespace plugins
class Control : public Plugin
{
public:
Control(const std::string name, const Plugin::Version version, const std::string maintainer, const std::string homepage, PluginServices* pluginServices);
Control(const std::string& name, const Plugin::Version version, const std::string& maintainer, const std::string& homepage, PluginServices* pluginServices);
~Control();
private:
auto InteractiveMode() -> void;
auto PrintInteractiveHelp() -> void;
static auto PrintInteractiveHelp() -> void;
auto StaticMode() -> void;
auto WaitForNextState() -> DeviceState;
auto SignalHandler() -> void;
auto HandleShutdownSignal() -> void;
auto RunShutdownSequence() -> void;
auto RunStartupSequence() -> void;
auto EmptyEventQueue() -> void;
std::thread fControllerThread;
std::thread fSignalHandlerThread;
std::thread fShutdownThread;
std::queue<DeviceState> fEvents;
std::mutex fEventsMutex;
std::mutex fShutdownMutex;
std::mutex fControllerMutex;
std::condition_variable fNewEvent;
std::atomic<bool> fDeviceTerminationRequested;
std::atomic<bool> fHasShutdown;
std::atomic<bool> fDeviceShutdownRequested;
std::atomic<bool> fDeviceHasShutdown;
std::atomic<bool> fPluginShutdownRequested;
struct DeviceErrorState : std::runtime_error { using std::runtime_error::runtime_error; };
}; /* class Control */
auto ControlPluginProgramOptions() -> Plugin::ProgOptions;

View File

@@ -31,7 +31,7 @@ namespace mq
namespace plugins
{
DDS::DDS(const string name, const Plugin::Version version, const string maintainer, const string homepage, PluginServices* pluginServices)
DDS::DDS(const string& name, const Plugin::Version version, const string& maintainer, const string& homepage, PluginServices* pluginServices)
: Plugin(name, version, maintainer, homepage, pluginServices)
, fService()
, fDDSCustomCmd(fService)

View File

@@ -61,7 +61,7 @@ struct IofN
class DDS : public Plugin
{
public:
DDS(const std::string name, const Plugin::Version version, const std::string maintainer, const std::string homepage, PluginServices* pluginServices);
DDS(const std::string& name, const Plugin::Version version, const std::string& maintainer, const std::string& homepage, PluginServices* pluginServices);
~DDS();

View File

@@ -33,7 +33,7 @@ int main(int argc, char* argv[])
{
try {
string sessionID;
char command;
char command = ' ';
string topologyPath;
bpo::options_description options("fairmq-dds-command-ui options");

View File

@@ -56,12 +56,12 @@ int main(int argc, char* argv[])
}
catch (std::exception& e)
{
LOG(error) << "Unhandled exception reached the top of main: " << e.what() << ", application will now exit";
LOG(error) << "Uncaught exception reached the top of main: " << e.what();
return 1;
}
catch (...)
{
LOG(error) << "Non-exception instance being thrown. Please make sure you use std::runtime_exception() instead. Application will now exit.";
LOG(error) << "Uncaught exception reached the top of main.";
return 1;
}
}

View File

@@ -9,8 +9,13 @@
#define FAIR_MQ_SHMEM_COMMON_H_
#include <atomic>
#include <string>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/functional/hash.hpp>
#include <unistd.h>
#include <sys/types.h>
namespace fair
{
@@ -73,6 +78,16 @@ struct RegionBlock
size_t fHint;
};
// find id for unique shmem name:
// a hash of user id + session id, truncated to 8 characters (to accommodate for name size limit on some systems (MacOS)).
inline std::string buildShmIdFromSessionIdAndUserId(const std::string& sessionId)
{
boost::hash<std::string> stringHash;
std::string shmId(std::to_string(stringHash(std::string((std::to_string(geteuid()) + sessionId)))));
shmId.resize(8, '_');
return shmId;
}
} // namespace shmem
} // namespace mq
} // namespace fair

View File

@@ -68,10 +68,9 @@ FairMQPollerSHM::FairMQPollerSHM(const unordered_map<string, vector<FairMQChanne
, fNumItems(0)
, fOffsetMap()
{
int offset = 0;
try
{
int offset = 0;
// calculate offsets and the total size of the poll item set
for (string channel : channelList)
{
@@ -189,7 +188,7 @@ bool FairMQPollerSHM::CheckOutput(const int index)
return false;
}
bool FairMQPollerSHM::CheckInput(const string channelKey, const int index)
bool FairMQPollerSHM::CheckInput(const string& channelKey, const int index)
{
try
{
@@ -208,7 +207,7 @@ bool FairMQPollerSHM::CheckInput(const string channelKey, const int index)
}
}
bool FairMQPollerSHM::CheckOutput(const string channelKey, const int index)
bool FairMQPollerSHM::CheckOutput(const string& channelKey, const int index)
{
try
{

View File

@@ -37,8 +37,8 @@ class FairMQPollerSHM : public FairMQPoller
void Poll(const int timeout) override;
bool CheckInput(const int index) override;
bool CheckOutput(const int index) override;
bool CheckInput(const std::string channelKey, const int index) override;
bool CheckOutput(const std::string channelKey, const int index) override;
bool CheckInput(const std::string& channelKey, const int index) override;
bool CheckOutput(const std::string& channelKey, const int index) override;
~FairMQPollerSHM() override;

View File

@@ -121,12 +121,11 @@ int64_t FairMQSocketSHM::TryReceive(vector<unique_ptr<FairMQMessage>>& msgVec) {
int FairMQSocketSHM::SendImpl(FairMQMessagePtr& msg, const int flags, const int timeout)
{
int nbytes = -1;
int elapsed = 0;
while (true && !fInterrupted)
{
nbytes = zmq_msg_send(static_cast<FairMQMessageSHM*>(msg.get())->GetMessage(), fSocket, flags);
int nbytes = zmq_msg_send(static_cast<FairMQMessageSHM*>(msg.get())->GetMessage(), fSocket, flags);
if (nbytes == 0)
{
return nbytes;
@@ -177,13 +176,12 @@ int FairMQSocketSHM::SendImpl(FairMQMessagePtr& msg, const int flags, const int
int FairMQSocketSHM::ReceiveImpl(FairMQMessagePtr& msg, const int flags, const int timeout)
{
int nbytes = -1;
int elapsed = 0;
zmq_msg_t* msgPtr = static_cast<FairMQMessageSHM*>(msg.get())->GetMessage();
while (true)
{
nbytes = zmq_msg_recv(msgPtr, fSocket, flags);
int nbytes = zmq_msg_recv(msgPtr, fSocket, flags);
if (nbytes == 0)
{
++fMessagesRx;
@@ -249,7 +247,6 @@ int FairMQSocketSHM::ReceiveImpl(FairMQMessagePtr& msg, const int flags, const i
int64_t FairMQSocketSHM::SendImpl(vector<FairMQMessagePtr>& msgVec, const int flags, const int timeout)
{
const unsigned int vecSize = msgVec.size();
int64_t totalSize = 0;
int elapsed = 0;
if (vecSize == 1) {
@@ -263,7 +260,7 @@ int64_t FairMQSocketSHM::SendImpl(vector<FairMQMessagePtr>& msgVec, const int fl
// prepare the message with shm metas
MetaHeader* metas = static_cast<MetaHeader*>(zmq_msg_data(&zmqMsg));
for (auto &msg : msgVec)
for (auto& msg : msgVec)
{
zmq_msg_t* metaMsg = static_cast<FairMQMessageSHM*>(msg.get())->GetMessage();
memcpy(metas++, zmq_msg_data(metaMsg), sizeof(MetaHeader));
@@ -271,9 +268,8 @@ int64_t FairMQSocketSHM::SendImpl(vector<FairMQMessagePtr>& msgVec, const int fl
while (!fInterrupted)
{
int nbytes = -1;
nbytes = zmq_msg_send(&zmqMsg, fSocket, flags);
int64_t totalSize = 0;
int nbytes = zmq_msg_send(&zmqMsg, fSocket, flags);
if (nbytes == 0)
{
zmq_msg_close(&zmqMsg);
@@ -283,7 +279,7 @@ int64_t FairMQSocketSHM::SendImpl(vector<FairMQMessagePtr>& msgVec, const int fl
{
assert(nbytes == (vecSize * sizeof(MetaHeader))); // all or nothing
for (auto &msg : msgVec)
for (auto& msg : msgVec)
{
FairMQMessageSHM* shmMsg = static_cast<FairMQMessageSHM*>(msg.get());
shmMsg->fQueued = true;
@@ -338,7 +334,6 @@ int64_t FairMQSocketSHM::SendImpl(vector<FairMQMessagePtr>& msgVec, const int fl
int64_t FairMQSocketSHM::ReceiveImpl(vector<FairMQMessagePtr>& msgVec, const int flags, const int timeout)
{
int64_t totalSize = 0;
int elapsed = 0;
zmq_msg_t zmqMsg;
@@ -346,6 +341,7 @@ int64_t FairMQSocketSHM::ReceiveImpl(vector<FairMQMessagePtr>& msgVec, const int
while (!fInterrupted)
{
int64_t totalSize = 0;
int nbytes = zmq_msg_recv(&zmqMsg, fSocket, flags);
if (nbytes == 0)
{

View File

@@ -37,7 +37,7 @@ fair::mq::Transport FairMQTransportFactorySHM::fTransportType = fair::mq::Transp
FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const FairMQProgOptions* config)
: FairMQTransportFactory(id)
, fDeviceId(id)
, fSessionName("default")
, fShmId()
, fContext(nullptr)
, fHeartbeatThread()
, fSendHeartbeats(true)
@@ -58,12 +58,13 @@ FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const Fai
}
int numIoThreads = 1;
string sessionName = "default";
size_t segmentSize = 2000000000;
bool autolaunchMonitor = false;
if (config)
{
numIoThreads = config->GetValue<int>("io-threads");
fSessionName = config->GetValue<string>("session");
sessionName = config->GetValue<string>("session");
segmentSize = config->GetValue<size_t>("shm-segment-size");
autolaunchMonitor = config->GetValue<bool>("shm-monitor");
}
@@ -72,11 +73,11 @@ FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const Fai
LOG(debug) << "FairMQProgOptions not available! Using defaults.";
}
fSessionName.resize(8, '_'); // shorten the session name, to accommodate for name size limit on some systems (MacOS)
fShmId = buildShmIdFromSessionIdAndUserId(sessionName);
try
{
fShMutex = fair::mq::tools::make_unique<bipc::named_mutex>(bipc::open_or_create, string("fmq_" + fSessionName + "_mtx").c_str());
fShMutex = fair::mq::tools::make_unique<bipc::named_mutex>(bipc::open_or_create, string("fmq_" + fShmId + "_mtx").c_str());
if (zmq_ctx_set(fContext, ZMQ_IO_THREADS, numIoThreads) != 0)
{
@@ -89,8 +90,8 @@ FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const Fai
LOG(error) << "failed configuring context, reason: " << zmq_strerror(errno);
}
fManager = fair::mq::tools::make_unique<Manager>(fSessionName, segmentSize);
LOG(debug) << "created/opened shared memory segment '" << "fmq_" << fSessionName << "_main" << "' of " << segmentSize << " bytes. Available are " << fManager->Segment().get_free_memory() << " bytes.";
fManager = fair::mq::tools::make_unique<Manager>(fShmId, segmentSize);
LOG(debug) << "created/opened shared memory segment '" << "fmq_" << fShmId << "_main" << "' of " << segmentSize << " bytes. Available are " << fManager->Segment().get_free_memory() << " bytes.";
{
bipc::scoped_lock<bipc::named_mutex> lock(*fShMutex);
@@ -145,8 +146,6 @@ FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const Fai
void FairMQTransportFactorySHM::StartMonitor()
{
int numTries = 0;
auto env = boost::this_process::environment();
vector<bfs::path> ownPath = boost::this_process::path();
@@ -160,8 +159,8 @@ void FairMQTransportFactorySHM::StartMonitor()
if (!p.empty())
{
boost::process::spawn(p, "-x", "-s", fSessionName, "-d", "-t", "2000", env);
boost::process::spawn(p, "-x", "--shmid", fShmId, "-d", "-t", "2000", env);
int numTries = 0;
do
{
MonitorStatus* monitorStatus = fManager->ManagementSegment().find<MonitorStatus>(bipc::unique_instance).first;
@@ -190,7 +189,7 @@ void FairMQTransportFactorySHM::StartMonitor()
void FairMQTransportFactorySHM::SendHeartbeats()
{
string controlQueueName("fmq_" + fSessionName + "_cq");
string controlQueueName("fmq_" + fShmId + "_cq");
while (fSendHeartbeats)
{
try
@@ -312,7 +311,7 @@ FairMQTransportFactorySHM::~FairMQTransportFactorySHM()
if (lastRemoved)
{
boost::interprocess::named_mutex::remove(string("fmq_" + fSessionName + "_mtx").c_str());
bipc::named_mutex::remove(string("fmq_" + fShmId + "_mtx").c_str());
}
}

View File

@@ -60,7 +60,7 @@ class FairMQTransportFactorySHM : public FairMQTransportFactory
static fair::mq::Transport fTransportType;
std::string fDeviceId;
std::string fSessionName;
std::string fShmId;
void* fContext;
std::thread fHeartbeatThread;
std::atomic<bool> fSendHeartbeats;

View File

@@ -105,7 +105,7 @@ void Manager::RemoveSegment()
{
if (bipc::shared_memory_object::remove(fSegmentName.c_str()))
{
LOG(debug) << "successfully removed " << fSegmentName << " segment after the device has stopped.";
LOG(debug) << "successfully removed '" << fSegmentName << "' segment after the device has stopped.";
}
else
{

View File

@@ -51,17 +51,17 @@ void signalHandler(int signal)
gSignalStatus = signal;
}
Monitor::Monitor(const string& sessionName, bool selfDestruct, bool interactive, unsigned int timeoutInMS, bool runAsDaemon, bool cleanOnExit)
Monitor::Monitor(const string& shmId, bool selfDestruct, bool interactive, unsigned int timeoutInMS, bool runAsDaemon, bool cleanOnExit)
: fSelfDestruct(selfDestruct)
, fInteractive(interactive)
, fSeenOnce(false)
, fIsDaemon(runAsDaemon)
, fCleanOnExit(cleanOnExit)
, fTimeoutInMS(timeoutInMS)
, fSessionName(sessionName)
, fSegmentName("fmq_" + fSessionName + "_main")
, fManagementSegmentName("fmq_" + fSessionName + "_mng")
, fControlQueueName("fmq_" + fSessionName + "_cq")
, fShmId(shmId)
, fSegmentName("fmq_" + fShmId + "_main")
, fManagementSegmentName("fmq_" + fShmId + "_mng")
, fControlQueueName("fmq_" + fShmId + "_cq")
, fTerminating(false)
, fHeartbeatTriggered(false)
, fLastHeartbeat(chrono::high_resolution_clock::now())
@@ -201,7 +201,7 @@ void Monitor::Interactive()
break;
case 'x':
cout << "\n[x] --> closing shared memory:" << endl;
Cleanup(fSessionName);
Cleanup(fShmId);
break;
case 'h':
cout << "\n[h] --> help:" << endl << endl;
@@ -245,11 +245,11 @@ void Monitor::Interactive()
void Monitor::CheckSegment()
{
static uint64_t counter = 0;
char c = '#';
if (fInteractive)
{
static uint64_t counter = 0;
int mod = counter++ % 5;
switch (mod)
{
@@ -293,7 +293,7 @@ void Monitor::CheckSegment()
if (fHeartbeatTriggered && duration > fTimeoutInMS)
{
cout << "no heartbeats since over " << fTimeoutInMS << " milliseconds, cleaning..." << endl;
Cleanup(fSessionName);
Cleanup(fShmId);
fHeartbeatTriggered = false;
if (fSelfDestruct)
{
@@ -340,7 +340,7 @@ void Monitor::CheckSegment()
if (fIsDaemon && duration > fTimeoutInMS * 2)
{
Cleanup(fSessionName);
Cleanup(fShmId);
fHeartbeatTriggered = false;
if (fSelfDestruct)
{
@@ -360,9 +360,9 @@ void Monitor::CheckSegment()
}
}
void Monitor::Cleanup(const string& sessionName)
void Monitor::Cleanup(const string& shmId)
{
string managementSegmentName("fmq_" + sessionName + "_mng");
string managementSegmentName("fmq_" + shmId + "_mng");
try
{
bipc::managed_shared_memory managementSegment(bipc::open_only, managementSegmentName.c_str());
@@ -373,8 +373,8 @@ void Monitor::Cleanup(const string& sessionName)
unsigned int regionCount = rc->fCount;
for (unsigned int i = 1; i <= regionCount; ++i)
{
RemoveObject("fmq_" + sessionName + "_rg_" + to_string(i));
RemoveQueue(string("fmq_" + sessionName + "_rgq_" + to_string(i)));
RemoveObject("fmq_" + shmId + "_rg_" + to_string(i));
RemoveQueue(string("fmq_" + shmId + "_rgq_" + to_string(i)));
}
}
else
@@ -389,9 +389,8 @@ void Monitor::Cleanup(const string& sessionName)
cout << "Did not find '" << managementSegmentName << "' shared memory segment. No regions to cleanup." << endl;
}
RemoveObject("fmq_" + sessionName + "_main");
boost::interprocess::named_mutex::remove(string("fmq_" + sessionName + "_mtx").c_str());
RemoveObject("fmq_" + shmId + "_main");
RemoveMutex("fmq_" + shmId + "_mtx");
cout << endl;
}
@@ -420,6 +419,18 @@ void Monitor::RemoveQueue(const string& name)
}
}
void Monitor::RemoveMutex(const string& name)
{
if (bipc::named_mutex::remove(name.c_str()))
{
cout << "Successfully removed \"" << name << "\"." << endl;
}
else
{
cout << "Did not remove \"" << name << "\". Already removed?" << endl;
}
}
void Monitor::PrintQueues()
{
cout << '\n';
@@ -427,7 +438,7 @@ void Monitor::PrintQueues()
try
{
bipc::managed_shared_memory segment(bipc::open_only, fSegmentName.c_str());
StringVector* queues = segment.find<StringVector>(string("fmq_" + fSessionName + "_qs").c_str()).first;
StringVector* queues = segment.find<StringVector>(string("fmq_" + fShmId + "_qs").c_str()).first;
if (queues)
{
cout << "found " << queues->size() << " queue(s):" << endl;
@@ -500,7 +511,7 @@ Monitor::~Monitor()
}
if (fCleanOnExit)
{
Cleanup(fSessionName);
Cleanup(fShmId);
}
}

View File

@@ -39,6 +39,7 @@ class Monitor
static void Cleanup(const std::string& sessionName);
static void RemoveObject(const std::string&);
static void RemoveQueue(const std::string&);
static void RemoveMutex(const std::string&);
private:
void PrintHeader();
@@ -55,7 +56,7 @@ class Monitor
bool fIsDaemon;
bool fCleanOnExit;
unsigned int fTimeoutInMS;
std::string fSessionName;
std::string fShmId;
std::string fSegmentName;
std::string fManagementSegmentName;
std::string fControlQueueName;
@@ -71,4 +72,4 @@ class Monitor
} // namespace mq
} // namespace fair
#endif /* FAIR_MQ_SHMEM_MONITOR_H_ */
#endif /* FAIR_MQ_SHMEM_MONITOR_H_ */

View File

@@ -12,8 +12,9 @@ The shared memory monitor tool, supplied with the shared memory transport can be
With default arguments the monitor will run indefinitely with no output, and clean up shared memory segment if it is open and no heartbeats from devices arrive within a timeout period. It can be further customized with following parameters:
`--session <arg>`: customize the name of the shared memory segment via the session name (default is "default").
`--session <arg>`: for which session to run the monitor (default is "default"). The actual ressource names will be built out of session id, user id (hashed and truncated).
`--cleanup`: start monitor, perform cleanup of the memory and quit.
`--shmid <arg>`: if provided, this shmem id will be used instead of the one generated from session id. Use this if you know the name of the shared memory ressource, but do not have the used session id.
`--self-destruct`: run until the memory segment is closed (either naturally via cleanup performed by devices or in case of a crash (no heartbeats within timeout)).
`--interactive`: run interactively, with detailed segment details and user input for various shmem operations.
`--timeout <arg>`: specifiy the timeout for the heartbeats from shmem transports in milliseconds (default 5000).
@@ -27,9 +28,9 @@ The Monitor class can also be used independently from the supplied executable (b
FairMQ Shared Memory currently uses following names to register shared memory on the system:
`fmq_<sessionName>_main` - main segment name, used for user data (session name can be overridden via `--session`).
`fmq_<sessionName>_mng` - management segment name, used for storing management data.
`fmq_<sessionName>_cq` - message queue for communicating between shm transport and shm monitor (exists independent of above segments).
`fmq_<sessionName>_mtx` - boost::interprocess::named_mutex for management purposes (exists independent of above segments).
`fmq_<sessionName>_rg_<index>` - names of unmanaged regions.
`fmq_<sessionName>_rgq_<index>` - names of queues for the unmanaged regions.
`fmq_<shmId>_main` - main segment name, used for user data (the shmId is generated out of session id and user id).
`fmq_<shmId>_mng` - management segment name, used for storing management data.
`fmq_<shmId>_cq` - message queue for communicating between shm transport and shm monitor (exists independent of above segments).
`fmq_<shmId>_mtx` - boost::interprocess::named_mutex for management purposes (exists independent of above segments).
`fmq_<shmId>_rg_<index>` - names of unmanaged regions.
`fmq_<shmId>_rgq_<index>` - names of queues for the unmanaged regions.

View File

@@ -1,76 +0,0 @@
################################################################################
# Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# #
# This software is distributed under the terms of the #
# GNU Lesser General Public Licence (LGPL) version 3, #
# copied verbatim in the file "LICENSE" #
################################################################################
configure_file(${CMAKE_SOURCE_DIR}/fairmq/shmem/prototype/shm-prototype.json
${CMAKE_BINARY_DIR}/bin/config/shm-prototype.json)
configure_file(${CMAKE_SOURCE_DIR}/fairmq/shmem/prototype/startShmPrototype.sh.in
${CMAKE_BINARY_DIR}/bin/prototype/shmem/startShmPrototype.sh)
Set(INCLUDE_DIRECTORIES
${CMAKE_SOURCE_DIR}/fairmq
${CMAKE_SOURCE_DIR}/fairmq/zeromq
${CMAKE_SOURCE_DIR}/fairmq/nanomsg
${CMAKE_SOURCE_DIR}/fairmq/devices
${CMAKE_SOURCE_DIR}/fairmq/tools
${CMAKE_SOURCE_DIR}/fairmq/options
${CMAKE_SOURCE_DIR}/fairmq/shmem/prototype
${CMAKE_CURRENT_BINARY_DIR}
)
Set(SYSTEM_INCLUDE_DIRECTORIES
${Boost_INCLUDE_DIR}
${ZeroMQ_INCLUDE_DIR}
)
Include_Directories(${INCLUDE_DIRECTORIES})
Include_Directories(SYSTEM ${SYSTEM_INCLUDE_DIRECTORIES})
Set(LINK_DIRECTORIES
${Boost_LIBRARY_DIRS}
)
Link_Directories(${LINK_DIRECTORIES})
Set(SRCS
"FairMQShmPrototypeSampler.cxx"
"FairMQShmPrototypeSink.cxx"
)
Set(DEPENDENCIES
${DEPENDENCIES}
${Boost_INTERPROCESS_LIBRARY}
FairMQ
)
Set(LIBRARY_NAME FairMQShmPrototype)
GENERATE_LIBRARY()
Set(Exe_Names
shm-prototype-sampler
shm-prototype-sink
)
Set(Exe_Source
runShmPrototypeSampler.cxx
runShmPrototypeSink.cxx
)
list(LENGTH Exe_Names _length)
math(EXPR _length ${_length}-1)
set(EXECUTABLE_OUTPUT_PATH "${EXECUTABLE_OUTPUT_PATH}/prototype/shmem")
ForEach(_file RANGE 0 ${_length})
list(GET Exe_Names ${_file} _name)
list(GET Exe_Source ${_file} _src)
set(EXE_NAME ${_name})
set(SRCS ${_src})
set(DEPENDENCIES FairMQShmPrototype)
GENERATE_EXECUTABLE()
EndForEach(_file RANGE 0 ${_length})

View File

@@ -1,227 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQShmPrototypeSampler.cpp
*
* @since 2016-04-08
* @author A. Rybalchenko
*/
#include <string>
#include <thread>
#include <chrono>
#include <iomanip>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/smart_ptr/shared_ptr.hpp>
#include "FairMQShmPrototypeSampler.h"
#include "FairMQProgOptions.h"
#include "FairMQLogger.h"
#include "ShmChunk.h"
using namespace std;
using namespace boost::interprocess;
FairMQShmPrototypeSampler::FairMQShmPrototypeSampler()
: fMsgSize(10000)
, fMsgCounter(0)
, fMsgRate(1)
, fBytesOut(0)
, fMsgOut(0)
, fBytesOutNew(0)
, fMsgOutNew(0)
{
if (shared_memory_object::remove("FairMQSharedMemoryPrototype"))
{
LOG(info) << "Successfully removed shared memory upon device start.";
}
else
{
LOG(info) << "Did not remove shared memory upon device start.";
}
}
FairMQShmPrototypeSampler::~FairMQShmPrototypeSampler()
{
if (shared_memory_object::remove("FairMQSharedMemoryPrototype"))
{
LOG(info) << "Successfully removed shared memory after the device has stopped.";
}
else
{
LOG(info) << "Did not remove shared memory after the device stopped. Still in use?";
}
}
void FairMQShmPrototypeSampler::Init()
{
fMsgSize = fConfig->GetValue<int>("msg-size");
fMsgRate = fConfig->GetValue<int>("msg-rate");
SegmentManager::Instance().InitializeSegment("open_or_create", "FairMQSharedMemoryPrototype", 2000000000);
LOG(info) << "Created/Opened shared memory segment of 2,000,000,000 bytes. Available are "
<< SegmentManager::Instance().Segment()->get_free_memory() << " bytes.";
}
void FairMQShmPrototypeSampler::Run()
{
// count sent messages (also used in creating ShmChunk container ID)
static uint64_t numSentMsgs = 0;
LOG(info) << "Starting the benchmark with message size of " << fMsgSize;
// start rate logger and acknowledgement listener in separate threads
thread rateLogger(&FairMQShmPrototypeSampler::Log, this, 1000);
// thread resetMsgCounter(&FairMQShmPrototypeSampler::ResetMsgCounter, this);
// int charnum = 97;
while (CheckCurrentState(RUNNING))
{
void* ptr = nullptr;
bipc::managed_shared_memory::handle_t handle;
while (!ptr)
{
try
{
ptr = SegmentManager::Instance().Segment()->allocate(fMsgSize);
}
catch (bipc::bad_alloc& ba)
{
this_thread::sleep_for(chrono::milliseconds(50));
if (CheckCurrentState(RUNNING))
{
continue;
}
else
{
break;
}
}
}
// // ShmChunk container ID
// string chunkID = "c" + to_string(numSentMsgs);
// // shared pointer ID
// string ownerID = "o" + to_string(numSentMsgs);
// ShPtrOwner* owner = nullptr;
// try
// {
// owner = SegmentManager::Instance().Segment()->construct<ShPtrOwner>(ownerID.c_str())(
// make_managed_shared_ptr(SegmentManager::Instance().Segment()->construct<ShmChunk>(chunkID.c_str())(fMsgSize),
// *(SegmentManager::Instance().Segment())));
// }
// catch (bipc::bad_alloc& ba)
// {
// LOG(warn) << "Shared memory full...";
// this_thread::sleep_for(chrono::milliseconds(100));
// continue;
// }
// void* ptr = owner->fPtr->GetData();
// write something to memory, otherwise only (incomplete) allocation will be measured
// memset(ptr, 0, fMsgSize);
// static_cast<char*>(ptr)[3] = charnum++;
// if (charnum == 123)
// {
// charnum = 97;
// }
// LOG(debug) << "chunk handle: " << owner->fPtr->GetHandle();
// LOG(debug) << "chunk size: " << owner->fPtr->GetSize();
// LOG(debug) << "owner (" << ownerID << ") use count: " << owner->fPtr.use_count();
// char* cptr = static_cast<char*>(ptr);
// LOG(debug) << "check: " << cptr[3];
// FairMQMessagePtr msg(NewSimpleMessage(ownerID));
if (ptr)
{
handle = SegmentManager::Instance().Segment()->get_handle_from_address(ptr);
FairMQMessagePtr msg(NewMessage(sizeof(ExMetaHeader)));
ExMetaHeader* metaPtr = new(msg->GetData()) ExMetaHeader();
metaPtr->fSize = fMsgSize;
metaPtr->fHandle = handle;
// LOG(info) << metaPtr->fSize;
// LOG(info) << metaPtr->fHandle;
// LOG(warn) << ptr;
if (Send(msg, "meta", 0) > 0)
{
fBytesOutNew += fMsgSize;
++fMsgOutNew;
++numSentMsgs;
}
else
{
SegmentManager::Instance().Segment()->deallocate(ptr);
// SegmentManager::Instance().Segment()->destroy_ptr(owner);
}
}
// --fMsgCounter;
// while (fMsgCounter == 0)
// {
// this_thread::sleep_for(chrono::milliseconds(1));
// }
}
LOG(info) << "Sent " << numSentMsgs << " messages, leaving RUNNING state.";
rateLogger.join();
// resetMsgCounter.join();
}
void FairMQShmPrototypeSampler::Log(const int intervalInMs)
{
chrono::time_point<chrono::high_resolution_clock> t0 = chrono::high_resolution_clock::now();
chrono::time_point<chrono::high_resolution_clock> t1;
unsigned long long msSinceLastLog;
double mbPerSecOut = 0;
double msgPerSecOut = 0;
while (CheckCurrentState(RUNNING))
{
t1 = chrono::high_resolution_clock::now();
msSinceLastLog = chrono::duration_cast<chrono::milliseconds>(t1 - t0).count();
mbPerSecOut = (static_cast<double>(fBytesOutNew - fBytesOut) / (1024. * 1024.)) / static_cast<double>(msSinceLastLog) * 1000.;
fBytesOut = fBytesOutNew;
msgPerSecOut = static_cast<double>(fMsgOutNew - fMsgOut) / static_cast<double>(msSinceLastLog) * 1000.;
fMsgOut = fMsgOutNew;
LOG(debug) << fixed
<< setprecision(0) << "out: " << msgPerSecOut << " msg ("
<< setprecision(2) << mbPerSecOut << " MB)\t("
<< SegmentManager::Instance().Segment()->get_free_memory() / (1024. * 1024.) << " MB free)";
t0 = t1;
this_thread::sleep_for(chrono::milliseconds(intervalInMs));
}
}
void FairMQShmPrototypeSampler::ResetMsgCounter()
{
while (CheckCurrentState(RUNNING))
{
fMsgCounter = fMsgRate / 100;
this_thread::sleep_for(chrono::milliseconds(10));
}
}

View File

@@ -1,45 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQShmPrototypeSampler.h
*
* @since 2016-04-08
* @author A. Rybalchenko
*/
#ifndef FAIRMQSHMPROTOTYPESAMPLER_H_
#define FAIRMQSHMPROTOTYPESAMPLER_H_
#include <atomic>
#include "FairMQDevice.h"
class FairMQShmPrototypeSampler : public FairMQDevice
{
public:
FairMQShmPrototypeSampler();
virtual ~FairMQShmPrototypeSampler();
void Log(const int intervalInMs);
void ResetMsgCounter();
protected:
unsigned int fMsgSize;
unsigned int fMsgCounter;
unsigned int fMsgRate;
unsigned long long fBytesOut;
unsigned long long fMsgOut;
std::atomic<unsigned long long> fBytesOutNew;
std::atomic<unsigned long long> fMsgOutNew;
virtual void Init();
virtual void Run();
};
#endif /* FAIRMQSHMPROTOTYPESAMPLER_H_ */

View File

@@ -1,145 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQShmPrototypeSink.cxx
*
* @since 2016-04-08
* @author A. Rybalchenko
*/
#include <string>
#include <thread>
#include <chrono>
#include <iomanip>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/smart_ptr/shared_ptr.hpp>
#include "FairMQShmPrototypeSink.h"
#include "FairMQProgOptions.h"
#include "FairMQLogger.h"
#include "ShmChunk.h"
using namespace std;
using namespace boost::interprocess;
FairMQShmPrototypeSink::FairMQShmPrototypeSink()
: fBytesIn(0)
, fMsgIn(0)
, fBytesInNew(0)
, fMsgInNew(0)
{
}
FairMQShmPrototypeSink::~FairMQShmPrototypeSink()
{
}
void FairMQShmPrototypeSink::Init()
{
SegmentManager::Instance().InitializeSegment("open_or_create", "FairMQSharedMemoryPrototype", 2000000000);
LOG(info) << "Created/Opened shared memory segment of 2,000,000,000 bytes. Available are "
<< SegmentManager::Instance().Segment()->get_free_memory() << " bytes.";
}
void FairMQShmPrototypeSink::Run()
{
static uint64_t numReceivedMsgs = 0;
thread rateLogger(&FairMQShmPrototypeSink::Log, this, 1000);
while (CheckCurrentState(RUNNING))
{
FairMQMessagePtr msg(NewMessage());
if (Receive(msg, "meta") > 0)
{
ExMetaHeader* hdr = static_cast<ExMetaHeader*>(msg->GetData());
size_t size = hdr->fSize;
bipc::managed_shared_memory::handle_t handle = hdr->fHandle;
void* ptr = SegmentManager::Instance().Segment()->get_address_from_handle(handle);
// LOG(info) << size;
// LOG(info) << handle;
// LOG(warn) << ptr;
fBytesInNew += size;
++fMsgInNew;
SegmentManager::Instance().Segment()->deallocate(ptr);
// get the shared pointer ID from the received message
// string ownerID(static_cast<char*>(msg->GetData()), msg->GetSize());
// find the shared pointer in shared memory with its ID
// ShPtrOwner* owner = SegmentManager::Instance().Segment()->find<ShPtrOwner>(ownerID.c_str()).first;
// LOG(debug) << "owner (" << ownerID << ") use count: " << owner->fPtr.use_count();
// if (owner)
// {
// // void* ptr = owner->fPtr->GetData();
// // LOG(debug) << "chunk handle: " << owner->fPtr->GetHandle();
// // LOG(debug) << "chunk size: " << owner->fPtr->GetSize();
// fBytesInNew += owner->fPtr->GetSize();
// ++fMsgInNew;
// // char* cptr = static_cast<char*>(ptr);
// // LOG(debug) << "check: " << cptr[3];
// SegmentManager::Instance().Segment()->deallocate(ptr);
// // SegmentManager::Instance().Segment()->destroy_ptr(owner);
// }
// else
// {
// LOG(warn) << "Shared pointer is zero.";
// }
++numReceivedMsgs;
}
}
LOG(info) << "Received " << numReceivedMsgs << " messages, leaving RUNNING state.";
rateLogger.join();
}
void FairMQShmPrototypeSink::Log(const int intervalInMs)
{
chrono::time_point<chrono::high_resolution_clock> t0 = chrono::high_resolution_clock::now();
chrono::time_point<chrono::high_resolution_clock> t1;
unsigned long long msSinceLastLog;
double mbPerSecIn = 0;
double msgPerSecIn = 0;
while (CheckCurrentState(RUNNING))
{
t1 = chrono::high_resolution_clock::now();
msSinceLastLog = chrono::duration_cast<chrono::milliseconds>(t1 - t0).count();
mbPerSecIn = (static_cast<double>(fBytesInNew - fBytesIn) / (1024. * 1024.)) / static_cast<double>(msSinceLastLog) * 1000.;
fBytesIn = fBytesInNew;
msgPerSecIn = static_cast<double>(fMsgInNew - fMsgIn) / static_cast<double>(msSinceLastLog) * 1000.;
fMsgIn = fMsgInNew;
LOG(debug) << fixed
<< setprecision(0) << "in: " << msgPerSecIn << " msg ("
<< setprecision(2) << mbPerSecIn << " MB)\t("
<< SegmentManager::Instance().Segment()->get_free_memory() / (1024. * 1024.) << " MB free)";
t0 = t1;
this_thread::sleep_for(chrono::milliseconds(intervalInMs));
}
}

View File

@@ -1,40 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQShmPrototypeSink.h
*
* @since 2016-04-08
* @author A. Rybalchenko
*/
#ifndef FAIRMQSHMPROTOTYPESINK_H_
#define FAIRMQSHMPROTOTYPESINK_H_
#include <atomic>
#include "FairMQDevice.h"
class FairMQShmPrototypeSink : public FairMQDevice
{
public:
FairMQShmPrototypeSink();
virtual ~FairMQShmPrototypeSink();
void Log(const int intervalInMs);
protected:
unsigned long long fBytesIn;
unsigned long long fMsgIn;
std::atomic<unsigned long long> fBytesInNew;
std::atomic<unsigned long long> fMsgInNew;
virtual void Init();
virtual void Run();
};
#endif /* FAIRMQSHMPROTOTYPESINK_H_ */

View File

@@ -1,182 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* ShmChunk.h
*
* @since 2016-04-08
* @author A. Rybalchenko
*/
#ifndef SHMCHUNK_H_
#define SHMCHUNK_H_
#include <thread>
#include <chrono>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/smart_ptr/shared_ptr.hpp>
#include "FairMQLogger.h"
namespace bipc = boost::interprocess;
class SegmentManager
{
public:
static SegmentManager& Instance()
{
static SegmentManager man;
return man;
}
void InitializeSegment(const std::string& op, const std::string& name, const size_t size = 0)
{
if (!fSegment)
{
try
{
if (op == "open_or_create")
{
fSegment = new bipc::managed_shared_memory(bipc::open_or_create, name.c_str(), size);
}
else if (op == "create_only")
{
fSegment = new bipc::managed_shared_memory(bipc::create_only, name.c_str(), size);
}
else if (op == "open_only")
{
int numTries = 0;
bool success = false;
do
{
try
{
fSegment = new bipc::managed_shared_memory(bipc::open_only, name.c_str());
success = true;
}
catch (bipc::interprocess_exception& ie)
{
if (++numTries == 5)
{
LOG(error) << "Could not open shared memory after " << numTries << " attempts, exiting!";
exit(EXIT_FAILURE);
}
else
{
LOG(debug) << "Could not open shared memory segment on try " << numTries << ". Retrying in 1 second...";
LOG(debug) << ie.what();
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
}
}
while (!success);
}
else
{
LOG(error) << "Unknown operation when initializing shared memory segment: " << op;
}
}
catch (std::exception& e)
{
LOG(error) << "Exception during shared memory segment initialization: " << e.what() << ", application will now exit";
exit(EXIT_FAILURE);
}
}
else
{
LOG(info) << "Segment already initialized";
}
}
bipc::managed_shared_memory* Segment() const
{
if (fSegment)
{
return fSegment;
}
else
{
LOG(error) << "Segment not initialized";
exit(EXIT_FAILURE);
}
}
private:
SegmentManager()
: fSegment(nullptr)
{}
bipc::managed_shared_memory* fSegment;
};
struct alignas(16) ExMetaHeader
{
uint64_t fSize;
bipc::managed_shared_memory::handle_t fHandle;
};
// class ShmChunk
// {
// public:
// ShmChunk()
// : fHandle()
// , fSize(0)
// {
// }
// ShmChunk(const size_t size)
// : fHandle()
// , fSize(size)
// {
// void* ptr = SegmentManager::Instance().Segment()->allocate(size);
// fHandle = SegmentManager::Instance().Segment()->get_handle_from_address(ptr);
// }
// ~ShmChunk()
// {
// SegmentManager::Instance().Segment()->deallocate(SegmentManager::Instance().Segment()->get_address_from_handle(fHandle));
// }
// bipc::managed_shared_memory::handle_t GetHandle() const
// {
// return fHandle;
// }
// void* GetData() const
// {
// return SegmentManager::Instance().Segment()->get_address_from_handle(fHandle);
// }
// size_t GetSize() const
// {
// return fSize;
// }
// private:
// bipc::managed_shared_memory::handle_t fHandle;
// size_t fSize;
// };
// typedef bipc::managed_shared_ptr<ShmChunk, bipc::managed_shared_memory>::type ShPtrType;
// struct ShPtrOwner
// {
// ShPtrOwner(const ShPtrType& other)
// : fPtr(other)
// {}
// ShPtrOwner(const ShPtrOwner& other)
// : fPtr(other.fPtr)
// {}
// ShPtrType fPtr;
// };
#endif /* SHMCHUNK_H_ */

View File

@@ -1,24 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "FairMQShmPrototypeSampler.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("msg-size", bpo::value<int>()->default_value(1000), "Message size in bytes")
("msg-rate", bpo::value<int>()->default_value(0), "Msg rate limit in maximum number of messages per second");
}
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
{
return new FairMQShmPrototypeSampler();
}

View File

@@ -1,21 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "FairMQShmPrototypeSink.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& /*options*/)
{
}
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
{
return new FairMQShmPrototypeSink();
}

View File

@@ -1,34 +0,0 @@
{
"fairMQOptions": {
"devices": [
{
"id": "sampler1",
"channels": [
{
"name": "meta",
"type": "push",
"method": "bind",
"address": "tcp://127.0.0.1:5555",
"sndBufSize": 10,
"rcvBufSize": 10,
"rateLogging": 0
}
]
},
{
"id": "sink1",
"channels": [
{
"name": "meta",
"type": "pull",
"method": "connect",
"address": "tcp://127.0.0.1:5555",
"sndBufSize": 10,
"rcvBufSize": 10,
"rateLogging": 0
}
]
}
]
}
}

View File

@@ -1,49 +0,0 @@
#!/bin/bash
msgSize="1000000"
transport="zeromq"
if [[ $1 =~ ^[0-9]+$ ]]; then
msgSize=$1
fi
echo "Starting shared memory example with message size of $msgSize bytes."
echo ""
echo "Usage: startShmPrototype [message size=1000000]"
SAMPLER="shm-prototype-sampler"
SAMPLER+=" --id sampler1"
SAMPLER+=" --transport $transport"
# SAMPLER+=" --severity TRACE"
SAMPLER+=" --msg-size $msgSize"
# SAMPLER+=" --msg-rate 1000"
SAMPLER+=" --mq-config @CMAKE_BINARY_DIR@/bin/config/shm-prototype.json"
xterm -geometry 80x32+0+0 -hold -e @CMAKE_BINARY_DIR@/bin/prototype/shmem/$SAMPLER &
SINK1="shm-prototype-sink"
SINK1+=" --id sink1"
SINK1+=" --transport $transport"
# SINK1+=" --severity TRACE"
SINK1+=" --mq-config @CMAKE_BINARY_DIR@/bin/config/shm-prototype.json"
xterm -geometry 80x32+500+0 -hold -e @CMAKE_BINARY_DIR@/bin/prototype/shmem/$SINK1 &
# SINK2="shm-prototype-sink"
# SINK2+=" --id sink2"
# SINK2+=" --transport $transport"
# # SINK2+=" --severity TRACE"
# SINK2+=" --mq-config @CMAKE_BINARY_DIR@/bin/config/shm-prototype.json"
# xterm -geometry 80x32+500+500 -hold -e @CMAKE_BINARY_DIR@/bin/prototype/shmem/$SINK2 &
# SINK3="shm-prototype-sink"
# SINK3+=" --id sink3"
# SINK3+=" --transport $transport"
# # SINK3+=" --severity TRACE"
# SINK3+=" --mq-config @CMAKE_BINARY_DIR@/bin/config/shm-prototype.json"
# xterm -geometry 80x32+1000+0 -hold -e @CMAKE_BINARY_DIR@/bin/prototype/shmem/$SINK3 &
# SINK4="shm-prototype-sink"
# SINK4+=" --id sink4"
# SINK4+=" --transport $transport"
# # SINK4+=" --severity TRACE"
# SINK4+=" --mq-config @CMAKE_BINARY_DIR@/bin/config/shm-prototype.json"
# xterm -geometry 80x32+1000+500 -hold -e @CMAKE_BINARY_DIR@/bin/prototype/shmem/$SINK4 &

View File

@@ -6,6 +6,7 @@
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <fairmq/shmem/Monitor.h>
#include <fairmq/shmem/Common.h>
#include <boost/program_options.hpp>
@@ -20,6 +21,7 @@
using namespace std;
using namespace boost::program_options;
using namespace fair::mq::shmem;
static void daemonize()
{
@@ -68,16 +70,18 @@ int main(int argc, char** argv)
try
{
string sessionName;
string shmId;
bool cleanup = false;
bool selfDestruct = false;
bool interactive = false;
unsigned int timeoutInMS;
unsigned int timeoutInMS = 5000;
bool runAsDaemon = false;
bool cleanOnExit = false;
options_description desc("Options");
desc.add_options()
("session,s", value<string>(&sessionName)->default_value("default"), "Name of the session which to monitor")
("session,s", value<string>(&sessionName)->default_value("default"), "session id which to monitor")
("shmid", value<string>(&shmId)->default_value(""), "Shmem Id to monitor (if not provided, it is generated out of session id and user id)")
("cleanup,c", value<bool>(&cleanup)->implicit_value(true), "Perform cleanup and quit")
("self-destruct,x", value<bool>(&selfDestruct)->implicit_value(true), "Quit after first closing of the memory")
("interactive,i", value<bool>(&interactive)->implicit_value(true), "Interactive run")
@@ -97,24 +101,27 @@ int main(int argc, char** argv)
notify(vm);
sessionName.resize(8, '_'); // shorten the session name, to accommodate for name size limit on some systems (MacOS)
if (runAsDaemon)
{
daemonize();
}
if (shmId == "")
{
shmId = buildShmIdFromSessionIdAndUserId(sessionName);
}
if (cleanup)
{
cout << "Cleaning up \"" << sessionName << "\"..." << endl;
fair::mq::shmem::Monitor::Cleanup(sessionName);
fair::mq::shmem::Monitor::RemoveQueue("fmq_" + sessionName + "_cq");
cout << "Cleaning up \"" << shmId << "\"..." << endl;
Monitor::Cleanup(shmId);
Monitor::RemoveQueue("fmq_" + shmId + "_cq");
return 0;
}
cout << "Starting shared memory monitor for session: \"" << sessionName << "\"..." << endl;
cout << "Starting shared memory monitor for session: \"" << sessionName << "\" (shmId: " << shmId << ")..." << endl;
fair::mq::shmem::Monitor monitor{sessionName, selfDestruct, interactive, timeoutInMS, runAsDaemon, cleanOnExit};
Monitor monitor{shmId, selfDestruct, interactive, timeoutInMS, runAsDaemon, cleanOnExit};
monitor.CatchSignals();
monitor.Run();

View File

@@ -12,6 +12,7 @@
#include <iostream>
#include <sstream>
#include <thread>
using namespace std;
@@ -30,23 +31,33 @@ namespace tools
* @param[in] log_prefix How to prefix each captured output line with
* @return Captured stdout output and exit code
*/
execute_result execute(string cmd, string prefix)
execute_result execute(const string& cmd, const string& prefix, const string& input)
{
execute_result result;
stringstream out;
// print full line thread-safe
stringstream printCmd;
printCmd << prefix << cmd << "\n";
printCmd << prefix << " " << cmd << "\n";
cout << printCmd.str() << flush;
out << prefix << cmd << endl;
// Execute command and capture stdout, add prefix line by line
boost::process::ipstream stdout;
boost::process::child c(cmd, boost::process::std_out > stdout);
boost::process::ipstream c_stdout;
boost::process::opstream c_stdin;
boost::process::child c(
cmd, boost::process::std_out > c_stdout, boost::process::std_in < c_stdin);
// Optionally, write to stdin of the child
if (input != "") {
this_thread::sleep_for(chrono::milliseconds(100));
c_stdin << input;
c_stdin.flush();
}
string line;
while (getline(stdout, line))
while (getline(c_stdout, line))
{
// print full line thread-safe
stringstream printLine;

View File

@@ -32,10 +32,13 @@ struct execute_result
* and exit code.
*
* @param[in] cmd Command to execute
* @param[in] log_prefix How to prefix each captured output line with
* @param[in] prefix How to prefix each captured output line with
* @param[in] input Data which is sent to stdin of the child process
* @return Captured stdout output and exit code
*/
execute_result execute(std::string cmd, std::string prefix = "");
execute_result execute(const std::string& cmd,
const std::string& prefix = "",
const std::string& input = "");
} /* namespace tools */
} /* namespace mq */

View File

@@ -69,10 +69,9 @@ FairMQPollerZMQ::FairMQPollerZMQ(const unordered_map<string, vector<FairMQChanne
, fNumItems(0)
, fOffsetMap()
{
int offset = 0;
try
{
int offset = 0;
// calculate offsets and the total size of the poll item set
for (string channel : channelList)
{
@@ -190,7 +189,7 @@ bool FairMQPollerZMQ::CheckOutput(const int index)
return false;
}
bool FairMQPollerZMQ::CheckInput(const string channelKey, const int index)
bool FairMQPollerZMQ::CheckInput(const string& channelKey, const int index)
{
try
{
@@ -209,7 +208,7 @@ bool FairMQPollerZMQ::CheckInput(const string channelKey, const int index)
}
}
bool FairMQPollerZMQ::CheckOutput(const string channelKey, const int index)
bool FairMQPollerZMQ::CheckOutput(const string& channelKey, const int index)
{
try
{

View File

@@ -45,8 +45,8 @@ class FairMQPollerZMQ : public FairMQPoller
virtual void Poll(const int timeout);
virtual bool CheckInput(const int index);
virtual bool CheckOutput(const int index);
virtual bool CheckInput(const std::string channelKey, const int index);
virtual bool CheckOutput(const std::string channelKey, const int index);
virtual bool CheckInput(const std::string& channelKey, const int index);
virtual bool CheckOutput(const std::string& channelKey, const int index);
virtual ~FairMQPollerZMQ();

View File

@@ -116,14 +116,13 @@ int64_t FairMQSocketZMQ::TryReceive(vector<unique_ptr<FairMQMessage>>& msgVec) {
int FairMQSocketZMQ::SendImpl(FairMQMessagePtr& msg, const int flags, const int timeout)
{
int nbytes = -1;
int elapsed = 0;
static_cast<FairMQMessageZMQ*>(msg.get())->ApplyUsedSize();
while (true)
{
nbytes = zmq_msg_send(static_cast<FairMQMessageZMQ*>(msg.get())->GetMessage(), fSocket, flags);
int nbytes = zmq_msg_send(static_cast<FairMQMessageZMQ*>(msg.get())->GetMessage(), fSocket, flags);
if (nbytes >= 0)
{
fBytesTx += nbytes;
@@ -212,25 +211,21 @@ int FairMQSocketZMQ::ReceiveImpl(FairMQMessagePtr& msg, const int flags, const i
int64_t FairMQSocketZMQ::SendImpl(vector<FairMQMessagePtr>& msgVec, const int flags, const int timeout)
{
const unsigned int vecSize = msgVec.size();
int elapsed = 0;
// Sending vector typicaly handles more then one part
if (vecSize > 1)
{
int64_t totalSize = 0;
int nbytes = -1;
bool repeat = false;
int elapsed = 0;
while (true)
{
totalSize = 0;
repeat = false;
int64_t totalSize = 0;
bool repeat = false;
for (unsigned int i = 0; i < vecSize; ++i)
{
static_cast<FairMQMessageZMQ*>(msgVec[i].get())->ApplyUsedSize();
nbytes = zmq_msg_send(static_cast<FairMQMessageZMQ*>(msgVec[i].get())->GetMessage(),
int nbytes = zmq_msg_send(static_cast<FairMQMessageZMQ*>(msgVec[i].get())->GetMessage(),
fSocket,
(i < vecSize - 1) ? ZMQ_SNDMORE|flags : flags);
if (nbytes >= 0)
@@ -294,16 +289,13 @@ int64_t FairMQSocketZMQ::SendImpl(vector<FairMQMessagePtr>& msgVec, const int fl
int64_t FairMQSocketZMQ::ReceiveImpl(vector<FairMQMessagePtr>& msgVec, const int flags, const int timeout)
{
int64_t totalSize = 0;
int64_t more = 0;
bool repeat = false;
int elapsed = 0;
while (true)
{
totalSize = 0;
more = 0;
repeat = false;
int64_t totalSize = 0;
int64_t more = 0;
bool repeat = false;
do
{

View File

@@ -27,6 +27,7 @@ add_testhelper(runTestDevice
helper/devices/TestSub.cxx
helper/devices/TestTransferTimeout.cxx
helper/devices/TestWaitFor.cxx
helper/devices/TestExceptions.cxx
LINKS FairMQ
)
@@ -95,13 +96,14 @@ add_testsuite(FairMQ.Device
device/_device_version.cxx
device/_device_config.cxx
device/_waitfor.cxx
device/_exceptions.cxx
LINKS FairMQ
DEPENDS testhelper_runTestDevice
INCLUDES ${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/device
${CMAKE_CURRENT_BINARY_DIR}
TIMEOUT 5
TIMEOUT 15
RUN_SERIAL ON
)

125
test/device/_exceptions.cxx Normal file
View File

@@ -0,0 +1,125 @@
/********************************************************************************
* Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runner.h"
#include <gtest/gtest.h>
#include <boost/process.hpp>
#include <fairmq/Tools.h>
#include <string>
#include <thread>
#include <iostream>
namespace
{
using namespace std;
using namespace fair::mq::test;
using namespace fair::mq::tools;
void RunExceptionIn(const std::string& state, const std::string& input = "")
{
size_t session{fair::mq::tools::UuidHash()};
execute_result result{"", 100};
thread device_thread([&]() {
stringstream cmd;
cmd << runTestDevice
<< " --id exceptions_" << state << "_"
<< " --control " << ((input == "") ? "static" : "interactive")
<< " --session " << session
<< " --color false";
result = execute(cmd.str(), "[EXCEPTION IN " + state + "]", input);
});
device_thread.join();
ASSERT_NE(std::string::npos, result.console_out.find("exception in " + state + "()"));
exit(result.exit_code);
}
TEST(Exceptions, InInit_______static)
{
EXPECT_EXIT(RunExceptionIn("Init"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, InInitTask___static)
{
EXPECT_EXIT(RunExceptionIn("InitTask"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, InPreRun_____static)
{
EXPECT_EXIT(RunExceptionIn("PreRun"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, InRun________static)
{
EXPECT_EXIT(RunExceptionIn("Run"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, InPostRun____static)
{
EXPECT_EXIT(RunExceptionIn("PostRun"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, InResetTask__static)
{
EXPECT_EXIT(RunExceptionIn("ResetTask"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, InReset______static)
{
EXPECT_EXIT(RunExceptionIn("Reset"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, InInit_______interactive)
{
EXPECT_EXIT(RunExceptionIn("Init", "q"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, InInitTask___interactive)
{
EXPECT_EXIT(RunExceptionIn("InitTask", "q"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, InPreRun_____interactive)
{
EXPECT_EXIT(RunExceptionIn("PreRun", "q"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, InRun________interactive)
{
EXPECT_EXIT(RunExceptionIn("Run", "q"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, InPostRun____interactive)
{
EXPECT_EXIT(RunExceptionIn("PostRun", "q"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, InResetTask__interactive)
{
EXPECT_EXIT(RunExceptionIn("ResetTask", "q"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, InReset______interactive)
{
EXPECT_EXIT(RunExceptionIn("Reset", "q"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, InInit_______interactive_invalid)
{
EXPECT_EXIT(RunExceptionIn("Init", "_"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, InInitTask___interactive_invalid)
{
EXPECT_EXIT(RunExceptionIn("InitTask", "_"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, InPreRun_____interactive_invalid)
{
EXPECT_EXIT(RunExceptionIn("PreRun", "_"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, InRun________interactive_invalid)
{
EXPECT_EXIT(RunExceptionIn("Run", "_"), ::testing::ExitedWithCode(1), "");
}
TEST(Exceptions, InPostRun____interactive_invalid)
{
EXPECT_EXIT(RunExceptionIn("PostRun", "_"), ::testing::ExitedWithCode(1), "");
}
} // namespace

View File

@@ -0,0 +1,84 @@
/********************************************************************************
* Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <FairMQDevice.h>
#include <FairMQLogger.h>
#include <iostream>
#include <stdexcept>
namespace fair
{
namespace mq
{
namespace test
{
class TestExceptions : public FairMQDevice
{
public:
auto Init() -> void override
{
std::string state("Init");
if (std::string::npos != GetId().find("_" + state + "_")) {
throw std::runtime_error("exception in " + state + "()");
}
}
auto InitTask() -> void override
{
std::string state("InitTask");
if (std::string::npos != GetId().find("_" + state + "_")) {
throw std::runtime_error("exception in " + state + "()");
}
}
auto PreRun() -> void override
{
std::string state("PreRun");
if (std::string::npos != GetId().find("_" + state + "_")) {
throw std::runtime_error("exception in " + state + "()");
}
}
auto Run() -> void override
{
std::string state("Run");
if (std::string::npos != GetId().find("_" + state + "_")) {
throw std::runtime_error("exception in " + state + "()");
}
}
auto PostRun() -> void override
{
std::string state("PostRun");
if (std::string::npos != GetId().find("_" + state + "_")) {
throw std::runtime_error("exception in " + state + "()");
}
}
auto ResetTask() -> void override
{
std::string state("ResetTask");
if (std::string::npos != GetId().find("_" + state + "_")) {
throw std::runtime_error("exception in " + state + "()");
}
}
auto Reset() -> void override
{
std::string state("Reset");
if (std::string::npos != GetId().find("_" + state + "_")) {
throw std::runtime_error("exception in " + state + "()");
}
}
};
} // namespace test
} // namespace mq
} // namespace fair

View File

@@ -18,6 +18,7 @@
#include "devices/TestSub.cxx"
#include "devices/TestTransferTimeout.cxx"
#include "devices/TestWaitFor.cxx"
#include "devices/TestExceptions.cxx"
#include <runFairMQDevice.h>
@@ -87,6 +88,10 @@ auto getDevice(const FairMQProgOptions& config) -> FairMQDevicePtr
{
return new TestWaitFor;
}
else if (0 == id.find("exceptions_"))
{
return new TestExceptions;
}
else
{
cerr << "Don't know id '" << id << "'" << endl;