mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-15 09:31:45 +00:00
SDK: Add Topology::AsyncGetProperties
Co-Author: Dennis Klein <d.klein@gsi.de>
This commit is contained in:
committed by
Dennis Klein
parent
1c8ad03f3c
commit
264a178424
@@ -27,6 +27,8 @@ std::string ErrorCategory::message(int ev) const
|
||||
return "async operation canceled";
|
||||
case ErrorCode::DeviceChangeStateFailed:
|
||||
return "failed to change state of a fairmq device";
|
||||
case ErrorCode::DeviceGetPropertiesFailed:
|
||||
return "failed to get fairmq device properties";
|
||||
case ErrorCode::DeviceSetPropertiesFailed:
|
||||
return "failed to set fairmq device properties";
|
||||
default:
|
||||
|
@@ -38,6 +38,7 @@ enum class ErrorCode
|
||||
OperationTimeout,
|
||||
OperationCanceled,
|
||||
DeviceChangeStateFailed,
|
||||
DeviceGetPropertiesFailed,
|
||||
DeviceSetPropertiesFailed
|
||||
};
|
||||
|
||||
|
@@ -75,6 +75,18 @@ struct DeviceStatus
|
||||
|
||||
using DeviceProperty = std::pair<std::string, std::string>; /// pair := (key, value)
|
||||
using DeviceProperties = std::vector<DeviceProperty>;
|
||||
using DevicePropertyQuery = std::string; /// Boost regex supported
|
||||
using FailedDevices = std::set<DeviceId>;
|
||||
|
||||
struct GetPropertiesResult
|
||||
{
|
||||
struct Device
|
||||
{
|
||||
DeviceProperties props;
|
||||
};
|
||||
std::unordered_map<DeviceId, Device> devices;
|
||||
FailedDevices failed;
|
||||
};
|
||||
|
||||
using TopologyState = std::vector<DeviceStatus>;
|
||||
using TopologyStateIndex = std::unordered_map<DDSTask::Id, int>; // task id -> index in the data vector
|
||||
@@ -210,6 +222,10 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||
}
|
||||
}
|
||||
break;
|
||||
case Type::properties: {
|
||||
HandleCmd(static_cast<cmd::Properties&>(*cmd));
|
||||
}
|
||||
break;
|
||||
case Type::properties_set: {
|
||||
HandleCmd(static_cast<cmd::PropertiesSet&>(*cmd));
|
||||
}
|
||||
@@ -415,7 +431,146 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||
|
||||
auto StateEqualsTo(DeviceState state) const -> bool { return sdk::StateEqualsTo(GetCurrentState(), state); }
|
||||
|
||||
using FailedDevices = std::set<DeviceId>;
|
||||
using GetPropertiesCompletionSignature = void(std::error_code, GetPropertiesResult);
|
||||
|
||||
private:
|
||||
struct GetPropertiesOp
|
||||
{
|
||||
using Id = std::size_t;
|
||||
using GetCount = unsigned int;
|
||||
|
||||
template<typename Handler>
|
||||
GetPropertiesOp(Id id,
|
||||
GetCount expectedCount,
|
||||
Duration timeout,
|
||||
std::mutex& mutex,
|
||||
Executor const & ex,
|
||||
Allocator const & alloc,
|
||||
Handler&& handler)
|
||||
: fId(id)
|
||||
, fOp(ex, alloc, std::move(handler))
|
||||
, fTimer(ex)
|
||||
, fCount(0)
|
||||
, fExpectedCount(expectedCount)
|
||||
, fMtx(mutex)
|
||||
{
|
||||
if (timeout > std::chrono::milliseconds(0)) {
|
||||
fTimer.expires_after(timeout);
|
||||
fTimer.async_wait([&](std::error_code ec) {
|
||||
if (!ec) {
|
||||
std::lock_guard<std::mutex> lk(fMtx);
|
||||
fOp.Timeout(fResult);
|
||||
}
|
||||
});
|
||||
}
|
||||
// LOG(debug) << "GetProperties " << fId << " with expected count of " << fExpectedCount << " started.";
|
||||
}
|
||||
GetPropertiesOp() = delete;
|
||||
GetPropertiesOp(const GetPropertiesOp&) = delete;
|
||||
GetPropertiesOp& operator=(const GetPropertiesOp&) = delete;
|
||||
GetPropertiesOp(GetPropertiesOp&&) = default;
|
||||
GetPropertiesOp& operator=(GetPropertiesOp&&) = default;
|
||||
~GetPropertiesOp() = default;
|
||||
|
||||
auto Update(const std::string& deviceId, cmd::Result result, DeviceProperties props) -> void
|
||||
{
|
||||
std::lock_guard<std::mutex> lk(fMtx);
|
||||
if (cmd::Result::Ok != result) {
|
||||
fResult.failed.insert(deviceId);
|
||||
} else {
|
||||
fResult.devices.insert({deviceId, {std::move(props)}});
|
||||
}
|
||||
++fCount;
|
||||
TryCompletion();
|
||||
}
|
||||
|
||||
private:
|
||||
Id const fId;
|
||||
AsioAsyncOp<Executor, Allocator, GetPropertiesCompletionSignature> fOp;
|
||||
asio::steady_timer fTimer;
|
||||
GetCount fCount;
|
||||
GetCount const fExpectedCount;
|
||||
GetPropertiesResult fResult;
|
||||
std::mutex& fMtx;
|
||||
|
||||
/// precondition: fMtx is locked.
|
||||
auto TryCompletion() -> void
|
||||
{
|
||||
if (!fOp.IsCompleted() && fCount == fExpectedCount) {
|
||||
fTimer.cancel();
|
||||
if (fResult.failed.size() > 0) {
|
||||
fOp.Complete(MakeErrorCode(ErrorCode::DeviceGetPropertiesFailed), std::move(fResult));
|
||||
} else {
|
||||
fOp.Complete(std::move(fResult));
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
auto HandleCmd(cmd::Properties const& cmd) -> void
|
||||
{
|
||||
std::unique_lock<std::mutex> lk(fMtx);
|
||||
try {
|
||||
auto& op(fGetPropertiesOps.at(cmd.GetRequestId()));
|
||||
lk.unlock();
|
||||
op.Update(cmd.GetDeviceId(), cmd.GetResult(), cmd.GetProps());
|
||||
} catch (std::out_of_range& e) {
|
||||
LOG(debug) << "GetProperties operation (request id: " << cmd.GetRequestId()
|
||||
<< ") not found (probably completed or timed out), "
|
||||
<< "discarding reply of device " << cmd.GetDeviceId();
|
||||
}
|
||||
}
|
||||
|
||||
public:
|
||||
template<typename CompletionToken>
|
||||
auto AsyncGetProperties(DevicePropertyQuery const& query,
|
||||
Duration timeout,
|
||||
CompletionToken&& token)
|
||||
{
|
||||
return asio::async_initiate<CompletionToken, GetPropertiesCompletionSignature>(
|
||||
[&](auto handler) {
|
||||
typename GetPropertiesOp::Id const id(tools::UuidHash());
|
||||
|
||||
std::lock_guard<std::mutex> lk(fMtx);
|
||||
fGetPropertiesOps.emplace(
|
||||
std::piecewise_construct,
|
||||
std::forward_as_tuple(id),
|
||||
std::forward_as_tuple(id,
|
||||
fStateData.size(),
|
||||
timeout,
|
||||
fMtx,
|
||||
AsioBase<Executor, Allocator>::GetExecutor(),
|
||||
AsioBase<Executor, Allocator>::GetAllocator(),
|
||||
std::move(handler)));
|
||||
|
||||
cmd::Cmds const cmds(cmd::make<cmd::GetProperties>(id, query));
|
||||
fDDSSession.SendCommand(cmds.Serialize());
|
||||
},
|
||||
token);
|
||||
}
|
||||
|
||||
template<typename CompletionToken>
|
||||
auto AsyncGetProperties(DevicePropertyQuery const& query, CompletionToken&& token)
|
||||
{
|
||||
return AsyncGetProperties(query, Duration(0), std::move(token));
|
||||
}
|
||||
|
||||
auto GetProperties(DevicePropertyQuery const& query, Duration timeout = Duration(0))
|
||||
-> std::pair<std::error_code, GetPropertiesResult>
|
||||
{
|
||||
tools::SharedSemaphore blocker;
|
||||
std::error_code ec;
|
||||
GetPropertiesResult result;
|
||||
AsyncGetProperties(
|
||||
query, timeout, [&, blocker](std::error_code _ec, GetPropertiesResult _result) mutable {
|
||||
ec = _ec;
|
||||
result = _result;
|
||||
blocker.Signal();
|
||||
});
|
||||
blocker.Wait();
|
||||
return {ec, result};
|
||||
}
|
||||
|
||||
using SetPropertiesCompletionSignature = void(std::error_code, FailedDevices);
|
||||
|
||||
private:
|
||||
@@ -437,7 +592,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||
, fTimer(ex)
|
||||
, fCount(0)
|
||||
, fExpectedCount(expectedCount)
|
||||
, fFailedDevices(alloc)
|
||||
, fFailedDevices()
|
||||
, fMtx(mutex)
|
||||
{
|
||||
if (timeout > std::chrono::milliseconds(0)) {
|
||||
@@ -571,6 +726,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||
TransitionedCount fTransitionedCount;
|
||||
|
||||
std::unordered_map<typename SetPropertiesOp::Id, SetPropertiesOp> fSetPropertiesOps;
|
||||
std::unordered_map<typename GetPropertiesOp::Id, GetPropertiesOp> fGetPropertiesOps;
|
||||
|
||||
auto makeTopologyState() -> void
|
||||
{
|
||||
|
@@ -46,7 +46,7 @@ array<string, 2> resultNames =
|
||||
}
|
||||
};
|
||||
|
||||
array<string, 18> typeNames =
|
||||
array<string, 21> typeNames =
|
||||
{
|
||||
{
|
||||
"CheckState",
|
||||
@@ -57,6 +57,7 @@ array<string, 18> typeNames =
|
||||
"SubscribeToStateChange",
|
||||
"UnsubscribeFromStateChange",
|
||||
"StateChangeExitingReceived",
|
||||
"GetProperties",
|
||||
"SetProperties",
|
||||
|
||||
"CurrentState",
|
||||
@@ -68,6 +69,7 @@ array<string, 18> typeNames =
|
||||
"StateChangeSubscription",
|
||||
"StateChangeUnsubscription",
|
||||
"StateChange",
|
||||
"Properties",
|
||||
"PropertiesSet"
|
||||
}
|
||||
};
|
||||
@@ -150,7 +152,7 @@ array<sdk::cmd::FBTransition, 12> mqTransitionToFBTransition =
|
||||
}
|
||||
};
|
||||
|
||||
array<FBCmd, 19> typeToFBCmd =
|
||||
array<FBCmd, 21> typeToFBCmd =
|
||||
{
|
||||
{
|
||||
FBCmd::FBCmd_check_state,
|
||||
@@ -161,6 +163,7 @@ array<FBCmd, 19> typeToFBCmd =
|
||||
FBCmd::FBCmd_subscribe_to_state_change,
|
||||
FBCmd::FBCmd_unsubscribe_from_state_change,
|
||||
FBCmd::FBCmd_state_change_exiting_received,
|
||||
FBCmd::FBCmd_get_properties,
|
||||
FBCmd::FBCmd_set_properties,
|
||||
FBCmd::FBCmd_current_state,
|
||||
FBCmd::FBCmd_transition_status,
|
||||
@@ -171,11 +174,12 @@ array<FBCmd, 19> typeToFBCmd =
|
||||
FBCmd::FBCmd_state_change_subscription,
|
||||
FBCmd::FBCmd_state_change_unsubscription,
|
||||
FBCmd::FBCmd_state_change,
|
||||
FBCmd::FBCmd_properties,
|
||||
FBCmd::FBCmd_properties_set
|
||||
}
|
||||
};
|
||||
|
||||
array<Type, 19> fbCmdToType =
|
||||
array<Type, 21> fbCmdToType =
|
||||
{
|
||||
{
|
||||
Type::check_state,
|
||||
@@ -186,6 +190,7 @@ array<Type, 19> fbCmdToType =
|
||||
Type::subscribe_to_state_change,
|
||||
Type::unsubscribe_from_state_change,
|
||||
Type::state_change_exiting_received,
|
||||
Type::get_properties,
|
||||
Type::set_properties,
|
||||
Type::current_state,
|
||||
Type::transition_status,
|
||||
@@ -196,6 +201,7 @@ array<Type, 19> fbCmdToType =
|
||||
Type::state_change_subscription,
|
||||
Type::state_change_unsubscription,
|
||||
Type::state_change,
|
||||
Type::properties,
|
||||
Type::properties_set
|
||||
}
|
||||
};
|
||||
@@ -255,6 +261,14 @@ string Cmds::Serialize(const Format type) const
|
||||
cmdBuilder = tools::make_unique<FBCommandBuilder>(fbb);
|
||||
}
|
||||
break;
|
||||
case Type::get_properties: {
|
||||
auto _cmd = static_cast<GetProperties&>(*cmd);
|
||||
auto query = fbb.CreateString(_cmd.GetQuery());
|
||||
cmdBuilder = tools::make_unique<FBCommandBuilder>(fbb);
|
||||
cmdBuilder->add_request_id(_cmd.GetRequestId());
|
||||
cmdBuilder->add_property_query(query);
|
||||
}
|
||||
break;
|
||||
case Type::set_properties: {
|
||||
auto _cmd = static_cast<SetProperties&>(*cmd);
|
||||
std::vector<flatbuffers::Offset<FBProperty>> propsVector;
|
||||
@@ -343,6 +357,25 @@ string Cmds::Serialize(const Format type) const
|
||||
cmdBuilder->add_current_state(GetFBState(_cmd.GetCurrentState()));
|
||||
}
|
||||
break;
|
||||
case Type::properties: {
|
||||
auto _cmd = static_cast<Properties&>(*cmd);
|
||||
auto deviceId = fbb.CreateString(_cmd.GetDeviceId());
|
||||
|
||||
std::vector<flatbuffers::Offset<FBProperty>> propsVector;
|
||||
for (const auto& e : _cmd.GetProps()) {
|
||||
auto key = fbb.CreateString(e.first);
|
||||
auto val = fbb.CreateString(e.second);
|
||||
auto prop = CreateFBProperty(fbb, key, val);
|
||||
propsVector.push_back(prop);
|
||||
}
|
||||
auto props = fbb.CreateVector(propsVector);
|
||||
cmdBuilder = tools::make_unique<FBCommandBuilder>(fbb);
|
||||
cmdBuilder->add_device_id(deviceId);
|
||||
cmdBuilder->add_request_id(_cmd.GetRequestId());
|
||||
cmdBuilder->add_result(GetFBResult(_cmd.GetResult()));
|
||||
cmdBuilder->add_properties(props);
|
||||
}
|
||||
break;
|
||||
case Type::properties_set: {
|
||||
auto _cmd = static_cast<PropertiesSet&>(*cmd);
|
||||
auto deviceId = fbb.CreateString(_cmd.GetDeviceId());
|
||||
@@ -428,6 +461,9 @@ void Cmds::Deserialize(const string& str, const Format type)
|
||||
case FBCmd_state_change_exiting_received:
|
||||
fCmds.emplace_back(make<StateChangeExitingReceived>());
|
||||
break;
|
||||
case FBCmd_get_properties:
|
||||
fCmds.emplace_back(make<GetProperties>(cmdPtr.request_id(), cmdPtr.property_query()->str()));
|
||||
break;
|
||||
case FBCmd_set_properties: {
|
||||
std::vector<std::pair<std::string, std::string>> properties;
|
||||
auto props = cmdPtr.properties();
|
||||
@@ -463,6 +499,14 @@ void Cmds::Deserialize(const string& str, const Format type)
|
||||
case FBCmd_state_change:
|
||||
fCmds.emplace_back(make<StateChange>(cmdPtr.device_id()->str(), cmdPtr.task_id(), GetMQState(cmdPtr.last_state()), GetMQState(cmdPtr.current_state())));
|
||||
break;
|
||||
case FBCmd_properties: {
|
||||
std::vector<std::pair<std::string, std::string>> properties;
|
||||
auto props = cmdPtr.properties();
|
||||
for (unsigned int j = 0; j < props->size(); ++j) {
|
||||
properties.emplace_back(props->Get(j)->key()->str(), props->Get(j)->value()->str());
|
||||
}
|
||||
fCmds.emplace_back(make<Properties>(cmdPtr.device_id()->str(), cmdPtr.request_id(), GetResult(cmdPtr.result()), properties));
|
||||
} break;
|
||||
case FBCmd_properties_set:
|
||||
fCmds.emplace_back(make<PropertiesSet>(cmdPtr.device_id()->str(), cmdPtr.request_id(), GetResult(cmdPtr.result())));
|
||||
break;
|
||||
|
@@ -47,6 +47,7 @@ enum class Type : int
|
||||
subscribe_to_state_change, // args: { }
|
||||
unsubscribe_from_state_change, // args: { }
|
||||
state_change_exiting_received, // args: { }
|
||||
get_properties, // args: { request_id, property_query }
|
||||
set_properties, // args: { request_id, properties }
|
||||
|
||||
current_state, // args: { device_id, current_state }
|
||||
@@ -58,6 +59,7 @@ enum class Type : int
|
||||
state_change_subscription, // args: { device_id, Result }
|
||||
state_change_unsubscription, // args: { device_id, Result }
|
||||
state_change, // args: { device_id, task_id, last_state, current_state }
|
||||
properties, // args: { device_id, request_id, Result, properties }
|
||||
properties_set // args: { device_id, request_id, Result }
|
||||
};
|
||||
|
||||
@@ -121,6 +123,24 @@ struct StateChangeExitingReceived : Cmd
|
||||
explicit StateChangeExitingReceived() : Cmd(Type::state_change_exiting_received) {}
|
||||
};
|
||||
|
||||
struct GetProperties : Cmd
|
||||
{
|
||||
GetProperties(std::size_t request_id, std::string query)
|
||||
: Cmd(Type::get_properties)
|
||||
, fRequestId(request_id)
|
||||
, fQuery(std::move(query))
|
||||
{}
|
||||
|
||||
auto GetRequestId() const -> std::size_t { return fRequestId; }
|
||||
auto SetRequestId(std::size_t requestId) -> void { fRequestId = requestId; }
|
||||
auto GetQuery() const -> std::string { return fQuery; }
|
||||
auto SetQuery(std::string query) -> void { fQuery = std::move(query); }
|
||||
|
||||
private:
|
||||
std::size_t fRequestId;
|
||||
std::string fQuery;
|
||||
};
|
||||
|
||||
struct SetProperties : Cmd
|
||||
{
|
||||
SetProperties(std::size_t request_id, std::vector<std::pair<std::string, std::string>> properties)
|
||||
@@ -309,6 +329,32 @@ struct StateChange : Cmd
|
||||
fair::mq::State fCurrentState;
|
||||
};
|
||||
|
||||
struct Properties : Cmd
|
||||
{
|
||||
Properties(std::string deviceId, std::size_t requestId, const Result result, std::vector<std::pair<std::string, std::string>> properties)
|
||||
: Cmd(Type::properties)
|
||||
, fDeviceId(std::move(deviceId))
|
||||
, fRequestId(requestId)
|
||||
, fResult(result)
|
||||
, fProperties(std::move(properties))
|
||||
{}
|
||||
|
||||
auto GetDeviceId() const -> std::string { return fDeviceId; }
|
||||
auto SetDeviceId(std::string deviceId) -> void { fDeviceId = std::move(deviceId); }
|
||||
auto GetRequestId() const -> std::size_t { return fRequestId; }
|
||||
auto SetRequestId(std::size_t requestId) -> void { fRequestId = requestId; }
|
||||
auto GetResult() const -> Result { return fResult; }
|
||||
auto SetResult(Result result) -> void { fResult = result; }
|
||||
auto GetProps() const -> std::vector<std::pair<std::string, std::string>> { return fProperties; }
|
||||
auto SetProps(std::vector<std::pair<std::string, std::string>> properties) -> void { fProperties = std::move(properties); }
|
||||
|
||||
private:
|
||||
std::string fDeviceId;
|
||||
std::size_t fRequestId;
|
||||
Result fResult;
|
||||
std::vector<std::pair<std::string, std::string>> fProperties;
|
||||
};
|
||||
|
||||
struct PropertiesSet : Cmd {
|
||||
PropertiesSet(std::string deviceId, std::size_t requestId, Result result)
|
||||
: Cmd(Type::properties_set)
|
||||
|
@@ -52,7 +52,8 @@ enum FBCmd:byte {
|
||||
subscribe_to_state_change, // args: { }
|
||||
unsubscribe_from_state_change, // args: { }
|
||||
state_change_exiting_received, // args: { }
|
||||
set_properties, // args: { key, value }
|
||||
get_properties, // args: { request_id, property_query }
|
||||
set_properties, // args: { request_id, properties }
|
||||
|
||||
current_state, // args: { device_id, current_state }
|
||||
transition_status, // args: { device_id, Result, transition }
|
||||
@@ -63,6 +64,7 @@ enum FBCmd:byte {
|
||||
state_change_subscription, // args: { device_id, Result }
|
||||
state_change_unsubscription, // args: { device_id, Result }
|
||||
state_change, // args: { device_id, task_id, last_state, current_state }
|
||||
properties, // args: { device_id, request_id, Result, properties }
|
||||
properties_set // args: { device_id, request_id, Result }
|
||||
}
|
||||
|
||||
@@ -79,6 +81,7 @@ table FBCommand {
|
||||
current_state:FBState;
|
||||
debug:string;
|
||||
properties:[FBProperty];
|
||||
property_query:string;
|
||||
}
|
||||
|
||||
table FBCommands {
|
||||
|
Reference in New Issue
Block a user