Add new Send/Receive methods with smart pointers and no flag checks.

This commit is contained in:
Alexey Rybalchenko 2015-08-17 14:45:31 +02:00 committed by Mohammad Al-Turany
parent 105e734808
commit a7ab33a10e
22 changed files with 204 additions and 121 deletions

View File

@ -36,6 +36,8 @@ FairMQChannel::FairMQChannel()
, fPoller(nullptr) , fPoller(nullptr)
, fCmdSocket(nullptr) , fCmdSocket(nullptr)
, fTransportFactory(nullptr) , fTransportFactory(nullptr)
, fNoBlockFlag(0)
, fSndMoreFlag(0)
{ {
} }
@ -52,6 +54,8 @@ FairMQChannel::FairMQChannel(const string& type, const string& method, const str
, fPoller(nullptr) , fPoller(nullptr)
, fCmdSocket(nullptr) , fCmdSocket(nullptr)
, fTransportFactory(nullptr) , fTransportFactory(nullptr)
, fNoBlockFlag(0)
, fSndMoreFlag(0)
{ {
} }
@ -347,10 +351,21 @@ bool FairMQChannel::InitCommandInterface(FairMQTransportFactory* factory)
fTransportFactory = factory; fTransportFactory = factory;
fCmdSocket = fTransportFactory->CreateSocket("sub", "device-commands", 1); fCmdSocket = fTransportFactory->CreateSocket("sub", "device-commands", 1);
fCmdSocket->Connect("inproc://commands"); if (fCmdSocket)
{
fCmdSocket->Connect("inproc://commands");
fPoller = fTransportFactory->CreatePoller(*fSocket, *fCmdSocket); fNoBlockFlag = fCmdSocket->NOBLOCK;
return true; fSndMoreFlag = fCmdSocket->SNDMORE;
fPoller = fTransportFactory->CreatePoller(*fSocket, *fCmdSocket);
return true;
}
else
{
return false;
}
} }
void FairMQChannel::ResetChannel() void FairMQChannel::ResetChannel()
@ -359,6 +374,57 @@ void FairMQChannel::ResetChannel()
// TODO: implement channel resetting // TODO: implement channel resetting
} }
int FairMQChannel::Send(const unique_ptr<FairMQMessage>& msg) const
{
fPoller->Poll(-1);
if (fPoller->CheckInput(0))
{
HandleCommand();
return -1;
}
if (fPoller->CheckOutput(1))
{
return fSocket->Send(msg.get(), 0);
}
return -1;
}
int FairMQChannel::SendAsync(const unique_ptr<FairMQMessage>& msg) const
{
return fSocket->Send(msg.get(), fNoBlockFlag);
}
int FairMQChannel::SendPart(const unique_ptr<FairMQMessage>& msg) const
{
return fSocket->Send(msg.get(), fSndMoreFlag);
}
int FairMQChannel::Receive(const unique_ptr<FairMQMessage>& msg) const
{
fPoller->Poll(-1);
if (fPoller->CheckInput(0))
{
HandleCommand();
return -1;
}
if (fPoller->CheckInput(1))
{
return fSocket->Receive(msg.get(), 0);
}
return -1;
}
int FairMQChannel::ReceiveAsync(const unique_ptr<FairMQMessage>& msg) const
{
return fSocket->Receive(msg.get(), fNoBlockFlag);
}
int FairMQChannel::Send(FairMQMessage* msg, const string& flag) const int FairMQChannel::Send(FairMQMessage* msg, const string& flag) const
{ {
if (flag == "") if (flag == "")

View File

@ -16,6 +16,7 @@
#define FAIRMQCHANNEL_H_ #define FAIRMQCHANNEL_H_
#include <string> #include <string>
#include <memory> // unique_ptr
#include <boost/thread/mutex.hpp> #include <boost/thread/mutex.hpp>
@ -59,6 +60,14 @@ class FairMQChannel
FairMQSocket* fSocket; FairMQSocket* fSocket;
// Wrappers for the socket methods to simplify the usage of channels // Wrappers for the socket methods to simplify the usage of channels
int Send(const std::unique_ptr<FairMQMessage>& msg) const;
int SendAsync(const std::unique_ptr<FairMQMessage>& msg) const;
int SendPart(const std::unique_ptr<FairMQMessage>& msg) const;
int Receive(const std::unique_ptr<FairMQMessage>& msg) const;
int ReceiveAsync(const std::unique_ptr<FairMQMessage>& msg) const;
// DEPRECATED socket method wrappers with raw pointers and flag checks
int Send(FairMQMessage* msg, const std::string& flag = "") const; int Send(FairMQMessage* msg, const std::string& flag = "") const;
int Send(FairMQMessage* msg, const int flags) const; int Send(FairMQMessage* msg, const int flags) const;
int Receive(FairMQMessage* msg, const std::string& flag = "") const; int Receive(FairMQMessage* msg, const std::string& flag = "") const;
@ -84,6 +93,9 @@ class FairMQChannel
FairMQTransportFactory* fTransportFactory; FairMQTransportFactory* fTransportFactory;
int fNoBlockFlag;
int fSndMoreFlag;
bool HandleCommand() const; bool HandleCommand() const;
// use static mutex to make the class easily copyable // use static mutex to make the class easily copyable

View File

@ -25,11 +25,12 @@ class FairMQConfigurable
Last = 1 Last = 1
}; };
FairMQConfigurable(); FairMQConfigurable();
virtual ~FairMQConfigurable();
virtual void SetProperty(const int key, const std::string& value); virtual void SetProperty(const int key, const std::string& value);
virtual std::string GetProperty(const int key, const std::string& default_ = ""); virtual std::string GetProperty(const int key, const std::string& default_ = "");
virtual void SetProperty(const int key, const int value); virtual void SetProperty(const int key, const int value);
virtual int GetProperty(const int key, const int default_ = 0); virtual int GetProperty(const int key, const int default_ = 0);
virtual ~FairMQConfigurable();
}; };
#endif /* FAIRMQCONFIGURABLE_H_ */ #endif /* FAIRMQCONFIGURABLE_H_ */

