Avoid copy (where possible) when switching transports

This commit is contained in:
Alexey Rybalchenko 2018-06-06 18:10:42 +02:00
parent 2894af803b
commit ed0ad135fc
10 changed files with 66 additions and 59 deletions

View File

@ -748,44 +748,35 @@ unsigned long FairMQChannel::GetMessagesRx() const
return fSocket->GetMessagesRx(); return fSocket->GetMessagesRx();
} }
bool FairMQChannel::CheckCompatibility(unique_ptr<FairMQMessage>& msg) const void FairMQChannel::CheckCompatibility(unique_ptr<FairMQMessage>& msg) const
{ {
if (fTransportType == msg->GetType()) if (fTransportType != msg->GetType())
{ {
return true; // LOG(debug) << "Channel type does not match message type. Creating wrapper";
} FairMQMessagePtr msgWrapper(fTransportFactory->CreateMessage(msg->GetData(),
else msg->GetSize(),
{ [](void* /*data*/, void* msg) { delete static_cast<FairMQMessage*>(msg); },
// LOG(warn) << "Channel type does not match message type. Copying..."; msg.get()
FairMQMessagePtr msgCopy(fTransportFactory->CreateMessage(msg->GetSize())); ));
memcpy(msgCopy->GetData(), msg->GetData(), msg->GetSize()); msg.release();
msg = move(msgCopy); msg = move(msgWrapper);
return false;
} }
} }
bool FairMQChannel::CheckCompatibility(vector<unique_ptr<FairMQMessage>>& msgVec) const void FairMQChannel::CheckCompatibility(vector<FairMQMessagePtr>& msgVec) const
{ {
bool match = true; for (auto& part : msgVec)
if (msgVec.size() > 0)
{ {
for (unsigned int i = 0; i < msgVec.size(); ++i) if (fTransportType != part->GetType())
{ {
if (fTransportType != msgVec.at(i)->GetType()) // LOG(debug) << "Channel type does not match message type. Creating wrapper";
{ FairMQMessagePtr partWrapper(fTransportFactory->CreateMessage(part->GetData(),
// LOG(warn) << "Channel type does not match message type. Copying..."; part->GetSize(),
FairMQMessagePtr newMsg(fTransportFactory->CreateMessage(msgVec[i]->GetSize())); [](void* /*data*/, void* part) { delete static_cast<FairMQMessage*>(part); },
memcpy(newMsg->GetData(), msgVec[i]->GetData(), msgVec[i]->GetSize()); part.get()
msgVec[i] = move(newMsg); ));
match = false; part.release();
} part = move(partWrapper);
} }
} }
else
{
return true;
}
return match;
} }

View File

@ -313,8 +313,8 @@ class FairMQChannel
std::shared_ptr<FairMQTransportFactory> fTransportFactory; std::shared_ptr<FairMQTransportFactory> fTransportFactory;
bool CheckCompatibility(std::unique_ptr<FairMQMessage>& msg) const; void CheckCompatibility(std::unique_ptr<FairMQMessage>& msg) const;
bool CheckCompatibility(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) const; void CheckCompatibility(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) const;
void InitTransport(std::shared_ptr<FairMQTransportFactory> factory); void InitTransport(std::shared_ptr<FairMQTransportFactory> factory);

View File

