Compare commits

...

6 Commits

Author SHA1 Message Date
Alexey Rybalchenko
e642262468 Fix race condition in the control plugin 2018-09-12 16:14:32 +02:00
Dennis Klein
9bab3f9f4c Support msgpack 3.x
Fixes #32
2018-07-18 16:13:18 +02:00
Dennis Klein
ee3a84ce7a Add Matthias as CONTRIBUTOR
To acknowledge his countless hours of consulting which benefited
this project significantly.
2018-06-19 14:08:32 +02:00
mkrzewic
f05118f4eb Make ";" the separateor in multi-point channel config
This is to avoid parsing problems using "," in e.g. multi-point configuration using Suboptparser
2018-06-18 15:26:31 +02:00
Alexey Rybalchenko
21419adb40 Change unregistered options warning to debug 2018-06-16 17:40:05 +02:00
Alexey Rybalchenko
1554c1c273 Change missing options in transport from warning to debug
Transport has meaningful defaults if FairMQProgOptions is missing
2018-06-13 18:03:16 +02:00
11 changed files with 40 additions and 33 deletions

View File

@@ -48,7 +48,11 @@ endif()
if(BUILD_NANOMSG_TRANSPORT) if(BUILD_NANOMSG_TRANSPORT)
find_package2(PRIVATE nanomsg VERSION 1.0.0 REQUIRED) find_package2(PRIVATE nanomsg VERSION 1.0.0 REQUIRED)
find_package2(PRIVATE msgpack VERSION 2.1.5 REQUIRED) find_package2(PRIVATE msgpack VERSION 3.0.0)
set(PROJECT_msgpack_VERSION 2.1.5)
if(NOT msgpack_FOUND)
find_package2(PRIVATE msgpack VERSION 2.1.5 REQUIRED)
endif()
set(msgpack_ROOT ${PACKAGE_PREFIX_DIR}) set(msgpack_ROOT ${PACKAGE_PREFIX_DIR})
endif() endif()

View File

@@ -2,6 +2,7 @@ Aphecetche, Laurent
Binet, Sebastien Binet, Sebastien
Eulisse, Giulio Eulisse, Giulio
Karabowicz, Radoslaw Karabowicz, Radoslaw
Kretz, Matthias <kretz@kde.org>
Krzewicki, Mikolaj Krzewicki, Mikolaj
Neskovic, Gvozden Neskovic, Gvozden
Richter, Matthias Richter, Matthias

View File

@@ -280,20 +280,22 @@ macro(find_package2 qualifier pkgname)
set(CMAKE_PREFIX_PATH ${old_CPP}) set(CMAKE_PREFIX_PATH ${old_CPP})
unset(old_CPP) unset(old_CPP)
if(${qualifier} STREQUAL PRIVATE) if(${pkgname}_FOUND)
set(PROJECT_${pkgname}_VERSION ${ARGS_VERSION}) if(${qualifier} STREQUAL PRIVATE)
set(PROJECT_${pkgname}_COMPONENTS ${ARGS_COMPONENTS}) set(PROJECT_${pkgname}_VERSION ${ARGS_VERSION})
set(PROJECT_PACKAGE_DEPENDENCIES ${PROJECT_PACKAGE_DEPENDENCIES} ${pkgname}) set(PROJECT_${pkgname}_COMPONENTS ${ARGS_COMPONENTS})
elseif(${qualifier} STREQUAL PUBLIC) set(PROJECT_PACKAGE_DEPENDENCIES ${PROJECT_PACKAGE_DEPENDENCIES} ${pkgname})
set(PROJECT_${pkgname}_VERSION ${ARGS_VERSION}) elseif(${qualifier} STREQUAL PUBLIC)
set(PROJECT_${pkgname}_COMPONENTS ${ARGS_COMPONENTS}) set(PROJECT_${pkgname}_VERSION ${ARGS_VERSION})
set(PROJECT_PACKAGE_DEPENDENCIES ${PROJECT_PACKAGE_DEPENDENCIES} ${pkgname}) set(PROJECT_${pkgname}_COMPONENTS ${ARGS_COMPONENTS})
set(PROJECT_INTERFACE_${pkgname}_VERSION ${ARGS_VERSION}) set(PROJECT_PACKAGE_DEPENDENCIES ${PROJECT_PACKAGE_DEPENDENCIES} ${pkgname})
set(PROJECT_INTERFACE_${pkgname}_COMPONENTS ${ARGS_COMPONENTS}) set(PROJECT_INTERFACE_${pkgname}_VERSION ${ARGS_VERSION})
set(PROJECT_INTERFACE_PACKAGE_DEPENDENCIES ${PROJECT_INTERFACE_PACKAGE_DEPENDENCIES} ${pkgname}) set(PROJECT_INTERFACE_${pkgname}_COMPONENTS ${ARGS_COMPONENTS})
elseif(${qualifier} STREQUAL INTERFACE) set(PROJECT_INTERFACE_PACKAGE_DEPENDENCIES ${PROJECT_INTERFACE_PACKAGE_DEPENDENCIES} ${pkgname})
set(PROJECT_INTERFACE_${pkgname}_VERSION ${ARGS_VERSION}) elseif(${qualifier} STREQUAL INTERFACE)
set(PROJECT_INTERFACE_${pkgname}_COMPONENTS ${ARGS_COMPONENTS}) set(PROJECT_INTERFACE_${pkgname}_VERSION ${ARGS_VERSION})
set(PROJECT_INTERFACE_PACKAGE_DEPENDENCIES ${PROJECT_INTERFACE_PACKAGE_DEPENDENCIES} ${pkgname}) set(PROJECT_INTERFACE_${pkgname}_COMPONENTS ${ARGS_COMPONENTS})
set(PROJECT_INTERFACE_PACKAGE_DEPENDENCIES ${PROJECT_INTERFACE_PACKAGE_DEPENDENCIES} ${pkgname})
endif()
endif() endif()
endmacro() endmacro()

