Avoid copy (where possible) when switching transports

This commit is contained in:
Alexey Rybalchenko 2018-06-06 18:10:42 +02:00 committed by Mohammad Al-Turany
parent 96e2076300
commit 653e82cab4
10 changed files with 66 additions and 59 deletions

View File

@ -748,44 +748,35 @@ unsigned long FairMQChannel::GetMessagesRx() const
return fSocket->GetMessagesRx();
}
bool FairMQChannel::CheckCompatibility(unique_ptr<FairMQMessage>& msg) const
void FairMQChannel::CheckCompatibility(unique_ptr<FairMQMessage>& msg) const
{
if (fTransportType == msg->GetType())
if (fTransportType != msg->GetType())
{
return true;
}
else
{
// LOG(warn) << "Channel type does not match message type. Copying...";
FairMQMessagePtr msgCopy(fTransportFactory->CreateMessage(msg->GetSize()));
memcpy(msgCopy->GetData(), msg->GetData(), msg->GetSize());
msg = move(msgCopy);
return false;
// LOG(debug) << "Channel type does not match message type. Creating wrapper";
FairMQMessagePtr msgWrapper(fTransportFactory->CreateMessage(msg->GetData(),
msg->GetSize(),
[](void* /*data*/, void* msg) { delete static_cast<FairMQMessage*>(msg); },
msg.get()
));
msg.release();
msg = move(msgWrapper);
}
}
bool FairMQChannel::CheckCompatibility(vector<unique_ptr<FairMQMessage>>& msgVec) const
void FairMQChannel::CheckCompatibility(vector<FairMQMessagePtr>& msgVec) const
{
bool match = true;
if (msgVec.size() > 0)
for (auto& part : msgVec)
{
for (unsigned int i = 0; i < msgVec.size(); ++i)
if (fTransportType != part->GetType())
{
if (fTransportType != msgVec.at(i)->GetType())
{
// LOG(warn) << "Channel type does not match message type. Copying...";
FairMQMessagePtr newMsg(fTransportFactory->CreateMessage(msgVec[i]->GetSize()));
memcpy(newMsg->GetData(), msgVec[i]->GetData(), msgVec[i]->GetSize());
msgVec[i] = move(newMsg);
match = false;
}
// LOG(debug) << "Channel type does not match message type. Creating wrapper";
FairMQMessagePtr partWrapper(fTransportFactory->CreateMessage(part->GetData(),
part->GetSize(),
[](void* /*data*/, void* part) { delete static_cast<FairMQMessage*>(part); },
part.get()
));
part.release();
part = move(partWrapper);
}
}
else
{
return true;
}
return match;
}

View File

@ -313,8 +313,8 @@ class FairMQChannel
std::shared_ptr<FairMQTransportFactory> fTransportFactory;
bool CheckCompatibility(std::unique_ptr<FairMQMessage>& msg) const;
bool CheckCompatibility(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) const;
void CheckCompatibility(std::unique_ptr<FairMQMessage>& msg) const;
void CheckCompatibility(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) const;
void InitTransport(std::shared_ptr<FairMQTransportFactory> factory);

View File

