From 4dbb5535c3c60c0f91857487aa77d14acadd9184 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Thu, 24 Jun 2021 10:50:37 +0200 Subject: [PATCH] Add empty msg check for transport compatibility checker --- fairmq/FairMQChannel.h | 42 +++++++++++++++++++++-------------- test/helper/devices/TestRep.h | 6 ++--- test/helper/devices/TestReq.h | 3 +-- test/protocols/_req_rep.cxx | 27 ++++++++++++++-------- 4 files changed, 46 insertions(+), 32 deletions(-) diff --git a/fairmq/FairMQChannel.h b/fairmq/FairMQChannel.h index 9952461b..6f506f29 100644 --- a/fairmq/FairMQChannel.h +++ b/fairmq/FairMQChannel.h @@ -385,22 +385,7 @@ class FairMQChannel void CheckSendCompatibility(FairMQMessagePtr& msg) { if (fTransportType != msg->GetType()) { - FairMQMessagePtr msgWrapper(NewMessage( - msg->GetData(), - msg->GetSize(), - [](void* /*data*/, void* _msg) { delete static_cast(_msg); }, - msg.get() - )); - msg.release(); - msg = move(msgWrapper); - } - } - - void CheckSendCompatibility(std::vector& msgVec) - { - for (auto& msg : msgVec) { - if (fTransportType != msg->GetType()) { - + if (msg->GetSize() > 0) { FairMQMessagePtr msgWrapper(NewMessage( msg->GetData(), msg->GetSize(), @@ -409,6 +394,30 @@ class FairMQChannel )); msg.release(); msg = move(msgWrapper); + } else { + FairMQMessagePtr newMsg(NewMessage()); + msg = move(newMsg); + } + } + } + + void CheckSendCompatibility(std::vector& msgVec) + { + for (auto& msg : msgVec) { + if (fTransportType != msg->GetType()) { + if (msg->GetSize() > 0) { + FairMQMessagePtr msgWrapper(NewMessage( + msg->GetData(), + msg->GetSize(), + [](void* /*data*/, void* _msg) { delete static_cast(_msg); }, + msg.get() + )); + msg.release(); + msg = move(msgWrapper); + } else { + FairMQMessagePtr newMsg(NewMessage()); + msg = move(newMsg); + } } } } @@ -425,7 +434,6 @@ class FairMQChannel { for (auto& msg : msgVec) { if (fTransportType != msg->GetType()) { - FairMQMessagePtr newMsg(NewMessage()); msg = move(newMsg); } diff --git a/test/helper/devices/TestRep.h b/test/helper/devices/TestRep.h index 7159fe1b..0cb95d46 100644 --- a/test/helper/devices/TestRep.h +++ b/test/helper/devices/TestRep.h @@ -27,15 +27,13 @@ class Rep : public FairMQDevice auto Run() -> void override { auto request1 = FairMQMessagePtr{NewMessage()}; - if (Receive(request1, "data") >= 0) - { + if (Receive(request1, "data") >= 0) { LOG(info) << "Received request 1"; auto reply = FairMQMessagePtr{NewMessage()}; Send(reply, "data"); } auto request2 = FairMQMessagePtr{NewMessage()}; - if (Receive(request2, "data") >= 0) - { + if (Receive(request2, "data") >= 0) { LOG(info) << "Received request 2"; auto reply = FairMQMessagePtr{NewMessage()}; Send(reply, "data"); diff --git a/test/helper/devices/TestReq.h b/test/helper/devices/TestReq.h index cf719f4f..210cac33 100644 --- a/test/helper/devices/TestReq.h +++ b/test/helper/devices/TestReq.h @@ -30,8 +30,7 @@ class Req : public FairMQDevice Send(request, "data"); auto reply = FairMQMessagePtr{NewMessage()}; - if (Receive(reply, "data") >= 0) - { + if (Receive(reply, "data") >= 0) { LOG(info) << "received reply"; } }; diff --git a/test/protocols/_req_rep.cxx b/test/protocols/_req_rep.cxx index a1a12dea..5ff7e87c 100644 --- a/test/protocols/_req_rep.cxx +++ b/test/protocols/_req_rep.cxx @@ -24,27 +24,36 @@ auto RunReqRep(string transport) -> void { size_t session{fair::mq::tools::UuidHash()}; - auto rep = execute_result{ "", 0 }; + auto rep = execute_result{"", 0}; thread rep_thread([&]() { stringstream cmd; - cmd << runTestDevice << " --id rep_" << transport << " --control static " - << "--session " << session << " --color false --mq-config \"" << mqConfig << "\""; + cmd << runTestDevice << " --id rep_" << transport + << " --control static" + << " --session " << session + << " --color false" + << " --mq-config \"" << mqConfig << "\""; rep = execute(cmd.str(), "[REP]"); }); - auto req1 = execute_result{ "", 0 }; + auto req1 = execute_result{"", 0}; thread req1_thread([&]() { stringstream cmd; - cmd << runTestDevice << " --id req_1" << transport << " --control static " - << "--session " << session << " --color false --mq-config \"" << mqConfig << "\""; + cmd << runTestDevice << " --id req_1" << transport + << " --control static" + << " --session " << session + << " --color false" + << " --mq-config \"" << mqConfig << "\""; req1 = execute(cmd.str(), "[REQ1]"); }); - auto req2 = execute_result{ "", 0 }; + auto req2 = execute_result{"", 0}; thread req2_thread([&]() { stringstream cmd; - cmd << runTestDevice << " --id req_2" << transport << " --control static " - << "--session " << session << " --color false --mq-config \"" << mqConfig << "\""; + cmd << runTestDevice << " --id req_2" << transport + << " --control static" + << " --session " << session + << " --color false" + << " --mq-config \"" << mqConfig << "\""; req2 = execute(cmd.str(), "[REQ2]"); });