mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-16 10:01:47 +00:00
Compare commits
11 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
f46d446d52 | ||
|
db0937f339 | ||
|
bb1ce794b6 | ||
|
9e2373b55d | ||
|
c51e88e114 | ||
|
f9219dab65 | ||
|
0806720f61 | ||
|
e39d17d09e | ||
|
a14502242f | ||
|
d3697ec97b | ||
|
73377c5100 |
@@ -82,7 +82,7 @@ endif()
|
|||||||
|
|
||||||
if(BUILD_DDS_PLUGIN OR BUILD_SDK)
|
if(BUILD_DDS_PLUGIN OR BUILD_SDK)
|
||||||
find_package2(PRIVATE DDS REQUIRED
|
find_package2(PRIVATE DDS REQUIRED
|
||||||
VERSION 3.0
|
VERSION 3.5.3
|
||||||
)
|
)
|
||||||
set(DDS_Boost_COMPONENTS system log log_setup regex filesystem thread)
|
set(DDS_Boost_COMPONENTS system log log_setup regex filesystem thread)
|
||||||
set(DDS_Boost_VERSION 1.67)
|
set(DDS_Boost_VERSION 1.67)
|
||||||
@@ -136,7 +136,7 @@ endif()
|
|||||||
|
|
||||||
if(BUILD_SDK)
|
if(BUILD_SDK)
|
||||||
find_package2(BUNDLED asio
|
find_package2(BUNDLED asio
|
||||||
VERSION 1.13.0
|
VERSION 1.18.0
|
||||||
)
|
)
|
||||||
if(NOT asio_FOUND)
|
if(NOT asio_FOUND)
|
||||||
build_bundled(asio extern/asio)
|
build_bundled(asio extern/asio)
|
||||||
|
@@ -30,9 +30,6 @@ Set(configure_options "${configure_options};-DCMAKE_PREFIX_PATH=$ENV{SIMPATH}")
|
|||||||
Set(configure_options "${configure_options};-DBUILD_DDS_PLUGIN=ON")
|
Set(configure_options "${configure_options};-DBUILD_DDS_PLUGIN=ON")
|
||||||
Set(configure_options "${configure_options};-DBUILD_SDK=ON")
|
Set(configure_options "${configure_options};-DBUILD_SDK=ON")
|
||||||
Set(configure_options "${configure_options};-DBUILD_SDK_COMMANDS=ON")
|
Set(configure_options "${configure_options};-DBUILD_SDK_COMMANDS=ON")
|
||||||
Set(configure_options "${configure_options};-DFAST_BUILD=ON")
|
|
||||||
Set(configure_options "${configure_options};-DCOTIRE_MAXIMUM_NUMBER_OF_UNITY_INCLUDES=-j$ENV{number_of_processors}")
|
|
||||||
Set(configure_options "${configure_options};-DBoost_NO_BOOST_CMAKE=ON")
|
|
||||||
|
|
||||||
Set(EXTRA_FLAGS $ENV{EXTRA_FLAGS})
|
Set(EXTRA_FLAGS $ENV{EXTRA_FLAGS})
|
||||||
If(EXTRA_FLAGS)
|
If(EXTRA_FLAGS)
|
||||||
@@ -60,8 +57,13 @@ Ctest_Configure(BUILD "${CTEST_BINARY_DIRECTORY}"
|
|||||||
|
|
||||||
Ctest_Build(BUILD "${CTEST_BINARY_DIRECTORY}")
|
Ctest_Build(BUILD "${CTEST_BINARY_DIRECTORY}")
|
||||||
|
|
||||||
|
unset(exclude_tests)
|
||||||
|
if($ENV{EXCLUDE_UNSTABLE_DDS_TESTS})
|
||||||
|
set(exclude_tests EXCLUDE ".*\\.localhost$")
|
||||||
|
endif()
|
||||||
Ctest_Test(BUILD "${CTEST_BINARY_DIRECTORY}"
|
Ctest_Test(BUILD "${CTEST_BINARY_DIRECTORY}"
|
||||||
# PARALLEL_LEVEL $ENV{number_of_processors}
|
# PARALLEL_LEVEL $ENV{number_of_processors}
|
||||||
|
${exclude_tests}
|
||||||
PARALLEL_LEVEL $ENV{number_of_processors}
|
PARALLEL_LEVEL $ENV{number_of_processors}
|
||||||
RETURN_VALUE _ctest_test_ret_val
|
RETURN_VALUE _ctest_test_ret_val
|
||||||
)
|
)
|
||||||
|
10
Jenkinsfile
vendored
10
Jenkinsfile
vendored
@@ -46,6 +46,11 @@ def jobMatrix(String prefix, List specs, Closure callback) {
|
|||||||
echo "export GIT_BRANCH=$JOB_BASE_NAME" >> Dart.cfg
|
echo "export GIT_BRANCH=$JOB_BASE_NAME" >> Dart.cfg
|
||||||
echo "echo \\\$PATH" >> Dart.cfg
|
echo "echo \\\$PATH" >> Dart.cfg
|
||||||
'''
|
'''
|
||||||
|
|
||||||
|
if (os =~ /macOS10.14/) {
|
||||||
|
sh "echo \"export EXCLUDE_UNSTABLE_DDS_TESTS=1\" >> Dart.cfg"
|
||||||
|
}
|
||||||
|
|
||||||
sh 'cat Dart.cfg'
|
sh 'cat Dart.cfg'
|
||||||
|
|
||||||
callback.call(spec, label)
|
callback.call(spec, label)
|
||||||
@@ -74,8 +79,9 @@ pipeline{
|
|||||||
steps{
|
steps{
|
||||||
script {
|
script {
|
||||||
def build_jobs = jobMatrix('build', [
|
def build_jobs = jobMatrix('build', [
|
||||||
[os: 'Debian8', arch: 'x86_64', compiler: 'gcc9.1.0', fairsoft: 'fairmq_dev'],
|
[os: 'Debian8', arch: 'x86_64', compiler: 'gcc9.1.0', fairsoft: 'fairmq_dev'],
|
||||||
[os: 'macOS10.15', arch: 'x86_64', compiler: 'AppleLLVM11.0.3', fairsoft: 'fairmq_dev'],
|
[os: 'macOS10.14', arch: 'x86_64', compiler: 'AppleClang11.0', fairsoft: 'fairmq_dev'],
|
||||||
|
[os: 'macOS10.15', arch: 'x86_64', compiler: 'AppleClang12.0', fairsoft: 'fairmq_dev'],
|
||||||
]) { spec, label ->
|
]) { spec, label ->
|
||||||
sh './Dart.sh alfa_ci Dart.cfg'
|
sh './Dart.sh alfa_ci Dart.cfg'
|
||||||
}
|
}
|
||||||
|
2
extern/asio
vendored
2
extern/asio
vendored
Submodule extern/asio updated: 90f32660cd...be7badc31a
@@ -257,7 +257,7 @@ class FairMQChannel
|
|||||||
/// @param msg Constant reference of unique_ptr to a FairMQMessage
|
/// @param msg Constant reference of unique_ptr to a FairMQMessage
|
||||||
/// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
|
/// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
|
||||||
/// @return Number of bytes that have been queued, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
|
/// @return Number of bytes that have been queued, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
|
||||||
int Send(FairMQMessagePtr& msg, int sndTimeoutInMs = -1)
|
int64_t Send(FairMQMessagePtr& msg, int sndTimeoutInMs = -1)
|
||||||
{
|
{
|
||||||
CheckSendCompatibility(msg);
|
CheckSendCompatibility(msg);
|
||||||
return fSocket->Send(msg, sndTimeoutInMs);
|
return fSocket->Send(msg, sndTimeoutInMs);
|
||||||
@@ -267,7 +267,7 @@ class FairMQChannel
|
|||||||
/// @param msg Constant reference of unique_ptr to a FairMQMessage
|
/// @param msg Constant reference of unique_ptr to a FairMQMessage
|
||||||
/// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
|
/// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
|
||||||
/// @return Number of bytes that have been received, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
|
/// @return Number of bytes that have been received, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
|
||||||
int Receive(FairMQMessagePtr& msg, int rcvTimeoutInMs = -1)
|
int64_t Receive(FairMQMessagePtr& msg, int rcvTimeoutInMs = -1)
|
||||||
{
|
{
|
||||||
CheckReceiveCompatibility(msg);
|
CheckReceiveCompatibility(msg);
|
||||||
return fSocket->Receive(msg, rcvTimeoutInMs);
|
return fSocket->Receive(msg, rcvTimeoutInMs);
|
||||||
|
@@ -129,7 +129,7 @@ class FairMQDevice
|
|||||||
/// @param i channel index
|
/// @param i channel index
|
||||||
/// @param sndTimeoutInMs send timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
|
/// @param sndTimeoutInMs send timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
|
||||||
/// @return Number of bytes that have been queued, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
|
/// @return Number of bytes that have been queued, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
|
||||||
int Send(FairMQMessagePtr& msg, const std::string& channel, const int index = 0, int sndTimeoutInMs = -1)
|
int64_t Send(FairMQMessagePtr& msg, const std::string& channel, const int index = 0, int sndTimeoutInMs = -1)
|
||||||
{
|
{
|
||||||
return GetChannel(channel, index).Send(msg, sndTimeoutInMs);
|
return GetChannel(channel, index).Send(msg, sndTimeoutInMs);
|
||||||
}
|
}
|
||||||
@@ -140,7 +140,7 @@ class FairMQDevice
|
|||||||
/// @param i channel index
|
/// @param i channel index
|
||||||
/// @param rcvTimeoutInMs receive timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
|
/// @param rcvTimeoutInMs receive timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
|
||||||
/// @return Number of bytes that have been received, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
|
/// @return Number of bytes that have been received, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
|
||||||
int Receive(FairMQMessagePtr& msg, const std::string& channel, const int index = 0, int rcvTimeoutInMs = -1)
|
int64_t Receive(FairMQMessagePtr& msg, const std::string& channel, const int index = 0, int rcvTimeoutInMs = -1)
|
||||||
{
|
{
|
||||||
return GetChannel(channel, index).Receive(msg, rcvTimeoutInMs);
|
return GetChannel(channel, index).Receive(msg, rcvTimeoutInMs);
|
||||||
}
|
}
|
||||||
|
@@ -45,8 +45,8 @@ class FairMQSocket
|
|||||||
virtual bool Bind(const std::string& address) = 0;
|
virtual bool Bind(const std::string& address) = 0;
|
||||||
virtual bool Connect(const std::string& address) = 0;
|
virtual bool Connect(const std::string& address) = 0;
|
||||||
|
|
||||||
virtual int Send(FairMQMessagePtr& msg, int timeout = -1) = 0;
|
virtual int64_t Send(FairMQMessagePtr& msg, int timeout = -1) = 0;
|
||||||
virtual int Receive(FairMQMessagePtr& msg, int timeout = -1) = 0;
|
virtual int64_t Receive(FairMQMessagePtr& msg, int timeout = -1) = 0;
|
||||||
virtual int64_t Send(std::vector<std::unique_ptr<FairMQMessage>>& msgVec, int timeout = -1) = 0;
|
virtual int64_t Send(std::vector<std::unique_ptr<FairMQMessage>>& msgVec, int timeout = -1) = 0;
|
||||||
virtual int64_t Receive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec, int timeout = -1) = 0;
|
virtual int64_t Receive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec, int timeout = -1) = 0;
|
||||||
|
|
||||||
|
@@ -254,7 +254,7 @@ auto Socket::ConnectEndpoint(std::unique_ptr<asiofi::connected_endpoint>& endpoi
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
auto Socket::Send(MessagePtr& msg, const int /*timeout*/) -> int
|
auto Socket::Send(MessagePtr& msg, const int /*timeout*/) -> int64_t
|
||||||
{
|
{
|
||||||
// timeout argument not yet implemented
|
// timeout argument not yet implemented
|
||||||
|
|
||||||
@@ -412,7 +412,7 @@ auto Socket::SendQueueReaderStatic() -> void
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
auto Socket::Receive(MessagePtr& msg, const int /*timeout*/) -> int
|
auto Socket::Receive(MessagePtr& msg, const int /*timeout*/) -> int64_t
|
||||||
try {
|
try {
|
||||||
// timeout argument not yet implemented
|
// timeout argument not yet implemented
|
||||||
|
|
||||||
|
@@ -49,8 +49,8 @@ class Socket final : public fair::mq::Socket
|
|||||||
auto Bind(const std::string& address) -> bool override;
|
auto Bind(const std::string& address) -> bool override;
|
||||||
auto Connect(const std::string& address) -> bool override;
|
auto Connect(const std::string& address) -> bool override;
|
||||||
|
|
||||||
auto Send(MessagePtr& msg, int timeout = 0) -> int override;
|
auto Send(MessagePtr& msg, int timeout = 0) -> int64_t override;
|
||||||
auto Receive(MessagePtr& msg, int timeout = 0) -> int override;
|
auto Receive(MessagePtr& msg, int timeout = 0) -> int64_t override;
|
||||||
auto Send(std::vector<MessagePtr>& msgVec, int timeout = 0) -> int64_t override;
|
auto Send(std::vector<MessagePtr>& msgVec, int timeout = 0) -> int64_t override;
|
||||||
auto Receive(std::vector<MessagePtr>& msgVec, int timeout = 0) -> int64_t override;
|
auto Receive(std::vector<MessagePtr>& msgVec, int timeout = 0) -> int64_t override;
|
||||||
|
|
||||||
|
@@ -357,7 +357,7 @@ auto DDS::HandleCmd(const string& id, sdk::cmd::Cmd& cmd, const string& cond, ui
|
|||||||
} break;
|
} break;
|
||||||
case Type::dump_config: {
|
case Type::dump_config: {
|
||||||
stringstream ss;
|
stringstream ss;
|
||||||
for (const auto pKey : GetPropertyKeys()) {
|
for (const auto& pKey : GetPropertyKeys()) {
|
||||||
ss << id << ": " << pKey << " -> " << GetPropertyAsString(pKey) << "\n";
|
ss << id << ": " << pKey << " -> " << GetPropertyAsString(pKey) << "\n";
|
||||||
}
|
}
|
||||||
Cmds outCmds(make<Config>(id, ss.str()));
|
Cmds outCmds(make<Config>(id, ss.str()));
|
||||||
|
@@ -12,6 +12,7 @@
|
|||||||
#include <asio/associated_allocator.hpp>
|
#include <asio/associated_allocator.hpp>
|
||||||
#include <asio/associated_executor.hpp>
|
#include <asio/associated_executor.hpp>
|
||||||
#include <asio/executor_work_guard.hpp>
|
#include <asio/executor_work_guard.hpp>
|
||||||
|
#include <asio/dispatch.hpp>
|
||||||
#include <asio/system_executor.hpp>
|
#include <asio/system_executor.hpp>
|
||||||
#include <chrono>
|
#include <chrono>
|
||||||
#include <exception>
|
#include <exception>
|
||||||
@@ -69,17 +70,16 @@ struct AsioAsyncOpImpl : AsioAsyncOpImplBase<SignatureArgTypes...>
|
|||||||
throw RuntimeError("Async operation already completed");
|
throw RuntimeError("Async operation already completed");
|
||||||
}
|
}
|
||||||
|
|
||||||
GetEx2().dispatch(
|
asio::dispatch(GetEx2(),
|
||||||
[=, handler = std::move(fHandler)]() mutable {
|
[=, handler = std::move(fHandler)]() mutable {
|
||||||
try {
|
try {
|
||||||
handler(ec, args...);
|
handler(ec, args...);
|
||||||
} catch (const std::exception& e) {
|
} catch (const std::exception& e) {
|
||||||
FAIR_LOG(error) << "Uncaught exception in AsioAsyncOp completion handler: " << e.what();
|
FAIR_LOG(error) << "Uncaught exception in AsioAsyncOp completion handler: " << e.what();
|
||||||
} catch (...) {
|
} catch (...) {
|
||||||
FAIR_LOG(error) << "Unknown uncaught exception in AsioAsyncOp completion handler.";
|
FAIR_LOG(error) << "Unknown uncaught exception in AsioAsyncOp completion handler.";
|
||||||
}
|
}
|
||||||
},
|
});
|
||||||
GetAlloc2());
|
|
||||||
|
|
||||||
fWork1.reset();
|
fWork1.reset();
|
||||||
fWork2.reset();
|
fWork2.reset();
|
||||||
|
@@ -9,7 +9,7 @@
|
|||||||
#ifndef FAIR_MQ_SDK_ASIOBASE_H
|
#ifndef FAIR_MQ_SDK_ASIOBASE_H
|
||||||
#define FAIR_MQ_SDK_ASIOBASE_H
|
#define FAIR_MQ_SDK_ASIOBASE_H
|
||||||
|
|
||||||
#include <asio/executor.hpp>
|
#include <asio/any_io_executor.hpp>
|
||||||
#include <fairmq/sdk/Traits.h>
|
#include <fairmq/sdk/Traits.h>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
#include <utility>
|
#include <utility>
|
||||||
@@ -18,7 +18,7 @@ namespace fair {
|
|||||||
namespace mq {
|
namespace mq {
|
||||||
namespace sdk {
|
namespace sdk {
|
||||||
|
|
||||||
using DefaultExecutor = asio::executor;
|
using DefaultExecutor = asio::any_io_executor;
|
||||||
using DefaultAllocator = std::allocator<int>;
|
using DefaultAllocator = std::allocator<int>;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@@ -18,11 +18,13 @@ namespace sdk {
|
|||||||
/// @param nativeSession Existing and initialized CSession (either via create() or attach())
|
/// @param nativeSession Existing and initialized CSession (either via create() or attach())
|
||||||
/// @param nativeTopo Existing CTopology that is activated on the given nativeSession
|
/// @param nativeTopo Existing CTopology that is activated on the given nativeSession
|
||||||
/// @param env Optional DDSEnv (needed primarily for unit testing)
|
/// @param env Optional DDSEnv (needed primarily for unit testing)
|
||||||
|
/// @param blockUntilConnected if true, ctor will wait for all tasks to confirm subscriptions
|
||||||
auto MakeTopology(dds::topology_api::CTopology nativeTopo,
|
auto MakeTopology(dds::topology_api::CTopology nativeTopo,
|
||||||
std::shared_ptr<dds::tools_api::CSession> nativeSession,
|
std::shared_ptr<dds::tools_api::CSession> nativeSession,
|
||||||
DDSEnv env) -> Topology
|
DDSEnv env,
|
||||||
|
bool blockUntilConnected) -> Topology
|
||||||
{
|
{
|
||||||
return {DDSTopo(std::move(nativeTopo), env), DDSSession(std::move(nativeSession), env)};
|
return {DDSTopo(std::move(nativeTopo), env), DDSSession(std::move(nativeSession), env), blockUntilConnected};
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace sdk
|
} // namespace sdk
|
||||||
|
@@ -216,18 +216,21 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
|||||||
/// @brief (Re)Construct a FairMQ topology from an existing DDS topology
|
/// @brief (Re)Construct a FairMQ topology from an existing DDS topology
|
||||||
/// @param topo DDSTopology
|
/// @param topo DDSTopology
|
||||||
/// @param session DDSSession
|
/// @param session DDSSession
|
||||||
BasicTopology(DDSTopology topo, DDSSession session)
|
/// @param blockUntilConnected if true, ctor will wait for all tasks to confirm subscriptions
|
||||||
: BasicTopology<Executor, Allocator>(asio::system_executor(), std::move(topo), std::move(session))
|
BasicTopology(DDSTopology topo, DDSSession session, bool blockUntilConnected = false)
|
||||||
|
: BasicTopology<Executor, Allocator>(asio::system_executor(), std::move(topo), std::move(session), blockUntilConnected)
|
||||||
{}
|
{}
|
||||||
|
|
||||||
/// @brief (Re)Construct a FairMQ topology from an existing DDS topology
|
/// @brief (Re)Construct a FairMQ topology from an existing DDS topology
|
||||||
/// @param ex I/O executor to be associated
|
/// @param ex I/O executor to be associated
|
||||||
/// @param topo DDSTopology
|
/// @param topo DDSTopology
|
||||||
/// @param session DDSSession
|
/// @param session DDSSession
|
||||||
|
/// @param blockUntilConnected if true, ctor will wait for all tasks to confirm subscriptions
|
||||||
/// @throws RuntimeError
|
/// @throws RuntimeError
|
||||||
BasicTopology(const Executor& ex,
|
BasicTopology(const Executor& ex,
|
||||||
DDSTopology topo,
|
DDSTopology topo,
|
||||||
DDSSession session,
|
DDSSession session,
|
||||||
|
bool blockUntilConnected = false,
|
||||||
Allocator alloc = DefaultAllocator())
|
Allocator alloc = DefaultAllocator())
|
||||||
: AsioBase<Executor, Allocator>(ex, std::move(alloc))
|
: AsioBase<Executor, Allocator>(ex, std::move(alloc))
|
||||||
, fDDSSession(std::move(session))
|
, fDDSSession(std::move(session))
|
||||||
@@ -235,7 +238,8 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
|||||||
, fStateData()
|
, fStateData()
|
||||||
, fStateIndex()
|
, fStateIndex()
|
||||||
, fMtx(std::make_unique<std::mutex>())
|
, fMtx(std::make_unique<std::mutex>())
|
||||||
, fStateChangeUnsubscriptionCV(std::make_unique<std::condition_variable>())
|
, fStateChangeSubscriptionsCV(std::make_unique<std::condition_variable>())
|
||||||
|
, fNumStateChangePublishers(0)
|
||||||
, fHeartbeatsTimer(asio::system_executor())
|
, fHeartbeatsTimer(asio::system_executor())
|
||||||
, fHeartbeatInterval(600000)
|
, fHeartbeatInterval(600000)
|
||||||
{
|
{
|
||||||
@@ -251,6 +255,9 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
|||||||
|
|
||||||
fDDSSession.StartDDSService();
|
fDDSSession.StartDDSService();
|
||||||
SubscribeToStateChanges();
|
SubscribeToStateChanges();
|
||||||
|
if (blockUntilConnected) {
|
||||||
|
WaitForPublisherCount(fStateIndex.size());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// not copyable
|
/// not copyable
|
||||||
@@ -284,6 +291,14 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
|||||||
fHeartbeatsTimer.async_wait(std::bind(&BasicTopology::SendSubscriptionHeartbeats, this, std::placeholders::_1));
|
fHeartbeatsTimer.async_wait(std::bind(&BasicTopology::SendSubscriptionHeartbeats, this, std::placeholders::_1));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void WaitForPublisherCount(unsigned int number)
|
||||||
|
{
|
||||||
|
std::unique_lock<std::mutex> lk(*fMtx);
|
||||||
|
fStateChangeSubscriptionsCV->wait(lk, [&](){
|
||||||
|
return fNumStateChangePublishers == number;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
void SendSubscriptionHeartbeats(const std::error_code& ec)
|
void SendSubscriptionHeartbeats(const std::error_code& ec)
|
||||||
{
|
{
|
||||||
if (!ec) {
|
if (!ec) {
|
||||||
@@ -308,13 +323,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
|||||||
fDDSSession.SendCommand(cmd::Cmds(cmd::make<cmd::UnsubscribeFromStateChange>()).Serialize());
|
fDDSSession.SendCommand(cmd::Cmds(cmd::make<cmd::UnsubscribeFromStateChange>()).Serialize());
|
||||||
|
|
||||||
// wait for all tasks to confirm unsubscription
|
// wait for all tasks to confirm unsubscription
|
||||||
std::unique_lock<std::mutex> lk(*fMtx);
|
WaitForPublisherCount(0);
|
||||||
fStateChangeUnsubscriptionCV->wait(lk, [&](){
|
|
||||||
unsigned int count = std::count_if(fStateIndex.cbegin(), fStateIndex.cend(), [=](const auto& s) {
|
|
||||||
return fStateData.at(s.second).subscribed_to_state_changes == false;
|
|
||||||
});
|
|
||||||
return count == fStateIndex.size();
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void SubscribeToCommands()
|
void SubscribeToCommands()
|
||||||
@@ -360,11 +369,19 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
|||||||
DDSTask::Id taskId(cmd.GetTaskId());
|
DDSTask::Id taskId(cmd.GetTaskId());
|
||||||
|
|
||||||
try {
|
try {
|
||||||
std::lock_guard<std::mutex> lk(*fMtx);
|
std::unique_lock<std::mutex> lk(*fMtx);
|
||||||
DeviceStatus& task = fStateData.at(fStateIndex.at(taskId));
|
DeviceStatus& task = fStateData.at(fStateIndex.at(taskId));
|
||||||
task.subscribed_to_state_changes = true;
|
if (!task.subscribed_to_state_changes) {
|
||||||
|
task.subscribed_to_state_changes = true;
|
||||||
|
++fNumStateChangePublishers;
|
||||||
|
} else {
|
||||||
|
FAIR_LOG(warn) << "Task '" << task.taskId << "' sent subscription confirmation more than once";
|
||||||
|
}
|
||||||
|
lk.unlock();
|
||||||
|
fStateChangeSubscriptionsCV->notify_one();
|
||||||
} catch (const std::exception& e) {
|
} catch (const std::exception& e) {
|
||||||
FAIR_LOG(error) << "Exception in HandleCmd(cmd::StateChangeSubscription const&): " << e.what();
|
FAIR_LOG(error) << "Exception in HandleCmd(cmd::StateChangeSubscription const&): " << e.what();
|
||||||
|
FAIR_LOG(error) << "Possibly no task with id '" << taskId << "'?";
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
FAIR_LOG(error) << "State change subscription failed for device: " << cmd.GetDeviceId() << ", task id: " << cmd.GetTaskId();
|
FAIR_LOG(error) << "State change subscription failed for device: " << cmd.GetDeviceId() << ", task id: " << cmd.GetTaskId();
|
||||||
@@ -379,9 +396,14 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
|||||||
try {
|
try {
|
||||||
std::unique_lock<std::mutex> lk(*fMtx);
|
std::unique_lock<std::mutex> lk(*fMtx);
|
||||||
DeviceStatus& task = fStateData.at(fStateIndex.at(taskId));
|
DeviceStatus& task = fStateData.at(fStateIndex.at(taskId));
|
||||||
task.subscribed_to_state_changes = false;
|
if (task.subscribed_to_state_changes) {
|
||||||
|
task.subscribed_to_state_changes = false;
|
||||||
|
--fNumStateChangePublishers;
|
||||||
|
} else {
|
||||||
|
FAIR_LOG(warn) << "Task '" << task.taskId << "' sent unsubscription confirmation more than once";
|
||||||
|
}
|
||||||
lk.unlock();
|
lk.unlock();
|
||||||
fStateChangeUnsubscriptionCV->notify_one();
|
fStateChangeSubscriptionsCV->notify_one();
|
||||||
} catch (const std::exception& e) {
|
} catch (const std::exception& e) {
|
||||||
FAIR_LOG(error) << "Exception in HandleCmd(cmd::StateChangeUnsubscription const&): " << e.what();
|
FAIR_LOG(error) << "Exception in HandleCmd(cmd::StateChangeUnsubscription const&): " << e.what();
|
||||||
}
|
}
|
||||||
@@ -406,6 +428,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
|||||||
// if the task is exiting, it will not respond to unsubscription request anymore, set it to false now.
|
// if the task is exiting, it will not respond to unsubscription request anymore, set it to false now.
|
||||||
if (task.state == DeviceState::Exiting) {
|
if (task.state == DeviceState::Exiting) {
|
||||||
task.subscribed_to_state_changes = false;
|
task.subscribed_to_state_changes = false;
|
||||||
|
--fNumStateChangePublishers;
|
||||||
}
|
}
|
||||||
// FAIR_LOG(debug) << "Updated state entry: taskId=" << taskId << ", state=" << state;
|
// FAIR_LOG(debug) << "Updated state entry: taskId=" << taskId << ", state=" << state;
|
||||||
|
|
||||||
@@ -1300,7 +1323,8 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
|||||||
|
|
||||||
mutable std::unique_ptr<std::mutex> fMtx;
|
mutable std::unique_ptr<std::mutex> fMtx;
|
||||||
|
|
||||||
std::unique_ptr<std::condition_variable> fStateChangeUnsubscriptionCV;
|
std::unique_ptr<std::condition_variable> fStateChangeSubscriptionsCV;
|
||||||
|
unsigned int fNumStateChangePublishers;
|
||||||
asio::steady_timer fHeartbeatsTimer;
|
asio::steady_timer fHeartbeatsTimer;
|
||||||
Duration fHeartbeatInterval;
|
Duration fHeartbeatInterval;
|
||||||
|
|
||||||
@@ -1336,9 +1360,11 @@ using Topo = Topology;
|
|||||||
/// @param nativeSession Existing and initialized CSession (either via create() or attach())
|
/// @param nativeSession Existing and initialized CSession (either via create() or attach())
|
||||||
/// @param nativeTopo Existing CTopology that is activated on the given nativeSession
|
/// @param nativeTopo Existing CTopology that is activated on the given nativeSession
|
||||||
/// @param env Optional DDSEnv (needed primarily for unit testing)
|
/// @param env Optional DDSEnv (needed primarily for unit testing)
|
||||||
|
/// @param blockUntilConnected if true, ctor will wait for all tasks to confirm subscriptions
|
||||||
auto MakeTopology(dds::topology_api::CTopology nativeTopo,
|
auto MakeTopology(dds::topology_api::CTopology nativeTopo,
|
||||||
std::shared_ptr<dds::tools_api::CSession> nativeSession,
|
std::shared_ptr<dds::tools_api::CSession> nativeSession,
|
||||||
DDSEnv env = {}) -> Topology;
|
DDSEnv env = {},
|
||||||
|
bool blockUntilConnected = false) -> Topology;
|
||||||
|
|
||||||
} // namespace sdk
|
} // namespace sdk
|
||||||
} // namespace mq
|
} // namespace mq
|
||||||
|
@@ -9,6 +9,7 @@
|
|||||||
#include <fairmq/sdk/commands/Commands.h>
|
#include <fairmq/sdk/commands/Commands.h>
|
||||||
#include <fairmq/States.h>
|
#include <fairmq/States.h>
|
||||||
#include <fairmq/SDK.h>
|
#include <fairmq/SDK.h>
|
||||||
|
#include <fairmq/Tools.h>
|
||||||
|
|
||||||
#include <boost/program_options.hpp>
|
#include <boost/program_options.hpp>
|
||||||
|
|
||||||
@@ -63,14 +64,25 @@ void handleCommand(const string& command, const string& path, unsigned int timeo
|
|||||||
if (command == "c") {
|
if (command == "c") {
|
||||||
cout << "> checking state of the devices" << endl;
|
cout << "> checking state of the devices" << endl;
|
||||||
auto const result = topo.GetCurrentState();
|
auto const result = topo.GetCurrentState();
|
||||||
|
bool error = false;
|
||||||
for (const auto& d : result) {
|
for (const auto& d : result) {
|
||||||
cout << d.taskId << " : " << d.state << endl;
|
cout << d.taskId << " : " << d.state << endl;
|
||||||
|
if (d.state == sdk::DeviceState::Error) {
|
||||||
|
error = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (error) {
|
||||||
|
throw runtime_error("Some of the devices are in the Error state");
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
} else if (command == "o") {
|
} else if (command == "o") {
|
||||||
cout << "> dumping config of " << (path == "" ? "all" : path) << endl;
|
cout << "> dumping config of " << (path == "" ? "all" : path) << endl;
|
||||||
// TODO: extend this regex to return all properties, once command size limitation is removed.
|
// TODO: extend this regex to return all properties, once command size limitation is removed.
|
||||||
auto const result = topo.GetProperties("^(session|id)$", path, std::chrono::milliseconds(timeout));
|
auto const result = topo.GetProperties("^(session|id)$", path, std::chrono::milliseconds(timeout));
|
||||||
|
if (result.first != std::error_code()) {
|
||||||
|
cout << "ERROR: GetProperties failed for '" << path << "': " << result.first.message() << endl;
|
||||||
|
throw runtime_error(tools::ToString("GetProperties failed for '", path, "': ", result.first.message()));
|
||||||
|
}
|
||||||
for (const auto& d : result.second.devices) {
|
for (const auto& d : result.second.devices) {
|
||||||
for (auto const& p : d.second.props) {
|
for (auto const& p : d.second.props) {
|
||||||
cout << d.first << ": " << p.first << " : " << p.second << endl;
|
cout << d.first << ": " << p.first << " : " << p.second << endl;
|
||||||
@@ -80,11 +92,15 @@ void handleCommand(const string& command, const string& path, unsigned int timeo
|
|||||||
} else if (command == "p") {
|
} else if (command == "p") {
|
||||||
if (pKey == "" || pVal == "") {
|
if (pKey == "" || pVal == "") {
|
||||||
cout << "cannot send property with empty key and/or value! given key: '" << pKey << "', value: '" << pVal << "'." << endl;
|
cout << "cannot send property with empty key and/or value! given key: '" << pKey << "', value: '" << pVal << "'." << endl;
|
||||||
return;
|
throw runtime_error(tools::ToString("cannot send property with empty key and/or value! given key: '", pKey, "', value: '", pVal, "'."));
|
||||||
}
|
}
|
||||||
const DeviceProperties props{{pKey, pVal}};
|
const DeviceProperties props{{pKey, pVal}};
|
||||||
cout << "> setting properties --> " << (path == "" ? "all" : path) << endl;
|
cout << "> setting properties --> " << (path == "" ? "all" : path) << endl;
|
||||||
topo.SetProperties(props, path);
|
auto const result = topo.SetProperties(props, path);
|
||||||
|
if (result.first != std::error_code()) {
|
||||||
|
cout << "ERROR: SetProperties failed for '" << path << "': " << result.first.message() << endl;
|
||||||
|
throw runtime_error(tools::ToString("SetProperties failed for '", path, "': ", result.first.message()));
|
||||||
|
}
|
||||||
// give dds time to complete request
|
// give dds time to complete request
|
||||||
this_thread::sleep_for(chrono::milliseconds(100));
|
this_thread::sleep_for(chrono::milliseconds(100));
|
||||||
return;
|
return;
|
||||||
@@ -125,10 +141,11 @@ void handleCommand(const string& command, const string& path, unsigned int timeo
|
|||||||
} else {
|
} else {
|
||||||
cout << "\033[01;32mInvalid input: [" << command << "]\033[0m" << endl;
|
cout << "\033[01;32mInvalid input: [" << command << "]\033[0m" << endl;
|
||||||
printControlsHelp();
|
printControlsHelp();
|
||||||
return;
|
throw runtime_error(tools::ToString("\033[01;32mInvalid input: [", command, "]\033[0m"));
|
||||||
}
|
}
|
||||||
if (changeStateResult.first != std::error_code()) {
|
if (changeStateResult.first != std::error_code()) {
|
||||||
cout << "ERROR: ChangeState failed for '" << path << "': " << changeStateResult.first.message() << endl;
|
cout << "ERROR: ChangeState failed for '" << path << "': " << changeStateResult.first.message() << endl;
|
||||||
|
throw runtime_error(tools::ToString("ERROR: ChangeState failed for '", path, "': ", changeStateResult.first.message()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -148,7 +165,11 @@ void sendCommand(const string& commandIn, const string& path, unsigned int timeo
|
|||||||
command = c;
|
command = c;
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
handleCommand(command, path, timeout, topo, pKey, pVal);
|
try {
|
||||||
|
handleCommand(command, path, timeout, topo, pKey, pVal);
|
||||||
|
} catch(exception& e) {
|
||||||
|
cout << "Error: " << e.what() << endl;
|
||||||
|
}
|
||||||
cin >> c;
|
cin >> c;
|
||||||
command = c;
|
command = c;
|
||||||
}
|
}
|
||||||
@@ -209,7 +230,7 @@ try {
|
|||||||
DDSSession session(sessionID, env);
|
DDSSession session(sessionID, env);
|
||||||
DDSTopology ddsTopo(DDSTopology::Path(topoFile), env);
|
DDSTopology ddsTopo(DDSTopology::Path(topoFile), env);
|
||||||
|
|
||||||
Topology topo(ddsTopo, session);
|
Topology topo(ddsTopo, session, true);
|
||||||
|
|
||||||
if (targetState != "") {
|
if (targetState != "") {
|
||||||
if (command != "") {
|
if (command != "") {
|
||||||
|
@@ -143,23 +143,19 @@ class Manager
|
|||||||
} catch(interprocess_exception& bie) {
|
} catch(interprocess_exception& bie) {
|
||||||
LOG(error) << "something went wrong: " << bie.what();
|
LOG(error) << "something went wrong: " << bie.what();
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
if (mlockSegment) {
|
if (mlockSegment) {
|
||||||
LOG(debug) << "Locking the managed segment memory pages...";
|
LOG(debug) << "Locking the managed segment memory pages...";
|
||||||
if (mlock(boost::apply_visitor(SegmentAddress{}, fSegments.at(fSegmentId)), boost::apply_visitor(SegmentSize{}, fSegments.at(fSegmentId))) == -1) {
|
if (mlock(boost::apply_visitor(SegmentAddress{}, fSegments.at(fSegmentId)), boost::apply_visitor(SegmentSize{}, fSegments.at(fSegmentId))) == -1) {
|
||||||
LOG(error) << "Could not lock the managed segment memory. Code: " << errno << ", reason: " << strerror(errno);
|
LOG(error) << "Could not lock the managed segment memory. Code: " << errno << ", reason: " << strerror(errno);
|
||||||
|
}
|
||||||
|
LOG(debug) << "Successfully locked the managed segment memory pages.";
|
||||||
|
}
|
||||||
|
if (zeroSegment) {
|
||||||
|
LOG(debug) << "Zeroing the managed segment free memory...";
|
||||||
|
boost::apply_visitor(SegmentMemoryZeroer{}, fSegments.at(fSegmentId));
|
||||||
|
LOG(debug) << "Successfully zeroed the managed segment free memory.";
|
||||||
}
|
}
|
||||||
LOG(debug) << "Successfully locked the managed segment memory pages.";
|
|
||||||
}
|
|
||||||
if (zeroSegment) {
|
|
||||||
LOG(debug) << "Zeroing the managed segment free memory...";
|
|
||||||
boost::apply_visitor(SegmentMemoryZeroer{}, fSegments.at(fSegmentId));
|
|
||||||
LOG(debug) << "Successfully zeroed the managed segment free memory.";
|
|
||||||
}
|
|
||||||
|
|
||||||
{
|
|
||||||
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
|
|
||||||
|
|
||||||
fShmRegions = fManagementSegment.find_or_construct<Uint16RegionInfoHashMap>(unique_instance)(fShmVoidAlloc);
|
fShmRegions = fManagementSegment.find_or_construct<Uint16RegionInfoHashMap>(unique_instance)(fShmVoidAlloc);
|
||||||
|
|
||||||
@@ -367,13 +363,20 @@ class Manager
|
|||||||
}
|
}
|
||||||
|
|
||||||
for (const auto& e : *fShmSegments) {
|
for (const auto& e : *fShmSegments) {
|
||||||
fair::mq::RegionInfo info;
|
// make sure any segments in the session are found
|
||||||
info.managed = true;
|
GetSegment(e.first);
|
||||||
info.id = e.first;
|
try {
|
||||||
info.event = RegionEvent::created;
|
fair::mq::RegionInfo info;
|
||||||
info.ptr = boost::apply_visitor(SegmentAddress{}, fSegments.at(e.first));
|
info.managed = true;
|
||||||
info.size = boost::apply_visitor(SegmentSize{}, fSegments.at(e.first));
|
info.id = e.first;
|
||||||
result.push_back(info);
|
info.event = RegionEvent::created;
|
||||||
|
info.ptr = boost::apply_visitor(SegmentAddress{}, fSegments.at(e.first));
|
||||||
|
info.size = boost::apply_visitor(SegmentSize{}, fSegments.at(e.first));
|
||||||
|
result.push_back(info);
|
||||||
|
} catch (const std::out_of_range& oor) {
|
||||||
|
LOG(error) << "could not find segment with id " << e.first;
|
||||||
|
LOG(error) << oor.what();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
@@ -474,7 +477,7 @@ class Manager
|
|||||||
try {
|
try {
|
||||||
// get region info
|
// get region info
|
||||||
SegmentInfo segmentInfo = fShmSegments->at(id);
|
SegmentInfo segmentInfo = fShmSegments->at(id);
|
||||||
LOG(info) << "LOCATED SEGMENT WITH ID '" << id << "'";
|
LOG(debug) << "Located segment with id '" << id << "'";
|
||||||
|
|
||||||
using namespace boost::interprocess;
|
using namespace boost::interprocess;
|
||||||
|
|
||||||
|
@@ -155,7 +155,7 @@ class Socket final : public fair::mq::Socket
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int Send(MessagePtr& msg, const int timeout = -1) override
|
int64_t Send(MessagePtr& msg, const int timeout = -1) override
|
||||||
{
|
{
|
||||||
int flags = 0;
|
int flags = 0;
|
||||||
if (timeout == 0) {
|
if (timeout == 0) {
|
||||||
@@ -191,7 +191,7 @@ class Socket final : public fair::mq::Socket
|
|||||||
return static_cast<int>(TransferResult::error);
|
return static_cast<int>(TransferResult::error);
|
||||||
}
|
}
|
||||||
|
|
||||||
int Receive(MessagePtr& msg, const int timeout = -1) override
|
int64_t Receive(MessagePtr& msg, const int timeout = -1) override
|
||||||
{
|
{
|
||||||
int flags = 0;
|
int flags = 0;
|
||||||
if (timeout == 0) {
|
if (timeout == 0) {
|
||||||
|
@@ -132,7 +132,7 @@ class Socket final : public fair::mq::Socket
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int Send(MessagePtr& msg, const int timeout = -1) override
|
int64_t Send(MessagePtr& msg, const int timeout = -1) override
|
||||||
{
|
{
|
||||||
int flags = 0;
|
int flags = 0;
|
||||||
if (timeout == 0) {
|
if (timeout == 0) {
|
||||||
@@ -162,7 +162,7 @@ class Socket final : public fair::mq::Socket
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int Receive(MessagePtr& msg, const int timeout = -1) override
|
int64_t Receive(MessagePtr& msg, const int timeout = -1) override
|
||||||
{
|
{
|
||||||
int flags = 0;
|
int flags = 0;
|
||||||
if (timeout == 0) {
|
if (timeout == 0) {
|
||||||
|
Reference in New Issue
Block a user