shm: handle shrink failure gracefully

This commit is contained in:
Alexey Rybalchenko 2021-03-23 03:47:48 +01:00
parent 72f319e276
commit bffe74c5cf
2 changed files with 40 additions and 20 deletions

View File

@ -210,12 +210,32 @@ class Message final : public fair::mq::Message
Deallocate();
return true;
} else if (newSize <= fMeta.fSize) {
try {
try {
fLocalPtr = fManager.ShrinkInPlace(newSize, fLocalPtr, fMeta.fSegmentId);
fMeta.fSize = newSize;
return true;
} catch (boost::interprocess::bad_alloc& e) {
// if shrinking fails (can happen due to boost alignment requirements):
// unused size >= 1000000 bytes: reallocate fully
// unused size < 1000000 bytes: simply reset the size and keep the rest of the buffer until message destruction
if (fMeta.fSize - newSize >= 1000000) {
char* newPtr = fManager.Allocate(newSize, fAlignment);
if (newPtr) {
std::memcpy(newPtr, fLocalPtr, newSize);
fManager.Deallocate(fMeta.fHandle, fMeta.fSegmentId);
fLocalPtr = newPtr;
fMeta.fHandle = fManager.GetHandleFromAddress(fLocalPtr, fMeta.fSegmentId);
} else {
LOG(debug) << "could not set used size: " << e.what();
return false;
}
}
fMeta.fSize = newSize;
return true;
}
} catch (boost::interprocess::interprocess_exception& e) {
LOG(info) << "could not set used size: " << e.what();
LOG(debug) << "could not set used size: " << e.what();
return false;
}
} else {
@ -257,7 +277,7 @@ class Message final : public fair::mq::Message
Manager& fManager;
bool fQueued;
MetaHeader fMeta;
size_t fAlignment; // TODO: put this to debug mode
size_t fAlignment;
mutable Region* fRegionPtr;
mutable char* fLocalPtr;

View File

@ -40,34 +40,34 @@ void RunPushPullWithMsgResize(const string& transport, const string& _address)
pull.Connect(address);
{
FairMQMessagePtr outMsg(push.NewMessage(1000));
ASSERT_EQ(outMsg->GetSize(), 1000);
memcpy(outMsg->GetData(), "ABC", 3);
ASSERT_EQ(outMsg->SetUsedSize(500), true);
ASSERT_EQ(outMsg->SetUsedSize(500), true);
ASSERT_EQ(outMsg->SetUsedSize(700), false);
ASSERT_EQ(outMsg->GetSize(), 500);
FairMQMessagePtr outMsg(push.NewMessage(6));
ASSERT_EQ(outMsg->GetSize(), 6);
memcpy(outMsg->GetData(), "ABCDEF", 6);
ASSERT_EQ(outMsg->SetUsedSize(5), true);
ASSERT_EQ(outMsg->SetUsedSize(5), true);
ASSERT_EQ(outMsg->SetUsedSize(7), false);
ASSERT_EQ(outMsg->GetSize(), 5);
// check if the data is still intact
ASSERT_EQ(static_cast<char*>(outMsg->GetData())[0], 'A');
ASSERT_EQ(static_cast<char*>(outMsg->GetData())[1], 'B');
ASSERT_EQ(static_cast<char*>(outMsg->GetData())[2], 'C');
ASSERT_EQ(outMsg->SetUsedSize(250), true);
ASSERT_EQ(outMsg->GetSize(), 250);
ASSERT_EQ(static_cast<char*>(outMsg->GetData())[3], 'D');
ASSERT_EQ(static_cast<char*>(outMsg->GetData())[4], 'E');
ASSERT_EQ(outMsg->SetUsedSize(2), true);
ASSERT_EQ(outMsg->GetSize(), 2);
ASSERT_EQ(static_cast<char*>(outMsg->GetData())[0], 'A');
ASSERT_EQ(static_cast<char*>(outMsg->GetData())[1], 'B');
ASSERT_EQ(static_cast<char*>(outMsg->GetData())[2], 'C');
FairMQMessagePtr msgCopy(push.NewMessage());
msgCopy->Copy(*outMsg);
ASSERT_EQ(msgCopy->GetSize(), 250);
ASSERT_EQ(msgCopy->GetSize(), 2);
ASSERT_EQ(push.Send(outMsg), 250);
ASSERT_EQ(push.Send(outMsg), 2);
FairMQMessagePtr inMsg(pull.NewMessage());
ASSERT_EQ(pull.Receive(inMsg), 250);
ASSERT_EQ(inMsg->GetSize(), 250);
ASSERT_EQ(pull.Receive(inMsg), 2);
ASSERT_EQ(inMsg->GetSize(), 2);
ASSERT_EQ(static_cast<char*>(inMsg->GetData())[0], 'A');
ASSERT_EQ(static_cast<char*>(inMsg->GetData())[1], 'B');
ASSERT_EQ(static_cast<char*>(inMsg->GetData())[2], 'C');
}
{