mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-15 17:41:45 +00:00
Compare commits
13 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
49d8a1b4dd | ||
|
f14f507584 | ||
|
8dd0b25c06 | ||
|
7edf436919 | ||
|
0e5978b160 | ||
|
71b1866d7b | ||
|
6699711e17 | ||
|
120760da0a | ||
|
d03a504ccd | ||
|
cf004f69b2 | ||
|
cfa18ccfce | ||
|
e332e20dbd | ||
|
3ab10ced7a |
@@ -8,5 +8,6 @@ Lebedev, Andrey
|
||||
Mrnjavac, Teo <teo.m@cern.ch>
|
||||
Neskovic, Gvozden
|
||||
Richter, Matthias
|
||||
Tacke, Christian
|
||||
Uhlig, Florian
|
||||
Wenzel, Sandro
|
||||
|
@@ -6,28 +6,35 @@
|
||||
# copied verbatim in the file "LICENSE" #
|
||||
################################################################################
|
||||
|
||||
|
||||
# The "lib/${CMAKE_LIBRARY_ARCHITECTURE}/pmix" part in all
|
||||
# the PATH_SUFFIXES is here to be able to find Debian's
|
||||
# libpmix-dev package. It installs everything below
|
||||
# /usr/lib/${CMAKE_LIBRARY_ARCHITECTURE}/pmix
|
||||
|
||||
|
||||
find_path(PMIx_INCLUDE_DIR
|
||||
NAMES pmix.h
|
||||
HINTS ${PMIX_ROOT} $ENV{PMIX_ROOT}
|
||||
PATH_SUFFIXES include
|
||||
PATH_SUFFIXES include lib/${CMAKE_LIBRARY_ARCHITECTURE}/pmix/include
|
||||
)
|
||||
|
||||
find_path(PMIx_LIBRARY_DIR
|
||||
NAMES libpmix.dylib libpmix.so
|
||||
HINTS ${PMIX_ROOT} $ENV{PMIX_ROOT}
|
||||
PATH_SUFFIXES lib lib64
|
||||
PATH_SUFFIXES lib lib64 lib/${CMAKE_LIBRARY_ARCHITECTURE}/pmix/lib
|
||||
)
|
||||
|
||||
find_library(PMIx_LIBRARY_SHARED
|
||||
NAMES libpmix.dylib libpmix.so
|
||||
HINTS ${PMIX_ROOT} $ENV{PMIX_ROOT}
|
||||
PATH_SUFFIXES lib lib64
|
||||
PATH_SUFFIXES lib lib64 lib/${CMAKE_LIBRARY_ARCHITECTURE}/pmix/lib
|
||||
)
|
||||
|
||||
find_file(PMIx_VERSION_FILE
|
||||
NAMES pmix_version.h
|
||||
HINTS ${PMIX_ROOT} $ENV{PMIX_ROOT}
|
||||
PATH_SUFFIXES include
|
||||
PATH_SUFFIXES include lib/${CMAKE_LIBRARY_ARCHITECTURE}/pmix/include
|
||||
)
|
||||
|
||||
file(READ "${PMIx_VERSION_FILE}" __version_raw)
|
||||
|
@@ -56,7 +56,9 @@ bool DeviceRunner::HandleGeneralOptions(const fair::mq::ProgOptions& config, boo
|
||||
fair::Logger::SetConsoleSeverity("nolog");
|
||||
} else {
|
||||
fair::Logger::SetConsoleColor(color);
|
||||
fair::Logger::SetConsoleSeverity(severity);
|
||||
if (severity != "") {
|
||||
fair::Logger::SetConsoleSeverity(severity);
|
||||
}
|
||||
}
|
||||
|
||||
if (printLogo) {
|
||||
@@ -154,8 +156,8 @@ auto DeviceRunner::Run() -> int
|
||||
|
||||
// Handle --version
|
||||
if (fConfig.Count("version")) {
|
||||
cout << "FairMQ version: " << FAIRMQ_GIT_VERSION << endl;
|
||||
cout << "User device version: " << fDevice->GetVersion() << endl;
|
||||
LOGV(info, verylow) << "FairMQ version: " << FAIRMQ_GIT_VERSION;
|
||||
LOGV(info, verylow) << "User device version: " << fDevice->GetVersion();
|
||||
fDevice->ChangeState(fair::mq::Transition::End);
|
||||
return 0;
|
||||
}
|
||||
|
@@ -81,31 +81,30 @@ FairMQChannel::FairMQChannel(const string& name, const string& type, const strin
|
||||
, fPortRangeMin(DefaultPortRangeMin)
|
||||
, fPortRangeMax(DefaultPortRangeMax)
|
||||
, fAutoBind(DefaultAutoBind)
|
||||
, fIsValid(false)
|
||||
, fValid(false)
|
||||
, fMultipart(false)
|
||||
, fModified(true)
|
||||
, fReset(false)
|
||||
, fMtx()
|
||||
{}
|
||||
{
|
||||
// LOG(warn) << "Constructing channel '" << fName << "'";
|
||||
}
|
||||
|
||||
FairMQChannel::FairMQChannel(const string& name, int index, const fair::mq::Properties& properties)
|
||||
: FairMQChannel(tools::ToString(name, "[", index, "]"), "unspecified", "unspecified", "unspecified", nullptr)
|
||||
{
|
||||
string prefix(tools::ToString("chans.", name, ".", index, "."));
|
||||
|
||||
fType = GetPropertyOrDefault(properties, string(prefix + "type"), fType);
|
||||
fMethod = GetPropertyOrDefault(properties, string(prefix + "method"), fMethod);
|
||||
fAddress = GetPropertyOrDefault(properties, string(prefix + "address"), fAddress);
|
||||
fTransportType = TransportTypes.at(GetPropertyOrDefault(properties, string(prefix + "transport"), TransportNames.at(fTransportType)));
|
||||
fSndBufSize = GetPropertyOrDefault(properties, string(prefix + "sndBufSize"), fSndBufSize);
|
||||
fRcvBufSize = GetPropertyOrDefault(properties, string(prefix + "rcvBufSize"), fRcvBufSize);
|
||||
fSndKernelSize = GetPropertyOrDefault(properties, string(prefix + "sndKernelSize"), fSndKernelSize);
|
||||
fRcvKernelSize = GetPropertyOrDefault(properties, string(prefix + "rcvKernelSize"), fRcvKernelSize);
|
||||
fLinger = GetPropertyOrDefault(properties, string(prefix + "linger"), fLinger);
|
||||
fRateLogging = GetPropertyOrDefault(properties, string(prefix + "rateLogging"), fRateLogging);
|
||||
fPortRangeMin = GetPropertyOrDefault(properties, string(prefix + "portRangeMin"), fPortRangeMin);
|
||||
fPortRangeMax = GetPropertyOrDefault(properties, string(prefix + "portRangeMax"), fPortRangeMax);
|
||||
fAutoBind = GetPropertyOrDefault(properties, string(prefix + "autoBind"), fAutoBind);
|
||||
fType = GetPropertyOrDefault(properties, string(prefix + "type"), std::string(DefaultType));
|
||||
fMethod = GetPropertyOrDefault(properties, string(prefix + "method"), std::string(DefaultMethod));
|
||||
fAddress = GetPropertyOrDefault(properties, string(prefix + "address"), std::string(DefaultAddress));
|
||||
fTransportType = TransportType(GetPropertyOrDefault(properties, string(prefix + "transport"), std::string(DefaultTransportName)));
|
||||
fSndBufSize = GetPropertyOrDefault(properties, string(prefix + "sndBufSize"), DefaultSndBufSize);
|
||||
fRcvBufSize = GetPropertyOrDefault(properties, string(prefix + "rcvBufSize"), DefaultRcvBufSize);
|
||||
fSndKernelSize = GetPropertyOrDefault(properties, string(prefix + "sndKernelSize"), DefaultSndKernelSize);
|
||||
fRcvKernelSize = GetPropertyOrDefault(properties, string(prefix + "rcvKernelSize"), DefaultRcvKernelSize);
|
||||
fLinger = GetPropertyOrDefault(properties, string(prefix + "linger"), DefaultLinger);
|
||||
fRateLogging = GetPropertyOrDefault(properties, string(prefix + "rateLogging"), DefaultRateLogging);
|
||||
fPortRangeMin = GetPropertyOrDefault(properties, string(prefix + "portRangeMin"), DefaultPortRangeMin);
|
||||
fPortRangeMax = GetPropertyOrDefault(properties, string(prefix + "portRangeMax"), DefaultPortRangeMax);
|
||||
fAutoBind = GetPropertyOrDefault(properties, string(prefix + "autoBind"), DefaultAutoBind);
|
||||
}
|
||||
|
||||
FairMQChannel::FairMQChannel(const FairMQChannel& chan)
|
||||
@@ -129,10 +128,8 @@ FairMQChannel::FairMQChannel(const FairMQChannel& chan, const string& newName)
|
||||
, fPortRangeMin(chan.fPortRangeMin)
|
||||
, fPortRangeMax(chan.fPortRangeMax)
|
||||
, fAutoBind(chan.fAutoBind)
|
||||
, fIsValid(false)
|
||||
, fValid(false)
|
||||
, fMultipart(chan.fMultipart)
|
||||
, fModified(chan.fModified)
|
||||
, fReset(false)
|
||||
{}
|
||||
|
||||
FairMQChannel& FairMQChannel::operator=(const FairMQChannel& chan)
|
||||
@@ -141,372 +138,34 @@ FairMQChannel& FairMQChannel::operator=(const FairMQChannel& chan)
|
||||
return *this;
|
||||
}
|
||||
|
||||
{
|
||||
// TODO: replace this with std::scoped_lock (c++17)
|
||||
lock(fMtx, chan.fMtx);
|
||||
lock_guard<mutex> lock1(fMtx, adopt_lock);
|
||||
lock_guard<mutex> lock2(chan.fMtx, adopt_lock);
|
||||
|
||||
fTransportFactory = nullptr;
|
||||
fTransportType = chan.fTransportType;
|
||||
fSocket = nullptr;
|
||||
fName = chan.fName;
|
||||
fType = chan.fType;
|
||||
fMethod = chan.fMethod;
|
||||
fAddress = chan.fAddress;
|
||||
fSndBufSize = chan.fSndBufSize;
|
||||
fRcvBufSize = chan.fRcvBufSize;
|
||||
fSndKernelSize = chan.fSndKernelSize;
|
||||
fRcvKernelSize = chan.fRcvKernelSize;
|
||||
fLinger = chan.fLinger;
|
||||
fRateLogging = chan.fRateLogging;
|
||||
fPortRangeMin = chan.fPortRangeMin;
|
||||
fPortRangeMax = chan.fPortRangeMax;
|
||||
fAutoBind = chan.fAutoBind;
|
||||
fIsValid = false;
|
||||
fMultipart = chan.fMultipart;
|
||||
fModified = chan.fModified;
|
||||
fReset = false;
|
||||
}
|
||||
fTransportFactory = nullptr;
|
||||
fTransportType = chan.fTransportType;
|
||||
fSocket = nullptr;
|
||||
fName = chan.fName;
|
||||
fType = chan.fType;
|
||||
fMethod = chan.fMethod;
|
||||
fAddress = chan.fAddress;
|
||||
fSndBufSize = chan.fSndBufSize;
|
||||
fRcvBufSize = chan.fRcvBufSize;
|
||||
fSndKernelSize = chan.fSndKernelSize;
|
||||
fRcvKernelSize = chan.fRcvKernelSize;
|
||||
fLinger = chan.fLinger;
|
||||
fRateLogging = chan.fRateLogging;
|
||||
fPortRangeMin = chan.fPortRangeMin;
|
||||
fPortRangeMax = chan.fPortRangeMax;
|
||||
fAutoBind = chan.fAutoBind;
|
||||
fValid = false;
|
||||
fMultipart = chan.fMultipart;
|
||||
|
||||
return *this;
|
||||
}
|
||||
|
||||
FairMQSocket & FairMQChannel::GetSocket() const
|
||||
{
|
||||
assert(fSocket);
|
||||
return *fSocket;
|
||||
}
|
||||
|
||||
string FairMQChannel::GetName() const
|
||||
{
|
||||
lock_guard<mutex> lock(fMtx);
|
||||
return fName;
|
||||
}
|
||||
|
||||
string FairMQChannel::GetPrefix() const
|
||||
{
|
||||
lock_guard<mutex> lock(fMtx);
|
||||
string prefix = fName;
|
||||
prefix = prefix.erase(fName.rfind('['));
|
||||
return prefix;
|
||||
}
|
||||
|
||||
string FairMQChannel::GetIndex() const
|
||||
{
|
||||
lock_guard<mutex> lock(fMtx);
|
||||
string indexStr = fName;
|
||||
indexStr.erase(indexStr.rfind(']'));
|
||||
indexStr.erase(0, indexStr.rfind('[') + 1);
|
||||
return indexStr;
|
||||
}
|
||||
|
||||
string FairMQChannel::GetType() const
|
||||
try {
|
||||
lock_guard<mutex> lock(fMtx);
|
||||
return fType;
|
||||
} catch (exception& e) {
|
||||
LOG(error) << "Exception caught in FairMQChannel::GetType: " << e.what();
|
||||
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
|
||||
}
|
||||
|
||||
string FairMQChannel::GetMethod() const
|
||||
try {
|
||||
lock_guard<mutex> lock(fMtx);
|
||||
return fMethod;
|
||||
} catch (exception& e) {
|
||||
LOG(error) << "Exception caught in FairMQChannel::GetMethod: " << e.what();
|
||||
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
|
||||
}
|
||||
|
||||
string FairMQChannel::GetAddress() const
|
||||
try {
|
||||
lock_guard<mutex> lock(fMtx);
|
||||
return fAddress;
|
||||
} catch (exception& e) {
|
||||
LOG(error) << "Exception caught in FairMQChannel::GetAddress: " << e.what();
|
||||
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
|
||||
}
|
||||
|
||||
string FairMQChannel::GetTransportName() const
|
||||
try {
|
||||
lock_guard<mutex> lock(fMtx);
|
||||
return TransportNames.at(fTransportType);
|
||||
} catch (exception& e) {
|
||||
LOG(error) << "Exception caught in FairMQChannel::GetTransportName: " << e.what();
|
||||
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
|
||||
}
|
||||
|
||||
Transport FairMQChannel::GetTransportType() const
|
||||
try {
|
||||
lock_guard<mutex> lock(fMtx);
|
||||
return fTransportType;
|
||||
} catch (exception& e) {
|
||||
LOG(error) << "Exception caught in FairMQChannel::GetTransportType: " << e.what();
|
||||
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
|
||||
}
|
||||
|
||||
|
||||
int FairMQChannel::GetSndBufSize() const
|
||||
try {
|
||||
lock_guard<mutex> lock(fMtx);
|
||||
return fSndBufSize;
|
||||
} catch (exception& e) {
|
||||
LOG(error) << "Exception caught in FairMQChannel::GetSndBufSize: " << e.what();
|
||||
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
|
||||
}
|
||||
|
||||
int FairMQChannel::GetRcvBufSize() const
|
||||
try {
|
||||
lock_guard<mutex> lock(fMtx);
|
||||
return fRcvBufSize;
|
||||
} catch (exception& e) {
|
||||
LOG(error) << "Exception caught in FairMQChannel::GetRcvBufSize: " << e.what();
|
||||
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
|
||||
}
|
||||
|
||||
int FairMQChannel::GetSndKernelSize() const
|
||||
try {
|
||||
lock_guard<mutex> lock(fMtx);
|
||||
return fSndKernelSize;
|
||||
} catch (exception& e) {
|
||||
LOG(error) << "Exception caught in FairMQChannel::GetSndKernelSize: " << e.what();
|
||||
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
|
||||
}
|
||||
|
||||
int FairMQChannel::GetRcvKernelSize() const
|
||||
try {
|
||||
lock_guard<mutex> lock(fMtx);
|
||||
return fRcvKernelSize;
|
||||
} catch (exception& e) {
|
||||
LOG(error) << "Exception caught in FairMQChannel::GetRcvKernelSize: " << e.what();
|
||||
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
|
||||
}
|
||||
|
||||
int FairMQChannel::GetLinger() const
|
||||
try {
|
||||
lock_guard<mutex> lock(fMtx);
|
||||
return fLinger;
|
||||
} catch (exception& e) {
|
||||
LOG(error) << "Exception caught in FairMQChannel::GetLinger: " << e.what();
|
||||
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
|
||||
}
|
||||
|
||||
int FairMQChannel::GetRateLogging() const
|
||||
try {
|
||||
lock_guard<mutex> lock(fMtx);
|
||||
return fRateLogging;
|
||||
} catch (exception& e) {
|
||||
LOG(error) << "Exception caught in FairMQChannel::GetRateLogging: " << e.what();
|
||||
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
|
||||
}
|
||||
|
||||
int FairMQChannel::GetPortRangeMin() const
|
||||
try {
|
||||
lock_guard<mutex> lock(fMtx);
|
||||
return fPortRangeMin;
|
||||
} catch (exception& e) {
|
||||
LOG(error) << "Exception caught in FairMQChannel::GetPortRangeMin: " << e.what();
|
||||
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
|
||||
}
|
||||
|
||||
int FairMQChannel::GetPortRangeMax() const
|
||||
try {
|
||||
lock_guard<mutex> lock(fMtx);
|
||||
return fPortRangeMax;
|
||||
} catch (exception& e) {
|
||||
LOG(error) << "Exception caught in FairMQChannel::GetPortRangeMax: " << e.what();
|
||||
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
|
||||
}
|
||||
|
||||
bool FairMQChannel::GetAutoBind() const
|
||||
try {
|
||||
lock_guard<mutex> lock(fMtx);
|
||||
return fAutoBind;
|
||||
} catch (exception& e) {
|
||||
LOG(error) << "Exception caught in FairMQChannel::GetAutoBind: " << e.what();
|
||||
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
|
||||
}
|
||||
|
||||
void FairMQChannel::UpdateType(const string& type)
|
||||
try {
|
||||
lock_guard<mutex> lock(fMtx);
|
||||
fIsValid = false;
|
||||
fType = type;
|
||||
fModified = true;
|
||||
} catch (exception& e) {
|
||||
LOG(error) << "Exception caught in FairMQChannel::UpdateType: " << e.what();
|
||||
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
|
||||
}
|
||||
|
||||
void FairMQChannel::UpdateMethod(const string& method)
|
||||
try {
|
||||
lock_guard<mutex> lock(fMtx);
|
||||
fIsValid = false;
|
||||
fMethod = method;
|
||||
fModified = true;
|
||||
} catch (exception& e) {
|
||||
LOG(error) << "Exception caught in FairMQChannel::UpdateMethod: " << e.what();
|
||||
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
|
||||
}
|
||||
|
||||
void FairMQChannel::UpdateAddress(const string& address)
|
||||
try {
|
||||
lock_guard<mutex> lock(fMtx);
|
||||
fIsValid = false;
|
||||
fAddress = address;
|
||||
fModified = true;
|
||||
} catch (exception& e) {
|
||||
LOG(error) << "Exception caught in FairMQChannel::UpdateAddress: " << e.what();
|
||||
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
|
||||
}
|
||||
|
||||
void FairMQChannel::UpdateTransport(const string& transport)
|
||||
try {
|
||||
lock_guard<mutex> lock(fMtx);
|
||||
fIsValid = false;
|
||||
fTransportType = TransportTypes.at(transport);
|
||||
fModified = true;
|
||||
} catch (exception& e) {
|
||||
LOG(error) << "Exception caught in FairMQChannel::UpdateTransport: " << e.what();
|
||||
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
|
||||
}
|
||||
|
||||
void FairMQChannel::UpdateSndBufSize(const int sndBufSize)
|
||||
try {
|
||||
lock_guard<mutex> lock(fMtx);
|
||||
fIsValid = false;
|
||||
fSndBufSize = sndBufSize;
|
||||
fModified = true;
|
||||
} catch (exception& e) {
|
||||
LOG(error) << "Exception caught in FairMQChannel::UpdateSndBufSize: " << e.what();
|
||||
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
|
||||
}
|
||||
|
||||
void FairMQChannel::UpdateRcvBufSize(const int rcvBufSize)
|
||||
try {
|
||||
lock_guard<mutex> lock(fMtx);
|
||||
fIsValid = false;
|
||||
fRcvBufSize = rcvBufSize;
|
||||
fModified = true;
|
||||
} catch (exception& e) {
|
||||
LOG(error) << "Exception caught in FairMQChannel::UpdateRcvBufSize: " << e.what();
|
||||
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
|
||||
}
|
||||
|
||||
void FairMQChannel::UpdateSndKernelSize(const int sndKernelSize)
|
||||
try {
|
||||
lock_guard<mutex> lock(fMtx);
|
||||
fIsValid = false;
|
||||
fSndKernelSize = sndKernelSize;
|
||||
fModified = true;
|
||||
} catch (exception& e) {
|
||||
LOG(error) << "Exception caught in FairMQChannel::UpdateSndKernelSize: " << e.what();
|
||||
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
|
||||
}
|
||||
|
||||
void FairMQChannel::UpdateRcvKernelSize(const int rcvKernelSize)
|
||||
try {
|
||||
lock_guard<mutex> lock(fMtx);
|
||||
fIsValid = false;
|
||||
fRcvKernelSize = rcvKernelSize;
|
||||
fModified = true;
|
||||
} catch (exception& e) {
|
||||
LOG(error) << "Exception caught in FairMQChannel::UpdateRcvKernelSize: " << e.what();
|
||||
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
|
||||
}
|
||||
|
||||
void FairMQChannel::UpdateLinger(const int duration)
|
||||
try {
|
||||
lock_guard<mutex> lock(fMtx);
|
||||
fIsValid = false;
|
||||
fLinger = duration;
|
||||
fModified = true;
|
||||
} catch (exception& e) {
|
||||
LOG(error) << "Exception caught in FairMQChannel::UpdateLinger: " << e.what();
|
||||
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
|
||||
}
|
||||
|
||||
void FairMQChannel::UpdateRateLogging(const int rateLogging)
|
||||
try {
|
||||
lock_guard<mutex> lock(fMtx);
|
||||
fIsValid = false;
|
||||
fRateLogging = rateLogging;
|
||||
fModified = true;
|
||||
} catch (exception& e) {
|
||||
LOG(error) << "Exception caught in FairMQChannel::UpdateRateLogging: " << e.what();
|
||||
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
|
||||
}
|
||||
|
||||
void FairMQChannel::UpdatePortRangeMin(const int minPort)
|
||||
try {
|
||||
lock_guard<mutex> lock(fMtx);
|
||||
fIsValid = false;
|
||||
fPortRangeMin = minPort;
|
||||
fModified = true;
|
||||
} catch (exception& e) {
|
||||
LOG(error) << "Exception caught in FairMQChannel::UpdatePortRangeMin: " << e.what();
|
||||
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
|
||||
}
|
||||
|
||||
void FairMQChannel::UpdatePortRangeMax(const int maxPort)
|
||||
try {
|
||||
lock_guard<mutex> lock(fMtx);
|
||||
fIsValid = false;
|
||||
fPortRangeMax = maxPort;
|
||||
fModified = true;
|
||||
} catch (exception& e) {
|
||||
LOG(error) << "Exception caught in FairMQChannel::UpdatePortRangeMax: " << e.what();
|
||||
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
|
||||
}
|
||||
|
||||
void FairMQChannel::UpdateAutoBind(const bool autobind)
|
||||
try {
|
||||
lock_guard<mutex> lock(fMtx);
|
||||
fIsValid = false;
|
||||
fAutoBind = autobind;
|
||||
fModified = true;
|
||||
} catch (exception& e) {
|
||||
LOG(error) << "Exception caught in FairMQChannel::UpdateAutoBind: " << e.what();
|
||||
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
|
||||
}
|
||||
|
||||
auto FairMQChannel::SetModified(const bool modified) -> void
|
||||
try {
|
||||
lock_guard<mutex> lock(fMtx);
|
||||
fModified = modified;
|
||||
} catch (exception& e) {
|
||||
LOG(error) << "Exception caught in FairMQChannel::SetModified: " << e.what();
|
||||
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
|
||||
}
|
||||
|
||||
void FairMQChannel::UpdateName(const string& name)
|
||||
try {
|
||||
lock_guard<mutex> lock(fMtx);
|
||||
fIsValid = false;
|
||||
fName = name;
|
||||
fModified = true;
|
||||
} catch (exception& e) {
|
||||
LOG(error) << "Exception caught in FairMQChannel::UpdateName: " << e.what();
|
||||
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
|
||||
}
|
||||
|
||||
bool FairMQChannel::IsValid() const
|
||||
try {
|
||||
lock_guard<mutex> lock(fMtx);
|
||||
return fIsValid;
|
||||
} catch (exception& e) {
|
||||
LOG(error) << "Exception caught in FairMQChannel::IsValid: " << e.what();
|
||||
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
|
||||
}
|
||||
|
||||
bool FairMQChannel::Validate()
|
||||
try {
|
||||
lock_guard<mutex> lock(fMtx);
|
||||
stringstream ss;
|
||||
ss << "Validating channel '" << fName << "'... ";
|
||||
|
||||
if (fIsValid) {
|
||||
if (fValid) {
|
||||
ss << "ALREADY VALID";
|
||||
LOG(debug) << ss.str();
|
||||
return true;
|
||||
@@ -539,7 +198,7 @@ try {
|
||||
} else {
|
||||
vector<string> endpoints;
|
||||
boost::algorithm::split(endpoints, fAddress, boost::algorithm::is_any_of(";"));
|
||||
for (const auto endpoint : endpoints) {
|
||||
for (const auto& endpoint : endpoints) {
|
||||
string address;
|
||||
if (endpoint[0] == '@' || endpoint[0] == '+' || endpoint[0] == '>') {
|
||||
address = endpoint.substr(1);
|
||||
@@ -641,7 +300,7 @@ try {
|
||||
throw ChannelConfigurationError(tools::ToString("invalid socket rate logging interval (cannot be negative): '", fRateLogging, "'"));
|
||||
}
|
||||
|
||||
fIsValid = true;
|
||||
fValid = true;
|
||||
ss << "VALID";
|
||||
LOG(debug) << ss.str();
|
||||
return true;
|
||||
@@ -652,8 +311,6 @@ try {
|
||||
|
||||
void FairMQChannel::Init()
|
||||
{
|
||||
lock_guard<mutex> lock(fMtx);
|
||||
|
||||
fSocket = fTransportFactory->CreateSocket(fType, fName);
|
||||
|
||||
// set linger duration (how long socket should wait for outstanding transfers before shutdown)
|
||||
@@ -674,14 +331,11 @@ void FairMQChannel::Init()
|
||||
|
||||
bool FairMQChannel::ConnectEndpoint(const string& endpoint)
|
||||
{
|
||||
lock_guard<mutex> lock(fMtx);
|
||||
return fSocket->Connect(endpoint);
|
||||
}
|
||||
|
||||
bool FairMQChannel::BindEndpoint(string& endpoint)
|
||||
{
|
||||
lock_guard<mutex> lock(fMtx);
|
||||
|
||||
// try to bind to the configured port. If it fails, try random one (if AutoBind is on).
|
||||
if (fSocket->Bind(endpoint)) {
|
||||
return true;
|
||||
@@ -722,10 +376,3 @@ bool FairMQChannel::BindEndpoint(string& endpoint)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
void FairMQChannel::ResetChannel()
|
||||
{
|
||||
lock_guard<mutex> lock(fMtx);
|
||||
fIsValid = false;
|
||||
// TODO: implement channel resetting
|
||||
}
|
||||
|
@@ -25,6 +25,12 @@
|
||||
#include <utility> // std::move
|
||||
#include <cstdint> // int64_t
|
||||
|
||||
/**
|
||||
* @class FairMQChannel FairMQChannel.h <FairMQChannel.h>
|
||||
* @brief Wrapper class for FairMQSocket and related methods
|
||||
*
|
||||
* The class is not thread-safe.
|
||||
*/
|
||||
class FairMQChannel
|
||||
{
|
||||
friend class FairMQDevice;
|
||||
@@ -75,14 +81,11 @@ class FairMQChannel
|
||||
// FairMQChannel& operator=(FairMQChannel&&) = delete;
|
||||
|
||||
/// Destructor
|
||||
virtual ~FairMQChannel()
|
||||
{
|
||||
// LOG(debug) << "Destroying channel " << fName;
|
||||
}
|
||||
virtual ~FairMQChannel() { /* LOG(warn) << "Destroying channel '" << fName << "'"; */ }
|
||||
|
||||
struct ChannelConfigurationError : std::runtime_error { using std::runtime_error::runtime_error; };
|
||||
|
||||
FairMQSocket& GetSocket() const;
|
||||
FairMQSocket& GetSocket() const { assert(fSocket); return *fSocket; }
|
||||
|
||||
bool Bind(const std::string& address)
|
||||
{
|
||||
@@ -100,139 +103,142 @@ class FairMQChannel
|
||||
|
||||
/// Get channel name
|
||||
/// @return Returns full channel name (e.g. "data[0]")
|
||||
std::string GetChannelName() const __attribute__((deprecated("Use GetName()"))) { return GetName(); }
|
||||
std::string GetName() const ;
|
||||
std::string GetName() const { return fName; }
|
||||
|
||||
/// Get channel prefix
|
||||
/// @return Returns channel prefix (e.g. "data" in "data[0]")
|
||||
std::string GetChannelPrefix() const __attribute__((deprecated("Use GetPrefix()"))) { return GetPrefix(); }
|
||||
std::string GetPrefix() const;
|
||||
std::string GetPrefix() const
|
||||
{
|
||||
std::string prefix = fName;
|
||||
prefix = prefix.erase(fName.rfind('['));
|
||||
return prefix;
|
||||
}
|
||||
|
||||
/// Get channel index
|
||||
/// @return Returns channel index (e.g. 0 in "data[0]")
|
||||
std::string GetChannelIndex() const __attribute__((deprecated("Use GetIndex()"))) { return GetIndex(); }
|
||||
std::string GetIndex() const;
|
||||
std::string GetIndex() const
|
||||
{
|
||||
std::string indexStr = fName;
|
||||
indexStr.erase(indexStr.rfind(']'));
|
||||
indexStr.erase(0, indexStr.rfind('[') + 1);
|
||||
return indexStr;
|
||||
}
|
||||
|
||||
/// Get socket type
|
||||
/// @return Returns socket type (push/pull/pub/sub/spub/xsub/pair/req/rep/dealer/router/)
|
||||
std::string GetType() const;
|
||||
std::string GetType() const { return fType; }
|
||||
|
||||
/// Get socket method
|
||||
/// @return Returns socket method (bind/connect)
|
||||
std::string GetMethod() const;
|
||||
std::string GetMethod() const { return fMethod; }
|
||||
|
||||
/// Get socket address (e.g. "tcp://127.0.0.1:5555" or "ipc://abc")
|
||||
/// @return Returns socket address (e.g. "tcp://127.0.0.1:5555" or "ipc://abc")
|
||||
std::string GetAddress() const;
|
||||
std::string GetAddress() const { return fAddress; }
|
||||
|
||||
/// Get channel transport name ("default", "zeromq" or "shmem")
|
||||
/// @return Returns channel transport name (e.g. "default", "zeromq" or "shmem")
|
||||
std::string GetTransportName() const;
|
||||
std::string GetTransportName() const { return fair::mq::TransportName(fTransportType); }
|
||||
|
||||
/// Get channel transport type
|
||||
/// @return Returns channel transport type
|
||||
fair::mq::Transport GetTransportType() const;
|
||||
fair::mq::Transport GetTransportType() const { return fTransportType; }
|
||||
|
||||
/// Get socket send buffer size (in number of messages)
|
||||
/// @return Returns socket send buffer size (in number of messages)
|
||||
int GetSndBufSize() const;
|
||||
int GetSndBufSize() const { return fSndBufSize; }
|
||||
|
||||
/// Get socket receive buffer size (in number of messages)
|
||||
/// @return Returns socket receive buffer size (in number of messages)
|
||||
int GetRcvBufSize() const;
|
||||
int GetRcvBufSize() const { return fRcvBufSize; }
|
||||
|
||||
/// Get socket kernel transmit send buffer size (in bytes)
|
||||
/// @return Returns socket kernel transmit send buffer size (in bytes)
|
||||
int GetSndKernelSize() const;
|
||||
int GetSndKernelSize() const { return fSndKernelSize; }
|
||||
|
||||
/// Get socket kernel transmit receive buffer size (in bytes)
|
||||
/// @return Returns socket kernel transmit receive buffer size (in bytes)
|
||||
int GetRcvKernelSize() const;
|
||||
int GetRcvKernelSize() const { return fRcvKernelSize; }
|
||||
|
||||
/// Get linger duration (in milliseconds)
|
||||
/// @return Returns linger duration (in milliseconds)
|
||||
int GetLinger() const;
|
||||
int GetLinger() const { return fLinger; }
|
||||
|
||||
/// Get socket rate logging interval (in seconds)
|
||||
/// @return Returns socket rate logging interval (in seconds)
|
||||
int GetRateLogging() const;
|
||||
int GetRateLogging() const { return fRateLogging; }
|
||||
|
||||
/// Get start of the port range for automatic binding
|
||||
/// @return start of the port range
|
||||
int GetPortRangeMin() const;
|
||||
int GetPortRangeMin() const { return fPortRangeMin; }
|
||||
|
||||
/// Get end of the port range for automatic binding
|
||||
/// @return end of the port range
|
||||
int GetPortRangeMax() const;
|
||||
int GetPortRangeMax() const { return fPortRangeMax; }
|
||||
|
||||
/// Set automatic binding (pick random port if bind fails)
|
||||
/// @return true/false, true if automatic binding is enabled
|
||||
bool GetAutoBind() const;
|
||||
|
||||
/// Set socket type
|
||||
/// @param type Socket type (push/pull/pub/sub/spub/xsub/pair/req/rep/dealer/router/)
|
||||
void UpdateType(const std::string& type);
|
||||
|
||||
/// Set socket method
|
||||
/// @param method Socket method (bind/connect)
|
||||
void UpdateMethod(const std::string& method);
|
||||
|
||||
/// Set socket address
|
||||
/// @param Socket address (e.g. "tcp://127.0.0.1:5555" or "ipc://abc")
|
||||
void UpdateAddress(const std::string& address);
|
||||
|
||||
/// Set channel transport
|
||||
/// @param transport transport string ("default", "zeromq" or "shmem")
|
||||
void UpdateTransport(const std::string& transport);
|
||||
|
||||
/// Set socket send buffer size
|
||||
/// @param sndBufSize Socket send buffer size (in number of messages)
|
||||
void UpdateSndBufSize(const int sndBufSize);
|
||||
|
||||
/// Set socket receive buffer size
|
||||
/// @param rcvBufSize Socket receive buffer size (in number of messages)
|
||||
void UpdateRcvBufSize(const int rcvBufSize);
|
||||
|
||||
/// Set socket kernel transmit send buffer size (in bytes)
|
||||
/// @param sndKernelSize Socket send buffer size (in bytes)
|
||||
void UpdateSndKernelSize(const int sndKernelSize);
|
||||
|
||||
/// Set socket kernel transmit receive buffer size (in bytes)
|
||||
/// @param rcvKernelSize Socket receive buffer size (in bytes)
|
||||
void UpdateRcvKernelSize(const int rcvKernelSize);
|
||||
|
||||
/// Set linger duration (in milliseconds)
|
||||
/// @param duration linger duration (in milliseconds)
|
||||
void UpdateLinger(const int duration);
|
||||
|
||||
/// Set socket rate logging interval (in seconds)
|
||||
/// @param rateLogging Socket rate logging interval (in seconds)
|
||||
void UpdateRateLogging(const int rateLogging);
|
||||
|
||||
/// Set start of the port range for automatic binding
|
||||
/// @param minPort start of the port range
|
||||
void UpdatePortRangeMin(const int minPort);
|
||||
|
||||
/// Set end of the port range for automatic binding
|
||||
/// @param maxPort end of the port range
|
||||
void UpdatePortRangeMax(const int maxPort);
|
||||
|
||||
/// Set automatic binding (pick random port if bind fails)
|
||||
/// @param autobind true/false, true to enable automatic binding
|
||||
void UpdateAutoBind(const bool autobind);
|
||||
bool GetAutoBind() const { return fAutoBind; }
|
||||
|
||||
/// Set channel name
|
||||
/// @param name Arbitrary channel name
|
||||
void UpdateChannelName(const std::string& name) __attribute__((deprecated("Use UpdateName()"))) { UpdateName(name); }
|
||||
void UpdateName(const std::string& name);
|
||||
void UpdateName(const std::string& name) { fName = name; Invalidate(); }
|
||||
|
||||
/// Set socket type
|
||||
/// @param type Socket type (push/pull/pub/sub/spub/xsub/pair/req/rep/dealer/router/)
|
||||
void UpdateType(const std::string& type) { fType = type; Invalidate(); }
|
||||
|
||||
/// Set socket method
|
||||
/// @param method Socket method (bind/connect)
|
||||
void UpdateMethod(const std::string& method) { fMethod = method; Invalidate(); }
|
||||
|
||||
/// Set socket address
|
||||
/// @param Socket address (e.g. "tcp://127.0.0.1:5555" or "ipc://abc")
|
||||
void UpdateAddress(const std::string& address) { fAddress = address; Invalidate(); }
|
||||
|
||||
/// Set channel transport
|
||||
/// @param transport transport string ("default", "zeromq" or "shmem")
|
||||
void UpdateTransport(const std::string& transport) { fTransportType = fair::mq::TransportType(transport); Invalidate(); }
|
||||
|
||||
/// Set socket send buffer size
|
||||
/// @param sndBufSize Socket send buffer size (in number of messages)
|
||||
void UpdateSndBufSize(const int sndBufSize) { fSndBufSize = sndBufSize; Invalidate(); }
|
||||
|
||||
/// Set socket receive buffer size
|
||||
/// @param rcvBufSize Socket receive buffer size (in number of messages)
|
||||
void UpdateRcvBufSize(const int rcvBufSize) { fRcvBufSize = rcvBufSize; Invalidate(); }
|
||||
|
||||
/// Set socket kernel transmit send buffer size (in bytes)
|
||||
/// @param sndKernelSize Socket send buffer size (in bytes)
|
||||
void UpdateSndKernelSize(const int sndKernelSize) { fSndKernelSize = sndKernelSize; Invalidate(); }
|
||||
|
||||
/// Set socket kernel transmit receive buffer size (in bytes)
|
||||
/// @param rcvKernelSize Socket receive buffer size (in bytes)
|
||||
void UpdateRcvKernelSize(const int rcvKernelSize) { fRcvKernelSize = rcvKernelSize; Invalidate(); }
|
||||
|
||||
/// Set linger duration (in milliseconds)
|
||||
/// @param duration linger duration (in milliseconds)
|
||||
void UpdateLinger(const int duration) { fLinger = duration; Invalidate(); }
|
||||
|
||||
/// Set socket rate logging interval (in seconds)
|
||||
/// @param rateLogging Socket rate logging interval (in seconds)
|
||||
void UpdateRateLogging(const int rateLogging) { fRateLogging = rateLogging; Invalidate(); }
|
||||
|
||||
/// Set start of the port range for automatic binding
|
||||
/// @param minPort start of the port range
|
||||
void UpdatePortRangeMin(const int minPort) { fPortRangeMin = minPort; Invalidate(); }
|
||||
|
||||
/// Set end of the port range for automatic binding
|
||||
/// @param maxPort end of the port range
|
||||
void UpdatePortRangeMax(const int maxPort) { fPortRangeMax = maxPort; Invalidate(); }
|
||||
|
||||
/// Set automatic binding (pick random port if bind fails)
|
||||
/// @param autobind true/false, true to enable automatic binding
|
||||
void UpdateAutoBind(const bool autobind) { fAutoBind = autobind; Invalidate(); }
|
||||
|
||||
/// Checks if the configured channel settings are valid (checks the validity parameter, without running full validation (as oposed to ValidateChannel()))
|
||||
/// @return true if channel settings are valid, false otherwise.
|
||||
bool IsValid() const;
|
||||
|
||||
/// Validates channel configuration
|
||||
/// @return true if channel settings are valid, false otherwise.
|
||||
bool ValidateChannel() __attribute__((deprecated("Use Validate()"))) { return Validate(); }
|
||||
bool IsValid() const { return fValid; }
|
||||
|
||||
/// Validates channel configuration
|
||||
/// @return true if channel settings are valid, false otherwise.
|
||||
@@ -244,8 +250,8 @@ class FairMQChannel
|
||||
|
||||
bool BindEndpoint(std::string& endpoint);
|
||||
|
||||
/// Resets the channel (requires validation to be used again).
|
||||
void ResetChannel();
|
||||
/// invalidates the channel (requires validation to be used again).
|
||||
void Invalidate() { fValid = false; }
|
||||
|
||||
/// Sends a message to the socket queue.
|
||||
/// @param msg Constant reference of unique_ptr to a FairMQMessage
|
||||
@@ -310,10 +316,7 @@ class FairMQChannel
|
||||
unsigned long GetMessagesTx() const { return fSocket->GetMessagesTx(); }
|
||||
unsigned long GetMessagesRx() const { return fSocket->GetMessagesRx(); }
|
||||
|
||||
auto Transport() -> FairMQTransportFactory*
|
||||
{
|
||||
return fTransportFactory.get();
|
||||
};
|
||||
auto Transport() -> FairMQTransportFactory* { return fTransportFactory.get(); };
|
||||
|
||||
template<typename... Args>
|
||||
FairMQMessagePtr NewMessage(Args&&... args)
|
||||
@@ -374,13 +377,9 @@ class FairMQChannel
|
||||
int fPortRangeMax;
|
||||
bool fAutoBind;
|
||||
|
||||
bool fIsValid;
|
||||
bool fValid;
|
||||
|
||||
bool fMultipart;
|
||||
bool fModified;
|
||||
bool fReset;
|
||||
|
||||
mutable std::mutex fMtx;
|
||||
|
||||
void CheckSendCompatibility(FairMQMessagePtr& msg)
|
||||
{
|
||||
@@ -437,8 +436,6 @@ class FairMQChannel
|
||||
fTransportFactory = factory;
|
||||
fTransportType = factory->GetType();
|
||||
}
|
||||
|
||||
auto SetModified(const bool modified) -> void;
|
||||
};
|
||||
|
||||
#endif /* FAIRMQCHANNEL_H_ */
|
||||
|
@@ -390,7 +390,6 @@ void FairMQDevice::AttachChannels(vector<FairMQChannel*>& chans)
|
||||
if ((*itr)->Validate()) {
|
||||
(*itr)->Init();
|
||||
if (AttachChannel(**itr)) {
|
||||
(*itr)->SetModified(false);
|
||||
// remove the channel from the uninitialized container
|
||||
itr = chans.erase(itr);
|
||||
} else {
|
||||
|
@@ -26,7 +26,6 @@
|
||||
#include <algorithm> // find
|
||||
#include <string>
|
||||
#include <chrono>
|
||||
#include <iostream>
|
||||
#include <unordered_map>
|
||||
#include <functional>
|
||||
#include <stdexcept>
|
||||
@@ -363,10 +362,10 @@ class FairMQDevice
|
||||
void PrintRegisteredChannels()
|
||||
{
|
||||
if (fChannelRegistry.size() < 1) {
|
||||
std::cout << "no channels registered." << std::endl;
|
||||
LOGV(info, verylow) << "no channels registered.";
|
||||
} else {
|
||||
for (const auto& c : fChannelRegistry) {
|
||||
std::cout << c.first << ":" << c.second.first << ":" << c.second.second << std::endl;
|
||||
LOGV(info, verylow) << c.first << ":" << c.second.first << ":" << c.second.second;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -79,7 +79,7 @@ ProgOptions::ProgOptions()
|
||||
fAllOptions.add_options()
|
||||
("help,h", "Print help")
|
||||
("version,v", "Print version")
|
||||
("severity", po::value<string>()->default_value("debug"), "Log severity level (console): trace, debug, info, state, warn, error, fatal, nolog")
|
||||
("severity", po::value<string>()->default_value(""), "Log severity level (console): trace, debug, info, state, warn, error, fatal, nolog.")
|
||||
("file-severity", po::value<string>()->default_value("debug"), "Log severity level (file): trace, debug, info, state, warn, error, fatal, nolog")
|
||||
("verbosity", po::value<string>()->default_value("medium"), "Log verbosity level: veryhigh, high, medium, low")
|
||||
("color", po::value<bool >()->default_value(true), "Log color (true/false)")
|
||||
|
@@ -76,9 +76,11 @@ unordered_map<type_index, function<pair<string, string>(const Property&)>> Prope
|
||||
{ type_index(typeid(const char*)), [](const Property& p) { return pair<string, string>{ string(any_cast<const char*>(p)), "string" }; } },
|
||||
{ type_index(typeid(string)), [](const Property& p) { return pair<string, string>{ any_cast<string>(p), "string" }; } },
|
||||
{ type_index(typeid(int)), [](const Property& p) { return getString<int>(p, "int"); } },
|
||||
{ type_index(typeid(short)), [](const Property& p) { return getString<short>(p, "short"); } },
|
||||
{ type_index(typeid(long)), [](const Property& p) { return getString<long>(p, "long"); } },
|
||||
{ type_index(typeid(long long)), [](const Property& p) { return getString<long long>(p, "long long"); } },
|
||||
{ type_index(typeid(unsigned)), [](const Property& p) { return getString<unsigned>(p, "unsigned"); } },
|
||||
{ type_index(typeid(unsigned short)), [](const Property& p) { return getString<unsigned short>(p, "unsigned short"); } },
|
||||
{ type_index(typeid(unsigned long)), [](const Property& p) { return getString<unsigned long>(p, "unsigned long"); } },
|
||||
{ type_index(typeid(unsigned long long)), [](const Property& p) { return getString<unsigned long long>(p, "unsigned long long"); } },
|
||||
{ type_index(typeid(float)), [](const Property& p) { return getStringPair<float>(p, "float"); } },
|
||||
@@ -92,9 +94,11 @@ unordered_map<type_index, function<pair<string, string>(const Property&)>> Prope
|
||||
{ type_index(typeid(vector<unsigned char>)), [](const Property& p) { return getStringPair<vector<unsigned char>>(p, "vector<unsigned char>"); } },
|
||||
{ type_index(typeid(vector<string>)), [](const Property& p) { return getStringPair<vector<string>>(p, "vector<string>"); } },
|
||||
{ type_index(typeid(vector<int>)), [](const Property& p) { return getStringPair<vector<int>>(p, "vector<int>"); } },
|
||||
{ type_index(typeid(vector<short>)), [](const Property& p) { return getStringPair<vector<short>>(p, "vector<short>"); } },
|
||||
{ type_index(typeid(vector<long>)), [](const Property& p) { return getStringPair<vector<long>>(p, "vector<long>"); } },
|
||||
{ type_index(typeid(vector<long long>)), [](const Property& p) { return getStringPair<vector<long long>>(p, "vector<long long>"); } },
|
||||
{ type_index(typeid(vector<unsigned>)), [](const Property& p) { return getStringPair<vector<unsigned>>(p, "vector<unsigned>"); } },
|
||||
{ type_index(typeid(vector<unsigned short>)), [](const Property& p) { return getStringPair<vector<unsigned short>>(p, "vector<unsigned short>"); } },
|
||||
{ type_index(typeid(vector<unsigned long>)), [](const Property& p) { return getStringPair<vector<unsigned long>>(p, "vector<unsigned long>"); } },
|
||||
{ type_index(typeid(vector<unsigned long long>)), [](const Property& p) { return getStringPair<vector<unsigned long long>>(p, "vector<unsigned long long>"); } },
|
||||
{ type_index(typeid(vector<float>)), [](const Property& p) { return getStringPair<vector<float>>(p, "vector<float>"); } },
|
||||
@@ -110,9 +114,11 @@ unordered_map<type_index, void(*)(const EventManager&, const string&, const Prop
|
||||
{ type_index(typeid(const char*)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, string>(k, string(any_cast<const char*>(p))); } },
|
||||
{ type_index(typeid(string)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, string>(k, any_cast<string>(p)); } },
|
||||
{ type_index(typeid(int)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, int>(k, any_cast<int>(p)); } },
|
||||
{ type_index(typeid(short)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, short>(k, any_cast<short>(p)); } },
|
||||
{ type_index(typeid(long)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, long>(k, any_cast<long>(p)); } },
|
||||
{ type_index(typeid(long long)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, long long>(k, any_cast<long long>(p)); } },
|
||||
{ type_index(typeid(unsigned)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, unsigned>(k, any_cast<unsigned>(p)); } },
|
||||
{ type_index(typeid(unsigned short)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, unsigned short>(k, any_cast<unsigned short>(p)); } },
|
||||
{ type_index(typeid(unsigned long)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, unsigned long>(k, any_cast<unsigned long>(p)); } },
|
||||
{ type_index(typeid(unsigned long long)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, unsigned long long>(k, any_cast<unsigned long long>(p)); } },
|
||||
{ type_index(typeid(float)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, float>(k, any_cast<float>(p)); } },
|
||||
@@ -126,9 +132,11 @@ unordered_map<type_index, void(*)(const EventManager&, const string&, const Prop
|
||||
{ type_index(typeid(vector<unsigned char>)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, vector<unsigned char>>(k, any_cast<vector<unsigned char>>(p)); } },
|
||||
{ type_index(typeid(vector<string>)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, vector<string>>(k, any_cast<vector<string>>(p)); } },
|
||||
{ type_index(typeid(vector<int>)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, vector<int>>(k, any_cast<vector<int>>(p)); } },
|
||||
{ type_index(typeid(vector<short>)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, vector<short>>(k, any_cast<vector<short>>(p)); } },
|
||||
{ type_index(typeid(vector<long>)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, vector<long>>(k, any_cast<vector<long>>(p)); } },
|
||||
{ type_index(typeid(vector<long long>)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, vector<long long>>(k, any_cast<vector<long long>>(p)); } },
|
||||
{ type_index(typeid(vector<unsigned>)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, vector<unsigned>>(k, any_cast<vector<unsigned>>(p)); } },
|
||||
{ type_index(typeid(vector<unsigned short>)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, vector<unsigned short>>(k, any_cast<vector<unsigned short>>(p)); } },
|
||||
{ type_index(typeid(vector<unsigned long>)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, vector<unsigned long>>(k, any_cast<vector<unsigned long>>(p)); } },
|
||||
{ type_index(typeid(vector<unsigned long long>)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, vector<unsigned long long>>(k, any_cast<vector<unsigned long long>>(p)); } },
|
||||
{ type_index(typeid(vector<float>)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, vector<float>>(k, any_cast<vector<float>>(p)); } },
|
||||
|
@@ -10,8 +10,10 @@
|
||||
#define FAIR_MQ_TRANSPORTS_H
|
||||
|
||||
#include <fairmq/tools/CppSTL.h>
|
||||
#include <fairmq/tools/Strings.h>
|
||||
|
||||
#include <memory>
|
||||
#include <stdexcept>
|
||||
#include <string>
|
||||
#include <unordered_map>
|
||||
|
||||
@@ -28,6 +30,8 @@ enum class Transport
|
||||
OFI
|
||||
};
|
||||
|
||||
struct TransportError : std::runtime_error { using std::runtime_error::runtime_error; };
|
||||
|
||||
} /* namespace mq */
|
||||
} /* namespace fair */
|
||||
|
||||
@@ -58,6 +62,18 @@ static std::unordered_map<Transport, std::string> TransportNames {
|
||||
{ Transport::OFI, "ofi" }
|
||||
};
|
||||
|
||||
inline std::string TransportName(Transport transport)
|
||||
{
|
||||
return TransportNames[transport];
|
||||
}
|
||||
|
||||
inline Transport TransportType(const std::string& transport)
|
||||
try {
|
||||
return TransportTypes.at(transport);
|
||||
} catch (std::out_of_range&) {
|
||||
throw TransportError(tools::ToString("Unknown transport provided: ", transport));
|
||||
}
|
||||
|
||||
} /* namespace mq */
|
||||
} /* namespace fair */
|
||||
|
||||
|
@@ -8,14 +8,16 @@
|
||||
|
||||
#include "Control.h"
|
||||
|
||||
#include <termios.h> // for the interactive mode
|
||||
#include <poll.h> // for the interactive mode
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <csignal> // catching system signals
|
||||
#include <cstdlib>
|
||||
#include <functional>
|
||||
#include <atomic>
|
||||
#include <iostream>
|
||||
#include <thread>
|
||||
#include <chrono>
|
||||
|
||||
#include <poll.h> // for the interactive mode
|
||||
#include <termios.h> // for the interactive mode
|
||||
|
||||
using namespace std;
|
||||
|
||||
|
@@ -65,8 +65,6 @@ struct DDSSession::Impl
|
||||
, fId(to_string(fSession->create()))
|
||||
, fStopOnDestruction(false)
|
||||
{
|
||||
setenv("DDS_SESSION_ID", fId.c_str(), 1);
|
||||
|
||||
fDDSService.subscribeOnError([](const dds::intercom_api::EErrorCode errorCode, const std::string& msg) {
|
||||
std::cerr << "DDS error, error code: " << errorCode << ", error message: " << msg << std::endl;
|
||||
});
|
||||
@@ -81,10 +79,6 @@ struct DDSSession::Impl
|
||||
, fStopOnDestruction(false)
|
||||
{
|
||||
fSession->attach(fId);
|
||||
auto envId(std::getenv("DDS_SESSION_ID"));
|
||||
if (envId != nullptr && std::string(envId) != fId) {
|
||||
setenv("DDS_SESSION_ID", fId.c_str(), 1);
|
||||
}
|
||||
|
||||
fDDSService.subscribeOnError([](const dds::intercom_api::EErrorCode errorCode, const std::string& msg) {
|
||||
std::cerr << "DDS error, error code: " << errorCode << ", error message: " << msg << std::endl;
|
||||
@@ -99,11 +93,6 @@ struct DDSSession::Impl
|
||||
, fId(to_string(fSession->getSessionID()))
|
||||
, fStopOnDestruction(false)
|
||||
{
|
||||
auto envId(std::getenv("DDS_SESSION_ID"));
|
||||
if (envId != nullptr && std::string(envId) != fId) {
|
||||
setenv("DDS_SESSION_ID", fId.c_str(), 1);
|
||||
}
|
||||
|
||||
// Sanity check
|
||||
if (!fSession->IsRunning()) {
|
||||
throw std::runtime_error("Given CSession must be running");
|
||||
@@ -123,10 +112,6 @@ struct DDSSession::Impl
|
||||
Impl(Impl&&) = delete;
|
||||
Impl& operator=(Impl&&) = delete;
|
||||
|
||||
struct Tag {};
|
||||
friend auto operator<<(std::ostream& os, Tag) -> std::ostream& { return os << "DDSSession"; }
|
||||
tools::InstanceLimiter<Tag, 1> fCount;
|
||||
|
||||
DDSEnvironment fEnv;
|
||||
DDSRMSPlugin fRMSPlugin;
|
||||
Path fRMSConfig;
|
||||
|
@@ -234,6 +234,8 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||
, fDDSTopo(std::move(topo))
|
||||
, fStateData()
|
||||
, fStateIndex()
|
||||
, fMtx(std::make_unique<std::mutex>())
|
||||
, fStateChangeUnsubscriptionCV(std::make_unique<std::condition_variable>())
|
||||
, fHeartbeatsTimer(asio::system_executor())
|
||||
, fHeartbeatInterval(600000)
|
||||
{
|
||||
@@ -263,7 +265,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||
{
|
||||
UnsubscribeFromStateChanges();
|
||||
|
||||
std::lock_guard<std::mutex> lk(fMtx);
|
||||
std::lock_guard<std::mutex> lk(*fMtx);
|
||||
fDDSSession.UnsubscribeFromCommands();
|
||||
try {
|
||||
for (auto& op : fChangeStateOps) {
|
||||
@@ -306,8 +308,8 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||
fDDSSession.SendCommand(cmd::Cmds(cmd::make<cmd::UnsubscribeFromStateChange>()).Serialize());
|
||||
|
||||
// wait for all tasks to confirm unsubscription
|
||||
std::unique_lock<std::mutex> lk(fMtx);
|
||||
fStateChangeUnsubscriptionCV.wait(lk, [&](){
|
||||
std::unique_lock<std::mutex> lk(*fMtx);
|
||||
fStateChangeUnsubscriptionCV->wait(lk, [&](){
|
||||
unsigned int count = std::count_if(fStateIndex.cbegin(), fStateIndex.cend(), [=](const auto& s) {
|
||||
return fStateData.at(s.second).subscribed_to_state_changes == false;
|
||||
});
|
||||
@@ -358,7 +360,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||
DDSTask::Id taskId(cmd.GetTaskId());
|
||||
|
||||
try {
|
||||
std::lock_guard<std::mutex> lk(fMtx);
|
||||
std::lock_guard<std::mutex> lk(*fMtx);
|
||||
DeviceStatus& task = fStateData.at(fStateIndex.at(taskId));
|
||||
task.subscribed_to_state_changes = true;
|
||||
} catch (const std::exception& e) {
|
||||
@@ -375,11 +377,11 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||
DDSTask::Id taskId(cmd.GetTaskId());
|
||||
|
||||
try {
|
||||
std::unique_lock<std::mutex> lk(fMtx);
|
||||
std::unique_lock<std::mutex> lk(*fMtx);
|
||||
DeviceStatus& task = fStateData.at(fStateIndex.at(taskId));
|
||||
task.subscribed_to_state_changes = false;
|
||||
lk.unlock();
|
||||
fStateChangeUnsubscriptionCV.notify_one();
|
||||
fStateChangeUnsubscriptionCV->notify_one();
|
||||
} catch (const std::exception& e) {
|
||||
FAIR_LOG(error) << "Exception in HandleCmd(cmd::StateChangeUnsubscription const&): " << e.what();
|
||||
}
|
||||
@@ -397,7 +399,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||
DDSTask::Id taskId(cmd.GetTaskId());
|
||||
|
||||
try {
|
||||
std::lock_guard<std::mutex> lk(fMtx);
|
||||
std::lock_guard<std::mutex> lk(*fMtx);
|
||||
DeviceStatus& task = fStateData.at(fStateIndex.at(taskId));
|
||||
task.lastState = cmd.GetLastState();
|
||||
task.state = cmd.GetCurrentState();
|
||||
@@ -422,7 +424,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||
{
|
||||
if (cmd.GetResult() != cmd::Result::Ok) {
|
||||
DDSTask::Id taskId(cmd.GetTaskId());
|
||||
std::lock_guard<std::mutex> lk(fMtx);
|
||||
std::lock_guard<std::mutex> lk(*fMtx);
|
||||
for (auto& op : fChangeStateOps) {
|
||||
if (!op.second.IsCompleted() && op.second.ContainsTask(taskId)) {
|
||||
if (fStateData.at(fStateIndex.at(taskId)).state != op.second.GetTargetState()) {
|
||||
@@ -438,7 +440,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||
|
||||
auto HandleCmd(cmd::Properties const& cmd) -> void
|
||||
{
|
||||
std::unique_lock<std::mutex> lk(fMtx);
|
||||
std::unique_lock<std::mutex> lk(*fMtx);
|
||||
try {
|
||||
auto& op(fGetPropertiesOps.at(cmd.GetRequestId()));
|
||||
lk.unlock();
|
||||
@@ -452,7 +454,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||
|
||||
auto HandleCmd(cmd::PropertiesSet const& cmd) -> void
|
||||
{
|
||||
std::unique_lock<std::mutex> lk(fMtx);
|
||||
std::unique_lock<std::mutex> lk(*fMtx);
|
||||
try {
|
||||
auto& op(fSetPropertiesOps.at(cmd.GetRequestId()));
|
||||
lk.unlock();
|
||||
@@ -659,7 +661,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||
return asio::async_initiate<CompletionToken, ChangeStateCompletionSignature>([&](auto handler) {
|
||||
typename ChangeStateOp::Id const id(tools::UuidHash());
|
||||
|
||||
std::lock_guard<std::mutex> lk(fMtx);
|
||||
std::lock_guard<std::mutex> lk(*fMtx);
|
||||
|
||||
for (auto it = begin(fChangeStateOps); it != end(fChangeStateOps);) {
|
||||
if (it->second.IsCompleted()) {
|
||||
@@ -677,7 +679,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||
fDDSTopo.GetTasks(path),
|
||||
fStateData,
|
||||
timeout,
|
||||
fMtx,
|
||||
*fMtx,
|
||||
AsioBase<Executor, Allocator>::GetExecutor(),
|
||||
AsioBase<Executor, Allocator>::GetAllocator(),
|
||||
std::move(handler)));
|
||||
@@ -762,7 +764,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||
/// @return map of id : DeviceStatus
|
||||
auto GetCurrentState() const -> TopologyState
|
||||
{
|
||||
std::lock_guard<std::mutex> lk(fMtx);
|
||||
std::lock_guard<std::mutex> lk(*fMtx);
|
||||
return fStateData;
|
||||
}
|
||||
|
||||
@@ -890,7 +892,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||
return asio::async_initiate<CompletionToken, WaitForStateCompletionSignature>([&](auto handler) {
|
||||
typename GetPropertiesOp::Id const id(tools::UuidHash());
|
||||
|
||||
std::lock_guard<std::mutex> lk(fMtx);
|
||||
std::lock_guard<std::mutex> lk(*fMtx);
|
||||
|
||||
for (auto it = begin(fWaitForStateOps); it != end(fWaitForStateOps);) {
|
||||
if (it->second.IsCompleted()) {
|
||||
@@ -908,7 +910,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||
targetCurrentState,
|
||||
fDDSTopo.GetTasks(path),
|
||||
timeout,
|
||||
fMtx,
|
||||
*fMtx,
|
||||
AsioBase<Executor, Allocator>::GetExecutor(),
|
||||
AsioBase<Executor, Allocator>::GetAllocator(),
|
||||
std::move(handler)));
|
||||
@@ -1071,7 +1073,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||
[&](auto handler) {
|
||||
typename GetPropertiesOp::Id const id(tools::UuidHash());
|
||||
|
||||
std::lock_guard<std::mutex> lk(fMtx);
|
||||
std::lock_guard<std::mutex> lk(*fMtx);
|
||||
|
||||
for (auto it = begin(fGetPropertiesOps); it != end(fGetPropertiesOps);) {
|
||||
if (it->second.IsCompleted()) {
|
||||
@@ -1087,7 +1089,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||
std::forward_as_tuple(id,
|
||||
fDDSTopo.GetTasks(path).size(),
|
||||
timeout,
|
||||
fMtx,
|
||||
*fMtx,
|
||||
AsioBase<Executor, Allocator>::GetExecutor(),
|
||||
AsioBase<Executor, Allocator>::GetAllocator(),
|
||||
std::move(handler)));
|
||||
@@ -1227,7 +1229,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||
[&](auto handler) {
|
||||
typename SetPropertiesOp::Id const id(tools::UuidHash());
|
||||
|
||||
std::lock_guard<std::mutex> lk(fMtx);
|
||||
std::lock_guard<std::mutex> lk(*fMtx);
|
||||
|
||||
for (auto it = begin(fGetPropertiesOps); it != end(fGetPropertiesOps);) {
|
||||
if (it->second.IsCompleted()) {
|
||||
@@ -1243,7 +1245,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||
std::forward_as_tuple(id,
|
||||
fDDSTopo.GetTasks(path).size(),
|
||||
timeout,
|
||||
fMtx,
|
||||
*fMtx,
|
||||
AsioBase<Executor, Allocator>::GetExecutor(),
|
||||
AsioBase<Executor, Allocator>::GetAllocator(),
|
||||
std::move(handler)));
|
||||
@@ -1296,9 +1298,9 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||
TopologyState fStateData;
|
||||
TopologyStateIndex fStateIndex;
|
||||
|
||||
mutable std::mutex fMtx;
|
||||
mutable std::unique_ptr<std::mutex> fMtx;
|
||||
|
||||
std::condition_variable fStateChangeUnsubscriptionCV;
|
||||
std::unique_ptr<std::condition_variable> fStateChangeUnsubscriptionCV;
|
||||
asio::steady_timer fHeartbeatsTimer;
|
||||
Duration fHeartbeatInterval;
|
||||
|
||||
|
@@ -58,6 +58,12 @@ class Socket final : public fair::mq::Socket
|
||||
, fTimeout(100)
|
||||
{
|
||||
assert(context);
|
||||
|
||||
if (type == "sub" || type == "pub") {
|
||||
LOG(error) << "PUB/SUB socket type is not supported for shared memory transport";
|
||||
throw SocketError("PUB/SUB socket type is not supported for shared memory transport");
|
||||
}
|
||||
|
||||
fSocket = zmq_socket(context, GetConstant(type));
|
||||
|
||||
if (fSocket == nullptr) {
|
||||
@@ -91,11 +97,6 @@ class Socket final : public fair::mq::Socket
|
||||
// LOG(error) << "Failed setting ZMQ_SUBSCRIBE socket option, reason: " << zmq_strerror(errno);
|
||||
// }
|
||||
// }
|
||||
|
||||
if (type == "sub" || type == "pub") {
|
||||
LOG(error) << "PUB/SUB socket type is not supported for shared memory transport";
|
||||
throw SocketError("PUB/SUB socket type is not supported for shared memory transport");
|
||||
}
|
||||
LOG(debug) << "Created socket " << GetId();
|
||||
}
|
||||
|
||||
|
@@ -11,7 +11,7 @@
|
||||
|
||||
#include <cassert>
|
||||
#include <string>
|
||||
#include <iostream>
|
||||
// #include <iostream>
|
||||
#include <iomanip>
|
||||
#include <thread>
|
||||
#include <chrono>
|
||||
|
@@ -88,7 +88,7 @@ TEST(Channel, Validation)
|
||||
channel2.UpdateName("Kanal");
|
||||
ASSERT_EQ(channel2.GetName(), "Kanal");
|
||||
|
||||
channel2.ResetChannel();
|
||||
channel2.Invalidate();
|
||||
ASSERT_EQ(channel2.IsValid(), false);
|
||||
ASSERT_EQ(channel2.Validate(), true);
|
||||
}
|
||||
|
@@ -28,7 +28,7 @@ class ErrorState : public FairMQDevice
|
||||
{
|
||||
std::string state("Init");
|
||||
if (std::string::npos != GetId().find("_" + state + "_")) {
|
||||
LOG(debug) << "going to change to Error state from " << state << "()";
|
||||
LOG(info) << "going to change to Error state from " << state << "()";
|
||||
ChangeState(fair::mq::Transition::ErrorFound);
|
||||
}
|
||||
}
|
||||
@@ -37,7 +37,7 @@ class ErrorState : public FairMQDevice
|
||||
{
|
||||
std::string state("Bind");
|
||||
if (std::string::npos != GetId().find("_" + state + "_")) {
|
||||
LOG(debug) << "going to change to Error state from " << state << "()";
|
||||
LOG(info) << "going to change to Error state from " << state << "()";
|
||||
ChangeState(fair::mq::Transition::ErrorFound);
|
||||
}
|
||||
}
|
||||
@@ -46,7 +46,7 @@ class ErrorState : public FairMQDevice
|
||||
{
|
||||
std::string state("Connect");
|
||||
if (std::string::npos != GetId().find("_" + state + "_")) {
|
||||
LOG(debug) << "going to change to Error state from " << state << "()";
|
||||
LOG(info) << "going to change to Error state from " << state << "()";
|
||||
ChangeState(fair::mq::Transition::ErrorFound);
|
||||
}
|
||||
}
|
||||
@@ -55,7 +55,7 @@ class ErrorState : public FairMQDevice
|
||||
{
|
||||
std::string state("InitTask");
|
||||
if (std::string::npos != GetId().find("_" + state + "_")) {
|
||||
LOG(debug) << "going to change to Error state from " << state << "()";
|
||||
LOG(info) << "going to change to Error state from " << state << "()";
|
||||
ChangeState(fair::mq::Transition::ErrorFound);
|
||||
}
|
||||
}
|
||||
@@ -64,7 +64,7 @@ class ErrorState : public FairMQDevice
|
||||
{
|
||||
std::string state("PreRun");
|
||||
if (std::string::npos != GetId().find("_" + state + "_")) {
|
||||
LOG(debug) << "going to change to Error state from " << state << "()";
|
||||
LOG(info) << "going to change to Error state from " << state << "()";
|
||||
ChangeState(fair::mq::Transition::ErrorFound);
|
||||
}
|
||||
}
|
||||
@@ -73,7 +73,7 @@ class ErrorState : public FairMQDevice
|
||||
{
|
||||
std::string state("Run");
|
||||
if (std::string::npos != GetId().find("_" + state + "_")) {
|
||||
LOG(debug) << "going to change to Error state from " << state << "()";
|
||||
LOG(info) << "going to change to Error state from " << state << "()";
|
||||
ChangeState(fair::mq::Transition::ErrorFound);
|
||||
}
|
||||
}
|
||||
@@ -82,7 +82,7 @@ class ErrorState : public FairMQDevice
|
||||
{
|
||||
std::string state("PostRun");
|
||||
if (std::string::npos != GetId().find("_" + state + "_")) {
|
||||
LOG(debug) << "going to change to Error state from " << state << "()";
|
||||
LOG(info) << "going to change to Error state from " << state << "()";
|
||||
ChangeState(fair::mq::Transition::ErrorFound);
|
||||
}
|
||||
}
|
||||
@@ -91,7 +91,7 @@ class ErrorState : public FairMQDevice
|
||||
{
|
||||
std::string state("ResetTask");
|
||||
if (std::string::npos != GetId().find("_" + state + "_")) {
|
||||
LOG(debug) << "going to change to Error state from " << state << "()";
|
||||
LOG(info) << "going to change to Error state from " << state << "()";
|
||||
ChangeState(fair::mq::Transition::ErrorFound);
|
||||
}
|
||||
}
|
||||
@@ -100,7 +100,7 @@ class ErrorState : public FairMQDevice
|
||||
{
|
||||
std::string state("Reset");
|
||||
if (std::string::npos != GetId().find("_" + state + "_")) {
|
||||
LOG(debug) << "going to change to Error state from " << state << "()";
|
||||
LOG(info) << "going to change to Error state from " << state << "()";
|
||||
ChangeState(fair::mq::Transition::ErrorFound);
|
||||
}
|
||||
}
|
||||
|
@@ -29,7 +29,7 @@ class Signals : public FairMQDevice
|
||||
{
|
||||
std::string state("Init");
|
||||
if (std::string::npos != GetId().find("_" + state + "_")) {
|
||||
LOG(debug) << "raising SIGINT from " << state << "()";
|
||||
LOG(info) << "raising SIGINT from " << state << "()";
|
||||
raise(SIGINT);
|
||||
}
|
||||
}
|
||||
@@ -37,7 +37,7 @@ class Signals : public FairMQDevice
|
||||
{
|
||||
std::string state("Bind");
|
||||
if (std::string::npos != GetId().find("_" + state + "_")) {
|
||||
LOG(debug) << "raising SIGINT from " << state << "()";
|
||||
LOG(info) << "raising SIGINT from " << state << "()";
|
||||
raise(SIGINT);
|
||||
}
|
||||
}
|
||||
@@ -45,7 +45,7 @@ class Signals : public FairMQDevice
|
||||
{
|
||||
std::string state("Connect");
|
||||
if (std::string::npos != GetId().find("_" + state + "_")) {
|
||||
LOG(debug) << "raising SIGINT from " << state << "()";
|
||||
LOG(info) << "raising SIGINT from " << state << "()";
|
||||
raise(SIGINT);
|
||||
}
|
||||
}
|
||||
@@ -54,7 +54,7 @@ class Signals : public FairMQDevice
|
||||
{
|
||||
std::string state("InitTask");
|
||||
if (std::string::npos != GetId().find("_" + state + "_")) {
|
||||
LOG(debug) << "raising SIGINT from " << state << "()";
|
||||
LOG(info) << "raising SIGINT from " << state << "()";
|
||||
raise(SIGINT);
|
||||
}
|
||||
}
|
||||
@@ -63,7 +63,7 @@ class Signals : public FairMQDevice
|
||||
{
|
||||
std::string state("PreRun");
|
||||
if (std::string::npos != GetId().find("_" + state + "_")) {
|
||||
LOG(debug) << "raising SIGINT from " << state << "()";
|
||||
LOG(info) << "raising SIGINT from " << state << "()";
|
||||
raise(SIGINT);
|
||||
}
|
||||
}
|
||||
@@ -72,7 +72,7 @@ class Signals : public FairMQDevice
|
||||
{
|
||||
std::string state("Run");
|
||||
if (std::string::npos != GetId().find("_" + state + "_")) {
|
||||
LOG(debug) << "raising SIGINT from " << state << "()";
|
||||
LOG(info) << "raising SIGINT from " << state << "()";
|
||||
raise(SIGINT);
|
||||
}
|
||||
}
|
||||
@@ -81,7 +81,7 @@ class Signals : public FairMQDevice
|
||||
{
|
||||
std::string state("PostRun");
|
||||
if (std::string::npos != GetId().find("_" + state + "_")) {
|
||||
LOG(debug) << "raising SIGINT from " << state << "()";
|
||||
LOG(info) << "raising SIGINT from " << state << "()";
|
||||
raise(SIGINT);
|
||||
}
|
||||
}
|
||||
@@ -90,7 +90,7 @@ class Signals : public FairMQDevice
|
||||
{
|
||||
std::string state("ResetTask");
|
||||
if (std::string::npos != GetId().find("_" + state + "_")) {
|
||||
LOG(debug) << "raising SIGINT from " << state << "()";
|
||||
LOG(info) << "raising SIGINT from " << state << "()";
|
||||
raise(SIGINT);
|
||||
}
|
||||
}
|
||||
@@ -99,7 +99,7 @@ class Signals : public FairMQDevice
|
||||
{
|
||||
std::string state("Reset");
|
||||
if (std::string::npos != GetId().find("_" + state + "_")) {
|
||||
LOG(debug) << "raising SIGINT from " << state << "()";
|
||||
LOG(info) << "raising SIGINT from " << state << "()";
|
||||
raise(SIGINT);
|
||||
}
|
||||
}
|
||||
|
@@ -40,8 +40,9 @@ TEST(ProgOptions, SetAndGet)
|
||||
set_and_get<string>(o, "_string", "teststring");
|
||||
set_and_get<int>(o, "_int", 1);
|
||||
set_and_get<size_t>(o, "_size_t", 11);
|
||||
set_and_get<uint32_t>(o, "_uint32_t", 12);
|
||||
set_and_get<uint64_t>(o, "_uint64_t", 123);
|
||||
set_and_get<uint16_t>(o, "_uint16_t", 12);
|
||||
set_and_get<uint32_t>(o, "_uint32_t", 123);
|
||||
set_and_get<uint64_t>(o, "_uint64_t", 1234);
|
||||
set_and_get<long>(o, "_long", 1234);
|
||||
set_and_get<long long>(o, "_long long", 12345);
|
||||
set_and_get<unsigned>(o, "_unsigned", 3);
|
||||
@@ -59,8 +60,9 @@ TEST(ProgOptions, SetAndGet)
|
||||
set_and_get<vector<string>>(o, "_vector<string>", { "aa", "bb", "cc" });
|
||||
set_and_get<vector<int>>(o, "_vector<int>", { 1, 2, 3 });
|
||||
set_and_get<vector<size_t>>(o, "_vector<size_t>", { 1, 2, 3 });
|
||||
set_and_get<vector<uint32_t>>(o, "_vector<uint32_t>", { 12, 13, 14 });
|
||||
set_and_get<vector<uint64_t>>(o, "_vector<uint64_t>", { 123, 124, 125 });
|
||||
set_and_get<vector<uint16_t>>(o, "_vector<uint16_t>", { 12, 13, 14 });
|
||||
set_and_get<vector<uint32_t>>(o, "_vector<uint32_t>", { 123, 124, 125 });
|
||||
set_and_get<vector<uint64_t>>(o, "_vector<uint64_t>", { 1234, 1235, 1236 });
|
||||
set_and_get<vector<long>>(o, "_vector<long>", { 1234, 1235, 1236 });
|
||||
set_and_get<vector<long long>>(o, "_vector<long long>", { 12345, 12346, 12347 });
|
||||
set_and_get<vector<unsigned>>(o, "_vector<unsigned>", { 3, 4, 5 });
|
||||
@@ -122,8 +124,9 @@ TEST(ProgOptions, SubscribeAndSet)
|
||||
subscribe_and_set<string>(o, "_string", "teststring");
|
||||
subscribe_and_set<int>(o, "_int", 1);
|
||||
subscribe_and_set<size_t>(o, "_size_t", 11);
|
||||
subscribe_and_set<uint32_t>(o, "_uint32_t", 12);
|
||||
subscribe_and_set<uint64_t>(o, "_uint64_t", 123);
|
||||
subscribe_and_set<uint16_t>(o, "_uint16_t", 12);
|
||||
subscribe_and_set<uint32_t>(o, "_uint32_t", 123);
|
||||
subscribe_and_set<uint64_t>(o, "_uint64_t", 1234);
|
||||
subscribe_and_set<long>(o, "_long", 1234);
|
||||
subscribe_and_set<long long>(o, "_long long", 12345);
|
||||
subscribe_and_set<unsigned>(o, "_unsigned", 3);
|
||||
@@ -141,6 +144,7 @@ TEST(ProgOptions, SubscribeAndSet)
|
||||
subscribe_and_set<vector<string>>(o, "_vector<string>", { "aa", "bb", "cc" });
|
||||
subscribe_and_set<vector<int>>(o, "_vector<int>", { 1, 2, 3 });
|
||||
subscribe_and_set<vector<size_t>>(o, "_vector<size_t>", { 1, 2, 3 });
|
||||
subscribe_and_set<vector<uint16_t>>(o, "_vector<uint16_t>", { 12, 13, 14 });
|
||||
subscribe_and_set<vector<uint32_t>>(o, "_vector<uint32_t>", { 12, 13, 14 });
|
||||
subscribe_and_set<vector<uint64_t>>(o, "_vector<uint64_t>", { 123, 124, 125 });
|
||||
subscribe_and_set<vector<long>>(o, "_vector<long>", { 1234, 1235, 1236 });
|
||||
@@ -163,6 +167,7 @@ TEST(PropertyHelper, ConvertPropertyToString)
|
||||
EXPECT_EQ(PropertyHelper::ConvertPropertyToString(Property(static_cast<string>("teststring"))), "teststring");
|
||||
EXPECT_EQ(PropertyHelper::ConvertPropertyToString(Property(static_cast<int>(1))), "1");
|
||||
EXPECT_EQ(PropertyHelper::ConvertPropertyToString(Property(static_cast<size_t>(11))), "11");
|
||||
EXPECT_EQ(PropertyHelper::ConvertPropertyToString(Property(static_cast<uint16_t>(12))), "12");
|
||||
EXPECT_EQ(PropertyHelper::ConvertPropertyToString(Property(static_cast<uint32_t>(12))), "12");
|
||||
EXPECT_EQ(PropertyHelper::ConvertPropertyToString(Property(static_cast<uint64_t>(123))), "123");
|
||||
EXPECT_EQ(PropertyHelper::ConvertPropertyToString(Property(static_cast<long>(1234))), "1234");
|
||||
@@ -181,6 +186,7 @@ TEST(PropertyHelper, ConvertPropertyToString)
|
||||
EXPECT_EQ(PropertyHelper::ConvertPropertyToString(Property(vector<string>({ "aa", "bb", "cc" }))), "aa, bb, cc");
|
||||
EXPECT_EQ(PropertyHelper::ConvertPropertyToString(Property(vector<int>({ 1, 2, 3 }))), "1, 2, 3");
|
||||
EXPECT_EQ(PropertyHelper::ConvertPropertyToString(Property(vector<size_t>({ 1, 2, 3 }))), "1, 2, 3");
|
||||
EXPECT_EQ(PropertyHelper::ConvertPropertyToString(Property(vector<uint16_t>({ 12, 13, 14 }))), "12, 13, 14");
|
||||
EXPECT_EQ(PropertyHelper::ConvertPropertyToString(Property(vector<uint32_t>({ 12, 13, 14 }))), "12, 13, 14");
|
||||
EXPECT_EQ(PropertyHelper::ConvertPropertyToString(Property(vector<uint64_t>({ 123, 124, 125 }))), "123, 124, 125");
|
||||
EXPECT_EQ(PropertyHelper::ConvertPropertyToString(Property(vector<long>({ 1234, 1235, 1236 }))), "1234, 1235, 1236");
|
||||
|
@@ -11,13 +11,18 @@
|
||||
|
||||
#include "TestEnvironment.h"
|
||||
|
||||
#include <asio/io_context.hpp>
|
||||
#include <chrono>
|
||||
#include <cstdlib>
|
||||
#include <fairlogger/Logger.h>
|
||||
#include <fairmq/SDK.h>
|
||||
#include <fairmq/tools/Strings.h>
|
||||
|
||||
#include <fairlogger/Logger.h>
|
||||
|
||||
#include <asio/io_context.hpp>
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include <algorithm> // for_each
|
||||
#include <array>
|
||||
#include <chrono>
|
||||
#include <cstdlib>
|
||||
#include <thread>
|
||||
|
||||
namespace fair {
|
||||
@@ -82,8 +87,7 @@ struct TopologyFixture : ::testing::Test
|
||||
}
|
||||
}
|
||||
|
||||
auto TearDown() -> void override {
|
||||
}
|
||||
auto TearDown() -> void override {}
|
||||
|
||||
LoggerConfig mLoggerConfig;
|
||||
std::string mDDSTopoFile;
|
||||
@@ -93,14 +97,70 @@ struct TopologyFixture : ::testing::Test
|
||||
asio::io_context mIoContext;
|
||||
};
|
||||
|
||||
struct AsyncOpFixture : ::testing::Test
|
||||
struct MultipleTopologiesFixture : ::testing::Test
|
||||
{
|
||||
auto SetUp() -> void override {
|
||||
MultipleTopologiesFixture()
|
||||
: mDDSTopoFile(tools::ToString(SDK_TESTSUITE_SOURCE_DIR, "/test_topo.xml"))
|
||||
, mDDSEnv(CMAKE_CURRENT_BINARY_DIR)
|
||||
, mDDSSessions{ sdk::DDSSession(mDDSEnv),
|
||||
sdk::DDSSession(mDDSEnv) }
|
||||
, mDDSTopologies{ sdk::DDSTopology(sdk::DDSTopology::Path(mDDSTopoFile), mDDSEnv),
|
||||
sdk::DDSTopology(sdk::DDSTopology::Path(mDDSTopoFile), mDDSEnv) }
|
||||
{
|
||||
std::for_each(mDDSSessions.begin(), mDDSSessions.end(), [](sdk::DDSSession& s) {
|
||||
s.StopOnDestruction();
|
||||
});
|
||||
}
|
||||
|
||||
auto TearDown() -> void override {
|
||||
auto SetUp() -> void override
|
||||
{
|
||||
LOG(info) << mDDSEnv;
|
||||
for (int i = 0; i < mNumSessions; ++i) {
|
||||
LOG(info) << "##### SESSION " << i << " #####";
|
||||
LOG(info) << mDDSSessions[i];
|
||||
LOG(info) << mDDSTopologies[i];
|
||||
auto n(mDDSTopologies[i].GetNumRequiredAgents());
|
||||
mDDSSessions[i].SubmitAgents(n);
|
||||
mDDSSessions[i].ActivateTopology(mDDSTopologies[i]);
|
||||
|
||||
std::vector<sdk::DDSAgent> agents = mDDSSessions[i].RequestAgentInfo();
|
||||
LOG(info) << "##### AgentInfo:";
|
||||
LOG(info) << "size: " << agents.size();
|
||||
for (const auto& a : agents) {
|
||||
LOG(info) << a;
|
||||
}
|
||||
|
||||
std::vector<sdk::DDSTask> tasks = mDDSSessions[i].RequestTaskInfo();
|
||||
LOG(info) << "##### TaskInfo:";
|
||||
LOG(info) << "size: " << tasks.size();
|
||||
for (const auto& t : tasks) {
|
||||
LOG(info) << t;
|
||||
}
|
||||
|
||||
std::vector<sdk::DDSCollection> collections = mDDSTopologies[i].GetCollections();
|
||||
LOG(info) << "##### CollectionInfo:";
|
||||
LOG(info) << "size: " << collections.size();
|
||||
for (const auto& c : collections) {
|
||||
LOG(info) << c;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
auto TearDown() -> void override {}
|
||||
|
||||
static constexpr int mNumSessions = 2;
|
||||
LoggerConfig mLoggerConfig;
|
||||
std::string mDDSTopoFile;
|
||||
sdk::DDSEnvironment mDDSEnv;
|
||||
std::array<sdk::DDSSession, mNumSessions> mDDSSessions;
|
||||
std::array<sdk::DDSTopology, mNumSessions> mDDSTopologies;
|
||||
};
|
||||
|
||||
struct AsyncOpFixture : ::testing::Test
|
||||
{
|
||||
auto SetUp() -> void override {}
|
||||
auto TearDown() -> void override {}
|
||||
|
||||
LoggerConfig mLoggerConfig;
|
||||
asio::io_context mIoContext;
|
||||
};
|
||||
|
@@ -13,9 +13,30 @@
|
||||
#include <fairmq/sdk/Topology.h>
|
||||
#include <fairmq/Tools.h>
|
||||
|
||||
#include <thread>
|
||||
|
||||
namespace {
|
||||
|
||||
using Topology = fair::mq::test::TopologyFixture;
|
||||
using MultipleTopologies = fair::mq::test::MultipleTopologiesFixture;
|
||||
|
||||
void control(fair::mq::sdk::Topology& topo)
|
||||
{
|
||||
using fair::mq::sdk::TopologyTransition;
|
||||
|
||||
for (auto transition : {TopologyTransition::InitDevice,
|
||||
TopologyTransition::CompleteInit,
|
||||
TopologyTransition::Bind,
|
||||
TopologyTransition::Connect,
|
||||
TopologyTransition::InitTask,
|
||||
TopologyTransition::Run,
|
||||
TopologyTransition::Stop,
|
||||
TopologyTransition::ResetTask,
|
||||
TopologyTransition::ResetDevice,
|
||||
TopologyTransition::End}) {
|
||||
ASSERT_EQ(topo.ChangeState(transition).first, std::error_code());
|
||||
}
|
||||
}
|
||||
|
||||
TEST(TopologyHelper, MakeTopology)
|
||||
{
|
||||
@@ -24,7 +45,6 @@ TEST(TopologyHelper, MakeTopology)
|
||||
// This is only needed for this unit test
|
||||
test::LoggerConfig cfg;
|
||||
sdk::DDSEnv env(CMAKE_CURRENT_BINARY_DIR);
|
||||
/////////////////////////////////////
|
||||
|
||||
std::string topoFile(tools::ToString(SDK_TESTSUITE_SOURCE_DIR, "/test_topo.xml"));
|
||||
dds::topology_api::CTopology nativeTopo(topoFile);
|
||||
@@ -34,6 +54,59 @@ TEST(TopologyHelper, MakeTopology)
|
||||
nativeSession->shutdown();
|
||||
}
|
||||
|
||||
TEST_F(MultipleTopologies, Construction)
|
||||
{
|
||||
using namespace fair::mq;
|
||||
|
||||
std::array<sdk::Topology, mNumSessions> topos{
|
||||
sdk::Topology(mDDSTopologies[0], mDDSSessions[0]),
|
||||
sdk::Topology(mDDSTopologies[1], mDDSSessions[1])
|
||||
};
|
||||
}
|
||||
|
||||
TEST_F(MultipleTopologies, ChangeStateFullDeviceLifecycle)
|
||||
{
|
||||
using namespace fair::mq;
|
||||
|
||||
std::array<sdk::Topology, mNumSessions> topos{
|
||||
sdk::Topology(mDDSTopologies[0], mDDSSessions[0]),
|
||||
sdk::Topology(mDDSTopologies[1], mDDSSessions[1])
|
||||
};
|
||||
|
||||
for (int i = 0; i < mNumSessions; ++i) {
|
||||
using fair::mq::sdk::TopologyTransition;
|
||||
|
||||
for (auto transition : {TopologyTransition::InitDevice,
|
||||
TopologyTransition::CompleteInit,
|
||||
TopologyTransition::Bind,
|
||||
TopologyTransition::Connect,
|
||||
TopologyTransition::InitTask,
|
||||
TopologyTransition::Run,
|
||||
TopologyTransition::Stop,
|
||||
TopologyTransition::ResetTask,
|
||||
TopologyTransition::ResetDevice,
|
||||
TopologyTransition::End}) {
|
||||
ASSERT_EQ(topos[i].ChangeState(transition).first, std::error_code());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(MultipleTopologies, ChangeStateFullDeviceLifecycleConcurrent)
|
||||
{
|
||||
using namespace fair::mq;
|
||||
|
||||
std::array<sdk::Topology, mNumSessions> topos{
|
||||
sdk::Topology(mDDSTopologies[0], mDDSSessions[0]),
|
||||
sdk::Topology(mDDSTopologies[1], mDDSSessions[1])
|
||||
};
|
||||
|
||||
std::thread t0(control, std::ref(topos[0]));
|
||||
std::thread t1(control, std::ref(topos[1]));
|
||||
t0.join();
|
||||
t1.join();
|
||||
}
|
||||
|
||||
|
||||
TEST_F(Topology, Construction)
|
||||
{
|
||||
fair::mq::sdk::Topology topo(mDDSTopo, mDDSSession);
|
||||
|
Reference in New Issue
Block a user