View File

@ -418,6 +418,11 @@ int FairMQDevice::GetProperty(const int key, const int default_ /*= 0*/)
} }
} }
void FairMQDevice::SetTransport(unique_ptr<FairMQTransportFactory>& factory)
{
fTransportFactory = factory.get();
}
void FairMQDevice::SetTransport(FairMQTransportFactory* factory) void FairMQDevice::SetTransport(FairMQTransportFactory* factory)
{ {
fTransportFactory = factory; fTransportFactory = factory;
@ -637,8 +642,6 @@ void FairMQDevice::ResetWrapper()
void FairMQDevice::Reset() void FairMQDevice::Reset()
{ {
LOG(DEBUG) << "Resetting Device...";
// iterate over the channels map // iterate over the channels map
for (auto mi = fChannels.begin(); mi != fChannels.end(); ++mi) for (auto mi = fChannels.begin(); mi != fChannels.end(); ++mi)
{ {
@ -657,8 +660,6 @@ void FairMQDevice::Reset()
vi->fCmdSocket = nullptr; vi->fCmdSocket = nullptr;
} }
} }
LOG(DEBUG) << "Device reset finished!";
} }
void FairMQDevice::Terminate() void FairMQDevice::Terminate()
@ -725,4 +726,6 @@ FairMQDevice::~FairMQDevice()
delete fCmdSocket; delete fCmdSocket;
fCmdSocket = nullptr; fCmdSocket = nullptr;
} }
delete fTransportFactory;
} }

View File

@ -16,6 +16,7 @@
#define FAIRMQDEVICE_H_ #define FAIRMQDEVICE_H_
#include <vector> #include <vector>
#include <memory> // unique_ptr
#include <string> #include <string>
#include <iostream> #include <iostream>
#include <unordered_map> #include <unordered_map>
@ -60,6 +61,7 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable
virtual int GetProperty(const int key, const int default_ = 0); virtual int GetProperty(const int key, const int default_ = 0);
virtual void SetTransport(FairMQTransportFactory* factory); virtual void SetTransport(FairMQTransportFactory* factory);
virtual void SetTransport(std::unique_ptr<FairMQTransportFactory>& factory);
static bool SortSocketsByAddress(const FairMQChannel &lhs, const FairMQChannel &rhs); static bool SortSocketsByAddress(const FairMQChannel &lhs, const FairMQChannel &rhs);

View File

