From c6b13cd3a18ac0e4d724e4df54f8027885b9ee0d Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Mon, 25 Jan 2021 12:40:25 +0100 Subject: [PATCH] Fix shmem::Message::SetUsedSize(0) --- fairmq/shmem/Message.h | 10 +++++- test/message/_message.cxx | 67 ++++++++++++++++++++++++--------------- 2 files changed, 50 insertions(+), 27 deletions(-) diff --git a/fairmq/shmem/Message.h b/fairmq/shmem/Message.h index 20c78e1b..0739ff81 100644 --- a/fairmq/shmem/Message.h +++ b/fairmq/shmem/Message.h @@ -206,6 +206,9 @@ class Message final : public fair::mq::Message { if (newSize == fMeta.fSize) { return true; + } else if (newSize == 0) { + Deallocate(); + return true; } else if (newSize <= fMeta.fSize) { try { fLocalPtr = fManager.ShrinkInPlace(newSize, fLocalPtr, fMeta.fSegmentId); @@ -268,7 +271,7 @@ class Message final : public fair::mq::Message return fLocalPtr; } - void CloseMessage() + void Deallocate() { if (fMeta.fHandle >= 0 && !fQueued) { if (fMeta.fRegionId == 0) { @@ -289,6 +292,11 @@ class Message final : public fair::mq::Message } fLocalPtr = nullptr; fMeta.fSize = 0; + } + + void CloseMessage() + { + Deallocate(); fAlignment = 0; fManager.DecrementMsgCounter(); // TODO: put this to debug mode diff --git a/test/message/_message.cxx b/test/message/_message.cxx index 6d9cf10b..2eb2c5a1 100644 --- a/test/message/_message.cxx +++ b/test/message/_message.cxx @@ -39,34 +39,49 @@ void RunPushPullWithMsgResize(const string& transport, const string& _address) FairMQChannel pull{"Pull", "pull", factory}; pull.Connect(address); - FairMQMessagePtr outMsg(push.NewMessage(1000)); - ASSERT_EQ(outMsg->GetSize(), 1000); - memcpy(outMsg->GetData(), "ABC", 3); - ASSERT_EQ(outMsg->SetUsedSize(500), true); - ASSERT_EQ(outMsg->SetUsedSize(500), true); - ASSERT_EQ(outMsg->SetUsedSize(700), false); - ASSERT_EQ(outMsg->GetSize(), 500); - // check if the data is still intact - ASSERT_EQ(static_cast(outMsg->GetData())[0], 'A'); - ASSERT_EQ(static_cast(outMsg->GetData())[1], 'B'); - ASSERT_EQ(static_cast(outMsg->GetData())[2], 'C'); - ASSERT_EQ(outMsg->SetUsedSize(250), true); - ASSERT_EQ(outMsg->GetSize(), 250); - ASSERT_EQ(static_cast(outMsg->GetData())[0], 'A'); - ASSERT_EQ(static_cast(outMsg->GetData())[1], 'B'); - ASSERT_EQ(static_cast(outMsg->GetData())[2], 'C'); - FairMQMessagePtr msgCopy(push.NewMessage()); - msgCopy->Copy(*outMsg); - ASSERT_EQ(msgCopy->GetSize(), 250); + { + FairMQMessagePtr outMsg(push.NewMessage(1000)); + ASSERT_EQ(outMsg->GetSize(), 1000); + memcpy(outMsg->GetData(), "ABC", 3); + ASSERT_EQ(outMsg->SetUsedSize(500), true); + ASSERT_EQ(outMsg->SetUsedSize(500), true); + ASSERT_EQ(outMsg->SetUsedSize(700), false); + ASSERT_EQ(outMsg->GetSize(), 500); + // check if the data is still intact + ASSERT_EQ(static_cast(outMsg->GetData())[0], 'A'); + ASSERT_EQ(static_cast(outMsg->GetData())[1], 'B'); + ASSERT_EQ(static_cast(outMsg->GetData())[2], 'C'); + ASSERT_EQ(outMsg->SetUsedSize(250), true); + ASSERT_EQ(outMsg->GetSize(), 250); + ASSERT_EQ(static_cast(outMsg->GetData())[0], 'A'); + ASSERT_EQ(static_cast(outMsg->GetData())[1], 'B'); + ASSERT_EQ(static_cast(outMsg->GetData())[2], 'C'); + FairMQMessagePtr msgCopy(push.NewMessage()); + msgCopy->Copy(*outMsg); + ASSERT_EQ(msgCopy->GetSize(), 250); - ASSERT_EQ(push.Send(outMsg), 250); + ASSERT_EQ(push.Send(outMsg), 250); - FairMQMessagePtr inMsg(pull.NewMessage()); - ASSERT_EQ(pull.Receive(inMsg), 250); - ASSERT_EQ(inMsg->GetSize(), 250); - ASSERT_EQ(static_cast(inMsg->GetData())[0], 'A'); - ASSERT_EQ(static_cast(inMsg->GetData())[1], 'B'); - ASSERT_EQ(static_cast(inMsg->GetData())[2], 'C'); + FairMQMessagePtr inMsg(pull.NewMessage()); + ASSERT_EQ(pull.Receive(inMsg), 250); + ASSERT_EQ(inMsg->GetSize(), 250); + ASSERT_EQ(static_cast(inMsg->GetData())[0], 'A'); + ASSERT_EQ(static_cast(inMsg->GetData())[1], 'B'); + ASSERT_EQ(static_cast(inMsg->GetData())[2], 'C'); + } + + { + FairMQMessagePtr outMsg(push.NewMessage(1000)); + ASSERT_EQ(outMsg->SetUsedSize(0), true); + ASSERT_EQ(outMsg->GetSize(), 0); + FairMQMessagePtr msgCopy(push.NewMessage()); + msgCopy->Copy(*outMsg); + ASSERT_EQ(msgCopy->GetSize(), 0); + ASSERT_EQ(push.Send(outMsg), 0); + FairMQMessagePtr inMsg(pull.NewMessage()); + ASSERT_EQ(pull.Receive(inMsg), 0); + ASSERT_EQ(inMsg->GetSize(), 0); + } } void RunMsgRebuild(const string& transport)