shm: revert some changes from c85d6e0 that introduced a race

This commit is contained in:
Alexey Rybalchenko 2021-05-20 00:28:44 +02:00
parent 021c1b1c4d
commit a38e0eee66
3 changed files with 10 additions and 13 deletions

View File

@ -327,7 +327,6 @@ class Manager
fShmRegions->emplace(id, RegionInfo(path.c_str(), flags, userFlags, fShmVoidAlloc));
r.first->second->InitializeQueues();
r.first->second->StartReceivingAcks();
result.first = &(r.first->second->fRegion);
result.second = id;

View File

@ -309,8 +309,6 @@ class Message final : public fair::mq::Message
}
if (fRegionPtr) {
fRegionPtr->InitializeQueues();
fRegionPtr->StartSendingAcks();
fRegionPtr->ReleaseBlock({fMeta.fHandle, fMeta.fSize, fMeta.fHint});
} else {
LOG(warn) << "region ack queue for id " << fMeta.fRegionId << " no longer exist. Not sending ack";

View File

@ -104,6 +104,10 @@ struct Region
}
}
InitializeQueues();
StartSendingAcks();
LOG(trace) << "shmem: initialized region: " << fName << " (" << (fRemote ? "remote" : "local") << ")";
}
@ -116,21 +120,17 @@ struct Region
{
using namespace boost::interprocess;
if (fQueue == nullptr) {
if (fRemote) {
fQueue = std::make_unique<message_queue>(open_only, fQueueName.c_str());
} else {
fQueue = std::make_unique<message_queue>(create_only, fQueueName.c_str(), 1024, fAckBunchSize * sizeof(RegionBlock));
}
LOG(trace) << "shmem: initialized region queue: " << fQueueName << " (" << (fRemote ? "remote" : "local") << ")";
if (fRemote) {
fQueue = std::make_unique<message_queue>(open_only, fQueueName.c_str());
} else {
fQueue = std::make_unique<message_queue>(create_only, fQueueName.c_str(), 1024, fAckBunchSize * sizeof(RegionBlock));
}
LOG(trace) << "shmem: initialized region queue: " << fQueueName << " (" << (fRemote ? "remote" : "local") << ")";
}
void StartSendingAcks()
{
if (!fAcksSender.joinable()) {
fAcksSender = std::thread(&Region::SendAcks, this);
}
fAcksSender = std::thread(&Region::SendAcks, this);
}
void SendAcks()
{