mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 16:46:47 +00:00
SDK: Fix data races on the local semaphores
This commit is contained in:
parent
5d6184cd1a
commit
1c49dde668
|
@ -186,12 +186,12 @@ auto DDSSession::SubmitAgents(Quantity agents) -> void
|
||||||
submitInfo.m_instances = agents;
|
submitInfo.m_instances = agents;
|
||||||
submitInfo.m_config = GetRMSConfig().string();
|
submitInfo.m_config = GetRMSConfig().string();
|
||||||
|
|
||||||
tools::Semaphore blocker;
|
tools::SharedSemaphore blocker;
|
||||||
auto submitRequest = SSubmitRequest::makeRequest(submitInfo);
|
auto submitRequest = SSubmitRequest::makeRequest(submitInfo);
|
||||||
submitRequest->setMessageCallback([](const SMessageResponseData& message){
|
submitRequest->setMessageCallback([](const SMessageResponseData& message){
|
||||||
LOG(debug) << message.m_msg;
|
LOG(debug) << message.m_msg;
|
||||||
});
|
});
|
||||||
submitRequest->setDoneCallback([&]() {
|
submitRequest->setDoneCallback([agents, blocker]() mutable {
|
||||||
LOG(debug) << agents << " Agents submitted";
|
LOG(debug) << agents << " Agents submitted";
|
||||||
blocker.Signal();
|
blocker.Signal();
|
||||||
});
|
});
|
||||||
|
@ -265,7 +265,7 @@ auto DDSSession::RequestCommanderInfo() -> CommanderInfo
|
||||||
using namespace dds::tools_api;
|
using namespace dds::tools_api;
|
||||||
|
|
||||||
SCommanderInfoRequestData commanderInfo;
|
SCommanderInfoRequestData commanderInfo;
|
||||||
tools::Semaphore blocker;
|
tools::SharedSemaphore blocker;
|
||||||
std::string error;
|
std::string error;
|
||||||
auto commanderInfoRequest = SCommanderInfoRequest::makeRequest(commanderInfo);
|
auto commanderInfoRequest = SCommanderInfoRequest::makeRequest(commanderInfo);
|
||||||
CommanderInfo info;
|
CommanderInfo info;
|
||||||
|
@ -281,7 +281,7 @@ auto DDSSession::RequestCommanderInfo() -> CommanderInfo
|
||||||
LOG(debug) << _message.m_msg;
|
LOG(debug) << _message.m_msg;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
commanderInfoRequest->setDoneCallback([&]() { blocker.Signal(); });
|
commanderInfoRequest->setDoneCallback([blocker]() mutable { blocker.Signal(); });
|
||||||
fImpl->fSession->sendRequest<SCommanderInfoRequest>(commanderInfoRequest);
|
fImpl->fSession->sendRequest<SCommanderInfoRequest>(commanderInfoRequest);
|
||||||
blocker.Wait();
|
blocker.Wait();
|
||||||
|
|
||||||
|
@ -322,12 +322,12 @@ auto DDSSession::ActivateTopology(DDSTopology topo) -> void
|
||||||
topologyInfo.m_updateType = STopologyRequestData::EUpdateType::ACTIVATE;
|
topologyInfo.m_updateType = STopologyRequestData::EUpdateType::ACTIVATE;
|
||||||
topologyInfo.m_topologyFile = topo.GetTopoFile().string();
|
topologyInfo.m_topologyFile = topo.GetTopoFile().string();
|
||||||
|
|
||||||
tools::Semaphore blocker;
|
tools::SharedSemaphore blocker;
|
||||||
auto topologyRequest = STopologyRequest::makeRequest(topologyInfo);
|
auto topologyRequest = STopologyRequest::makeRequest(topologyInfo);
|
||||||
topologyRequest->setMessageCallback([](const SMessageResponseData& _message) {
|
topologyRequest->setMessageCallback([](const SMessageResponseData& _message) {
|
||||||
LOG(debug) << _message.m_msg;
|
LOG(debug) << _message.m_msg;
|
||||||
});
|
});
|
||||||
topologyRequest->setDoneCallback([&]() { blocker.Signal(); });
|
topologyRequest->setDoneCallback([blocker]() mutable { blocker.Signal(); });
|
||||||
fImpl->fSession->sendRequest<STopologyRequest>(topologyRequest);
|
fImpl->fSession->sendRequest<STopologyRequest>(topologyRequest);
|
||||||
blocker.Wait();
|
blocker.Wait();
|
||||||
|
|
||||||
|
|
|
@ -338,10 +338,11 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
auto ChangeState(TopologyTransition transition, Duration timeout = Duration(0))
|
auto ChangeState(TopologyTransition transition, Duration timeout = Duration(0))
|
||||||
-> std::pair<std::error_code, TopologyState>
|
-> std::pair<std::error_code, TopologyState>
|
||||||
{
|
{
|
||||||
tools::Semaphore blocker;
|
tools::SharedSemaphore blocker;
|
||||||
std::error_code ec;
|
std::error_code ec;
|
||||||
TopologyState state;
|
TopologyState state;
|
||||||
AsyncChangeState(transition, timeout, [&](std::error_code _ec, TopologyState _state) mutable {
|
AsyncChangeState(
|
||||||
|
transition, timeout, [&, blocker](std::error_code _ec, TopologyState _state) mutable {
|
||||||
ec = _ec;
|
ec = _ec;
|
||||||
state = _state;
|
state = _state;
|
||||||
blocker.Signal();
|
blocker.Signal();
|
||||||
|
|
|
@ -39,7 +39,7 @@ auto Semaphore::Signal() -> void
|
||||||
fCv.notify_one();
|
fCv.notify_one();
|
||||||
}
|
}
|
||||||
|
|
||||||
auto Semaphore::GetCount() -> std::size_t
|
auto Semaphore::GetCount() const -> std::size_t
|
||||||
{
|
{
|
||||||
std::unique_lock<std::mutex> lk(fMutex);
|
std::unique_lock<std::mutex> lk(fMutex);
|
||||||
return fCount;
|
return fCount;
|
||||||
|
@ -63,7 +63,7 @@ auto SharedSemaphore::Signal() -> void
|
||||||
fSemaphore->Signal();
|
fSemaphore->Signal();
|
||||||
}
|
}
|
||||||
|
|
||||||
auto SharedSemaphore::GetCount() -> std::size_t
|
auto SharedSemaphore::GetCount() const -> std::size_t
|
||||||
{
|
{
|
||||||
return fSemaphore->GetCount();
|
return fSemaphore->GetCount();
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,11 +30,11 @@ struct Semaphore
|
||||||
|
|
||||||
auto Wait() -> void;
|
auto Wait() -> void;
|
||||||
auto Signal() -> void;
|
auto Signal() -> void;
|
||||||
auto GetCount() -> std::size_t;
|
auto GetCount() const -> std::size_t;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::size_t fCount;
|
std::size_t fCount;
|
||||||
std::mutex fMutex;
|
mutable std::mutex fMutex;
|
||||||
std::condition_variable fCv;
|
std::condition_variable fCv;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -49,7 +49,7 @@ struct SharedSemaphore
|
||||||
|
|
||||||
auto Wait() -> void;
|
auto Wait() -> void;
|
||||||
auto Signal() -> void;
|
auto Signal() -> void;
|
||||||
auto GetCount() -> std::size_t;
|
auto GetCount() const -> std::size_t;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::shared_ptr<Semaphore> fSemaphore;
|
std::shared_ptr<Semaphore> fSemaphore;
|
||||||
|
|
|
@ -49,11 +49,11 @@ TEST_F(Topology, AsyncChangeState)
|
||||||
{
|
{
|
||||||
using namespace fair::mq;
|
using namespace fair::mq;
|
||||||
|
|
||||||
tools::Semaphore blocker;
|
tools::SharedSemaphore blocker;
|
||||||
sdk::Topology topo(mDDSTopo, mDDSSession);
|
sdk::Topology topo(mDDSTopo, mDDSSession);
|
||||||
topo.AsyncChangeState(
|
topo.AsyncChangeState(
|
||||||
sdk::TopologyTransition::InitDevice,
|
sdk::TopologyTransition::InitDevice,
|
||||||
[&](std::error_code ec, sdk::TopologyState) {
|
[=](std::error_code ec, sdk::TopologyState) mutable {
|
||||||
LOG(info) << ec;
|
LOG(info) << ec;
|
||||||
EXPECT_EQ(ec, std::error_code());
|
EXPECT_EQ(ec, std::error_code());
|
||||||
blocker.Signal();
|
blocker.Signal();
|
||||||
|
@ -142,9 +142,9 @@ TEST_F(Topology, AsyncChangeStateConcurrent)
|
||||||
using namespace fair::mq;
|
using namespace fair::mq;
|
||||||
|
|
||||||
sdk::Topology topo(mDDSTopo, mDDSSession);
|
sdk::Topology topo(mDDSTopo, mDDSSession);
|
||||||
tools::Semaphore blocker;
|
tools::SharedSemaphore blocker;
|
||||||
topo.AsyncChangeState(sdk::TopologyTransition::InitDevice,
|
topo.AsyncChangeState(sdk::TopologyTransition::InitDevice,
|
||||||
[&blocker](std::error_code ec, sdk::TopologyState) {
|
[blocker](std::error_code ec, sdk::TopologyState) mutable {
|
||||||
LOG(info) << "result for valid ChangeState: " << ec;
|
LOG(info) << "result for valid ChangeState: " << ec;
|
||||||
blocker.Signal();
|
blocker.Signal();
|
||||||
});
|
});
|
||||||
|
|
Loading…
Reference in New Issue
Block a user