View File

@@ -499,7 +499,7 @@ bool FairMQChannel::ValidateChannel()
else else
{ {
vector<string> endpoints; vector<string> endpoints;
boost::algorithm::split(endpoints, fAddress, boost::algorithm::is_any_of(",")); boost::algorithm::split(endpoints, fAddress, boost::algorithm::is_any_of(";"));
for (const auto endpoint : endpoints) for (const auto endpoint : endpoints)
{ {
string address; string address;

View File

@@ -280,7 +280,7 @@ bool FairMQDevice::AttachChannel(FairMQChannel& ch)
} }
catch (fair::mq::SocketError& se) catch (fair::mq::SocketError& se)
{ {
LOG(ERROR) << se.what(); LOG(error) << se.what();
return false; return false;
} }
} }
@@ -823,7 +823,7 @@ void FairMQDevice::CreateOwnConfig()
try { try {
fDefaultTransportType = fair::mq::TransportTypes.at(fConfig->GetValue<string>("transport")); fDefaultTransportType = fair::mq::TransportTypes.at(fConfig->GetValue<string>("transport"));
} catch(const exception& e) { } catch(const exception& e) {
LOG(ERROR) << "invalid transport type provided: " << fConfig->GetValue<string>("transport"); LOG(error) << "invalid transport type provided: " << fConfig->GetValue<string>("transport");
} }
} }
@@ -867,7 +867,7 @@ void FairMQDevice::SetConfig(FairMQProgOptions& config)
try { try {
fDefaultTransportType = fair::mq::TransportTypes.at(fConfig->GetValue<string>("transport")); fDefaultTransportType = fair::mq::TransportTypes.at(fConfig->GetValue<string>("transport"));
} catch(const exception& e) { } catch(const exception& e) {
LOG(ERROR) << "invalid transport type provided: " << fConfig->GetValue<string>("transport"); LOG(error) << "invalid transport type provided: " << fConfig->GetValue<string>("transport");
} }
SetTransport(fConfig->GetValue<string>("transport")); SetTransport(fConfig->GetValue<string>("transport"));
} }

View File

@@ -413,7 +413,7 @@ int FairMQProgOptions::PrintOptions()
for (const auto& o : fUnregisteredOptions) for (const auto& o : fUnregisteredOptions)
{ {
LOG(WARN) << "detected unregistered option: " << o; LOG(debug) << "detected unregistered option: " << o;
} }
stringstream ss; stringstream ss;

View File