@ -148,7 +148,8 @@ void FairMQDevice::InitWrapper()
else else
{ {
LOG(error) << "Cannot update configuration. Socket method (bind/connect) not specified."; LOG(error) << "Cannot update configuration. Socket method (bind/connect) not specified.";
throw runtime_error("Cannot update configuration. Socket method (bind/connect) not specified."); ChangeState(ERROR_FOUND);
// throw runtime_error("Cannot update configuration. Socket method (bind/connect) not specified.");
} }
// } // }
} }
@ -161,7 +162,8 @@ void FairMQDevice::InitWrapper()
if (uninitializedBindingChannels.size() > 0) if (uninitializedBindingChannels.size() > 0)
{ {
LOG(error) << uninitializedBindingChannels.size() << " of the binding channels could not initialize. Initial configuration incomplete."; LOG(error) << uninitializedBindingChannels.size() << " of the binding channels could not initialize. Initial configuration incomplete.";
throw runtime_error(fair::mq::tools::ToString(uninitializedBindingChannels.size(), " of the binding channels could not initialize. Initial configuration incomplete.")); ChangeState(ERROR_FOUND);
// throw runtime_error(fair::mq::tools::ToString(uninitializedBindingChannels.size(), " of the binding channels could not initialize. Initial configuration incomplete."));
} }
CallStateChangeCallbacks(INITIALIZING_DEVICE); CallStateChangeCallbacks(INITIALIZING_DEVICE);
@ -200,7 +202,8 @@ void FairMQDevice::InitWrapper()
if (numAttempts++ > maxAttempts) if (numAttempts++ > maxAttempts)
{ {
LOG(error) << "could not connect all channels after " << fInitializationTimeoutInS << " attempts"; LOG(error) << "could not connect all channels after " << fInitializationTimeoutInS << " attempts";
throw runtime_error(fair::mq::tools::ToString("could not connect all channels after ", fInitializationTimeoutInS, " attempts")); ChangeState(ERROR_FOUND);
// throw runtime_error(fair::mq::tools::ToString("could not connect all channels after ", fInitializationTimeoutInS, " attempts"));
} }
AttachChannels(uninitializedConnectingChannels); AttachChannels(uninitializedConnectingChannels);
@ -271,7 +274,15 @@ bool FairMQDevice::AttachChannel(FairMQChannel& ch)
//(re-)init socket //(re-)init socket
if (!ch.fSocket) if (!ch.fSocket)
{ {
ch.fSocket = ch.fTransportFactory->CreateSocket(ch.fType, ch.fName); try
{
ch.fSocket = ch.fTransportFactory->CreateSocket(ch.fType, ch.fName);
}
catch (fair::mq::SocketError& se)
{
LOG(ERROR) << se.what();
return false;
}
} }
// set high water marks // set high water marks

View File

@ -208,7 +208,7 @@ class FairMQDevice : public FairMQStateMachine
template<typename... Args> template<typename... Args>
FairMQMessagePtr NewMessageFor(const std::string& channel, int index, Args&&... args) const FairMQMessagePtr NewMessageFor(const std::string& channel, int index, Args&&... args) const
{ {
return fChannels.at(channel).at(index).Transport()->CreateMessage(std::forward<Args>(args)...); return fChannels.at(channel).at(index).NewMessage(std::forward<Args>(args)...);
} }
template<typename T> template<typename T>

View File

@ -24,7 +24,7 @@
#include <boost/msm/back/metafunctions.hpp> #include <boost/msm/back/metafunctions.hpp>
#include <boost/msm/front/state_machine_def.hpp> #include <boost/msm/front/state_machine_def.hpp>
#include <boost/msm/front/functor_row.hpp> #include <boost/msm/front/functor_row.hpp>
#include <boost/core/demangle.hpp>
#include <boost/signals2.hpp> // signal/slot for onStateChange callbacks #include <boost/signals2.hpp> // signal/slot for onStateChange callbacks
#include <atomic> #include <atomic>
@ -386,21 +386,18 @@ struct Machine_ : public msmf::state_machine_def<Machine_>
boost::mpl::for_each<all_states, boost::msm::wrap<boost::mpl::placeholders::_1>>(boost::msm::back::get_state_name<recursive_stt>(stateName, state)); boost::mpl::for_each<all_states, boost::msm::wrap<boost::mpl::placeholders::_1>>(boost::msm::back::get_state_name<recursive_stt>(stateName, state));
stateName = stateName.substr(24); stateName = boost::core::demangle(stateName.c_str());
size_t pos = stateName.find("_FSME"); size_t pos = stateName.rfind(":");
stateName.erase(pos); if (pos != string::npos)
if (stateName == "1RUNNING" || stateName == "6DEVICE_READY" || stateName == "0PAUSED" || stateName == "8RESETTING_TASK" || stateName == "0RESETTING_DEVICE")
{ {
stateName = stateName.substr(1); stateName = stateName.substr(pos + 1);
stateName = stateName.substr(0, stateName.size() - 4);
} }
if (stateName != "OK") if (stateName != "OK")
{ {
LOG(state) << "No transition from state " << stateName << " on event " << e.name(); LOG(state) << "No transition from state " << stateName << " on event " << e.name();
} }
// LOG(state) << "no transition from state " << GetStateName(state) << " (" << state << ") on event " << e.name();
} }
static string GetStateName(const int state) static string GetStateName(const int state)