@ -16,6 +16,7 @@
#define FAIRMQMESSAGE_H_ #define FAIRMQMESSAGE_H_
#include <cstddef> // for size_t #include <cstddef> // for size_t
#include <memory> // unique_ptr
typedef void (fairmq_free_fn) (void *data, void *hint); typedef void (fairmq_free_fn) (void *data, void *hint);
@ -33,6 +34,7 @@ class FairMQMessage
virtual void CloseMessage() = 0; virtual void CloseMessage() = 0;
virtual void Copy(FairMQMessage* msg) = 0; virtual void Copy(FairMQMessage* msg) = 0;
virtual void Copy(const std::unique_ptr<FairMQMessage>& msg) = 0;
virtual ~FairMQMessage() {}; virtual ~FairMQMessage() {};
}; };

View File

@ -38,14 +38,15 @@ void FairMQBenchmarkSampler::Run()
boost::thread resetEventCounter(boost::bind(&FairMQBenchmarkSampler::ResetEventCounter, this)); boost::thread resetEventCounter(boost::bind(&FairMQBenchmarkSampler::ResetEventCounter, this));
void* buffer = operator new[](fEventSize); void* buffer = operator new[](fEventSize);
FairMQMessage* baseMsg = fTransportFactory->CreateMessage(buffer, fEventSize);
unique_ptr<FairMQMessage> baseMsg(fTransportFactory->CreateMessage(buffer, fEventSize));
// store the channel reference to avoid traversing the map on every loop iteration // store the channel reference to avoid traversing the map on every loop iteration
const FairMQChannel& dataChannel = fChannels.at("data-out").at(0); const FairMQChannel& dataChannel = fChannels.at("data-out").at(0);
while (CheckCurrentState(RUNNING)) while (CheckCurrentState(RUNNING))
{ {
FairMQMessage* msg = fTransportFactory->CreateMessage(); unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
msg->Copy(baseMsg); msg->Copy(baseMsg);
dataChannel.Send(msg); dataChannel.Send(msg);
@ -56,12 +57,8 @@ void FairMQBenchmarkSampler::Run()
{ {
boost::this_thread::sleep(boost::posix_time::milliseconds(1)); boost::this_thread::sleep(boost::posix_time::milliseconds(1));
} }
delete msg;
} }
delete baseMsg;
try { try {
resetEventCounter.interrupt(); resetEventCounter.interrupt();
resetEventCounter.join(); resetEventCounter.join();

View File

@ -28,8 +28,7 @@ class FairMQBenchmarkSampler : public FairMQDevice
public: public:
enum enum
{ {
InputFile = FairMQDevice::Last, EventSize = FairMQDevice::Last,
EventSize,
EventRate, EventRate,
Last Last
}; };

View File

@ -32,14 +32,12 @@ void FairMQBuffer::Run()
while (CheckCurrentState(RUNNING)) while (CheckCurrentState(RUNNING))
{ {
FairMQMessage* msg = fTransportFactory->CreateMessage(); std::unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
if (dataInChannel.Receive(msg) > 0) if (dataInChannel.Receive(msg) > 0)
{ {
dataOutChannel.Send(msg); dataOutChannel.Send(msg);
} }
delete msg;
} }
} }

View File