@ -148,7 +148,8 @@ void FairMQDevice::InitWrapper()
else
{
LOG(error) << "Cannot update configuration. Socket method (bind/connect) not specified.";
throw runtime_error("Cannot update configuration. Socket method (bind/connect) not specified.");
ChangeState(ERROR_FOUND);
// throw runtime_error("Cannot update configuration. Socket method (bind/connect) not specified.");
}
// }
}
@ -161,7 +162,8 @@ void FairMQDevice::InitWrapper()
if (uninitializedBindingChannels.size() > 0)
{
LOG(error) << uninitializedBindingChannels.size() << " of the binding channels could not initialize. Initial configuration incomplete.";
throw runtime_error(fair::mq::tools::ToString(uninitializedBindingChannels.size(), " of the binding channels could not initialize. Initial configuration incomplete."));
ChangeState(ERROR_FOUND);
// throw runtime_error(fair::mq::tools::ToString(uninitializedBindingChannels.size(), " of the binding channels could not initialize. Initial configuration incomplete."));
}
CallStateChangeCallbacks(INITIALIZING_DEVICE);
@ -200,7 +202,8 @@ void FairMQDevice::InitWrapper()
if (numAttempts++ > maxAttempts)
{
LOG(error) << "could not connect all channels after " << fInitializationTimeoutInS << " attempts";
throw runtime_error(fair::mq::tools::ToString("could not connect all channels after ", fInitializationTimeoutInS, " attempts"));
ChangeState(ERROR_FOUND);
// throw runtime_error(fair::mq::tools::ToString("could not connect all channels after ", fInitializationTimeoutInS, " attempts"));
}
AttachChannels(uninitializedConnectingChannels);
@ -271,7 +274,15 @@ bool FairMQDevice::AttachChannel(FairMQChannel& ch)
//(re-)init socket
if (!ch.fSocket)
{
ch.fSocket = ch.fTransportFactory->CreateSocket(ch.fType, ch.fName);
try
{
ch.fSocket = ch.fTransportFactory->CreateSocket(ch.fType, ch.fName);
}
catch (fair::mq::SocketError& se)
{
LOG(ERROR) << se.what();
return false;
}
}
// set high water marks

View File

@ -208,7 +208,7 @@ class FairMQDevice : public FairMQStateMachine
template<typename... Args>
FairMQMessagePtr NewMessageFor(const std::string& channel, int index, Args&&... args) const
{
return fChannels.at(channel).at(index).Transport()->CreateMessage(std::forward<Args>(args)...);
return fChannels.at(channel).at(index).NewMessage(std::forward<Args>(args)...);
}
template<typename T>

View File

@ -24,7 +24,7 @@
#include <boost/msm/back/metafunctions.hpp>
#include <boost/msm/front/state_machine_def.hpp>
#include <boost/msm/front/functor_row.hpp>
#include <boost/core/demangle.hpp>
#include <boost/signals2.hpp> // signal/slot for onStateChange callbacks
#include <atomic>
@ -386,21 +386,18 @@ struct Machine_ : public msmf::state_machine_def<Machine_>
boost::mpl::for_each<all_states, boost::msm::wrap<boost::mpl::placeholders::_1>>(boost::msm::back::get_state_name<recursive_stt>(stateName, state));
stateName = stateName.substr(24);
size_t pos = stateName.find("_FSME");
stateName.erase(pos);
if (stateName == "1RUNNING" || stateName == "6DEVICE_READY" || stateName == "0PAUSED" || stateName == "8RESETTING_TASK" || stateName == "0RESETTING_DEVICE")
stateName = boost::core::demangle(stateName.c_str());
size_t pos = stateName.rfind(":");
if (pos != string::npos)
{
stateName = stateName.substr(1);
stateName = stateName.substr(pos + 1);
stateName = stateName.substr(0, stateName.size() - 4);
}
if (stateName != "OK")
{
LOG(state) << "No transition from state " << stateName << " on event " << e.name();
}
// LOG(state) << "no transition from state " << GetStateName(state) << " (" << state << ") on event " << e.name();
}
static string GetStateName(const int state)

View File

