mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-17 10:31:46 +00:00
Compare commits
9 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
e6dede492e | ||
|
f195eeac66 | ||
|
4d1e7b9cdb | ||
|
50be386191 | ||
|
f31be6d7a1 | ||
|
5607d47664 | ||
|
0f4595b8c1 | ||
|
b0b271d1f4 | ||
|
073f5e5c0e |
@@ -85,21 +85,15 @@ if(BUILD_NANOMSG_TRANSPORT)
|
|||||||
)
|
)
|
||||||
endif()
|
endif()
|
||||||
|
|
||||||
if(BUILD_SDK)
|
|
||||||
set(required_dds_version 2.5.46)
|
|
||||||
else()
|
|
||||||
set(required_dds_version 2.4)
|
|
||||||
endif()
|
|
||||||
|
|
||||||
if(BUILD_SDK_COMMANDS)
|
if(BUILD_SDK_COMMANDS)
|
||||||
find_package2(PRIVATE Flatbuffers REQUIRED)
|
find_package2(PRIVATE Flatbuffers REQUIRED)
|
||||||
endif()
|
endif()
|
||||||
|
|
||||||
if(BUILD_DDS_PLUGIN OR BUILD_SDK)
|
if(BUILD_DDS_PLUGIN OR BUILD_SDK)
|
||||||
find_package2(PRIVATE DDS REQUIRED
|
find_package2(PRIVATE DDS REQUIRED
|
||||||
VERSION ${required_dds_version}
|
VERSION 3.0
|
||||||
)
|
)
|
||||||
set(DDS_Boost_COMPONENTS system log log_setup)
|
set(DDS_Boost_COMPONENTS system log log_setup regex filesystem thread)
|
||||||
set(DDS_Boost_VERSION 1.67)
|
set(DDS_Boost_VERSION 1.67)
|
||||||
endif()
|
endif()
|
||||||
|
|
||||||
|
2
Dart.sh
2
Dart.sh
@@ -65,7 +65,7 @@ if [ "$1" == "alfa_ci" ]; then
|
|||||||
export ctest_model=Experimental
|
export ctest_model=Experimental
|
||||||
elif [ "$1" == "codecov" ]; then
|
elif [ "$1" == "codecov" ]; then
|
||||||
export ctest_model=Profile
|
export ctest_model=Profile
|
||||||
export do_codecov_upload=1
|
export do_codecov_upload=0
|
||||||
else
|
else
|
||||||
export ctest_model=$1
|
export ctest_model=$1
|
||||||
fi
|
fi
|
||||||
|
@@ -28,12 +28,6 @@ target_link_libraries(fairmq-ex-dds-sink PRIVATE ExampleDDSLib)
|
|||||||
|
|
||||||
add_custom_target(ExampleDDS DEPENDS fairmq-ex-dds-sampler fairmq-ex-dds-processor fairmq-ex-dds-sink)
|
add_custom_target(ExampleDDS DEPENDS fairmq-ex-dds-sampler fairmq-ex-dds-processor fairmq-ex-dds-sink)
|
||||||
|
|
||||||
if(DDS_VERSION VERSION_LESS 2.5.25)
|
|
||||||
set(WAIT_COMMAND "dds-info --wait-for-idle-agents")
|
|
||||||
else()
|
|
||||||
set(WAIT_COMMAND "dds-info --idle-count --wait")
|
|
||||||
endif()
|
|
||||||
|
|
||||||
set(BIN_DIR ${CMAKE_CURRENT_BINARY_DIR}:${CMAKE_BINARY_DIR}/fairmq/plugins/DDS)
|
set(BIN_DIR ${CMAKE_CURRENT_BINARY_DIR}:${CMAKE_BINARY_DIR}/fairmq/plugins/DDS)
|
||||||
set(DATA_DIR ${CMAKE_CURRENT_BINARY_DIR})
|
set(DATA_DIR ${CMAKE_CURRENT_BINARY_DIR})
|
||||||
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/ex-dds-topology.xml ${CMAKE_CURRENT_BINARY_DIR}/ex-dds-topology.xml @ONLY)
|
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/ex-dds-topology.xml ${CMAKE_CURRENT_BINARY_DIR}/ex-dds-topology.xml @ONLY)
|
||||||
@@ -47,7 +41,6 @@ if(DDS_FOUND)
|
|||||||
add_test(NAME Example.DDS.localhost COMMAND ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-dds.sh localhost)
|
add_test(NAME Example.DDS.localhost COMMAND ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-dds.sh localhost)
|
||||||
set_tests_properties(Example.DDS.localhost PROPERTIES
|
set_tests_properties(Example.DDS.localhost PROPERTIES
|
||||||
TIMEOUT 15
|
TIMEOUT 15
|
||||||
RUN_SERIAL true
|
|
||||||
PASS_REGULAR_EXPRESSION "Example successful"
|
PASS_REGULAR_EXPRESSION "Example successful"
|
||||||
)
|
)
|
||||||
endif()
|
endif()
|
||||||
|
@@ -30,52 +30,50 @@ echo "SESSION ID: ${DDS_SESSION_ID}"
|
|||||||
|
|
||||||
trap "cleanup ${DDS_SESSION_ID}" EXIT
|
trap "cleanup ${DDS_SESSION_ID}" EXIT
|
||||||
|
|
||||||
requiredNofAgents=12
|
requiredNofSlots=12
|
||||||
if [[ "$plugin" == "ssh" ]]; then
|
if [[ "$plugin" == "ssh" ]]; then
|
||||||
dds-submit -r ${plugin} -c @DATA_DIR@/ex-dds-hosts.cfg
|
dds-submit -r ${plugin} -c @DATA_DIR@/ex-dds-hosts.cfg
|
||||||
else
|
else
|
||||||
dds-submit -r ${plugin} -n ${requiredNofAgents}
|
dds-submit -r ${plugin} --slots ${requiredNofSlots}
|
||||||
fi
|
fi
|
||||||
echo "...waiting for ${requiredNofAgents} idle agents..."
|
echo "...waiting for ${requiredNofSlots} idle slots..."
|
||||||
@WAIT_COMMAND@ ${requiredNofAgents}
|
dds-info --idle-count --wait ${requiredNofSlots}
|
||||||
|
|
||||||
topologyFile=@DATA_DIR@/ex-dds-topology.xml
|
topologyFile=@DATA_DIR@/ex-dds-topology.xml
|
||||||
echo "TOPOLOGY FILE: ${topologyFile}"
|
echo "TOPOLOGY FILE: ${topologyFile}"
|
||||||
# TODO Uncomment once DDS 2.6 is released
|
echo "TOPOLOGY NAME: $(dds-topology --disable-validation --topology-name ${topologyFile})"
|
||||||
# echo "TOPOLOGY NAME: $(dds-topology --disable-validation --topology-name ${topologyFile})"
|
|
||||||
|
|
||||||
# TODO Uncomment once DDS 2.6 is released
|
dds-info --active-topology
|
||||||
# dds-info --active-topology
|
|
||||||
dds-topology --activate ${topologyFile}
|
dds-topology --activate ${topologyFile}
|
||||||
# dds-info --active-topology
|
dds-info --active-topology
|
||||||
# dds-info --wait-for-executing-agents ${requiredNofAgents}
|
echo "...waiting for ${requiredNofSlots} executing slots..."
|
||||||
sleep 1
|
dds-info --executing-count --wait ${requiredNofSlots}
|
||||||
|
|
||||||
echo "------------------------"
|
echo "------------------------"
|
||||||
echo "...waiting for Topology to finish..."
|
echo "...waiting for Topology to finish..."
|
||||||
# TODO Retrieve number of devices from DDS topology API instead of having the user pass it explicitely
|
# TODO Retrieve number of devices from DDS topology API instead of having the user pass it explicitely
|
||||||
fairmq-dds-command-ui -w "IDLE" -n ${requiredNofAgents}
|
fairmq-dds-command-ui -w "IDLE" -n ${requiredNofSlots}
|
||||||
fairmq-dds-command-ui -c i -w "INITIALIZING DEVICE" -n ${requiredNofAgents}
|
fairmq-dds-command-ui -c i -w "INITIALIZING DEVICE" -n ${requiredNofSlots}
|
||||||
fairmq-dds-command-ui -c k -w "INITIALIZED" -n ${requiredNofAgents}
|
fairmq-dds-command-ui -c k -w "INITIALIZED" -n ${requiredNofSlots}
|
||||||
fairmq-dds-command-ui -c b -w "BOUND" -n ${requiredNofAgents}
|
fairmq-dds-command-ui -c b -w "BOUND" -n ${requiredNofSlots}
|
||||||
fairmq-dds-command-ui -c x -w "DEVICE READY" -n ${requiredNofAgents}
|
fairmq-dds-command-ui -c x -w "DEVICE READY" -n ${requiredNofSlots}
|
||||||
fairmq-dds-command-ui -c j -w "READY" -n ${requiredNofAgents}
|
fairmq-dds-command-ui -c j -w "READY" -n ${requiredNofSlots}
|
||||||
fairmq-dds-command-ui -c r
|
fairmq-dds-command-ui -c r
|
||||||
sampler_and_sink="main/(Sampler|Sink)"
|
sampler_and_sink="main/(Sampler|Sink)"
|
||||||
|
# processors="main/ProcessorGroup/Processor"
|
||||||
fairmq-dds-command-ui -p $sampler_and_sink -w "RUNNING->READY" -n 2
|
fairmq-dds-command-ui -p $sampler_and_sink -w "RUNNING->READY" -n 2
|
||||||
echo "...$sampler_and_sink are READY, sending shutdown..."
|
echo "...$sampler_and_sink are READY, sending shutdown..."
|
||||||
fairmq-dds-command-ui -c s -w "RUNNING->READY" -n ${requiredNofAgents}
|
fairmq-dds-command-ui -c s -w "RUNNING->READY" -n ${requiredNofSlots}
|
||||||
fairmq-dds-command-ui -c t -w "DEVICE READY" -n ${requiredNofAgents}
|
fairmq-dds-command-ui -c t -w "DEVICE READY" -n ${requiredNofSlots}
|
||||||
fairmq-dds-command-ui -c d -w "IDLE" -n ${requiredNofAgents}
|
fairmq-dds-command-ui -c d -w "IDLE" -n ${requiredNofSlots}
|
||||||
fairmq-dds-command-ui -c q -w "EXITING" -n ${requiredNofAgents}
|
fairmq-dds-command-ui -c q -w "EXITING" -n ${requiredNofSlots}
|
||||||
echo "...waiting for ${requiredNofAgents} idle agents..."
|
echo "...waiting for ${requiredNofSlots} idle slots..."
|
||||||
@WAIT_COMMAND@ ${requiredNofAgents}
|
dds-info --idle-count --wait ${requiredNofSlots}
|
||||||
echo "------------------------"
|
echo "------------------------"
|
||||||
|
|
||||||
# TODO Uncomment once DDS 2.6 is released
|
dds-info --active-topology
|
||||||
# dds-info --active-topology
|
|
||||||
dds-topology --stop
|
dds-topology --stop
|
||||||
# dds-info --active-topology
|
dds-info --active-topology
|
||||||
|
|
||||||
dds-agent-cmd getlog -a
|
dds-agent-cmd getlog -a
|
||||||
logDir="${wrkDir}/logs"
|
logDir="${wrkDir}/logs"
|
||||||
|
@@ -58,6 +58,12 @@ if(BUILD_FAIRMQ OR BUILD_SDK)
|
|||||||
${TOOLS_PUBLIC_HEADER_FILES}
|
${TOOLS_PUBLIC_HEADER_FILES}
|
||||||
)
|
)
|
||||||
target_compile_definitions(${target} PUBLIC BOOST_ERROR_CODE_HEADER_ONLY)
|
target_compile_definitions(${target} PUBLIC BOOST_ERROR_CODE_HEADER_ONLY)
|
||||||
|
# workaround https://github.com/boostorg/asio/commit/43874d5497414c67655d901e48c939ef01337edb
|
||||||
|
if( Boost_VERSION VERSION_LESS 1.69
|
||||||
|
AND CMAKE_CXX_COMPILER_ID STREQUAL AppleClang
|
||||||
|
AND CMAKE_CXX_COMPILER_VERSION VERSION_GREATER_EQUAL 10.0.1)
|
||||||
|
target_compile_definitions(${target} PUBLIC BOOST_ASIO_HAS_STD_STRING_VIEW)
|
||||||
|
endif()
|
||||||
target_include_directories(${target}
|
target_include_directories(${target}
|
||||||
PUBLIC
|
PUBLIC
|
||||||
$<BUILD_INTERFACE:${CMAKE_SOURCE_DIR}>
|
$<BUILD_INTERFACE:${CMAKE_SOURCE_DIR}>
|
||||||
|
@@ -60,10 +60,10 @@ class Plugin
|
|||||||
friend auto operator!=(const Plugin& lhs, const Plugin& rhs) -> bool { return !(lhs == rhs); }
|
friend auto operator!=(const Plugin& lhs, const Plugin& rhs) -> bool { return !(lhs == rhs); }
|
||||||
friend auto operator<<(std::ostream& os, const Plugin& p) -> std::ostream&
|
friend auto operator<<(std::ostream& os, const Plugin& p) -> std::ostream&
|
||||||
{
|
{
|
||||||
return os << "'" << p.GetName() << "', "
|
return os << "'" << p.GetName() << "', "
|
||||||
<< "version '" << p.GetVersion() << "', "
|
<< "version '" << p.GetVersion() << "', "
|
||||||
<< "maintainer '" << p.GetMaintainer() << "', "
|
<< "maintainer '" << p.GetMaintainer() << "', "
|
||||||
<< "homepage '" << p.GetHomepage() << "'";
|
<< "homepage '" << p.GetHomepage() << "'";
|
||||||
}
|
}
|
||||||
static auto NoProgramOptions() -> ProgOptions { return boost::none; }
|
static auto NoProgramOptions() -> ProgOptions { return boost::none; }
|
||||||
|
|
||||||
@@ -80,7 +80,6 @@ class Plugin
|
|||||||
auto StealDeviceControl() -> void { fPluginServices->StealDeviceControl(fkName); };
|
auto StealDeviceControl() -> void { fPluginServices->StealDeviceControl(fkName); };
|
||||||
auto ReleaseDeviceControl() -> void { fPluginServices->ReleaseDeviceControl(fkName); };
|
auto ReleaseDeviceControl() -> void { fPluginServices->ReleaseDeviceControl(fkName); };
|
||||||
auto ChangeDeviceState(const DeviceStateTransition next) -> bool { return fPluginServices->ChangeDeviceState(fkName, next); }
|
auto ChangeDeviceState(const DeviceStateTransition next) -> bool { return fPluginServices->ChangeDeviceState(fkName, next); }
|
||||||
void TransitionDeviceStateTo(const DeviceState state) { return fPluginServices->TransitionDeviceStateTo(fkName, state); }
|
|
||||||
auto SubscribeToDeviceStateChange(std::function<void(DeviceState)> callback) -> void { fPluginServices->SubscribeToDeviceStateChange(fkName, callback); }
|
auto SubscribeToDeviceStateChange(std::function<void(DeviceState)> callback) -> void { fPluginServices->SubscribeToDeviceStateChange(fkName, callback); }
|
||||||
auto UnsubscribeFromDeviceStateChange() -> void { fPluginServices->UnsubscribeFromDeviceStateChange(fkName); }
|
auto UnsubscribeFromDeviceStateChange() -> void { fPluginServices->UnsubscribeFromDeviceStateChange(fkName); }
|
||||||
|
|
||||||
|
@@ -28,22 +28,6 @@ auto PluginServices::ChangeDeviceState(const string& controller, const DeviceSta
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void PluginServices::TransitionDeviceStateTo(const std::string& controller, DeviceState state)
|
|
||||||
{
|
|
||||||
lock_guard<mutex> lock{fDeviceControllerMutex};
|
|
||||||
|
|
||||||
if (!fDeviceController) fDeviceController = controller;
|
|
||||||
|
|
||||||
if (fDeviceController == controller) {
|
|
||||||
fDevice.TransitionTo(state);
|
|
||||||
} else {
|
|
||||||
throw DeviceControlError{tools::ToString(
|
|
||||||
"Plugin '", controller, "' is not allowed to change device states. ",
|
|
||||||
"Currently, plugin '", *fDeviceController, "' has taken control."
|
|
||||||
)};
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
auto PluginServices::TakeDeviceControl(const string& controller) -> void
|
auto PluginServices::TakeDeviceControl(const string& controller) -> void
|
||||||
{
|
{
|
||||||
lock_guard<mutex> lock{fDeviceControllerMutex};
|
lock_guard<mutex> lock{fDeviceControllerMutex};
|
||||||
|
@@ -124,8 +124,6 @@ class PluginServices
|
|||||||
/// If the device control role has not been taken yet, calling this function will take over control implicitely.
|
/// If the device control role has not been taken yet, calling this function will take over control implicitely.
|
||||||
auto ChangeDeviceState(const std::string& controller, const DeviceStateTransition next) -> bool;
|
auto ChangeDeviceState(const std::string& controller, const DeviceStateTransition next) -> bool;
|
||||||
|
|
||||||
void TransitionDeviceStateTo(const std::string& controller, DeviceState state);
|
|
||||||
|
|
||||||
/// @brief Subscribe with a callback to device state changes
|
/// @brief Subscribe with a callback to device state changes
|
||||||
/// @param subscriber id
|
/// @param subscriber id
|
||||||
/// @param callback
|
/// @param callback
|
||||||
|
@@ -37,16 +37,6 @@ DDS::DDS(const string& name,
|
|||||||
const string& homepage,
|
const string& homepage,
|
||||||
PluginServices* pluginServices)
|
PluginServices* pluginServices)
|
||||||
: Plugin(name, version, maintainer, homepage, pluginServices)
|
: Plugin(name, version, maintainer, homepage, pluginServices)
|
||||||
, fTransitions({"INIT DEVICE",
|
|
||||||
"COMPLETE INIT",
|
|
||||||
"BIND",
|
|
||||||
"CONNECT",
|
|
||||||
"INIT TASK",
|
|
||||||
"RUN",
|
|
||||||
"STOP",
|
|
||||||
"RESET TASK",
|
|
||||||
"RESET DEVICE",
|
|
||||||
"END"})
|
|
||||||
, fCurrentState(DeviceState::Idle)
|
, fCurrentState(DeviceState::Idle)
|
||||||
, fLastState(DeviceState::Idle)
|
, fLastState(DeviceState::Idle)
|
||||||
, fDeviceTerminationRequested(false)
|
, fDeviceTerminationRequested(false)
|
||||||
@@ -71,15 +61,13 @@ DDS::DDS(const string& name,
|
|||||||
}
|
}
|
||||||
|
|
||||||
auto control = GetProperty<string>("control");
|
auto control = GetProperty<string>("control");
|
||||||
bool staticMode(false);
|
|
||||||
if (control == "static") {
|
if (control == "static") {
|
||||||
LOG(debug) << "Running DDS controller: static";
|
LOG(error) << "DDS Plugin: static mode is not supported";
|
||||||
staticMode = true;
|
throw invalid_argument("DDS Plugin: static mode is not supported");
|
||||||
} else if (control == "dynamic" || control == "external" || control == "interactive") {
|
} else if (control == "dynamic" || control == "external" || control == "interactive") {
|
||||||
LOG(debug) << "Running DDS controller: external";
|
LOG(debug) << "Running DDS controller: external";
|
||||||
} else {
|
} else {
|
||||||
LOG(error) << "Unrecognized control mode '" << control << "' requested. " << "Ignoring and falling back to static control mode.";
|
LOG(error) << "Unrecognized control mode '" << control << "' requested. " << "Ignoring and starting in external control mode.";
|
||||||
staticMode = true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SubscribeForCustomCommands();
|
SubscribeForCustomCommands();
|
||||||
@@ -87,7 +75,6 @@ DDS::DDS(const string& name,
|
|||||||
|
|
||||||
// subscribe to device state changes, pushing new state changes into the event queue
|
// subscribe to device state changes, pushing new state changes into the event queue
|
||||||
SubscribeToDeviceStateChange([&](DeviceState newState) {
|
SubscribeToDeviceStateChange([&](DeviceState newState) {
|
||||||
fStateQueue.Push(newState);
|
|
||||||
switch (newState) {
|
switch (newState) {
|
||||||
case DeviceState::Bound:
|
case DeviceState::Bound:
|
||||||
// Receive addresses of connecting channels from DDS
|
// Receive addresses of connecting channels from DDS
|
||||||
@@ -130,11 +117,7 @@ DDS::DDS(const string& name,
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
if (staticMode) {
|
StartWorkerThread();
|
||||||
fControllerThread = thread(&DDS::StaticControl, this);
|
|
||||||
} else {
|
|
||||||
StartWorkerThread();
|
|
||||||
}
|
|
||||||
|
|
||||||
fDDS.Start();
|
fDDS.Start();
|
||||||
} catch (PluginServices::DeviceControlError& e) {
|
} catch (PluginServices::DeviceControlError& e) {
|
||||||
@@ -166,26 +149,6 @@ auto DDS::WaitForExitingAck() -> void
|
|||||||
[this]() { return fExitingAckedByLastExternalController; });
|
[this]() { return fExitingAckedByLastExternalController; });
|
||||||
}
|
}
|
||||||
|
|
||||||
auto DDS::StaticControl() -> void
|
|
||||||
{
|
|
||||||
try {
|
|
||||||
TransitionDeviceStateTo(DeviceState::Running);
|
|
||||||
|
|
||||||
// wait until stop signal
|
|
||||||
unique_lock<mutex> lock(fStopMutex);
|
|
||||||
while (!fDeviceTerminationRequested) {
|
|
||||||
fStopCondition.wait_for(lock, chrono::seconds(1));
|
|
||||||
}
|
|
||||||
LOG(debug) << "Stopping DDS plugin static controller";
|
|
||||||
} catch (DeviceErrorState&) {
|
|
||||||
ReleaseDeviceControl();
|
|
||||||
} catch (exception& e) {
|
|
||||||
ReleaseDeviceControl();
|
|
||||||
LOG(error) << "Error: " << e.what() << "\n";
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
auto DDS::FillChannelContainers() -> void
|
auto DDS::FillChannelContainers() -> void
|
||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
@@ -373,6 +336,7 @@ auto DDS::SubscribeForCustomCommands() -> void
|
|||||||
inCmds.Deserialize(cmdStr);
|
inCmds.Deserialize(cmdStr);
|
||||||
|
|
||||||
for (const auto& cmd : inCmds) {
|
for (const auto& cmd : inCmds) {
|
||||||
|
// LOG(info) << "Received command type: '" << cmd->GetType() << "' from " << senderId;
|
||||||
switch (cmd->GetType()) {
|
switch (cmd->GetType()) {
|
||||||
case Type::check_state: {
|
case Type::check_state: {
|
||||||
fDDS.Send(Cmds(make<CurrentState>(id, GetCurrentDeviceState())).Serialize(), to_string(senderId));
|
fDDS.Send(Cmds(make<CurrentState>(id, GetCurrentDeviceState())).Serialize(), to_string(senderId));
|
||||||
@@ -387,6 +351,10 @@ auto DDS::SubscribeForCustomCommands() -> void
|
|||||||
Cmds outCmds(make<TransitionStatus>(id, Result::Failure, transition));
|
Cmds outCmds(make<TransitionStatus>(id, Result::Failure, transition));
|
||||||
fDDS.Send(outCmds.Serialize(), to_string(senderId));
|
fDDS.Send(outCmds.Serialize(), to_string(senderId));
|
||||||
}
|
}
|
||||||
|
{
|
||||||
|
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
|
||||||
|
fLastExternalController = senderId;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case Type::dump_config: {
|
case Type::dump_config: {
|
||||||
|
@@ -13,8 +13,7 @@
|
|||||||
#include <fairmq/StateQueue.h>
|
#include <fairmq/StateQueue.h>
|
||||||
#include <fairmq/Version.h>
|
#include <fairmq/Version.h>
|
||||||
|
|
||||||
#include <DDS/dds_env_prop.h>
|
#include <dds/dds.h>
|
||||||
#include <DDS/dds_intercom.h>
|
|
||||||
|
|
||||||
#include <boost/asio/executor.hpp>
|
#include <boost/asio/executor.hpp>
|
||||||
#include <boost/asio/executor_work_guard.hpp>
|
#include <boost/asio/executor_work_guard.hpp>
|
||||||
@@ -133,7 +132,6 @@ class DDS : public Plugin
|
|||||||
~DDS();
|
~DDS();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
auto StaticControl() -> void;
|
|
||||||
auto WaitForExitingAck() -> void;
|
auto WaitForExitingAck() -> void;
|
||||||
auto StartWorkerThread() -> void;
|
auto StartWorkerThread() -> void;
|
||||||
|
|
||||||
@@ -154,14 +152,8 @@ class DDS : public Plugin
|
|||||||
std::unordered_map<std::string, int> fI;
|
std::unordered_map<std::string, int> fI;
|
||||||
std::unordered_map<std::string, IofN> fIofN;
|
std::unordered_map<std::string, IofN> fIofN;
|
||||||
|
|
||||||
std::mutex fStopMutex;
|
|
||||||
std::condition_variable fStopCondition;
|
|
||||||
|
|
||||||
const std::set<std::string> fTransitions;
|
|
||||||
|
|
||||||
std::thread fControllerThread;
|
std::thread fControllerThread;
|
||||||
DeviceState fCurrentState, fLastState;
|
DeviceState fCurrentState, fLastState;
|
||||||
fair::mq::StateQueue fStateQueue;
|
|
||||||
|
|
||||||
std::atomic<bool> fDeviceTerminationRequested;
|
std::atomic<bool> fDeviceTerminationRequested;
|
||||||
|
|
||||||
|
@@ -9,7 +9,7 @@
|
|||||||
#include <fairmq/sdk/commands/Commands.h>
|
#include <fairmq/sdk/commands/Commands.h>
|
||||||
#include <fairmq/States.h>
|
#include <fairmq/States.h>
|
||||||
|
|
||||||
#include <DDS/dds_intercom.h>
|
#include <dds/dds.h>
|
||||||
|
|
||||||
#include <boost/program_options.hpp>
|
#include <boost/program_options.hpp>
|
||||||
|
|
||||||
@@ -74,7 +74,8 @@ void printControlsHelp()
|
|||||||
cout << "To quit press Ctrl+C" << endl;
|
cout << "To quit press Ctrl+C" << endl;
|
||||||
}
|
}
|
||||||
|
|
||||||
void sendCommand(const string& commandIn, const string& topologyPath, CCustomCmd& ddsCustomCmd) {
|
void sendCommand(const string& commandIn, const string& topologyPath, CCustomCmd& ddsCustomCmd)
|
||||||
|
{
|
||||||
char c;
|
char c;
|
||||||
string command(commandIn);
|
string command(commandIn);
|
||||||
TerminalConfig tconfig;
|
TerminalConfig tconfig;
|
||||||
@@ -159,8 +160,6 @@ struct WaitMode
|
|||||||
|
|
||||||
void Run(const chrono::milliseconds& timeout, const string& topologyPath, CCustomCmd& ddsCustomCmd, unsigned int numDevices, const string& command = "")
|
void Run(const chrono::milliseconds& timeout, const string& topologyPath, CCustomCmd& ddsCustomCmd, unsigned int numDevices, const string& command = "")
|
||||||
{
|
{
|
||||||
StateSubscription stateSubscription(topologyPath, ddsCustomCmd);
|
|
||||||
|
|
||||||
if (command != "") {
|
if (command != "") {
|
||||||
sendCommand(command, topologyPath, ddsCustomCmd);
|
sendCommand(command, topologyPath, ddsCustomCmd);
|
||||||
}
|
}
|
||||||
@@ -288,13 +287,19 @@ int main(int argc, char* argv[])
|
|||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case Type::transition_status: {
|
case Type::transition_status: {
|
||||||
// if (static_cast<TransitionStatus&>(*cmd).GetResult() == Result::Ok) {
|
if (static_cast<TransitionStatus&>(*cmd).GetResult() == Result::Ok) {
|
||||||
// cout << "Device " << static_cast<TransitionStatus&>(*cmd).GetDeviceId() << " started to transition with " << static_cast<TransitionStatus&>(*cmd).GetTransition() << endl;
|
cout << "Device " << static_cast<TransitionStatus&>(*cmd).GetDeviceId() << " started to transition with " << static_cast<TransitionStatus&>(*cmd).GetTransition() << endl;
|
||||||
// } else {
|
} else {
|
||||||
// cout << "Device " << static_cast<TransitionStatus&>(*cmd).GetDeviceId() << " cannot transition with " << static_cast<TransitionStatus&>(*cmd).GetTransition() << endl;
|
cout << "Device " << static_cast<TransitionStatus&>(*cmd).GetDeviceId() << " cannot transition with " << static_cast<TransitionStatus&>(*cmd).GetTransition() << endl;
|
||||||
// }
|
}
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
case Type::current_state:
|
||||||
|
cout << "Device " << static_cast<CurrentState&>(*cmd).GetDeviceId() << " is in " << static_cast<CurrentState&>(*cmd).GetCurrentState() << " state" << endl;
|
||||||
|
break;
|
||||||
|
case Type::config:
|
||||||
|
cout << "Received config for device " << static_cast<Config&>(*cmd).GetDeviceId() << ":\n" << static_cast<Config&>(*cmd).GetConfig() << endl;
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
cout << "Unexpected/unknown command received: " << cmd->GetType() << endl;
|
cout << "Unexpected/unknown command received: " << cmd->GetType() << endl;
|
||||||
cout << "Origin: " << senderId << endl;
|
cout << "Origin: " << senderId << endl;
|
||||||
@@ -305,6 +310,8 @@ int main(int argc, char* argv[])
|
|||||||
|
|
||||||
service.start(sessionID);
|
service.start(sessionID);
|
||||||
|
|
||||||
|
StateSubscription stateSubscription(topologyPath, ddsCustomCmd);
|
||||||
|
|
||||||
if (targetState == "") {
|
if (targetState == "") {
|
||||||
sendCommand(command, topologyPath, ddsCustomCmd);
|
sendCommand(command, topologyPath, ddsCustomCmd);
|
||||||
} else {
|
} else {
|
||||||
|
@@ -33,32 +33,24 @@ class DDSAgent
|
|||||||
explicit DDSAgent(DDSSession session,
|
explicit DDSAgent(DDSSession session,
|
||||||
Id id,
|
Id id,
|
||||||
Pid pid,
|
Pid pid,
|
||||||
std::string state,
|
|
||||||
std::string path,
|
std::string path,
|
||||||
std::string host,
|
std::string host,
|
||||||
bool lobbyLeader,
|
|
||||||
std::chrono::milliseconds startupTime,
|
std::chrono::milliseconds startupTime,
|
||||||
Id taskId,
|
|
||||||
std::string username)
|
std::string username)
|
||||||
: fSession(std::move(session))
|
: fSession(std::move(session))
|
||||||
, fId(id)
|
, fId(id)
|
||||||
, fPid(pid)
|
, fPid(pid)
|
||||||
, fState(std::move(state))
|
|
||||||
, fDDSPath(std::move(path))
|
, fDDSPath(std::move(path))
|
||||||
, fHost(std::move(host))
|
, fHost(std::move(host))
|
||||||
, fLobbyLeader(lobbyLeader)
|
|
||||||
, fStartupTime(startupTime)
|
, fStartupTime(startupTime)
|
||||||
, fTaskId(taskId)
|
|
||||||
, fUsername(std::move(username))
|
, fUsername(std::move(username))
|
||||||
{}
|
{}
|
||||||
|
|
||||||
DDSSession GetSession() const { return fSession; }
|
DDSSession GetSession() const { return fSession; }
|
||||||
Id GetId() const { return fId; }
|
Id GetId() const { return fId; }
|
||||||
Pid GetPid() const { return fPid; }
|
Pid GetPid() const { return fPid; }
|
||||||
std::string GetState() const { return fState; }
|
|
||||||
std::string GetHost() const { return fHost; }
|
std::string GetHost() const { return fHost; }
|
||||||
std::string GetDDSPath() const { return fDDSPath; }
|
std::string GetDDSPath() const { return fDDSPath; }
|
||||||
bool IsLobbyLeader() const { return fLobbyLeader; }
|
|
||||||
std::chrono::milliseconds GetStartupTime() const { return fStartupTime; }
|
std::chrono::milliseconds GetStartupTime() const { return fStartupTime; }
|
||||||
std::string GetUsername() const { return fUsername; }
|
std::string GetUsername() const { return fUsername; }
|
||||||
|
|
||||||
@@ -66,12 +58,9 @@ class DDSAgent
|
|||||||
{
|
{
|
||||||
return os << "DDSAgent id: " << agent.fId
|
return os << "DDSAgent id: " << agent.fId
|
||||||
<< ", pid: " << agent.fPid
|
<< ", pid: " << agent.fPid
|
||||||
<< ", state: " << agent.fState
|
|
||||||
<< ", path: " << agent.fDDSPath
|
<< ", path: " << agent.fDDSPath
|
||||||
<< ", host: " << agent.fHost
|
<< ", host: " << agent.fHost
|
||||||
<< ", lobbyLeader: " << agent.fLobbyLeader
|
|
||||||
<< ", startupTime: " << agent.fStartupTime.count()
|
<< ", startupTime: " << agent.fStartupTime.count()
|
||||||
<< ", taskId: " << agent.fTaskId
|
|
||||||
<< ", username: " << agent.fUsername;
|
<< ", username: " << agent.fUsername;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -79,12 +68,9 @@ class DDSAgent
|
|||||||
DDSSession fSession;
|
DDSSession fSession;
|
||||||
Id fId;
|
Id fId;
|
||||||
Pid fPid;
|
Pid fPid;
|
||||||
std::string fState;
|
|
||||||
std::string fDDSPath;
|
std::string fDDSPath;
|
||||||
std::string fHost;
|
std::string fHost;
|
||||||
bool fLobbyLeader;
|
|
||||||
std::chrono::milliseconds fStartupTime;
|
std::chrono::milliseconds fStartupTime;
|
||||||
Id fTaskId;
|
|
||||||
std::string fUsername;
|
std::string fUsername;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@@ -8,15 +8,11 @@
|
|||||||
|
|
||||||
#include "DDSEnvironment.h"
|
#include "DDSEnvironment.h"
|
||||||
|
|
||||||
|
#include <cstdlib>
|
||||||
|
#include <dds/dds.h>
|
||||||
|
#include <fairlogger/Logger.h>
|
||||||
#include <fairmq/Tools.h>
|
#include <fairmq/Tools.h>
|
||||||
#include <fairmq/sdk/DDSInfo.h>
|
#include <fairmq/sdk/DDSInfo.h>
|
||||||
|
|
||||||
#include <fairlogger/Logger.h>
|
|
||||||
|
|
||||||
#include <DDS/Tools.h>
|
|
||||||
#include <DDS/dds_intercom.h>
|
|
||||||
|
|
||||||
#include <cstdlib>
|
|
||||||
#include <sstream>
|
#include <sstream>
|
||||||
#include <utility>
|
#include <utility>
|
||||||
|
|
||||||
|
@@ -8,11 +8,11 @@
|
|||||||
|
|
||||||
#include "DDSSession.h"
|
#include "DDSSession.h"
|
||||||
|
|
||||||
#include <DDS/Tools.h>
|
|
||||||
#include <boost/process.hpp>
|
#include <boost/process.hpp>
|
||||||
#include <boost/uuid/uuid_io.hpp>
|
#include <boost/uuid/uuid_io.hpp>
|
||||||
#include <cassert>
|
#include <cassert>
|
||||||
#include <cstdlib>
|
#include <cstdlib>
|
||||||
|
#include <dds/dds.h>
|
||||||
#include <fairlogger/Logger.h>
|
#include <fairlogger/Logger.h>
|
||||||
#include <fairmq/Tools.h>
|
#include <fairmq/Tools.h>
|
||||||
#include <fairmq/sdk/DDSAgent.h>
|
#include <fairmq/sdk/DDSAgent.h>
|
||||||
@@ -183,7 +183,8 @@ auto DDSSession::SubmitAgents(Quantity agents) -> void
|
|||||||
|
|
||||||
SSubmitRequestData submitInfo;
|
SSubmitRequestData submitInfo;
|
||||||
submitInfo.m_rms = tools::ToString(GetRMSPlugin());
|
submitInfo.m_rms = tools::ToString(GetRMSPlugin());
|
||||||
submitInfo.m_instances = agents;
|
submitInfo.m_instances = 1;
|
||||||
|
submitInfo.m_slots = agents; // TODO new api: get slots from agents
|
||||||
submitInfo.m_config = GetRMSConfig().string();
|
submitInfo.m_config = GetRMSConfig().string();
|
||||||
|
|
||||||
tools::SharedSemaphore blocker;
|
tools::SharedSemaphore blocker;
|
||||||
@@ -210,9 +211,9 @@ auto DDSSession::RequestAgentCount() -> AgentCount
|
|||||||
fImpl->fSession->syncSendRequest<SAgentCountRequest>(SAgentCountRequest::request_t(), res);
|
fImpl->fSession->syncSendRequest<SAgentCountRequest>(SAgentCountRequest::request_t(), res);
|
||||||
|
|
||||||
AgentCount count;
|
AgentCount count;
|
||||||
count.active = res.m_activeAgentsCount;
|
count.active = res.m_activeSlotsCount;
|
||||||
count.idle = res.m_idleAgentsCount;
|
count.idle = res.m_idleSlotsCount;
|
||||||
count.executing = res.m_executingAgentsCount;
|
count.executing = res.m_executingSlotsCount;
|
||||||
|
|
||||||
return count;
|
return count;
|
||||||
}
|
}
|
||||||
@@ -231,13 +232,11 @@ auto DDSSession::RequestAgentInfo() -> std::vector<DDSAgent>
|
|||||||
*this,
|
*this,
|
||||||
a.m_agentID,
|
a.m_agentID,
|
||||||
a.m_agentPid,
|
a.m_agentPid,
|
||||||
a.m_agentState,
|
|
||||||
a.m_DDSPath,
|
a.m_DDSPath,
|
||||||
a.m_host,
|
a.m_host,
|
||||||
a.m_lobbyLeader,
|
|
||||||
a.m_startUpTime,
|
a.m_startUpTime,
|
||||||
a.m_taskID,
|
|
||||||
a.m_username
|
a.m_username
|
||||||
|
// a.m_nSlots
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -254,7 +253,9 @@ auto DDSSession::RequestTaskInfo() -> std::vector<DDSTask>
|
|||||||
std::vector<DDSTask> taskInfo;
|
std::vector<DDSTask> taskInfo;
|
||||||
taskInfo.reserve(res.size());
|
taskInfo.reserve(res.size());
|
||||||
for (auto& a : res) {
|
for (auto& a : res) {
|
||||||
taskInfo.emplace_back(a.m_taskID, 0);
|
//taskInfo.emplace_back(a.m_taskID, 0);
|
||||||
|
(void)a;
|
||||||
|
taskInfo.emplace_back(0, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
return taskInfo;
|
return taskInfo;
|
||||||
|
@@ -9,18 +9,14 @@
|
|||||||
#include "DDSTopology.h"
|
#include "DDSTopology.h"
|
||||||
|
|
||||||
#include <boost/range/iterator_range.hpp>
|
#include <boost/range/iterator_range.hpp>
|
||||||
|
#include <dds/dds.h>
|
||||||
#include <fairmq/sdk/DDSEnvironment.h>
|
|
||||||
#include <fairmq/Tools.h>
|
|
||||||
|
|
||||||
#include <fairlogger/Logger.h>
|
#include <fairlogger/Logger.h>
|
||||||
|
#include <fairmq/Tools.h>
|
||||||
#include <DDS/Topology.h>
|
#include <fairmq/sdk/DDSEnvironment.h>
|
||||||
|
#include <memory>
|
||||||
#include <sstream>
|
#include <sstream>
|
||||||
#include <stdexcept>
|
#include <stdexcept>
|
||||||
#include <utility>
|
#include <utility>
|
||||||
#include <memory>
|
|
||||||
|
|
||||||
namespace fair {
|
namespace fair {
|
||||||
namespace mq {
|
namespace mq {
|
||||||
|
@@ -8,8 +8,7 @@
|
|||||||
|
|
||||||
#include "Topology.h"
|
#include "Topology.h"
|
||||||
|
|
||||||
#include <DDS/Tools.h>
|
#include <dds/dds.h>
|
||||||
#include <DDS/Topology.h>
|
|
||||||
|
|
||||||
namespace fair {
|
namespace fair {
|
||||||
namespace mq {
|
namespace mq {
|
||||||
|
@@ -8,8 +8,7 @@
|
|||||||
|
|
||||||
#include "Fixtures.h"
|
#include "Fixtures.h"
|
||||||
|
|
||||||
#include <DDS/Topology.h>
|
#include <dds/dds.h>
|
||||||
#include <DDS/Tools.h>
|
|
||||||
|
|
||||||
namespace {
|
namespace {
|
||||||
|
|
||||||
|
@@ -9,8 +9,7 @@
|
|||||||
#include "Fixtures.h"
|
#include "Fixtures.h"
|
||||||
|
|
||||||
#include <asio.hpp>
|
#include <asio.hpp>
|
||||||
#include <DDS/Topology.h>
|
#include <dds/dds.h>
|
||||||
#include <DDS/Tools.h>
|
|
||||||
#include <fairmq/sdk/Topology.h>
|
#include <fairmq/sdk/Topology.h>
|
||||||
#include <fairmq/Tools.h>
|
#include <fairmq/Tools.h>
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user