@@ -227,7 +227,7 @@ auto Control::WaitForNextState() -> DeviceState
unique_lock<mutex> lock{fEventsMutex}; unique_lock<mutex> lock{fEventsMutex};
while (fEvents.empty()) while (fEvents.empty())
{ {
fNewEvent.wait(lock); fNewEvent.wait_for(lock, chrono::milliseconds(50));
} }
auto result = fEvents.front(); auto result = fEvents.front();

View File

@@ -110,12 +110,12 @@ auto DDS::HandleControl() -> void
// and propagate addresses of bound channels to DDS. // and propagate addresses of bound channels to DDS.
FillChannelContainers(); FillChannelContainers();
LOG(DEBUG) << "$DDS_TASK_PATH: " << getenv("DDS_TASK_PATH"); LOG(debug) << "$DDS_TASK_PATH: " << getenv("DDS_TASK_PATH");
LOG(DEBUG) << "$DDS_GROUP_NAME: " << getenv("DDS_GROUP_NAME"); LOG(debug) << "$DDS_GROUP_NAME: " << getenv("DDS_GROUP_NAME");
LOG(DEBUG) << "$DDS_COLLECTION_NAME: " << getenv("DDS_COLLECTION_NAME"); LOG(debug) << "$DDS_COLLECTION_NAME: " << getenv("DDS_COLLECTION_NAME");
LOG(DEBUG) << "$DDS_TASK_NAME: " << getenv("DDS_TASK_NAME"); LOG(debug) << "$DDS_TASK_NAME: " << getenv("DDS_TASK_NAME");
LOG(DEBUG) << "$DDS_TASK_INDEX: " << getenv("DDS_TASK_INDEX"); LOG(debug) << "$DDS_TASK_INDEX: " << getenv("DDS_TASK_INDEX");
LOG(DEBUG) << "$DDS_COLLECTION_INDEX: " << getenv("DDS_COLLECTION_INDEX"); LOG(debug) << "$DDS_COLLECTION_INDEX: " << getenv("DDS_COLLECTION_INDEX");
// start DDS service - subscriptions will only start firing after this step // start DDS service - subscriptions will only start firing after this step
fService.start(); fService.start();

View File

@@ -196,7 +196,7 @@ int FairMQSocketSHM::ReceiveImpl(FairMQMessagePtr& msg, const int flags, const i
const auto numMsgs = nbytes / sizeof(MetaHeader); const auto numMsgs = nbytes / sizeof(MetaHeader);
if (numMsgs > 1) if (numMsgs > 1)
{ {
LOG(ERROR) << "Receiving SHM multipart with a single message receive call"; LOG(error) << "Receiving SHM multipart with a single message receive call";
} }
assert (numMsgs == 1); assert (numMsgs == 1);
@@ -311,13 +311,13 @@ int64_t FairMQSocketSHM::SendImpl(vector<FairMQMessagePtr>& msgVec, const int fl
else if (zmq_errno() == ETERM) else if (zmq_errno() == ETERM)
{ {
zmq_msg_close (&lZmqMsg); zmq_msg_close (&lZmqMsg);
LOG(INFO) << "terminating socket " << fId; LOG(info) << "terminating socket " << fId;
return -1; return -1;
} }
else else
{ {
zmq_msg_close (&lZmqMsg); zmq_msg_close (&lZmqMsg);
LOG(ERROR) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno); LOG(error) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno);
return nbytes; return nbytes;
} }
} }

View File

@@ -69,7 +69,7 @@ FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const Fai
} }
else else
{ {
LOG(warn) << "FairMQProgOptions not available! Using defaults."; LOG(debug) << "FairMQProgOptions not available! Using defaults.";
} }
fSessionName.resize(8, '_'); // shorten the session name, to accommodate for name size limit on some systems (MacOS) fSessionName.resize(8, '_'); // shorten the session name, to accommodate for name size limit on some systems (MacOS)

View File

@@ -39,7 +39,7 @@ FairMQTransportFactoryZMQ::FairMQTransportFactoryZMQ(const string& id, const Fai
} }
else else
{ {
LOG(warn) << "FairMQProgOptions not available! Using defaults."; LOG(debug) << "FairMQProgOptions not available! Using defaults.";
} }
if (zmq_ctx_set(fContext, ZMQ_IO_THREADS, numIoThreads) != 0) if (zmq_ctx_set(fContext, ZMQ_IO_THREADS, numIoThreads) != 0)