@ -29,28 +29,32 @@ FairMQMerger::~FairMQMerger()
void FairMQMerger::Run() void FairMQMerger::Run()
{ {
FairMQPoller* poller = fTransportFactory->CreatePoller(fChannels.at("data-in")); std::unique_ptr<FairMQPoller> poller(fTransportFactory->CreatePoller(fChannels.at("data-in")));
// store the channel references to avoid traversing the map on every loop iteration // store the channel references to avoid traversing the map on every loop iteration
const FairMQChannel& dataOutChannel = fChannels.at("data-out").at(0); const FairMQChannel& dataOutChannel = fChannels.at("data-out").at(0);
FairMQChannel* dataInChannels[fChannels.at("data-in").size()]; std::vector<FairMQChannel*> dataInChannels(fChannels.at("data-in").size());
for (int i = 0; i < fChannels.at("data-in").size(); ++i) for (int i = 0; i < fChannels.at("data-in").size(); ++i)
{ {
dataInChannels[i] = &(fChannels.at("data-in").at(i)); dataInChannels.at(i) = &(fChannels.at("data-in").at(i));
} }
while (CheckCurrentState(RUNNING)) while (CheckCurrentState(RUNNING))
{ {
FairMQMessage* msg = fTransportFactory->CreateMessage(); std::unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
poller->Poll(100); poller->Poll(100);
// Loop over the data input channels.
for (int i = 0; i < fChannels.at("data-in").size(); ++i) for (int i = 0; i < fChannels.at("data-in").size(); ++i)
{ {
// Check if the channel has data ready to be received.
if (poller->CheckInput(i)) if (poller->CheckInput(i))
{ {
// Try receiving the data.
if (dataInChannels[i]->Receive(msg) > 0) if (dataInChannels[i]->Receive(msg) > 0)
{ {
// If data was received, send it to output.
if (dataOutChannel.Send(msg) < 0) if (dataOutChannel.Send(msg) < 0)
{ {
LOG(DEBUG) << "Blocking send interrupted by a command"; LOG(DEBUG) << "Blocking send interrupted by a command";
@ -64,9 +68,5 @@ void FairMQMerger::Run()
} }
} }
} }
delete msg;
} }
delete poller;
} }

View File

@ -28,7 +28,7 @@ FairMQProxy::~FairMQProxy()
void FairMQProxy::Run() void FairMQProxy::Run()
{ {
FairMQMessage* msg = fTransportFactory->CreateMessage(); std::unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
// store the channel references to avoid traversing the map on every loop iteration // store the channel references to avoid traversing the map on every loop iteration
const FairMQChannel& dataInChannel = fChannels.at("data-in").at(0); const FairMQChannel& dataInChannel = fChannels.at("data-in").at(0);
@ -41,6 +41,4 @@ void FairMQProxy::Run()
dataOutChannel.Send(msg); dataOutChannel.Send(msg);
} }
} }
delete msg;
} }

View File

@ -29,11 +29,9 @@ void FairMQSink::Run()
while (CheckCurrentState(RUNNING)) while (CheckCurrentState(RUNNING))
{ {
FairMQMessage* msg = fTransportFactory->CreateMessage(); std::unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
dataChannel.Receive(msg); dataChannel.Receive(msg);
delete msg;
} }
} }

View File

@ -41,7 +41,7 @@ void FairMQSplitter::Run()
while (CheckCurrentState(RUNNING)) while (CheckCurrentState(RUNNING))
{ {
FairMQMessage* msg = fTransportFactory->CreateMessage(); std::unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
if (dataInChannel.Receive(msg) > 0) if (dataInChannel.Receive(msg) > 0)
{ {
@ -52,7 +52,5 @@ void FairMQSplitter::Run()
direction = 0; direction = 0;
} }
} }
delete msg;
} }
} }

View File