@ -57,7 +57,7 @@ void FairMQBenchmarkSampler::Run()
// store the channel reference to avoid traversing the map on every loop iteration
FairMQChannel& dataOutChannel = fChannels.at(fOutChannelName).at(0);
FairMQMessagePtr baseMsg(dataOutChannel.Transport()->CreateMessage(fMsgSize));
FairMQMessagePtr baseMsg(dataOutChannel.NewMessage(fMsgSize));
LOG(info) << "Starting the benchmark with message size of " << fMsgSize << " and " << fMaxIterations << " iterations.";
auto tStart = chrono::high_resolution_clock::now();
@ -66,7 +66,7 @@ void FairMQBenchmarkSampler::Run()
{
if (fSameMessage)
{
FairMQMessagePtr msg(dataOutChannel.Transport()->CreateMessage());
FairMQMessagePtr msg(dataOutChannel.NewMessage());
msg->Copy(*baseMsg);
if (dataOutChannel.Send(msg) >= 0)
@ -83,7 +83,7 @@ void FairMQBenchmarkSampler::Run()
}
else
{
FairMQMessagePtr msg(dataOutChannel.Transport()->CreateMessage(fMsgSize));
FairMQMessagePtr msg(dataOutChannel.NewMessage(fMsgSize));
if (dataOutChannel.Send(msg) >= 0)
{

View File

@ -56,7 +56,7 @@ class FairMQSink : public FairMQDevice//, public OutputPolicy
while (CheckCurrentState(RUNNING))
{
FairMQMessagePtr msg(dataInChannel.Transport()->CreateMessage());
FairMQMessagePtr msg(dataInChannel.NewMessage());
if (dataInChannel.Receive(msg) >= 0)
{

View File

@ -15,8 +15,8 @@ void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("out-channel", bpo::value<std::string>()->default_value("data"), "Name of the output channel")
("same-msg", bpo::value<bool>()->default_value(true), "Re-send the same message (default), or recreate for each iteration")
("msg-size", bpo::value<int>()->default_value(1000), "Message size in bytes")
("same-msg", bpo::value<bool>()->default_value(false), "Re-send the same message, or recreate for each iteration")
("msg-size", bpo::value<int>()->default_value(1000000), "Message size in bytes")
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Number of run iterations (0 - infinite)")
("msg-rate", bpo::value<int>()->default_value(0), "Msg rate limit in maximum number of messages per second");
}

View File

@ -72,7 +72,7 @@ SAMPLER+=" --msg-size $msgSize"
SAMPLER+=" --same-msg $sameMsg"
# SAMPLER+=" --msg-rate 1000"
SAMPLER+=" --max-iterations $maxIterations"
SAMPLER+=" --channel-config name=data,type=push,method=bind,address=tcp://127.0.01:5555"
SAMPLER+=" --channel-config name=data,type=push,method=bind,address=tcp://127.0.0.1:5555"
xterm -geometry 90x50+0+0 -hold -e $affinitySamp @CMAKE_CURRENT_BINARY_DIR@/$SAMPLER &
echo ""
echo "started: xterm -geometry 90x50+0+0 -hold -e $affinitySamp @CMAKE_CURRENT_BINARY_DIR@/$SAMPLER"
@ -85,7 +85,7 @@ SINK+=" --id sink1"
SINK+=" --transport $transport"
SINK+=" --severity debug"
SINK+=" --max-iterations $maxIterations"
SINK+=" --channel-config name=data,type=pull,method=connect,address=tcp://127.0.01:5555"
SINK+=" --channel-config name=data,type=pull,method=connect,address=tcp://127.0.0.1:5555"
xterm -geometry 90x50+550+0 -hold -e $affinitySink @CMAKE_CURRENT_BINARY_DIR@/$SINK &
echo ""
echo "started: xterm -geometry 90x50+550+0 -hold -e $affinitySink @CMAKE_CURRENT_BINARY_DIR@/$SINK"

View File

@ -14,6 +14,8 @@
#include <zmq.h>
#include <stdexcept>
using namespace std;
using namespace fair::mq::shmem;
@ -62,12 +64,18 @@ FairMQSocketSHM::FairMQSocketSHM(Manager& manager, const string& type, const str
LOG(error) << "Failed setting ZMQ_RCVTIMEO socket option, reason: " << zmq_strerror(errno);
}
if (type == "sub")
// if (type == "sub")
// {
// if (zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, nullptr, 0) != 0)
// {
// LOG(error) << "Failed setting ZMQ_SUBSCRIBE socket option, reason: " << zmq_strerror(errno);
// }
// }
if (type == "sub" || type == "pub")
{
if (zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, nullptr, 0) != 0)
{
LOG(error) << "Failed setting ZMQ_SUBSCRIBE socket option, reason: " << zmq_strerror(errno);
}
LOG(error) << "PUB/SUB socket type is not supported for shared memory transport";
throw fair::mq::SocketError("PUB/SUB socket type is not supported for shared memory transport");
}
// LOG(info) << "created socket " << fId;