mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 08:41:16 +00:00
SDK: minor refactoring of the command handling
This commit is contained in:
parent
0a5820c07f
commit
c5efd3e4a6
|
@ -197,6 +197,19 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
BasicTopology(BasicTopology&&) = default;
|
BasicTopology(BasicTopology&&) = default;
|
||||||
BasicTopology& operator=(BasicTopology&&) = default;
|
BasicTopology& operator=(BasicTopology&&) = default;
|
||||||
|
|
||||||
|
~BasicTopology()
|
||||||
|
{
|
||||||
|
UnsubscribeFromStateChanges();
|
||||||
|
|
||||||
|
std::lock_guard<std::mutex> lk(fMtx);
|
||||||
|
fDDSSession.UnsubscribeFromCommands();
|
||||||
|
try {
|
||||||
|
for (auto& op : fChangeStateOps) {
|
||||||
|
op.second.Complete(MakeErrorCode(ErrorCode::OperationCanceled));
|
||||||
|
}
|
||||||
|
} catch (...) {}
|
||||||
|
}
|
||||||
|
|
||||||
void SubscribeToStateChanges()
|
void SubscribeToStateChanges()
|
||||||
{
|
{
|
||||||
using namespace fair::mq::sdk::cmd;
|
using namespace fair::mq::sdk::cmd;
|
||||||
|
@ -215,54 +228,31 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
|
|
||||||
void SubscribeToCommands()
|
void SubscribeToCommands()
|
||||||
{
|
{
|
||||||
using namespace fair::mq::sdk::cmd;
|
|
||||||
fDDSSession.SubscribeToCommands([&](const std::string& msg, const std::string& /* condition */, DDSChannel::Id senderId) {
|
fDDSSession.SubscribeToCommands([&](const std::string& msg, const std::string& /* condition */, DDSChannel::Id senderId) {
|
||||||
Cmds inCmds;
|
cmd::Cmds inCmds;
|
||||||
inCmds.Deserialize(msg);
|
inCmds.Deserialize(msg);
|
||||||
// FAIR_LOG(debug) << "Received " << inCmds.Size() << " command(s) with total size of " << msg.length() << " bytes: ";
|
// FAIR_LOG(debug) << "Received " << inCmds.Size() << " command(s) with total size of " << msg.length() << " bytes: ";
|
||||||
|
|
||||||
for (const auto& cmd : inCmds) {
|
for (const auto& cmd : inCmds) {
|
||||||
// FAIR_LOG(debug) << " > " << cmd->GetType();
|
// FAIR_LOG(debug) << " > " << cmd->GetType();
|
||||||
switch (cmd->GetType()) {
|
switch (cmd->GetType()) {
|
||||||
case Type::state_change: {
|
case cmd::Type::state_change_subscription:
|
||||||
auto _cmd = static_cast<StateChange&>(*cmd);
|
HandleCmd(static_cast<cmd::StateChangeSubscription&>(*cmd));
|
||||||
if (_cmd.GetCurrentState() == DeviceState::Exiting) {
|
|
||||||
fDDSSession.SendCommand(Cmds(make<StateChangeExitingReceived>()).Serialize(), senderId);
|
|
||||||
}
|
|
||||||
HandleCmd(_cmd);
|
|
||||||
} break;
|
|
||||||
case Type::state_change_subscription:
|
|
||||||
if (static_cast<StateChangeSubscription&>(*cmd).GetResult() != Result::Ok) {
|
|
||||||
FAIR_LOG(error) << "State change subscription failed for " << static_cast<StateChangeSubscription&>(*cmd).GetDeviceId();
|
|
||||||
}
|
|
||||||
break;
|
break;
|
||||||
case Type::state_change_unsubscription:
|
case cmd::Type::state_change_unsubscription:
|
||||||
if (static_cast<StateChangeUnsubscription&>(*cmd).GetResult() != Result::Ok) {
|
HandleCmd(static_cast<cmd::StateChangeUnsubscription&>(*cmd));
|
||||||
FAIR_LOG(error) << "State change unsubscription failed for " << static_cast<StateChangeUnsubscription&>(*cmd).GetDeviceId();
|
|
||||||
}
|
|
||||||
break;
|
break;
|
||||||
case Type::transition_status: {
|
case cmd::Type::state_change:
|
||||||
auto _cmd = static_cast<TransitionStatus&>(*cmd);
|
HandleCmd(static_cast<cmd::StateChange&>(*cmd), senderId);
|
||||||
if (_cmd.GetResult() != Result::Ok) {
|
|
||||||
FAIR_LOG(error) << _cmd.GetTransition() << " transition failed for " << _cmd.GetDeviceId();
|
|
||||||
DDSTask::Id id(_cmd.GetTaskId());
|
|
||||||
std::lock_guard<std::mutex> lk(fMtx);
|
|
||||||
for (auto& op : fChangeStateOps) {
|
|
||||||
if (!op.second.IsCompleted() && op.second.ContainsTask(id) &&
|
|
||||||
fStateData.at(fStateIndex.at(id)).state != op.second.GetTargetState()) {
|
|
||||||
op.second.Complete(MakeErrorCode(ErrorCode::DeviceChangeStateFailed));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
break;
|
break;
|
||||||
case Type::properties: {
|
case cmd::Type::transition_status:
|
||||||
|
HandleCmd(static_cast<cmd::TransitionStatus&>(*cmd));
|
||||||
|
break;
|
||||||
|
case cmd::Type::properties:
|
||||||
HandleCmd(static_cast<cmd::Properties&>(*cmd));
|
HandleCmd(static_cast<cmd::Properties&>(*cmd));
|
||||||
}
|
|
||||||
break;
|
break;
|
||||||
case Type::properties_set: {
|
case cmd::Type::properties_set:
|
||||||
HandleCmd(static_cast<cmd::PropertiesSet&>(*cmd));
|
HandleCmd(static_cast<cmd::PropertiesSet&>(*cmd));
|
||||||
}
|
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
FAIR_LOG(warn) << "Unexpected/unknown command received: " << cmd->GetType();
|
FAIR_LOG(warn) << "Unexpected/unknown command received: " << cmd->GetType();
|
||||||
|
@ -273,21 +263,28 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
~BasicTopology()
|
auto HandleCmd(cmd::StateChangeSubscription const& cmd) -> void
|
||||||
{
|
{
|
||||||
UnsubscribeFromStateChanges();
|
if (cmd.GetResult() != cmd::Result::Ok) {
|
||||||
|
FAIR_LOG(error) << "State change subscription failed for " << cmd.GetDeviceId();
|
||||||
std::lock_guard<std::mutex> lk(fMtx);
|
|
||||||
fDDSSession.UnsubscribeFromCommands();
|
|
||||||
try {
|
|
||||||
for (auto& op : fChangeStateOps) {
|
|
||||||
op.second.Complete(MakeErrorCode(ErrorCode::OperationCanceled));
|
|
||||||
}
|
}
|
||||||
} catch (...) {}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
auto HandleCmd(cmd::StateChange const& cmd) -> void
|
auto HandleCmd(cmd::StateChangeUnsubscription const& cmd) -> void
|
||||||
{
|
{
|
||||||
|
if (cmd.GetResult() != cmd::Result::Ok) {
|
||||||
|
FAIR_LOG(error) << "State change unsubscription failed for " << cmd.GetDeviceId();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
auto HandleCmd(cmd::StateChange const& cmd, DDSChannel::Id const& senderId) -> void
|
||||||
|
{
|
||||||
|
using namespace fair::mq::sdk::cmd;
|
||||||
|
|
||||||
|
if (cmd.GetCurrentState() == DeviceState::Exiting) {
|
||||||
|
fDDSSession.SendCommand(Cmds(make<StateChangeExitingReceived>()).Serialize(), senderId);
|
||||||
|
}
|
||||||
|
|
||||||
DDSTask::Id taskId(cmd.GetTaskId());
|
DDSTask::Id taskId(cmd.GetTaskId());
|
||||||
std::lock_guard<std::mutex> lk(fMtx);
|
std::lock_guard<std::mutex> lk(fMtx);
|
||||||
|
|
||||||
|
@ -309,6 +306,21 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
auto HandleCmd(cmd::TransitionStatus const& cmd) -> void
|
||||||
|
{
|
||||||
|
if (cmd.GetResult() != cmd::Result::Ok) {
|
||||||
|
FAIR_LOG(error) << cmd.GetTransition() << " transition failed for " << cmd.GetDeviceId();
|
||||||
|
DDSTask::Id id(cmd.GetTaskId());
|
||||||
|
std::lock_guard<std::mutex> lk(fMtx);
|
||||||
|
for (auto& op : fChangeStateOps) {
|
||||||
|
if (!op.second.IsCompleted() && op.second.ContainsTask(id) &&
|
||||||
|
fStateData.at(fStateIndex.at(id)).state != op.second.GetTargetState()) {
|
||||||
|
op.second.Complete(MakeErrorCode(ErrorCode::DeviceChangeStateFailed));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
auto HandleCmd(cmd::Properties const& cmd) -> void
|
auto HandleCmd(cmd::Properties const& cmd) -> void
|
||||||
{
|
{
|
||||||
std::unique_lock<std::mutex> lk(fMtx);
|
std::unique_lock<std::mutex> lk(fMtx);
|
||||||
|
@ -629,7 +641,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
}
|
}
|
||||||
|
|
||||||
/// @brief Returns the current state of the topology
|
/// @brief Returns the current state of the topology
|
||||||
/// @return map of id : DeviceStatus (initialized, state)
|
/// @return map of id : DeviceStatus
|
||||||
auto GetCurrentState() const -> TopologyState
|
auto GetCurrentState() const -> TopologyState
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> lk(fMtx);
|
std::lock_guard<std::mutex> lk(fMtx);
|
||||||
|
|
Loading…
Reference in New Issue
Block a user