@ -71,17 +71,18 @@ class GenericFileSink : public FairMQDevice, public InputPolicy, public OutputPo
{ {
int receivedMsg = 0; int receivedMsg = 0;
while (GetCurrentState() == RUNNING) // store the channel reference to avoid traversing the map on every loop iteration
{ const FairMQChannel& inputChannel = fChannels["data-in"].at(0);
FairMQMessage* msg = fTransportFactory->CreateMessage();
if (fChannels["data-in"].at(0).Receive(msg) > 0) while (CheckCurrentState(RUNNING))
{
std::unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
if (inputChannel.Receive(msg) > 0)
{ {
OutputPolicy::AddToFile(InputPolicy::DeSerializeMsg(msg)); OutputPolicy::AddToFile(InputPolicy::DeSerializeMsg(msg.get()));
receivedMsg++; receivedMsg++;
} }
delete msg;
} }
MQLOG(INFO) << "Received " << receivedMsg << " messages!"; MQLOG(INFO) << "Received " << receivedMsg << " messages!";

View File

@ -44,15 +44,18 @@ void GenericFileSink<InputPolicy, OutputPolicy>::Run()
{ {
int receivedMsg = 0; int receivedMsg = 0;
// store the channel reference to avoid traversing the map on every loop iteration
const FairMQChannel& inputChannel = fChannels["data-in"].at(0);
while (CheckCurrentState(RUNNING)) while (CheckCurrentState(RUNNING))
{ {
FairMQMessage* msg = fTransportFactory->CreateMessage(); std::unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
if (fChannels.at("data-in").at(0).Receive(msg) > 0)
if (inputChannel.Receive(msg) > 0)
{ {
OutputPolicy::AddToFile(InputPolicy::DeSerializeMsg(msg)); OutputPolicy::AddToFile(InputPolicy::DeSerializeMsg(msg.get()));
receivedMsg++; receivedMsg++;
} }
delete msg;
} }
MQLOG(INFO) << "Received " << receivedMsg << " messages!"; MQLOG(INFO) << "Received " << receivedMsg << " messages!";

View File

@ -38,22 +38,25 @@ class GenericMerger : public FairMQDevice, public MergerPolicy, public InputPoli
virtual void Run() virtual void Run()
{ {
FairMQPoller* poller = fTransportFactory->CreatePoller(fChannels["data-in"]); std::unique_ptr<FairMQPoller> poller(fTransportFactory->CreatePoller(fChannels["data-in"]));
int received = 0; int received = 0;
while (GetCurrentState() == RUNNING) while (GetCurrentState() == RUNNING)
{ {
FairMQMessage* msg = fTransportFactory->CreateMessage(); std::unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
// MergerPolicy:: // MergerPolicy::
poller->Poll(fBlockingTime); poller->Poll(fBlockingTime);
for (int i = 0; i < fChannels["datain"].size(); i++) for (int i = 0; i < fChannels.at("data-in").size(); i++)
{ {
if (poller->CheckInput(i)) if (poller->CheckInput(i))
{ {
received = fChannels["data-in"].at(i).Receive(msg) received = fChannels.at("data-in").at(i).Receive(msg)
MergerPolicy::Merge(InputPolicy::DeSerializeMsg(msg)); if (received > 0)
{
MergerPolicy::Merge(InputPolicy::DeSerializeMsg(msg));
}
} }
OutputPolicy::SetMessage(msg); OutputPolicy::SetMessage(msg);
@ -64,11 +67,7 @@ class GenericMerger : public FairMQDevice, public MergerPolicy, public InputPoli
received = 0; received = 0;
} }
} }
delete msg;
} }
delete poller;
} }
}; };

View File

@ -120,31 +120,29 @@ class GenericProcessor : public FairMQDevice, public InputPolicy, public OutputP
int receivedMsgs = 0; int receivedMsgs = 0;
int sentMsgs = 0; int sentMsgs = 0;
const FairMQChannel& inputChannel = fChannels["data-in"].at(0);
const FairMQChannel& outputChannel = fChannels["data-out"].at(0);
while (CheckCurrentState(RUNNING)) while (CheckCurrentState(RUNNING))
{ {
FairMQMessage* msg = fTransportFactory->CreateMessage(); std::unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
++receivedMsgs; ++receivedMsgs;
if (fChannels["data-in"].at(0).Receive(msg) > 0) if (inputChannel.Receive(msg) > 0)
{ {
// InputPolicy::DeSerializeMsg(msg) --> deserialize data of msg and fill output container // InputPolicy::DeSerializeMsg(msg) --> deserialize data of msg and fill output container
// TaskPolicy::ExecuteTask( ... ) --> process output container // TaskPolicy::ExecuteTask( ... ) --> process output container
TaskPolicy::ExecuteTask(InputPolicy::DeSerializeMsg(msg)); TaskPolicy::ExecuteTask(InputPolicy::DeSerializeMsg(msg.get()));
// OutputPolicy::fMessage point to msg // OutputPolicy::fMessage point to msg
OutputPolicy::SetMessage(msg); OutputPolicy::SetMessage(msg.get());
// TaskPolicy::GetOutputData() --> Get processed output container // TaskPolicy::GetOutputData() --> Get processed output container
// OutputPolicy::message(...) --> Serialize output container and fill fMessage // OutputPolicy::message(...) --> Serialize output container and fill fMessage
fChannels["data-out"].at(0).Send(OutputPolicy::SerializeMsg(TaskPolicy::GetOutputData())); outputChannel.Send(OutputPolicy::SerializeMsg(TaskPolicy::GetOutputData()));
sentMsgs++; sentMsgs++;
} }
if (msg)
{
msg->CloseMessage();
}
} }
MQLOG(INFO) << "Received " << receivedMsgs << " and sent " << sentMsgs << " messages!"; MQLOG(INFO) << "Received " << receivedMsgs << " and sent " << sentMsgs << " messages!";

