FairMQ: allow accumulation of parts on receive.

This commit is contained in:
Alexey Rybalchenko 2017-04-27 14:05:02 +02:00 committed by Mohammad Al-Turany
parent 1fa5c836a6
commit 0926a9a764
4 changed files with 26 additions and 27 deletions

View File

@ -823,27 +823,26 @@ bool FairMQChannel::CheckCompatibility(unique_ptr<FairMQMessage>& msg) const
bool FairMQChannel::CheckCompatibility(vector<unique_ptr<FairMQMessage>>& msgVec) const bool FairMQChannel::CheckCompatibility(vector<unique_ptr<FairMQMessage>>& msgVec) const
{ {
bool match = true;
if (msgVec.size() > 0) if (msgVec.size() > 0)
{ {
if (fTransportType == msgVec.at(0)->GetType()) for (unsigned int i = 0; i < msgVec.size(); ++i)
{ {
return true; if (fTransportType != msgVec.at(i)->GetType())
}
else
{
// LOG(WARN) << "Channel type does not match message type. Copying...";
vector<unique_ptr<FairMQMessage>> tempVec;
for (unsigned int i = 0; i < msgVec.size(); ++i)
{ {
tempVec.push_back(fTransportFactory->CreateMessage(msgVec[i]->GetSize())); // LOG(WARN) << "Channel type does not match message type. Copying...";
memcpy(tempVec[i]->GetData(), msgVec[i]->GetData(), msgVec[i]->GetSize()); FairMQMessagePtr newMsg(fTransportFactory->CreateMessage(msgVec[i]->GetSize()));
memcpy(newMsg->GetData(), msgVec[i]->GetData(), msgVec[i]->GetSize());
msgVec[i] = move(newMsg);
match = false;
} }
msgVec = move(tempVec);
return false;
} }
} }
else else
{ {
return true; return true;
} }
return match;
} }

View File

@ -282,11 +282,11 @@ int64_t FairMQSocketNN::Receive(vector<unique_ptr<FairMQMessage>>& msgVec, const
{ {
#ifdef MSGPACK_FOUND #ifdef MSGPACK_FOUND
// Warn if the vector is filled before Receive() and empty it. // Warn if the vector is filled before Receive() and empty it.
if (msgVec.size() > 0) // if (msgVec.size() > 0)
{ // {
LOG(WARN) << "Message vector contains elements before Receive(), they will be deleted!"; // LOG(WARN) << "Message vector contains elements before Receive(), they will be deleted!";
msgVec.clear(); // msgVec.clear();
} // }
while (true) while (true)
{ {

View File

@ -301,11 +301,11 @@ int64_t FairMQSocketSHM::Receive(vector<FairMQMessagePtr>& msgVec, const int fla
while (true) while (true)
{ {
// Warn if the vector is filled before Receive() and empty it. // Warn if the vector is filled before Receive() and empty it.
if (msgVec.size() > 0) // if (msgVec.size() > 0)
{ // {
LOG(WARN) << "Message vector contains elements before Receive(), they will be deleted!"; // LOG(WARN) << "Message vector contains elements before Receive(), they will be deleted!";
msgVec.clear(); // msgVec.clear();
} // }
totalSize = 0; totalSize = 0;
more = 0; more = 0;

View File

@ -269,11 +269,11 @@ int64_t FairMQSocketZMQ::Receive(vector<unique_ptr<FairMQMessage>>& msgVec, cons
while (true) while (true)
{ {
// Warn if the vector is filled before Receive() and empty it. // Warn if the vector is filled before Receive() and empty it.
if (msgVec.size() > 0) // if (msgVec.size() > 0)
{ // {
LOG(WARN) << "Message vector contains elements before Receive(), they will be deleted!"; // LOG(WARN) << "Message vector contains elements before Receive(), they will be deleted!";
msgVec.clear(); // msgVec.clear();
} // }
totalSize = 0; totalSize = 0;
more = 0; more = 0;