mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 16:46:47 +00:00
zmq: simplify SetUsedSize implementation
This commit is contained in:
parent
afadbb53e4
commit
be94ceb7a7
|
@ -38,10 +38,7 @@ class Message final : public fair::mq::Message
|
||||||
public:
|
public:
|
||||||
Message(FairMQTransportFactory* factory = nullptr)
|
Message(FairMQTransportFactory* factory = nullptr)
|
||||||
: fair::mq::Message(factory)
|
: fair::mq::Message(factory)
|
||||||
, fUsedSizeModified(false)
|
|
||||||
, fUsedSize()
|
|
||||||
, fMsg(tools::make_unique<zmq_msg_t>())
|
, fMsg(tools::make_unique<zmq_msg_t>())
|
||||||
, fViewMsg(nullptr)
|
|
||||||
{
|
{
|
||||||
if (zmq_msg_init(fMsg.get()) != 0) {
|
if (zmq_msg_init(fMsg.get()) != 0) {
|
||||||
LOG(error) << "failed initializing message, reason: " << zmq_strerror(errno);
|
LOG(error) << "failed initializing message, reason: " << zmq_strerror(errno);
|
||||||
|
@ -49,10 +46,7 @@ class Message final : public fair::mq::Message
|
||||||
}
|
}
|
||||||
Message(Alignment /* alignment */, FairMQTransportFactory* factory = nullptr)
|
Message(Alignment /* alignment */, FairMQTransportFactory* factory = nullptr)
|
||||||
: fair::mq::Message(factory)
|
: fair::mq::Message(factory)
|
||||||
, fUsedSizeModified(false)
|
|
||||||
, fUsedSize()
|
|
||||||
, fMsg(tools::make_unique<zmq_msg_t>())
|
, fMsg(tools::make_unique<zmq_msg_t>())
|
||||||
, fViewMsg(nullptr)
|
|
||||||
{
|
{
|
||||||
if (zmq_msg_init(fMsg.get()) != 0) {
|
if (zmq_msg_init(fMsg.get()) != 0) {
|
||||||
LOG(error) << "failed initializing message, reason: " << zmq_strerror(errno);
|
LOG(error) << "failed initializing message, reason: " << zmq_strerror(errno);
|
||||||
|
@ -61,10 +55,7 @@ class Message final : public fair::mq::Message
|
||||||
|
|
||||||
Message(const size_t size, FairMQTransportFactory* factory = nullptr)
|
Message(const size_t size, FairMQTransportFactory* factory = nullptr)
|
||||||
: fair::mq::Message(factory)
|
: fair::mq::Message(factory)
|
||||||
, fUsedSizeModified(false)
|
|
||||||
, fUsedSize(size)
|
|
||||||
, fMsg(tools::make_unique<zmq_msg_t>())
|
, fMsg(tools::make_unique<zmq_msg_t>())
|
||||||
, fViewMsg(nullptr)
|
|
||||||
{
|
{
|
||||||
if (zmq_msg_init_size(fMsg.get(), size) != 0) {
|
if (zmq_msg_init_size(fMsg.get(), size) != 0) {
|
||||||
LOG(error) << "failed initializing message with size, reason: " << zmq_strerror(errno);
|
LOG(error) << "failed initializing message with size, reason: " << zmq_strerror(errno);
|
||||||
|
@ -73,10 +64,7 @@ class Message final : public fair::mq::Message
|
||||||
|
|
||||||
Message(const size_t size, Alignment /* alignment */, FairMQTransportFactory* factory = nullptr)
|
Message(const size_t size, Alignment /* alignment */, FairMQTransportFactory* factory = nullptr)
|
||||||
: fair::mq::Message(factory)
|
: fair::mq::Message(factory)
|
||||||
, fUsedSizeModified(false)
|
|
||||||
, fUsedSize(size)
|
|
||||||
, fMsg(tools::make_unique<zmq_msg_t>())
|
, fMsg(tools::make_unique<zmq_msg_t>())
|
||||||
, fViewMsg(nullptr)
|
|
||||||
{
|
{
|
||||||
if (zmq_msg_init_size(fMsg.get(), size) != 0) {
|
if (zmq_msg_init_size(fMsg.get(), size) != 0) {
|
||||||
LOG(error) << "failed initializing message with size, reason: " << zmq_strerror(errno);
|
LOG(error) << "failed initializing message with size, reason: " << zmq_strerror(errno);
|
||||||
|
@ -85,10 +73,7 @@ class Message final : public fair::mq::Message
|
||||||
|
|
||||||
Message(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr, FairMQTransportFactory* factory = nullptr)
|
Message(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr, FairMQTransportFactory* factory = nullptr)
|
||||||
: fair::mq::Message(factory)
|
: fair::mq::Message(factory)
|
||||||
, fUsedSizeModified(false)
|
|
||||||
, fUsedSize()
|
|
||||||
, fMsg(tools::make_unique<zmq_msg_t>())
|
, fMsg(tools::make_unique<zmq_msg_t>())
|
||||||
, fViewMsg(nullptr)
|
|
||||||
{
|
{
|
||||||
if (zmq_msg_init_data(fMsg.get(), data, size, ffn, hint) != 0) {
|
if (zmq_msg_init_data(fMsg.get(), data, size, ffn, hint) != 0) {
|
||||||
LOG(error) << "failed initializing message with data, reason: " << zmq_strerror(errno);
|
LOG(error) << "failed initializing message with data, reason: " << zmq_strerror(errno);
|
||||||
|
@ -97,10 +82,7 @@ class Message final : public fair::mq::Message
|
||||||
|
|
||||||
Message(UnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0, FairMQTransportFactory* factory = nullptr)
|
Message(UnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0, FairMQTransportFactory* factory = nullptr)
|
||||||
: fair::mq::Message(factory)
|
: fair::mq::Message(factory)
|
||||||
, fUsedSizeModified(false)
|
|
||||||
, fUsedSize()
|
|
||||||
, fMsg(tools::make_unique<zmq_msg_t>())
|
, fMsg(tools::make_unique<zmq_msg_t>())
|
||||||
, fViewMsg(nullptr)
|
|
||||||
{
|
{
|
||||||
// FIXME: make this zero-copy:
|
// FIXME: make this zero-copy:
|
||||||
// simply taking over the provided buffer can casue premature delete, since region could be
|
// simply taking over the provided buffer can casue premature delete, since region could be
|
||||||
|
@ -154,29 +136,14 @@ class Message final : public fair::mq::Message
|
||||||
|
|
||||||
void* GetData() const override
|
void* GetData() const override
|
||||||
{
|
{
|
||||||
if (!fViewMsg) {
|
|
||||||
if (zmq_msg_size(fMsg.get()) > 0) {
|
if (zmq_msg_size(fMsg.get()) > 0) {
|
||||||
return zmq_msg_data(fMsg.get());
|
return zmq_msg_data(fMsg.get());
|
||||||
} else {
|
} else {
|
||||||
return nullptr;
|
return nullptr;
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
if (zmq_msg_size(fViewMsg.get()) > 0) {
|
|
||||||
return zmq_msg_data(fViewMsg.get());
|
|
||||||
} else {
|
|
||||||
return nullptr;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t GetSize() const override
|
size_t GetSize() const override { return zmq_msg_size(fMsg.get()); }
|
||||||
{
|
|
||||||
if (fUsedSizeModified) {
|
|
||||||
return fUsedSize;
|
|
||||||
} else {
|
|
||||||
return zmq_msg_size(fMsg.get());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// To emulate shrinking, a new message is created with the new size (ViewMsg), that points to
|
// To emulate shrinking, a new message is created with the new size (ViewMsg), that points to
|
||||||
// the original buffer with the new size. Once the "view message" is transfered, the original is
|
// the original buffer with the new size. Once the "view message" is transfered, the original is
|
||||||
|
@ -185,30 +152,25 @@ class Message final : public fair::mq::Message
|
||||||
// happens.
|
// happens.
|
||||||
bool SetUsedSize(const size_t size) override
|
bool SetUsedSize(const size_t size) override
|
||||||
{
|
{
|
||||||
if (size <= zmq_msg_size(fMsg.get())) {
|
if (size == GetSize()) {
|
||||||
fUsedSize = size;
|
// nothing to do
|
||||||
fUsedSizeModified = true;
|
|
||||||
return true;
|
return true;
|
||||||
} else {
|
} else if (size > GetSize()) {
|
||||||
LOG(error) << "cannot set used size higher than original.";
|
LOG(error) << "cannot set used size higher than original.";
|
||||||
return false;
|
return false;
|
||||||
}
|
} else {
|
||||||
}
|
auto newMsg = tools::make_unique<zmq_msg_t>();
|
||||||
|
void* data = GetData();
|
||||||
void ApplyUsedSize()
|
if (zmq_msg_init_data(newMsg.get(), data, size, [](void* /* data */, void* obj) {
|
||||||
{
|
|
||||||
// Apply only once (before actual send).
|
|
||||||
// The check is needed because a send could fail and can be reattempted by the user, in this
|
|
||||||
// case we do not want to modify buffer again.
|
|
||||||
if (fUsedSizeModified && !fViewMsg) {
|
|
||||||
fViewMsg = tools::make_unique<zmq_msg_t>();
|
|
||||||
void* ptr = zmq_msg_data(fMsg.get());
|
|
||||||
if (zmq_msg_init_data(fViewMsg.get(), ptr, fUsedSize, [](void* /* data */, void* obj) {
|
|
||||||
zmq_msg_close(static_cast<zmq_msg_t*>(obj));
|
zmq_msg_close(static_cast<zmq_msg_t*>(obj));
|
||||||
delete static_cast<zmq_msg_t*>(obj);
|
delete static_cast<zmq_msg_t*>(obj);
|
||||||
}, fMsg.release()) != 0) {
|
}, fMsg.get()) != 0) {
|
||||||
LOG(error) << "failed initializing view message, reason: " << zmq_strerror(errno);
|
LOG(error) << "failed initializing message with data, reason: " << zmq_strerror(errno);
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
fMsg.release();
|
||||||
|
fMsg.swap(newMsg);
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -222,48 +184,22 @@ class Message final : public fair::mq::Message
|
||||||
LOG(error) << "failed copying message, reason: " << zmq_strerror(errno);
|
LOG(error) << "failed copying message, reason: " << zmq_strerror(errno);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// if the target message has been resized, apply same to this message also
|
|
||||||
if (zMsg.fUsedSizeModified) {
|
|
||||||
fUsedSizeModified = true;
|
|
||||||
fUsedSize = zMsg.fUsedSize;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
~Message() override { CloseMessage(); }
|
~Message() override { CloseMessage(); }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
bool fUsedSizeModified;
|
|
||||||
size_t fUsedSize;
|
|
||||||
std::unique_ptr<zmq_msg_t> fMsg;
|
std::unique_ptr<zmq_msg_t> fMsg;
|
||||||
std::unique_ptr<zmq_msg_t> fViewMsg; // view on a subset of fMsg (treating it as user buffer)
|
|
||||||
|
|
||||||
zmq_msg_t* GetMessage() const
|
zmq_msg_t* GetMessage() const { return fMsg.get(); }
|
||||||
{
|
|
||||||
if (!fViewMsg) {
|
|
||||||
return fMsg.get();
|
|
||||||
} else {
|
|
||||||
return fViewMsg.get();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void CloseMessage()
|
void CloseMessage()
|
||||||
{
|
{
|
||||||
if (!fViewMsg) {
|
|
||||||
if (zmq_msg_close(fMsg.get()) != 0) {
|
if (zmq_msg_close(fMsg.get()) != 0) {
|
||||||
LOG(error) << "failed closing message, reason: " << zmq_strerror(errno);
|
LOG(error) << "failed closing message, reason: " << zmq_strerror(errno);
|
||||||
}
|
}
|
||||||
// reset the message object to allow reuse in Rebuild
|
// reset the message object to allow reuse in Rebuild
|
||||||
fMsg.reset(nullptr);
|
fMsg.reset(nullptr);
|
||||||
} else {
|
|
||||||
if (zmq_msg_close(fViewMsg.get()) != 0) {
|
|
||||||
LOG(error) << "failed closing message, reason: " << zmq_strerror(errno);
|
|
||||||
}
|
|
||||||
// reset the message object to allow reuse in Rebuild
|
|
||||||
fViewMsg.reset(nullptr);
|
|
||||||
}
|
|
||||||
fUsedSizeModified = false;
|
|
||||||
fUsedSize = 0;
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -140,8 +140,6 @@ class Socket final : public fair::mq::Socket
|
||||||
}
|
}
|
||||||
int elapsed = 0;
|
int elapsed = 0;
|
||||||
|
|
||||||
static_cast<Message*>(msg.get())->ApplyUsedSize();
|
|
||||||
|
|
||||||
int64_t actualBytes = zmq_msg_size(static_cast<Message*>(msg.get())->GetMessage());
|
int64_t actualBytes = zmq_msg_size(static_cast<Message*>(msg.get())->GetMessage());
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
|
@ -211,8 +209,6 @@ class Socket final : public fair::mq::Socket
|
||||||
bool repeat = false;
|
bool repeat = false;
|
||||||
|
|
||||||
for (unsigned int i = 0; i < vecSize; ++i) {
|
for (unsigned int i = 0; i < vecSize; ++i) {
|
||||||
static_cast<Message*>(msgVec[i].get())->ApplyUsedSize();
|
|
||||||
|
|
||||||
int nbytes = zmq_msg_send(static_cast<Message*>(msgVec[i].get())->GetMessage(), fSocket, (i < vecSize - 1) ? ZMQ_SNDMORE | flags : flags);
|
int nbytes = zmq_msg_send(static_cast<Message*>(msgVec[i].get())->GetMessage(), fSocket, (i < vecSize - 1) ? ZMQ_SNDMORE | flags : flags);
|
||||||
if (nbytes >= 0) {
|
if (nbytes >= 0) {
|
||||||
totalSize += nbytes;
|
totalSize += nbytes;
|
||||||
|
|
|
@ -40,10 +40,20 @@ void RunPushPullWithMsgResize(const string& transport, const string& _address)
|
||||||
|
|
||||||
FairMQMessagePtr outMsg(push.NewMessage(1000));
|
FairMQMessagePtr outMsg(push.NewMessage(1000));
|
||||||
ASSERT_EQ(outMsg->GetSize(), 1000);
|
ASSERT_EQ(outMsg->GetSize(), 1000);
|
||||||
outMsg->SetUsedSize(500);
|
memcpy(outMsg->GetData(), "ABC", 3);
|
||||||
|
ASSERT_EQ(outMsg->SetUsedSize(500), true);
|
||||||
|
ASSERT_EQ(outMsg->SetUsedSize(500), true);
|
||||||
|
ASSERT_EQ(outMsg->SetUsedSize(700), false);
|
||||||
ASSERT_EQ(outMsg->GetSize(), 500);
|
ASSERT_EQ(outMsg->GetSize(), 500);
|
||||||
outMsg->SetUsedSize(250);
|
// check if the data is still intact
|
||||||
|
ASSERT_EQ(static_cast<char*>(outMsg->GetData())[0], 'A');
|
||||||
|
ASSERT_EQ(static_cast<char*>(outMsg->GetData())[1], 'B');
|
||||||
|
ASSERT_EQ(static_cast<char*>(outMsg->GetData())[2], 'C');
|
||||||
|
ASSERT_EQ(outMsg->SetUsedSize(250), true);
|
||||||
ASSERT_EQ(outMsg->GetSize(), 250);
|
ASSERT_EQ(outMsg->GetSize(), 250);
|
||||||
|
ASSERT_EQ(static_cast<char*>(outMsg->GetData())[0], 'A');
|
||||||
|
ASSERT_EQ(static_cast<char*>(outMsg->GetData())[1], 'B');
|
||||||
|
ASSERT_EQ(static_cast<char*>(outMsg->GetData())[2], 'C');
|
||||||
FairMQMessagePtr msgCopy(push.NewMessage());
|
FairMQMessagePtr msgCopy(push.NewMessage());
|
||||||
msgCopy->Copy(*outMsg);
|
msgCopy->Copy(*outMsg);
|
||||||
ASSERT_EQ(msgCopy->GetSize(), 250);
|
ASSERT_EQ(msgCopy->GetSize(), 250);
|
||||||
|
@ -53,6 +63,9 @@ void RunPushPullWithMsgResize(const string& transport, const string& _address)
|
||||||
FairMQMessagePtr inMsg(pull.NewMessage());
|
FairMQMessagePtr inMsg(pull.NewMessage());
|
||||||
ASSERT_EQ(pull.Receive(inMsg), 250);
|
ASSERT_EQ(pull.Receive(inMsg), 250);
|
||||||
ASSERT_EQ(inMsg->GetSize(), 250);
|
ASSERT_EQ(inMsg->GetSize(), 250);
|
||||||
|
ASSERT_EQ(static_cast<char*>(inMsg->GetData())[0], 'A');
|
||||||
|
ASSERT_EQ(static_cast<char*>(inMsg->GetData())[1], 'B');
|
||||||
|
ASSERT_EQ(static_cast<char*>(inMsg->GetData())[2], 'C');
|
||||||
}
|
}
|
||||||
|
|
||||||
void RunMsgRebuild(const string& transport)
|
void RunMsgRebuild(const string& transport)
|
||||||
|
|
Loading…
Reference in New Issue
Block a user