View File

@ -13,7 +13,7 @@
*/ */
#ifndef GENERICSAMPLER_H #ifndef GENERICSAMPLER_H
#define GENERICSAMPLER_H #define GENERICSAMPLER_H
#include <vector> #include <vector>
#include <iostream> #include <iostream>
@ -82,7 +82,6 @@ class base_GenericSampler : public FairMQDevice, public T, public U
typedef source_type source_type; typedef source_type source_type;
typedef serialization_type serialization_type; typedef serialization_type serialization_type;
}; };
*/ */
virtual void SetTransport(FairMQTransportFactory* factory); virtual void SetTransport(FairMQTransportFactory* factory);
@ -104,8 +103,6 @@ class base_GenericSampler : public FairMQDevice, public T, public U
int GetCurrentIndex() const; int GetCurrentIndex() const;
void SetContinuous(bool flag); void SetContinuous(bool flag);
/// ///////////////////////////////////////////////////////////////////////////////////////
/* /*
register the tasks you want to process and, which will be register the tasks you want to process and, which will be
called by ExecuteTasks() member function. The registration is done by filling called by ExecuteTasks() member function. The registration is done by filling
@ -130,25 +127,23 @@ class base_GenericSampler : public FairMQDevice, public T, public U
To communicate with the Host derived class via callback, three methods from the host class are callable (only To communicate with the Host derived class via callback, three methods from the host class are callable (only
after binding these methods in the GenericSampler<I,O>::InitTask() ) after binding these methods in the GenericSampler<I,O>::InitTask() )
*/ */
template<typename RegistrationManager> template<typename RegistrationManager>
void RegisterTask(RegistrationManager manage) void RegisterTask(RegistrationManager manage)
{ {
manage(this,fTaskList); manage(this, fTaskList);
LOG(DEBUG)<<"Current Number of registered tasks = "<<fTaskList.size(); LOG(DEBUG) << "Current Number of registered tasks = " << fTaskList.size();
} }
/// ///////////////////////////////////////////////////////////////////////////////////////
void ExecuteTasks() void ExecuteTasks()
{ {
for(const auto& p : fTaskList) for(const auto& p : fTaskList)
{ {
LOG(DEBUG)<<"Execute Task "<< p.first; LOG(DEBUG) << "Execute Task " << p.first;
p.second(); p.second();
} }
} }
protected: protected:
virtual void InitTask(); virtual void InitTask();
virtual void Run(); virtual void Run();
@ -160,30 +155,29 @@ class base_GenericSampler : public FairMQDevice, public T, public U
int fEventRate; int fEventRate;
int fEventCounter; int fEventCounter;
bool fContinuous; bool fContinuous;
std::map<key_type, task_type > fTaskList; // to handle Task list std::map<key_type, task_type> fTaskList; // to handle Task list
// automatically enable or disable the call of policy function members for binding of host functions. // automatically enable or disable the call of policy function members for binding of host functions.
// this template functions use SFINAE to detect the existence of the policy function signature. // this template functions use SFINAE to detect the existence of the policy function signature.
template<typename S = source_type ,FairMQ::tools::enable_if_hasNot_BindSendPart<S> = 0 > template<typename S = source_type,FairMQ::tools::enable_if_hasNot_BindSendPart<S> = 0>
void BindingSendPart(){} void BindingSendPart() {}
template<typename S = source_type ,FairMQ::tools::enable_if_has_BindSendPart<S> = 0 > template<typename S = source_type,FairMQ::tools::enable_if_has_BindSendPart<S> = 0>
void BindingSendPart() void BindingSendPart()
{ {
source_type::BindSendPart(std::bind(&base_GenericSampler::SendPart,this,std::placeholders::_1) ); source_type::BindSendPart(std::bind(&base_GenericSampler::SendPart,this,std::placeholders::_1) );
} }
template<typename S = source_type ,FairMQ::tools::enable_if_hasNot_BindGetSocketNumber<S> = 0 > template<typename S = source_type,FairMQ::tools::enable_if_hasNot_BindGetSocketNumber<S> = 0>
void BindingGetSocketNumber(){} void BindingGetSocketNumber() {}
template<typename S = source_type ,FairMQ::tools::enable_if_has_BindGetSocketNumber<S> = 0 > template<typename S = source_type,FairMQ::tools::enable_if_has_BindGetSocketNumber<S> = 0>
void BindingGetSocketNumber() void BindingGetSocketNumber()
{ {
source_type::BindGetSocketNumber(std::bind(&base_GenericSampler::GetSocketNumber,this) ); source_type::BindGetSocketNumber(std::bind(&base_GenericSampler::GetSocketNumber,this) );
} }
template<typename S = source_type ,FairMQ::tools::enable_if_hasNot_BindGetCurrentIndex<S> = 0 > template<typename S = source_type,FairMQ::tools::enable_if_hasNot_BindGetCurrentIndex<S> = 0>
void BindingGetCurrentIndex(){} void BindingGetCurrentIndex() {}
template<typename S = source_type ,FairMQ::tools::enable_if_has_BindGetCurrentIndex<S> = 0 > template<typename S = source_type,FairMQ::tools::enable_if_has_BindGetCurrentIndex<S> = 0>
void BindingGetCurrentIndex() void BindingGetCurrentIndex()
{ {
source_type::BindGetCurrentIndex(std::bind(&base_GenericSampler::GetCurrentIndex,this) ); source_type::BindGetCurrentIndex(std::bind(&base_GenericSampler::GetCurrentIndex,this) );
@ -193,4 +187,3 @@ class base_GenericSampler : public FairMQDevice, public T, public U
#include "GenericSampler.tpl" #include "GenericSampler.tpl"
#endif /* GENERICSAMPLER_H */ #endif /* GENERICSAMPLER_H */

View File

@ -38,9 +38,6 @@ void base_GenericSampler<T,U,K,L>::InitTask()
fNumEvents = source_type::GetNumberOfEvent(); fNumEvents = source_type::GetNumberOfEvent();
} }
template <typename T, typename U, typename K, typename L> template <typename T, typename U, typename K, typename L>
void base_GenericSampler<T,U,K,L>::Run() void base_GenericSampler<T,U,K,L>::Run()
{ {
@ -56,19 +53,19 @@ void base_GenericSampler<T,U,K,L>::Run()
{ {
for (fCurrentIdx = 0; fCurrentIdx < fNumEvents; fCurrentIdx++) for (fCurrentIdx = 0; fCurrentIdx < fNumEvents; fCurrentIdx++)
{ {
for(auto& p : fChannels[fOutChanName]) for (auto& p : fChannels[fOutChanName])
{ {
FairMQMessage* msg = fTransportFactory->CreateMessage(); std::unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
serialization_type::SetMessage(msg); serialization_type::SetMessage(msg.get());
source_type::SetIndex(fCurrentIdx); source_type::SetIndex(fCurrentIdx);
ExecuteTasks(); ExecuteTasks();
p.Send(serialization_type::SerializeMsg(source_type::GetOutData())); p.Send(serialization_type::SerializeMsg(source_type::GetOutData()));
if (msg)
msg->CloseMessage();
sentMsgs++; sentMsgs++;
if(fChannels[fOutChanName].size()>1) if (fChannels[fOutChanName].size() > 1)
{
fCurrentIdx++; fCurrentIdx++;
}
// Optional event rate limiting // Optional event rate limiting
// --fEventCounter; // --fEventCounter;
@ -77,11 +74,15 @@ void base_GenericSampler<T,U,K,L>::Run()
// } // }
if (!CheckCurrentState(RUNNING)) if (!CheckCurrentState(RUNNING))
{
break; break;
}
} }
// if more than one socket, remove the last incrementation // if more than one socket, remove the last incrementation
if(fChannels[fOutChanName].size()>1) if (fChannels[fOutChanName].size() > 1)
{
fCurrentIdx--; fCurrentIdx--;
}
} }
} }
while (CheckCurrentState(RUNNING) && fContinuous); while (CheckCurrentState(RUNNING) && fContinuous);
@ -96,14 +97,12 @@ template <typename T, typename U, typename K, typename L>
void base_GenericSampler<T,U,K,L>::SendPart(int socketIdx) void base_GenericSampler<T,U,K,L>::SendPart(int socketIdx)
{ {
fCurrentIdx++; fCurrentIdx++;
if(fCurrentIdx<fNumEvents) if (fCurrentIdx < fNumEvents)
{ {
FairMQMessage* msg = fTransportFactory->CreateMessage(); std::unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
serialization_type::SetMessage(msg); serialization_type::SetMessage(msg.get());
source_type::SetIndex(fCurrentIdx); source_type::SetIndex(fCurrentIdx);
fChannels[fOutChanName].at(socketIdx).Send(serialization_type::SerializeMsg(source_type::GetOutData()), "snd-more"); fChannels[fOutChanName].at(socketIdx).Send(serialization_type::SerializeMsg(source_type::GetOutData()), "snd-more");
if (msg)
msg->CloseMessage();
} }
} }
@ -129,7 +128,7 @@ void base_GenericSampler<T,U,K,L>::SetContinuous(bool flag)
template <typename T, typename U, typename K, typename L> template <typename T, typename U, typename K, typename L>
void base_GenericSampler<T,U,K,L>::ResetEventCounter() void base_GenericSampler<T,U,K,L>::ResetEventCounter()
{ {
while (GetCurrentState() == RUNNING) while (CheckCurrentState(RUNNING))
{ {
try try
{ {
@ -197,8 +196,6 @@ std::string base_GenericSampler<T,U,K,L>::GetProperty(const int key, const std::
} }
} }
template<typename T, typename U> template<typename T, typename U>
using GenericSampler = base_GenericSampler<T,U,int,std::function<void()> >; using GenericSampler = base_GenericSampler<T,U,int,std::function<void()> >;
typedef std::map<int, std::function<void()> > SamplerTasksMap; typedef std::map<int, std::function<void()> > SamplerTasksMap;

View File

@ -67,12 +67,11 @@ int main(int argc, char** argv)
LOG(INFO) << "PID: " << getpid(); LOG(INFO) << "PID: " << getpid();
#ifdef NANOMSG #ifdef NANOMSG
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); sampler.SetTransport(new FairMQTransportFactoryNN());
#else #else
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); sampler.SetTransport(new FairMQTransportFactoryZMQ());
#endif #endif
sampler.SetTransport(transportFactory);
sampler.SetProperty(FairMQBenchmarkSampler::Id, id); sampler.SetProperty(FairMQBenchmarkSampler::Id, id);
sampler.SetProperty(FairMQBenchmarkSampler::EventSize, eventSize); sampler.SetProperty(FairMQBenchmarkSampler::EventSize, eventSize);

View File

@ -95,6 +95,24 @@ void FairMQMessageZMQ::SetMessage(void* data, size_t size)
} }
void FairMQMessageZMQ::Copy(FairMQMessage* msg) void FairMQMessageZMQ::Copy(FairMQMessage* msg)
{
// DEPRECATED: Use Copy(const unique_ptr<FairMQMessage>&)
// Shares the message buffer between msg and this fMessage.
if (zmq_msg_copy(&fMessage, (zmq_msg_t*)msg->GetMessage()) != 0)
{
LOG(ERROR) << "failed copying message, reason: " << zmq_strerror(errno);
}
// Alternatively, following code does a hard copy of the message, which allows to modify the original after making a copy, without affecting the new msg.
// CloseMessage();
// size_t size = msg->GetSize();
// zmq_msg_init_size(&fMessage, size);
// memcpy(zmq_msg_data(&fMessage), msg->GetData(), size);
}
void FairMQMessageZMQ::Copy(const unique_ptr<FairMQMessage>& msg)
{ {
// Shares the message buffer between msg and this fMessage. // Shares the message buffer between msg and this fMessage.
if (zmq_msg_copy(&fMessage, (zmq_msg_t*)msg->GetMessage()) != 0) if (zmq_msg_copy(&fMessage, (zmq_msg_t*)msg->GetMessage()) != 0)

View File

@ -40,6 +40,7 @@ class FairMQMessageZMQ : public FairMQMessage
virtual void CloseMessage(); virtual void CloseMessage();
virtual void Copy(FairMQMessage* msg); virtual void Copy(FairMQMessage* msg);
virtual void Copy(const std::unique_ptr<FairMQMessage>& msg);
static void CleanUp(void* data, void* hint); static void CleanUp(void* data, void* hint);