mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 16:46:47 +00:00
SDK: Adapt to new DDS plugin external mode
This commit is contained in:
parent
6c07920fc6
commit
98aeb16dc7
|
@ -80,6 +80,7 @@ auto DDS::HandleControl() -> void
|
||||||
|
|
||||||
SubscribeForCustomCommands();
|
SubscribeForCustomCommands();
|
||||||
SubscribeForConnectingChannels();
|
SubscribeForConnectingChannels();
|
||||||
|
fDDS.Start();
|
||||||
|
|
||||||
// subscribe to device state changes, pushing new state changes into the event queue
|
// subscribe to device state changes, pushing new state changes into the event queue
|
||||||
SubscribeToDeviceStateChange([&](DeviceState newState) {
|
SubscribeToDeviceStateChange([&](DeviceState newState) {
|
||||||
|
|
|
@ -64,7 +64,10 @@ struct DDSSubscription
|
||||||
});
|
});
|
||||||
|
|
||||||
assert(!dds_session_id.empty());
|
assert(!dds_session_id.empty());
|
||||||
fService.start(dds_session_id);
|
}
|
||||||
|
|
||||||
|
auto Start() -> void {
|
||||||
|
fService.start(dds::env_prop<dds::dds_session_id>());
|
||||||
}
|
}
|
||||||
|
|
||||||
~DDSSubscription() {
|
~DDSSubscription() {
|
||||||
|
|
|
@ -54,7 +54,7 @@ const std::unordered_map<DeviceTransition, DeviceState, tools::HashEnum<DeviceTr
|
||||||
{ Transition::CompleteInit, DeviceState::Initialized },
|
{ Transition::CompleteInit, DeviceState::Initialized },
|
||||||
{ Transition::Bind, DeviceState::Bound },
|
{ Transition::Bind, DeviceState::Bound },
|
||||||
{ Transition::Connect, DeviceState::DeviceReady },
|
{ Transition::Connect, DeviceState::DeviceReady },
|
||||||
{ Transition::InitTask, DeviceState::InitializingTask },
|
{ Transition::InitTask, DeviceState::Ready },
|
||||||
{ Transition::Run, DeviceState::Running },
|
{ Transition::Run, DeviceState::Running },
|
||||||
{ Transition::Stop, DeviceState::Ready },
|
{ Transition::Stop, DeviceState::Ready },
|
||||||
{ Transition::ResetTask, DeviceState::DeviceReady },
|
{ Transition::ResetTask, DeviceState::DeviceReady },
|
||||||
|
@ -76,7 +76,7 @@ Topology::Topology(DDSTopology topo, DDSSession session)
|
||||||
fState.emplace(d, DeviceStatus{ false, DeviceState::Ok });
|
fState.emplace(d, DeviceStatus{ false, DeviceState::Ok });
|
||||||
}
|
}
|
||||||
fDDSSession.SubscribeToCommands([this](const std::string& msg, const std::string& /* condition */, uint64_t senderId) {
|
fDDSSession.SubscribeToCommands([this](const std::string& msg, const std::string& /* condition */, uint64_t senderId) {
|
||||||
LOG(debug) << "Received from " << senderId << ": " << msg;
|
// LOG(debug) << "Received from " << senderId << ": " << msg;
|
||||||
std::vector<std::string> parts;
|
std::vector<std::string> parts;
|
||||||
boost::algorithm::split(parts, msg, boost::algorithm::is_any_of(":,"));
|
boost::algorithm::split(parts, msg, boost::algorithm::is_any_of(":,"));
|
||||||
|
|
||||||
|
@ -87,6 +87,7 @@ Topology::Topology(DDSTopology topo, DDSSession session)
|
||||||
if (parts[0] == "state-change") {
|
if (parts[0] == "state-change") {
|
||||||
AddNewStateEntry(std::stoull(parts[2]), parts[3]);
|
AddNewStateEntry(std::stoull(parts[2]), parts[3]);
|
||||||
} else if (parts[0] == "state-changes-subscription") {
|
} else if (parts[0] == "state-changes-subscription") {
|
||||||
|
LOG(debug) << "Received from " << senderId << ": " << msg;
|
||||||
if (parts[2] != "OK") {
|
if (parts[2] != "OK") {
|
||||||
LOG(error) << "state-changes-subscription failed with return code: " << parts[2];
|
LOG(error) << "state-changes-subscription failed with return code: " << parts[2];
|
||||||
}
|
}
|
||||||
|
@ -99,6 +100,7 @@ Topology::Topology(DDSTopology topo, DDSSession session)
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
fDDSSession.StartDDSService();
|
fDDSSession.StartDDSService();
|
||||||
|
LOG(debug) << "subscribe-to-state-changes";
|
||||||
fDDSSession.SendCommand("subscribe-to-state-changes");
|
fDDSSession.SendCommand("subscribe-to-state-changes");
|
||||||
|
|
||||||
fExecutionThread = std::thread(&Topology::WaitForState, this);
|
fExecutionThread = std::thread(&Topology::WaitForState, this);
|
||||||
|
@ -156,10 +158,16 @@ void Topology::WaitForState()
|
||||||
auto condition = [&] {
|
auto condition = [&] {
|
||||||
// LOG(info) << "checking condition";
|
// LOG(info) << "checking condition";
|
||||||
// LOG(info) << "fShutdown: " << fShutdown;
|
// LOG(info) << "fShutdown: " << fShutdown;
|
||||||
// LOG(info) << "condition: " << std::all_of(fState.cbegin(), fState.cend(), [&](TopologyState::value_type i) { return i.second.state == fTargetState; });
|
// LOG(info) << "condition: " << std::all_of(fState.cbegin(), fState.cend(),
|
||||||
return fShutdown || std::all_of(fState.cbegin(), fState.cend(), [&](TopologyState::value_type i) {
|
// [&](TopologyState::value_type i) { return i.second.state == fTargetState; });
|
||||||
return i.second.state == fTargetState;
|
return fShutdown
|
||||||
});
|
|| std::all_of(
|
||||||
|
fState.cbegin(), fState.cend(), [&](TopologyState::value_type i) {
|
||||||
|
// TODO Check, if we can make sure that EXITING state change event are not missed
|
||||||
|
return (fTargetState == DeviceState::Exiting)
|
||||||
|
|| ((i.second.state == fTargetState)
|
||||||
|
&& i.second.initialized);
|
||||||
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
std::unique_lock<std::mutex> lock(fMtx);
|
std::unique_lock<std::mutex> lock(fMtx);
|
||||||
|
@ -170,7 +178,8 @@ void Topology::WaitForState()
|
||||||
fStateChangeOngoing = false;
|
fStateChangeOngoing = false;
|
||||||
TopologyState state = fState;
|
TopologyState state = fState;
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
fChangeStateCallback({{AsyncOpResultCode::Timeout, "timeout"}, std::move(state)});
|
fChangeStateCallback(
|
||||||
|
{{AsyncOpResultCode::Timeout, "timeout"}, std::move(state)});
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -182,13 +191,17 @@ void Topology::WaitForState()
|
||||||
LOG(debug) << "Aborting because a shutdown was requested";
|
LOG(debug) << "Aborting because a shutdown was requested";
|
||||||
TopologyState state = fState;
|
TopologyState state = fState;
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
fChangeStateCallback({{AsyncOpResultCode::Aborted, "Aborted because a shutdown was requested"}, std::move(state)});
|
fChangeStateCallback(
|
||||||
|
{{AsyncOpResultCode::Aborted, "Aborted because a shutdown was requested"},
|
||||||
|
std::move(state)});
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
} catch (std::exception& e) {
|
} catch (std::exception& e) {
|
||||||
fStateChangeOngoing = false;
|
fStateChangeOngoing = false;
|
||||||
LOG(error) << "Error while processing state request: " << e.what();
|
LOG(error) << "Error while processing state request: " << e.what();
|
||||||
fChangeStateCallback({{AsyncOpResultCode::Error, tools::ToString("Exception thrown: ", e.what())}, fState});
|
fChangeStateCallback(
|
||||||
|
{{AsyncOpResultCode::Error, tools::ToString("Exception thrown: ", e.what())},
|
||||||
|
fState});
|
||||||
}
|
}
|
||||||
|
|
||||||
fChangeStateCallback({{AsyncOpResultCode::Ok, "success"}, fState});
|
fChangeStateCallback({{AsyncOpResultCode::Ok, "success"}, fState});
|
||||||
|
|
|
@ -139,6 +139,7 @@ class Topology
|
||||||
DDSSession fDDSSession;
|
DDSSession fDDSSession;
|
||||||
DDSTopology fDDSTopo;
|
DDSTopology fDDSTopo;
|
||||||
TopologyState fState;
|
TopologyState fState;
|
||||||
|
std::unordered_map<uint64_t, DeviceStatus> fStateChangesSubscriptions;
|
||||||
bool fStateChangeOngoing;
|
bool fStateChangeOngoing;
|
||||||
DeviceState fTargetState;
|
DeviceState fTargetState;
|
||||||
mutable std::mutex fMtx;
|
mutable std::mutex fMtx;
|
||||||
|
|
|
@ -25,7 +25,8 @@ TEST(Topology2, ConstructionWithNativeDdsApiObjects)
|
||||||
/////////////////////////////////////////
|
/////////////////////////////////////////
|
||||||
|
|
||||||
// Example usage:
|
// Example usage:
|
||||||
dds::topology_api::CTopology nativeTopo(fair::mq::tools::ToString(SDK_TESTSUITE_SOURCE_DIR, "/test_topo.xml"));
|
dds::topology_api::CTopology nativeTopo(
|
||||||
|
fair::mq::tools::ToString(SDK_TESTSUITE_SOURCE_DIR, "/test_topo.xml"));
|
||||||
auto nativeSession(std::make_shared<dds::tools_api::CSession>());
|
auto nativeSession(std::make_shared<dds::tools_api::CSession>());
|
||||||
nativeSession->create();
|
nativeSession->create();
|
||||||
EXPECT_THROW(fair::mq::sdk::Topology topo(nativeTopo, nativeSession, env), std::runtime_error);
|
EXPECT_THROW(fair::mq::sdk::Topology topo(nativeTopo, nativeSession, env), std::runtime_error);
|
||||||
|
@ -43,13 +44,16 @@ TEST_F(Topology, ChangeStateAsync)
|
||||||
|
|
||||||
Topology topo(mDDSTopo, mDDSSession);
|
Topology topo(mDDSTopo, mDDSSession);
|
||||||
fair::mq::tools::Semaphore blocker;
|
fair::mq::tools::Semaphore blocker;
|
||||||
topo.ChangeState(TopologyTransition::InitDevice, [&blocker, &topo](Topology::ChangeStateResult result) {
|
topo.ChangeState(
|
||||||
LOG(info) << result;
|
TopologyTransition::InitDevice, [&blocker, &topo](Topology::ChangeStateResult result) {
|
||||||
EXPECT_EQ(result.rc, fair::mq::AsyncOpResultCode::Ok);
|
LOG(info) << result;
|
||||||
EXPECT_NO_THROW(fair::mq::sdk::AggregateState(result.state));
|
EXPECT_EQ(result.rc, fair::mq::AsyncOpResultCode::Ok);
|
||||||
EXPECT_EQ(fair::mq::sdk::StateEqualsTo(result.state, fair::mq::sdk::DeviceState::Running), true);
|
EXPECT_NO_THROW(fair::mq::sdk::AggregateState(result.state));
|
||||||
blocker.Signal();
|
EXPECT_EQ(
|
||||||
});
|
fair::mq::sdk::StateEqualsTo(result.state, fair::mq::sdk::DeviceState::InitializingDevice),
|
||||||
|
true);
|
||||||
|
blocker.Signal();
|
||||||
|
});
|
||||||
blocker.Wait();
|
blocker.Wait();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -59,12 +63,13 @@ TEST_F(Topology, ChangeStateSync)
|
||||||
using fair::mq::sdk::TopologyTransition;
|
using fair::mq::sdk::TopologyTransition;
|
||||||
|
|
||||||
Topology topo(mDDSTopo, mDDSSession);
|
Topology topo(mDDSTopo, mDDSSession);
|
||||||
EXPECT_EQ(topo.ChangeState(TopologyTransition::Run).rc, fair::mq::AsyncOpResultCode::Ok);
|
auto result(topo.ChangeState(TopologyTransition::InitDevice));
|
||||||
auto result(topo.ChangeState(TopologyTransition::Stop));
|
|
||||||
|
|
||||||
EXPECT_EQ(result.rc, fair::mq::AsyncOpResultCode::Ok);
|
EXPECT_EQ(result.rc, fair::mq::AsyncOpResultCode::Ok);
|
||||||
EXPECT_NO_THROW(fair::mq::sdk::AggregateState(result.state));
|
EXPECT_NO_THROW(fair::mq::sdk::AggregateState(result.state));
|
||||||
EXPECT_EQ(fair::mq::sdk::StateEqualsTo(result.state, fair::mq::sdk::DeviceState::Ready), true);
|
EXPECT_EQ(
|
||||||
|
fair::mq::sdk::StateEqualsTo(result.state, fair::mq::sdk::DeviceState::InitializingDevice),
|
||||||
|
true);
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(Topology, ChangeStateConcurrent)
|
TEST_F(Topology, ChangeStateConcurrent)
|
||||||
|
@ -74,14 +79,14 @@ TEST_F(Topology, ChangeStateConcurrent)
|
||||||
|
|
||||||
Topology topo(mDDSTopo, mDDSSession);
|
Topology topo(mDDSTopo, mDDSSession);
|
||||||
fair::mq::tools::Semaphore blocker;
|
fair::mq::tools::Semaphore blocker;
|
||||||
topo.ChangeState(TopologyTransition::Run, [&blocker](Topology::ChangeStateResult result) {
|
topo.ChangeState(TopologyTransition::InitDevice,
|
||||||
LOG(info) << "result for valid ChangeState: " << result;
|
[&blocker](Topology::ChangeStateResult result) {
|
||||||
blocker.Signal();
|
LOG(info) << "result for valid ChangeState: " << result;
|
||||||
});
|
blocker.Signal();
|
||||||
topo.ChangeState(TopologyTransition::Stop, [&blocker](Topology::ChangeStateResult result) {
|
});
|
||||||
LOG(info) << "result for invalid ChangeState: " << result;
|
EXPECT_THROW(topo.ChangeState(TopologyTransition::Stop,
|
||||||
EXPECT_EQ(result.rc, fair::mq::AsyncOpResultCode::Error);
|
[&blocker](Topology::ChangeStateResult) {}),
|
||||||
});
|
std::runtime_error);
|
||||||
blocker.Wait();
|
blocker.Wait();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -92,7 +97,7 @@ TEST_F(Topology, ChangeStateTimeout)
|
||||||
|
|
||||||
Topology topo(mDDSTopo, mDDSSession);
|
Topology topo(mDDSTopo, mDDSSession);
|
||||||
fair::mq::tools::Semaphore blocker;
|
fair::mq::tools::Semaphore blocker;
|
||||||
topo.ChangeState(TopologyTransition::End, [&](Topology::ChangeStateResult result) {
|
topo.ChangeState(TopologyTransition::InitDevice, [&](Topology::ChangeStateResult result) {
|
||||||
LOG(info) << result;
|
LOG(info) << result;
|
||||||
EXPECT_EQ(result.rc, fair::mq::AsyncOpResultCode::Timeout);
|
EXPECT_EQ(result.rc, fair::mq::AsyncOpResultCode::Timeout);
|
||||||
blocker.Signal();
|
blocker.Signal();
|
||||||
|
@ -100,4 +105,24 @@ TEST_F(Topology, ChangeStateTimeout)
|
||||||
blocker.Wait();
|
blocker.Wait();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(Topology, ChangeStateFullDeviceLifetime)
|
||||||
|
{
|
||||||
|
using fair::mq::sdk::Topology;
|
||||||
|
using fair::mq::sdk::TopologyTransition;
|
||||||
|
|
||||||
|
Topology topo(mDDSTopo, mDDSSession);
|
||||||
|
for (auto transition : {TopologyTransition::InitDevice,
|
||||||
|
TopologyTransition::CompleteInit,
|
||||||
|
TopologyTransition::Bind,
|
||||||
|
TopologyTransition::Connect,
|
||||||
|
TopologyTransition::InitTask,
|
||||||
|
TopologyTransition::Run,
|
||||||
|
TopologyTransition::Stop,
|
||||||
|
TopologyTransition::ResetTask,
|
||||||
|
TopologyTransition::ResetDevice,
|
||||||
|
TopologyTransition::End}) {
|
||||||
|
ASSERT_EQ(topo.ChangeState(transition).rc, fair::mq::AsyncOpResultCode::Ok);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace
|
} // namespace
|
||||||
|
|
Loading…
Reference in New Issue
Block a user