mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 16:46:47 +00:00
Make BasicTopology actually movable
This commit is contained in:
parent
7edf436919
commit
8dd0b25c06
|
@ -234,6 +234,8 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
, fDDSTopo(std::move(topo))
|
, fDDSTopo(std::move(topo))
|
||||||
, fStateData()
|
, fStateData()
|
||||||
, fStateIndex()
|
, fStateIndex()
|
||||||
|
, fMtx(std::make_unique<std::mutex>())
|
||||||
|
, fStateChangeUnsubscriptionCV(std::make_unique<std::condition_variable>())
|
||||||
, fHeartbeatsTimer(asio::system_executor())
|
, fHeartbeatsTimer(asio::system_executor())
|
||||||
, fHeartbeatInterval(600000)
|
, fHeartbeatInterval(600000)
|
||||||
{
|
{
|
||||||
|
@ -263,7 +265,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
{
|
{
|
||||||
UnsubscribeFromStateChanges();
|
UnsubscribeFromStateChanges();
|
||||||
|
|
||||||
std::lock_guard<std::mutex> lk(fMtx);
|
std::lock_guard<std::mutex> lk(*fMtx);
|
||||||
fDDSSession.UnsubscribeFromCommands();
|
fDDSSession.UnsubscribeFromCommands();
|
||||||
try {
|
try {
|
||||||
for (auto& op : fChangeStateOps) {
|
for (auto& op : fChangeStateOps) {
|
||||||
|
@ -306,8 +308,8 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
fDDSSession.SendCommand(cmd::Cmds(cmd::make<cmd::UnsubscribeFromStateChange>()).Serialize());
|
fDDSSession.SendCommand(cmd::Cmds(cmd::make<cmd::UnsubscribeFromStateChange>()).Serialize());
|
||||||
|
|
||||||
// wait for all tasks to confirm unsubscription
|
// wait for all tasks to confirm unsubscription
|
||||||
std::unique_lock<std::mutex> lk(fMtx);
|
std::unique_lock<std::mutex> lk(*fMtx);
|
||||||
fStateChangeUnsubscriptionCV.wait(lk, [&](){
|
fStateChangeUnsubscriptionCV->wait(lk, [&](){
|
||||||
unsigned int count = std::count_if(fStateIndex.cbegin(), fStateIndex.cend(), [=](const auto& s) {
|
unsigned int count = std::count_if(fStateIndex.cbegin(), fStateIndex.cend(), [=](const auto& s) {
|
||||||
return fStateData.at(s.second).subscribed_to_state_changes == false;
|
return fStateData.at(s.second).subscribed_to_state_changes == false;
|
||||||
});
|
});
|
||||||
|
@ -358,7 +360,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
DDSTask::Id taskId(cmd.GetTaskId());
|
DDSTask::Id taskId(cmd.GetTaskId());
|
||||||
|
|
||||||
try {
|
try {
|
||||||
std::lock_guard<std::mutex> lk(fMtx);
|
std::lock_guard<std::mutex> lk(*fMtx);
|
||||||
DeviceStatus& task = fStateData.at(fStateIndex.at(taskId));
|
DeviceStatus& task = fStateData.at(fStateIndex.at(taskId));
|
||||||
task.subscribed_to_state_changes = true;
|
task.subscribed_to_state_changes = true;
|
||||||
} catch (const std::exception& e) {
|
} catch (const std::exception& e) {
|
||||||
|
@ -375,11 +377,11 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
DDSTask::Id taskId(cmd.GetTaskId());
|
DDSTask::Id taskId(cmd.GetTaskId());
|
||||||
|
|
||||||
try {
|
try {
|
||||||
std::unique_lock<std::mutex> lk(fMtx);
|
std::unique_lock<std::mutex> lk(*fMtx);
|
||||||
DeviceStatus& task = fStateData.at(fStateIndex.at(taskId));
|
DeviceStatus& task = fStateData.at(fStateIndex.at(taskId));
|
||||||
task.subscribed_to_state_changes = false;
|
task.subscribed_to_state_changes = false;
|
||||||
lk.unlock();
|
lk.unlock();
|
||||||
fStateChangeUnsubscriptionCV.notify_one();
|
fStateChangeUnsubscriptionCV->notify_one();
|
||||||
} catch (const std::exception& e) {
|
} catch (const std::exception& e) {
|
||||||
FAIR_LOG(error) << "Exception in HandleCmd(cmd::StateChangeUnsubscription const&): " << e.what();
|
FAIR_LOG(error) << "Exception in HandleCmd(cmd::StateChangeUnsubscription const&): " << e.what();
|
||||||
}
|
}
|
||||||
|
@ -397,7 +399,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
DDSTask::Id taskId(cmd.GetTaskId());
|
DDSTask::Id taskId(cmd.GetTaskId());
|
||||||
|
|
||||||
try {
|
try {
|
||||||
std::lock_guard<std::mutex> lk(fMtx);
|
std::lock_guard<std::mutex> lk(*fMtx);
|
||||||
DeviceStatus& task = fStateData.at(fStateIndex.at(taskId));
|
DeviceStatus& task = fStateData.at(fStateIndex.at(taskId));
|
||||||
task.lastState = cmd.GetLastState();
|
task.lastState = cmd.GetLastState();
|
||||||
task.state = cmd.GetCurrentState();
|
task.state = cmd.GetCurrentState();
|
||||||
|
@ -422,7 +424,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
{
|
{
|
||||||
if (cmd.GetResult() != cmd::Result::Ok) {
|
if (cmd.GetResult() != cmd::Result::Ok) {
|
||||||
DDSTask::Id taskId(cmd.GetTaskId());
|
DDSTask::Id taskId(cmd.GetTaskId());
|
||||||
std::lock_guard<std::mutex> lk(fMtx);
|
std::lock_guard<std::mutex> lk(*fMtx);
|
||||||
for (auto& op : fChangeStateOps) {
|
for (auto& op : fChangeStateOps) {
|
||||||
if (!op.second.IsCompleted() && op.second.ContainsTask(taskId)) {
|
if (!op.second.IsCompleted() && op.second.ContainsTask(taskId)) {
|
||||||
if (fStateData.at(fStateIndex.at(taskId)).state != op.second.GetTargetState()) {
|
if (fStateData.at(fStateIndex.at(taskId)).state != op.second.GetTargetState()) {
|
||||||
|
@ -438,7 +440,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
|
|
||||||
auto HandleCmd(cmd::Properties const& cmd) -> void
|
auto HandleCmd(cmd::Properties const& cmd) -> void
|
||||||
{
|
{
|
||||||
std::unique_lock<std::mutex> lk(fMtx);
|
std::unique_lock<std::mutex> lk(*fMtx);
|
||||||
try {
|
try {
|
||||||
auto& op(fGetPropertiesOps.at(cmd.GetRequestId()));
|
auto& op(fGetPropertiesOps.at(cmd.GetRequestId()));
|
||||||
lk.unlock();
|
lk.unlock();
|
||||||
|
@ -452,7 +454,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
|
|
||||||
auto HandleCmd(cmd::PropertiesSet const& cmd) -> void
|
auto HandleCmd(cmd::PropertiesSet const& cmd) -> void
|
||||||
{
|
{
|
||||||
std::unique_lock<std::mutex> lk(fMtx);
|
std::unique_lock<std::mutex> lk(*fMtx);
|
||||||
try {
|
try {
|
||||||
auto& op(fSetPropertiesOps.at(cmd.GetRequestId()));
|
auto& op(fSetPropertiesOps.at(cmd.GetRequestId()));
|
||||||
lk.unlock();
|
lk.unlock();
|
||||||
|
@ -659,7 +661,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
return asio::async_initiate<CompletionToken, ChangeStateCompletionSignature>([&](auto handler) {
|
return asio::async_initiate<CompletionToken, ChangeStateCompletionSignature>([&](auto handler) {
|
||||||
typename ChangeStateOp::Id const id(tools::UuidHash());
|
typename ChangeStateOp::Id const id(tools::UuidHash());
|
||||||
|
|
||||||
std::lock_guard<std::mutex> lk(fMtx);
|
std::lock_guard<std::mutex> lk(*fMtx);
|
||||||
|
|
||||||
for (auto it = begin(fChangeStateOps); it != end(fChangeStateOps);) {
|
for (auto it = begin(fChangeStateOps); it != end(fChangeStateOps);) {
|
||||||
if (it->second.IsCompleted()) {
|
if (it->second.IsCompleted()) {
|
||||||
|
@ -677,7 +679,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
fDDSTopo.GetTasks(path),
|
fDDSTopo.GetTasks(path),
|
||||||
fStateData,
|
fStateData,
|
||||||
timeout,
|
timeout,
|
||||||
fMtx,
|
*fMtx,
|
||||||
AsioBase<Executor, Allocator>::GetExecutor(),
|
AsioBase<Executor, Allocator>::GetExecutor(),
|
||||||
AsioBase<Executor, Allocator>::GetAllocator(),
|
AsioBase<Executor, Allocator>::GetAllocator(),
|
||||||
std::move(handler)));
|
std::move(handler)));
|
||||||
|
@ -762,7 +764,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
/// @return map of id : DeviceStatus
|
/// @return map of id : DeviceStatus
|
||||||
auto GetCurrentState() const -> TopologyState
|
auto GetCurrentState() const -> TopologyState
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> lk(fMtx);
|
std::lock_guard<std::mutex> lk(*fMtx);
|
||||||
return fStateData;
|
return fStateData;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -890,7 +892,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
return asio::async_initiate<CompletionToken, WaitForStateCompletionSignature>([&](auto handler) {
|
return asio::async_initiate<CompletionToken, WaitForStateCompletionSignature>([&](auto handler) {
|
||||||
typename GetPropertiesOp::Id const id(tools::UuidHash());
|
typename GetPropertiesOp::Id const id(tools::UuidHash());
|
||||||
|
|
||||||
std::lock_guard<std::mutex> lk(fMtx);
|
std::lock_guard<std::mutex> lk(*fMtx);
|
||||||
|
|
||||||
for (auto it = begin(fWaitForStateOps); it != end(fWaitForStateOps);) {
|
for (auto it = begin(fWaitForStateOps); it != end(fWaitForStateOps);) {
|
||||||
if (it->second.IsCompleted()) {
|
if (it->second.IsCompleted()) {
|
||||||
|
@ -908,7 +910,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
targetCurrentState,
|
targetCurrentState,
|
||||||
fDDSTopo.GetTasks(path),
|
fDDSTopo.GetTasks(path),
|
||||||
timeout,
|
timeout,
|
||||||
fMtx,
|
*fMtx,
|
||||||
AsioBase<Executor, Allocator>::GetExecutor(),
|
AsioBase<Executor, Allocator>::GetExecutor(),
|
||||||
AsioBase<Executor, Allocator>::GetAllocator(),
|
AsioBase<Executor, Allocator>::GetAllocator(),
|
||||||
std::move(handler)));
|
std::move(handler)));
|
||||||
|
@ -1071,7 +1073,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
[&](auto handler) {
|
[&](auto handler) {
|
||||||
typename GetPropertiesOp::Id const id(tools::UuidHash());
|
typename GetPropertiesOp::Id const id(tools::UuidHash());
|
||||||
|
|
||||||
std::lock_guard<std::mutex> lk(fMtx);
|
std::lock_guard<std::mutex> lk(*fMtx);
|
||||||
|
|
||||||
for (auto it = begin(fGetPropertiesOps); it != end(fGetPropertiesOps);) {
|
for (auto it = begin(fGetPropertiesOps); it != end(fGetPropertiesOps);) {
|
||||||
if (it->second.IsCompleted()) {
|
if (it->second.IsCompleted()) {
|
||||||
|
@ -1087,7 +1089,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
std::forward_as_tuple(id,
|
std::forward_as_tuple(id,
|
||||||
fDDSTopo.GetTasks(path).size(),
|
fDDSTopo.GetTasks(path).size(),
|
||||||
timeout,
|
timeout,
|
||||||
fMtx,
|
*fMtx,
|
||||||
AsioBase<Executor, Allocator>::GetExecutor(),
|
AsioBase<Executor, Allocator>::GetExecutor(),
|
||||||
AsioBase<Executor, Allocator>::GetAllocator(),
|
AsioBase<Executor, Allocator>::GetAllocator(),
|
||||||
std::move(handler)));
|
std::move(handler)));
|
||||||
|
@ -1227,7 +1229,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
[&](auto handler) {
|
[&](auto handler) {
|
||||||
typename SetPropertiesOp::Id const id(tools::UuidHash());
|
typename SetPropertiesOp::Id const id(tools::UuidHash());
|
||||||
|
|
||||||
std::lock_guard<std::mutex> lk(fMtx);
|
std::lock_guard<std::mutex> lk(*fMtx);
|
||||||
|
|
||||||
for (auto it = begin(fGetPropertiesOps); it != end(fGetPropertiesOps);) {
|
for (auto it = begin(fGetPropertiesOps); it != end(fGetPropertiesOps);) {
|
||||||
if (it->second.IsCompleted()) {
|
if (it->second.IsCompleted()) {
|
||||||
|
@ -1243,7 +1245,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
std::forward_as_tuple(id,
|
std::forward_as_tuple(id,
|
||||||
fDDSTopo.GetTasks(path).size(),
|
fDDSTopo.GetTasks(path).size(),
|
||||||
timeout,
|
timeout,
|
||||||
fMtx,
|
*fMtx,
|
||||||
AsioBase<Executor, Allocator>::GetExecutor(),
|
AsioBase<Executor, Allocator>::GetExecutor(),
|
||||||
AsioBase<Executor, Allocator>::GetAllocator(),
|
AsioBase<Executor, Allocator>::GetAllocator(),
|
||||||
std::move(handler)));
|
std::move(handler)));
|
||||||
|
@ -1296,9 +1298,9 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
TopologyState fStateData;
|
TopologyState fStateData;
|
||||||
TopologyStateIndex fStateIndex;
|
TopologyStateIndex fStateIndex;
|
||||||
|
|
||||||
mutable std::mutex fMtx;
|
mutable std::unique_ptr<std::mutex> fMtx;
|
||||||
|
|
||||||
std::condition_variable fStateChangeUnsubscriptionCV;
|
std::unique_ptr<std::condition_variable> fStateChangeUnsubscriptionCV;
|
||||||
asio::steady_timer fHeartbeatsTimer;
|
asio::steady_timer fHeartbeatsTimer;
|
||||||
Duration fHeartbeatInterval;
|
Duration fHeartbeatInterval;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user