From ed0ad135fc28254fe480bc1483819d972a96956b Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Wed, 6 Jun 2018 18:10:42 +0200 Subject: [PATCH] Avoid copy (where possible) when switching transports --- fairmq/FairMQChannel.cxx | 51 ++++++++++------------- fairmq/FairMQChannel.h | 4 +- fairmq/FairMQDevice.cxx | 19 +++++++-- fairmq/FairMQDevice.h | 2 +- fairmq/FairMQStateMachine.cxx | 15 +++---- fairmq/devices/FairMQBenchmarkSampler.cxx | 6 +-- fairmq/devices/FairMQSink.h | 2 +- fairmq/run/runBenchmarkSampler.cxx | 4 +- fairmq/run/startMQBenchmark.sh.in | 4 +- fairmq/shmem/FairMQSocketSHM.cxx | 18 +++++--- 10 files changed, 66 insertions(+), 59 deletions(-) diff --git a/fairmq/FairMQChannel.cxx b/fairmq/FairMQChannel.cxx index 6bdc0b9f..84466001 100644 --- a/fairmq/FairMQChannel.cxx +++ b/fairmq/FairMQChannel.cxx @@ -748,44 +748,35 @@ unsigned long FairMQChannel::GetMessagesRx() const return fSocket->GetMessagesRx(); } -bool FairMQChannel::CheckCompatibility(unique_ptr& msg) const +void FairMQChannel::CheckCompatibility(unique_ptr& msg) const { - if (fTransportType == msg->GetType()) + if (fTransportType != msg->GetType()) { - return true; - } - else - { - // LOG(warn) << "Channel type does not match message type. Copying..."; - FairMQMessagePtr msgCopy(fTransportFactory->CreateMessage(msg->GetSize())); - memcpy(msgCopy->GetData(), msg->GetData(), msg->GetSize()); - msg = move(msgCopy); - return false; + // LOG(debug) << "Channel type does not match message type. Creating wrapper"; + FairMQMessagePtr msgWrapper(fTransportFactory->CreateMessage(msg->GetData(), + msg->GetSize(), + [](void* /*data*/, void* msg) { delete static_cast(msg); }, + msg.get() + )); + msg.release(); + msg = move(msgWrapper); } } -bool FairMQChannel::CheckCompatibility(vector>& msgVec) const +void FairMQChannel::CheckCompatibility(vector& msgVec) const { - bool match = true; - - if (msgVec.size() > 0) + for (auto& part : msgVec) { - for (unsigned int i = 0; i < msgVec.size(); ++i) + if (fTransportType != part->GetType()) { - if (fTransportType != msgVec.at(i)->GetType()) - { - // LOG(warn) << "Channel type does not match message type. Copying..."; - FairMQMessagePtr newMsg(fTransportFactory->CreateMessage(msgVec[i]->GetSize())); - memcpy(newMsg->GetData(), msgVec[i]->GetData(), msgVec[i]->GetSize()); - msgVec[i] = move(newMsg); - match = false; - } + // LOG(debug) << "Channel type does not match message type. Creating wrapper"; + FairMQMessagePtr partWrapper(fTransportFactory->CreateMessage(part->GetData(), + part->GetSize(), + [](void* /*data*/, void* part) { delete static_cast(part); }, + part.get() + )); + part.release(); + part = move(partWrapper); } } - else - { - return true; - } - - return match; } diff --git a/fairmq/FairMQChannel.h b/fairmq/FairMQChannel.h index 66f77b18..f0d9aa24 100644 --- a/fairmq/FairMQChannel.h +++ b/fairmq/FairMQChannel.h @@ -313,8 +313,8 @@ class FairMQChannel std::shared_ptr fTransportFactory; - bool CheckCompatibility(std::unique_ptr& msg) const; - bool CheckCompatibility(std::vector>& msgVec) const; + void CheckCompatibility(std::unique_ptr& msg) const; + void CheckCompatibility(std::vector>& msgVec) const; void InitTransport(std::shared_ptr factory); diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index 42d8c489..bba62ab9 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -148,7 +148,8 @@ void FairMQDevice::InitWrapper() else { LOG(error) << "Cannot update configuration. Socket method (bind/connect) not specified."; - throw runtime_error("Cannot update configuration. Socket method (bind/connect) not specified."); + ChangeState(ERROR_FOUND); + // throw runtime_error("Cannot update configuration. Socket method (bind/connect) not specified."); } // } } @@ -161,7 +162,8 @@ void FairMQDevice::InitWrapper() if (uninitializedBindingChannels.size() > 0) { LOG(error) << uninitializedBindingChannels.size() << " of the binding channels could not initialize. Initial configuration incomplete."; - throw runtime_error(fair::mq::tools::ToString(uninitializedBindingChannels.size(), " of the binding channels could not initialize. Initial configuration incomplete.")); + ChangeState(ERROR_FOUND); + // throw runtime_error(fair::mq::tools::ToString(uninitializedBindingChannels.size(), " of the binding channels could not initialize. Initial configuration incomplete.")); } CallStateChangeCallbacks(INITIALIZING_DEVICE); @@ -200,7 +202,8 @@ void FairMQDevice::InitWrapper() if (numAttempts++ > maxAttempts) { LOG(error) << "could not connect all channels after " << fInitializationTimeoutInS << " attempts"; - throw runtime_error(fair::mq::tools::ToString("could not connect all channels after ", fInitializationTimeoutInS, " attempts")); + ChangeState(ERROR_FOUND); + // throw runtime_error(fair::mq::tools::ToString("could not connect all channels after ", fInitializationTimeoutInS, " attempts")); } AttachChannels(uninitializedConnectingChannels); @@ -271,7 +274,15 @@ bool FairMQDevice::AttachChannel(FairMQChannel& ch) //(re-)init socket if (!ch.fSocket) { - ch.fSocket = ch.fTransportFactory->CreateSocket(ch.fType, ch.fName); + try + { + ch.fSocket = ch.fTransportFactory->CreateSocket(ch.fType, ch.fName); + } + catch (fair::mq::SocketError& se) + { + LOG(ERROR) << se.what(); + return false; + } } // set high water marks diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index 0579423e..071cab02 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -208,7 +208,7 @@ class FairMQDevice : public FairMQStateMachine template FairMQMessagePtr NewMessageFor(const std::string& channel, int index, Args&&... args) const { - return fChannels.at(channel).at(index).Transport()->CreateMessage(std::forward(args)...); + return fChannels.at(channel).at(index).NewMessage(std::forward(args)...); } template diff --git a/fairmq/FairMQStateMachine.cxx b/fairmq/FairMQStateMachine.cxx index 6979b149..c1f16a3b 100644 --- a/fairmq/FairMQStateMachine.cxx +++ b/fairmq/FairMQStateMachine.cxx @@ -24,7 +24,7 @@ #include #include #include - +#include #include // signal/slot for onStateChange callbacks #include @@ -386,21 +386,18 @@ struct Machine_ : public msmf::state_machine_def boost::mpl::for_each>(boost::msm::back::get_state_name(stateName, state)); - stateName = stateName.substr(24); - size_t pos = stateName.find("_FSME"); - stateName.erase(pos); - - if (stateName == "1RUNNING" || stateName == "6DEVICE_READY" || stateName == "0PAUSED" || stateName == "8RESETTING_TASK" || stateName == "0RESETTING_DEVICE") + stateName = boost::core::demangle(stateName.c_str()); + size_t pos = stateName.rfind(":"); + if (pos != string::npos) { - stateName = stateName.substr(1); + stateName = stateName.substr(pos + 1); + stateName = stateName.substr(0, stateName.size() - 4); } if (stateName != "OK") { LOG(state) << "No transition from state " << stateName << " on event " << e.name(); } - - // LOG(state) << "no transition from state " << GetStateName(state) << " (" << state << ") on event " << e.name(); } static string GetStateName(const int state) diff --git a/fairmq/devices/FairMQBenchmarkSampler.cxx b/fairmq/devices/FairMQBenchmarkSampler.cxx index b2e13a84..74a3610b 100644 --- a/fairmq/devices/FairMQBenchmarkSampler.cxx +++ b/fairmq/devices/FairMQBenchmarkSampler.cxx @@ -57,7 +57,7 @@ void FairMQBenchmarkSampler::Run() // store the channel reference to avoid traversing the map on every loop iteration FairMQChannel& dataOutChannel = fChannels.at(fOutChannelName).at(0); - FairMQMessagePtr baseMsg(dataOutChannel.Transport()->CreateMessage(fMsgSize)); + FairMQMessagePtr baseMsg(dataOutChannel.NewMessage(fMsgSize)); LOG(info) << "Starting the benchmark with message size of " << fMsgSize << " and " << fMaxIterations << " iterations."; auto tStart = chrono::high_resolution_clock::now(); @@ -66,7 +66,7 @@ void FairMQBenchmarkSampler::Run() { if (fSameMessage) { - FairMQMessagePtr msg(dataOutChannel.Transport()->CreateMessage()); + FairMQMessagePtr msg(dataOutChannel.NewMessage()); msg->Copy(*baseMsg); if (dataOutChannel.Send(msg) >= 0) @@ -83,7 +83,7 @@ void FairMQBenchmarkSampler::Run() } else { - FairMQMessagePtr msg(dataOutChannel.Transport()->CreateMessage(fMsgSize)); + FairMQMessagePtr msg(dataOutChannel.NewMessage(fMsgSize)); if (dataOutChannel.Send(msg) >= 0) { diff --git a/fairmq/devices/FairMQSink.h b/fairmq/devices/FairMQSink.h index 6746bf07..f35c466a 100644 --- a/fairmq/devices/FairMQSink.h +++ b/fairmq/devices/FairMQSink.h @@ -56,7 +56,7 @@ class FairMQSink : public FairMQDevice//, public OutputPolicy while (CheckCurrentState(RUNNING)) { - FairMQMessagePtr msg(dataInChannel.Transport()->CreateMessage()); + FairMQMessagePtr msg(dataInChannel.NewMessage()); if (dataInChannel.Receive(msg) >= 0) { diff --git a/fairmq/run/runBenchmarkSampler.cxx b/fairmq/run/runBenchmarkSampler.cxx index 56f15773..8703aff0 100644 --- a/fairmq/run/runBenchmarkSampler.cxx +++ b/fairmq/run/runBenchmarkSampler.cxx @@ -15,8 +15,8 @@ void addCustomOptions(bpo::options_description& options) { options.add_options() ("out-channel", bpo::value()->default_value("data"), "Name of the output channel") - ("same-msg", bpo::value()->default_value(true), "Re-send the same message (default), or recreate for each iteration") - ("msg-size", bpo::value()->default_value(1000), "Message size in bytes") + ("same-msg", bpo::value()->default_value(false), "Re-send the same message, or recreate for each iteration") + ("msg-size", bpo::value()->default_value(1000000), "Message size in bytes") ("max-iterations", bpo::value()->default_value(0), "Number of run iterations (0 - infinite)") ("msg-rate", bpo::value()->default_value(0), "Msg rate limit in maximum number of messages per second"); } diff --git a/fairmq/run/startMQBenchmark.sh.in b/fairmq/run/startMQBenchmark.sh.in index a21f625c..86609931 100755 --- a/fairmq/run/startMQBenchmark.sh.in +++ b/fairmq/run/startMQBenchmark.sh.in @@ -72,7 +72,7 @@ SAMPLER+=" --msg-size $msgSize" SAMPLER+=" --same-msg $sameMsg" # SAMPLER+=" --msg-rate 1000" SAMPLER+=" --max-iterations $maxIterations" -SAMPLER+=" --channel-config name=data,type=push,method=bind,address=tcp://127.0.01:5555" +SAMPLER+=" --channel-config name=data,type=push,method=bind,address=tcp://127.0.0.1:5555" xterm -geometry 90x50+0+0 -hold -e $affinitySamp @CMAKE_CURRENT_BINARY_DIR@/$SAMPLER & echo "" echo "started: xterm -geometry 90x50+0+0 -hold -e $affinitySamp @CMAKE_CURRENT_BINARY_DIR@/$SAMPLER" @@ -85,7 +85,7 @@ SINK+=" --id sink1" SINK+=" --transport $transport" SINK+=" --severity debug" SINK+=" --max-iterations $maxIterations" -SINK+=" --channel-config name=data,type=pull,method=connect,address=tcp://127.0.01:5555" +SINK+=" --channel-config name=data,type=pull,method=connect,address=tcp://127.0.0.1:5555" xterm -geometry 90x50+550+0 -hold -e $affinitySink @CMAKE_CURRENT_BINARY_DIR@/$SINK & echo "" echo "started: xterm -geometry 90x50+550+0 -hold -e $affinitySink @CMAKE_CURRENT_BINARY_DIR@/$SINK" diff --git a/fairmq/shmem/FairMQSocketSHM.cxx b/fairmq/shmem/FairMQSocketSHM.cxx index d016a06a..fd7d449d 100644 --- a/fairmq/shmem/FairMQSocketSHM.cxx +++ b/fairmq/shmem/FairMQSocketSHM.cxx @@ -14,6 +14,8 @@ #include +#include + using namespace std; using namespace fair::mq::shmem; @@ -62,12 +64,18 @@ FairMQSocketSHM::FairMQSocketSHM(Manager& manager, const string& type, const str LOG(error) << "Failed setting ZMQ_RCVTIMEO socket option, reason: " << zmq_strerror(errno); } - if (type == "sub") + // if (type == "sub") + // { + // if (zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, nullptr, 0) != 0) + // { + // LOG(error) << "Failed setting ZMQ_SUBSCRIBE socket option, reason: " << zmq_strerror(errno); + // } + // } + + if (type == "sub" || type == "pub") { - if (zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, nullptr, 0) != 0) - { - LOG(error) << "Failed setting ZMQ_SUBSCRIBE socket option, reason: " << zmq_strerror(errno); - } + LOG(error) << "PUB/SUB socket type is not supported for shared memory transport"; + throw fair::mq::SocketError("PUB/SUB socket type is not supported for shared memory transport"); } // LOG(info) << "created socket " << fId;