diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index 93098f39..d1585d10 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -79,25 +79,6 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable return fChannels.at(chan).at(i).Send(msg); } - template - inline int Send(std::unique_ptr& msg, DataType&& data, const std::string& chan, const int i = 0) const - { - - Serializer().serialize_impl(msg,std::forward(data)); - auto nbytes = fChannels.at(chan).at(i).Send(msg); - return nbytes; - } - - template - inline int Send(DataType&& data, const std::string& chan, const int i = 0) const - { - std::unique_ptr msg(NewMessage()); - Serializer().serialize_impl(msg,std::forward(data)); - auto nbytes = fChannels.at(chan).at(i).Send(msg); - return nbytes; - } - -//* template void Serialize(FairMQMessage& msg, DataType&& data, Args&&... args) const { @@ -110,24 +91,6 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable Deserializer().Deserialize(msg,std::forward(data),std::forward(args)...); } - -/* -// temporary overload to handle the case of a return ref to FairMQMessage - template - void Serialize(MessageType&& msg, DataType&& data) const - { - Serializer().Serialize(std::forward(msg),std::forward(data)); - } - - template - void Deserialize(MessageType&& msg, DataType&& data) const - { - Deserializer().Deserialize(std::forward(msg), std::forward(data)); - } - -// */ - - /// Shorthand method to receive `msg` on `chan` at index `i` /// @param msg message reference /// @param chan channel name @@ -138,25 +101,6 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable return fChannels.at(chan).at(i).Receive(msg); } - - template - inline int Receive(const std::unique_ptr& msg, DataType&& data, const std::string& chan, const int i = 0) const - { - auto nbytes = fChannels.at(chan).at(i).Receive(msg); - Deserializer().deserialize_impl(msg,std::forward(data)); - return nbytes; - } - - // using rvalue ref as universal reference - template - inline int Receive(DataType&& data, const std::string& chan, const int i = 0) const - { - std::unique_ptr msg(NewMessage()); - auto nbytes = fChannels.at(chan).at(i).Receive(msg); - Deserializer().deserialize_impl(msg,std::forward(data)); - return nbytes; - } - /// Shorthand method to send a vector of messages on `chan` at index `i` /// @param msgVec message vector reference /// @param chan channel name diff --git a/fairmq/FairMQParts.h b/fairmq/FairMQParts.h index 453427a1..94b379a6 100644 --- a/fairmq/FairMQParts.h +++ b/fairmq/FairMQParts.h @@ -14,18 +14,6 @@ #include "FairMQTransportFactory.h" #include "FairMQMessage.h" -#include "FairMQLogger.h" -#include "zeromq/FairMQMessageZMQ.h" -//class FairMQMessageZMQ; -class FairMQMessageNN; -namespace fairmq -{ - namespace transport - { - struct ZMQ{}; - struct NN{}; - } -} /// FairMQParts is a lightweight convenience wrapper around a vector of unique pointers to FairMQMessage, used for sending multi-part messages @@ -51,24 +39,18 @@ class FairMQParts } + /// Adds part (std::unique_ptr&) to the container (move) + /// @param msg unique pointer to FairMQMessage + /// lvalue ref (move not required when passing argument) inline void AddPart(std::unique_ptr& msg) { fParts.push_back(std::move(msg)); } - template - inline void AddPart(DataType&& data) - { - std::unique_ptr msg(new FairMQMessageZMQ()); - Serializer().Serialize(msg, std::forward(data)); - fParts.push_back(std::move(msg)); - } - - - - /// Adds part (std::unique_ptr) to the container (move) + /// Adds part (std::unique_ptr&) to the container (move) /// @param msg unique pointer to FairMQMessage - inline void AddPart(std::unique_ptr msg) + /// rvalue ref (move required when passing argument) + inline void AddPart(std::unique_ptr&& msg) { fParts.push_back(std::move(msg)); } @@ -81,16 +63,9 @@ class FairMQParts /// @param index container index inline std::unique_ptr& At(const int index) { return fParts.at(index); } - + // ref version inline FairMQMessage& At_ref(const int index) { return *(fParts.at(index)); } - template - inline void At(DataType&& data, const int index) - { - Deserializer().Deserialize(fParts.at(index), std::forward(data)); - } - - inline std::unique_ptr& At_ptr(const int index) { return fParts.at(index); } /// Get number of parts in the container /// @return number of parts in the container inline int Size() const { return fParts.size(); } diff --git a/fairmq/devices/GenericFileSink.h b/fairmq/devices/GenericFileSink.h index d9be195c..a87ad007 100644 --- a/fairmq/devices/GenericFileSink.h +++ b/fairmq/devices/GenericFileSink.h @@ -75,8 +75,10 @@ class GenericFileSink : public FairMQDevice, public T, public U int receivedMsg = 0; while (CheckCurrentState(RUNNING)) { - if (Receive(fInput, "data-in") > 0) + std::unique_ptr msg(NewMessage()); + if (Receive(msg,"data-in") > 0) { + Deserialize(*msg,fInput); U::Serialize(fInput);// add fInput to file receivedMsg++; } diff --git a/fairmq/devices/GenericProcessor.h b/fairmq/devices/GenericProcessor.h index 8909e3ac..29313e60 100644 --- a/fairmq/devices/GenericProcessor.h +++ b/fairmq/devices/GenericProcessor.h @@ -75,11 +75,15 @@ class GenericProcessor : public FairMQDevice, public T, public U, while (CheckCurrentState(RUNNING)) { - if (Receive(fInput, "data-in") > 0) + std::unique_ptr msg(NewMessage()); + if (Receive(fInput, "data-in") > 0) { + Deserialize(*msg,fInput); receivedMsgs++; task_type::Exec(fInput,fOutput); - Send(fOutput, "data-out"); + + Serialize(*msg,fOutput); + Send(fOutput, "data-out"); sentMsgs++; } }