View File

@ -57,7 +57,7 @@ void FairMQBenchmarkSampler::Run()
// store the channel reference to avoid traversing the map on every loop iteration // store the channel reference to avoid traversing the map on every loop iteration
FairMQChannel& dataOutChannel = fChannels.at(fOutChannelName).at(0); FairMQChannel& dataOutChannel = fChannels.at(fOutChannelName).at(0);
FairMQMessagePtr baseMsg(dataOutChannel.Transport()->CreateMessage(fMsgSize)); FairMQMessagePtr baseMsg(dataOutChannel.NewMessage(fMsgSize));
LOG(info) << "Starting the benchmark with message size of " << fMsgSize << " and " << fMaxIterations << " iterations."; LOG(info) << "Starting the benchmark with message size of " << fMsgSize << " and " << fMaxIterations << " iterations.";
auto tStart = chrono::high_resolution_clock::now(); auto tStart = chrono::high_resolution_clock::now();
@ -66,7 +66,7 @@ void FairMQBenchmarkSampler::Run()
{ {
if (fSameMessage) if (fSameMessage)
{ {
FairMQMessagePtr msg(dataOutChannel.Transport()->CreateMessage()); FairMQMessagePtr msg(dataOutChannel.NewMessage());
msg->Copy(*baseMsg); msg->Copy(*baseMsg);
if (dataOutChannel.Send(msg) >= 0) if (dataOutChannel.Send(msg) >= 0)
@ -83,7 +83,7 @@ void FairMQBenchmarkSampler::Run()
} }
else else
{ {
FairMQMessagePtr msg(dataOutChannel.Transport()->CreateMessage(fMsgSize)); FairMQMessagePtr msg(dataOutChannel.NewMessage(fMsgSize));
if (dataOutChannel.Send(msg) >= 0) if (dataOutChannel.Send(msg) >= 0)
{ {

View File

@ -56,7 +56,7 @@ class FairMQSink : public FairMQDevice//, public OutputPolicy
while (CheckCurrentState(RUNNING)) while (CheckCurrentState(RUNNING))
{ {
FairMQMessagePtr msg(dataInChannel.Transport()->CreateMessage()); FairMQMessagePtr msg(dataInChannel.NewMessage());
if (dataInChannel.Receive(msg) >= 0) if (dataInChannel.Receive(msg) >= 0)
{ {

View File

@ -15,8 +15,8 @@ void addCustomOptions(bpo::options_description& options)
{ {
options.add_options() options.add_options()
("out-channel", bpo::value<std::string>()->default_value("data"), "Name of the output channel") ("out-channel", bpo::value<std::string>()->default_value("data"), "Name of the output channel")
("same-msg", bpo::value<bool>()->default_value(true), "Re-send the same message (default), or recreate for each iteration") ("same-msg", bpo::value<bool>()->default_value(false), "Re-send the same message, or recreate for each iteration")
("msg-size", bpo::value<int>()->default_value(1000), "Message size in bytes") ("msg-size", bpo::value<int>()->default_value(1000000), "Message size in bytes")
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Number of run iterations (0 - infinite)") ("max-iterations", bpo::value<uint64_t>()->default_value(0), "Number of run iterations (0 - infinite)")
("msg-rate", bpo::value<int>()->default_value(0), "Msg rate limit in maximum number of messages per second"); ("msg-rate", bpo::value<int>()->default_value(0), "Msg rate limit in maximum number of messages per second");
} }

View File

@ -72,7 +72,7 @@ SAMPLER+=" --msg-size $msgSize"
SAMPLER+=" --same-msg $sameMsg" SAMPLER+=" --same-msg $sameMsg"
# SAMPLER+=" --msg-rate 1000" # SAMPLER+=" --msg-rate 1000"
SAMPLER+=" --max-iterations $maxIterations" SAMPLER+=" --max-iterations $maxIterations"
SAMPLER+=" --channel-config name=data,type=push,method=bind,address=tcp://127.0.01:5555" SAMPLER+=" --channel-config name=data,type=push,method=bind,address=tcp://127.0.0.1:5555"
xterm -geometry 90x50+0+0 -hold -e $affinitySamp @CMAKE_CURRENT_BINARY_DIR@/$SAMPLER & xterm -geometry 90x50+0+0 -hold -e $affinitySamp @CMAKE_CURRENT_BINARY_DIR@/$SAMPLER &
echo "" echo ""
echo "started: xterm -geometry 90x50+0+0 -hold -e $affinitySamp @CMAKE_CURRENT_BINARY_DIR@/$SAMPLER" echo "started: xterm -geometry 90x50+0+0 -hold -e $affinitySamp @CMAKE_CURRENT_BINARY_DIR@/$SAMPLER"
@ -85,7 +85,7 @@ SINK+=" --id sink1"
SINK+=" --transport $transport" SINK+=" --transport $transport"
SINK+=" --severity debug" SINK+=" --severity debug"
SINK+=" --max-iterations $maxIterations" SINK+=" --max-iterations $maxIterations"
SINK+=" --channel-config name=data,type=pull,method=connect,address=tcp://127.0.01:5555" SINK+=" --channel-config name=data,type=pull,method=connect,address=tcp://127.0.0.1:5555"
xterm -geometry 90x50+550+0 -hold -e $affinitySink @CMAKE_CURRENT_BINARY_DIR@/$SINK & xterm -geometry 90x50+550+0 -hold -e $affinitySink @CMAKE_CURRENT_BINARY_DIR@/$SINK &
echo "" echo ""
echo "started: xterm -geometry 90x50+550+0 -hold -e $affinitySink @CMAKE_CURRENT_BINARY_DIR@/$SINK" echo "started: xterm -geometry 90x50+550+0 -hold -e $affinitySink @CMAKE_CURRENT_BINARY_DIR@/$SINK"

View File

@ -14,6 +14,8 @@
#include <zmq.h> #include <zmq.h>
#include <stdexcept>
using namespace std; using namespace std;
using namespace fair::mq::shmem; using namespace fair::mq::shmem;
@ -62,12 +64,18 @@ FairMQSocketSHM::FairMQSocketSHM(Manager& manager, const string& type, const str
LOG(error) << "Failed setting ZMQ_RCVTIMEO socket option, reason: " << zmq_strerror(errno); LOG(error) << "Failed setting ZMQ_RCVTIMEO socket option, reason: " << zmq_strerror(errno);
} }
if (type == "sub") // if (type == "sub")
// {
// if (zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, nullptr, 0) != 0)
// {
// LOG(error) << "Failed setting ZMQ_SUBSCRIBE socket option, reason: " << zmq_strerror(errno);
// }
// }
if (type == "sub" || type == "pub")
{ {
if (zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, nullptr, 0) != 0) LOG(error) << "PUB/SUB socket type is not supported for shared memory transport";
{ throw fair::mq::SocketError("PUB/SUB socket type is not supported for shared memory transport");
LOG(error) << "Failed setting ZMQ_SUBSCRIBE socket option, reason: " << zmq_strerror(errno);
}
} }
// LOG(info) << "created socket " << fId; // LOG(info) << "created socket " << fId;