Move Bind/Connect/Attach to FairMQChannel

This commit is contained in:
Alexey Rybalchenko
2018-11-05 15:11:41 +01:00
committed by Dennis Klein
parent 3ca0d7236a
commit 25fcf13985
20 changed files with 603 additions and 696 deletions

View File

@@ -18,90 +18,65 @@
#include <boost/algorithm/string.hpp> // join/split
#include <set>
#include <utility> // std::move
#include <random>
using namespace std;
mutex FairMQChannel::fChannelMutex;
FairMQChannel::FairMQChannel()
: fSocket(nullptr)
, fType("unspecified")
, fMethod("unspecified")
, fAddress("unspecified")
, fTransportType(fair::mq::Transport::DEFAULT)
, fSndBufSize(1000)
, fRcvBufSize(1000)
, fSndKernelSize(0)
, fRcvKernelSize(0)
, fLinger(500)
, fRateLogging(1)
, fName("")
, fIsValid(false)
, fTransportFactory(nullptr)
, fMultipart(false)
, fModified(true)
, fReset(false)
{
}
: FairMQChannel("", "unspecified", "unspecified", "unspecified", nullptr)
{}
FairMQChannel::FairMQChannel(const string& type, const string& method, const string& address)
: fSocket(nullptr)
: FairMQChannel("", type, method, address, nullptr)
{}
FairMQChannel::FairMQChannel(const string& name, const string& type, shared_ptr<FairMQTransportFactory> factory)
: FairMQChannel(name, type, "unspecified", "unspecified", factory)
{}
FairMQChannel::FairMQChannel(const string& name, const string& type, const string& method, const string& address, shared_ptr<FairMQTransportFactory> factory)
: fTransportFactory(factory)
, fTransportType(factory ? factory->GetType() : fair::mq::Transport::DEFAULT)
, fSocket(factory ? factory->CreateSocket(type, name) : nullptr)
, fType(type)
, fMethod(method)
, fAddress(address)
, fTransportType(fair::mq::Transport::DEFAULT)
, fSndBufSize(1000)
, fRcvBufSize(1000)
, fSndKernelSize(0)
, fRcvKernelSize(0)
, fLinger(500)
, fRateLogging(1)
, fName("")
, fIsValid(false)
, fTransportFactory(nullptr)
, fMultipart(false)
, fModified(true)
, fReset(false)
{
}
FairMQChannel::FairMQChannel(const string& name, const string& type, shared_ptr<FairMQTransportFactory> factory)
: fSocket(factory->CreateSocket(type, name))
, fType(type)
, fMethod("unspecified")
, fAddress("unspecified")
, fTransportType(factory->GetType())
, fSndBufSize(1000)
, fRcvBufSize(1000)
, fSndKernelSize(0)
, fRcvKernelSize(0)
, fLinger(500)
, fRateLogging(1)
, fPortRangeMin(22000)
, fPortRangeMax(23000)
, fAutoBind(true)
, fName(name)
, fIsValid(false)
, fTransportFactory(factory)
, fMultipart(false)
, fModified(true)
, fReset(false)
{
}
{}
FairMQChannel::FairMQChannel(const FairMQChannel& chan)
: fSocket(nullptr)
: fTransportFactory(nullptr)
, fTransportType(chan.fTransportType)
, fSocket(nullptr)
, fType(chan.fType)
, fMethod(chan.fMethod)
, fAddress(chan.fAddress)
, fTransportType(chan.fTransportType)
, fSndBufSize(chan.fSndBufSize)
, fRcvBufSize(chan.fRcvBufSize)
, fSndKernelSize(chan.fSndKernelSize)
, fRcvKernelSize(chan.fRcvKernelSize)
, fLinger(chan.fLinger)
, fRateLogging(chan.fRateLogging)
, fPortRangeMin(chan.fPortRangeMin)
, fPortRangeMax(chan.fPortRangeMax)
, fAutoBind(chan.fAutoBind)
, fName(chan.fName)
, fIsValid(false)
, fTransportFactory(nullptr)
, fMultipart(chan.fMultipart)
, fModified(chan.fModified)
, fReset(false)
@@ -109,20 +84,23 @@ FairMQChannel::FairMQChannel(const FairMQChannel& chan)
FairMQChannel& FairMQChannel::operator=(const FairMQChannel& chan)
{
fTransportFactory = nullptr;
fTransportType = chan.fTransportType;
fSocket = nullptr;
fType = chan.fType;
fMethod = chan.fMethod;
fAddress = chan.fAddress;
fTransportType = chan.fTransportType;
fSndBufSize = chan.fSndBufSize;
fRcvBufSize = chan.fRcvBufSize;
fSndKernelSize = chan.fSndKernelSize;
fRcvKernelSize = chan.fRcvKernelSize;
fLinger = chan.fLinger;
fRateLogging = chan.fRateLogging;
fPortRangeMin = chan.fPortRangeMin;
fPortRangeMax = chan.fPortRangeMax;
fAutoBind = chan.fAutoBind;
fName = chan.fName;
fIsValid = false;
fTransportFactory = nullptr;
fMultipart = chan.fMultipart;
fModified = chan.fModified;
fReset = false;
@@ -136,20 +114,23 @@ FairMQSocket & FairMQChannel::GetSocket() const
return *fSocket;
}
string FairMQChannel::GetChannelName() const
string FairMQChannel::GetName() const
{
lock_guard<mutex> lock(fChannelMutex);
return fName;
}
string FairMQChannel::GetChannelPrefix() const
string FairMQChannel::GetPrefix() const
{
lock_guard<mutex> lock(fChannelMutex);
string prefix = fName;
prefix = prefix.erase(fName.rfind('['));
return prefix;
}
string FairMQChannel::GetChannelIndex() const
string FairMQChannel::GetIndex() const
{
lock_guard<mutex> lock(fChannelMutex);
string indexStr = fName;
indexStr.erase(indexStr.rfind(']'));
indexStr.erase(0, indexStr.rfind('[') + 1);
@@ -246,6 +227,33 @@ try {
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
}
int FairMQChannel::GetPortRangeMin() const
try {
lock_guard<mutex> lock(fChannelMutex);
return fPortRangeMin;
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::GetPortRangeMin: " << e.what();
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
}
int FairMQChannel::GetPortRangeMax() const
try {
lock_guard<mutex> lock(fChannelMutex);
return fPortRangeMax;
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::GetPortRangeMax: " << e.what();
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
}
bool FairMQChannel::GetAutoBind() const
try {
lock_guard<mutex> lock(fChannelMutex);
return fAutoBind;
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::GetAutoBind: " << e.what();
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
}
void FairMQChannel::UpdateType(const string& type)
try {
lock_guard<mutex> lock(fChannelMutex);
@@ -356,6 +364,39 @@ try {
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
}
void FairMQChannel::UpdatePortRangeMin(const int minPort)
try {
lock_guard<mutex> lock(fChannelMutex);
fIsValid = false;
fPortRangeMin = minPort;
fModified = true;
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::UpdatePortRangeMin: " << e.what();
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
}
void FairMQChannel::UpdatePortRangeMax(const int maxPort)
try {
lock_guard<mutex> lock(fChannelMutex);
fIsValid = false;
fPortRangeMax = maxPort;
fModified = true;
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::UpdatePortRangeMax: " << e.what();
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
}
void FairMQChannel::UpdateAutoBind(const bool autobind)
try {
lock_guard<mutex> lock(fChannelMutex);
fIsValid = false;
fAutoBind = autobind;
fModified = true;
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::UpdateAutoBind: " << e.what();
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
}
auto FairMQChannel::SetModified(const bool modified) -> void
try {
lock_guard<mutex> lock(fChannelMutex);
@@ -365,14 +406,14 @@ try {
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
}
void FairMQChannel::UpdateChannelName(const string& name)
void FairMQChannel::UpdateName(const string& name)
try {
lock_guard<mutex> lock(fChannelMutex);
fIsValid = false;
fName = name;
fModified = true;
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::UpdateChannelName: " << e.what();
LOG(error) << "Exception caught in FairMQChannel::UpdateName: " << e.what();
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
}
@@ -385,14 +426,13 @@ try {
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
}
bool FairMQChannel::ValidateChannel()
bool FairMQChannel::Validate()
try {
lock_guard<mutex> lock(fChannelMutex);
stringstream ss;
ss << "Validating channel '" << fName << "'... ";
if (fIsValid)
{
if (fIsValid) {
ss << "ALREADY VALID";
LOG(debug) << ss.str();
return true;
@@ -400,8 +440,7 @@ try {
// validate socket type
const set<string> socketTypes{ "sub", "pub", "pull", "push", "req", "rep", "xsub", "xpub", "dealer", "router", "pair" };
if (socketTypes.find(fType) == socketTypes.end())
{
if (socketTypes.find(fType) == socketTypes.end()) {
ss << "INVALID";
LOG(debug) << ss.str();
LOG(error) << "Invalid channel type: '" << fType << "'";
@@ -409,30 +448,22 @@ try {
}
// validate socket address
if (fAddress == "unspecified" || fAddress == "")
{
if (fAddress == "unspecified" || fAddress == "") {
ss << "INVALID";
LOG(debug) << ss.str();
LOG(debug) << "invalid channel address: '" << fAddress << "'";
return false;
}
else
{
} else {
vector<string> endpoints;
boost::algorithm::split(endpoints, fAddress, boost::algorithm::is_any_of(";"));
for (const auto endpoint : endpoints)
{
for (const auto endpoint : endpoints) {
string address;
if (endpoint[0] == '@' || endpoint[0] == '+' || endpoint[0] == '>')
{
if (endpoint[0] == '@' || endpoint[0] == '+' || endpoint[0] == '>') {
address = endpoint.substr(1);
}
else
{
} else {
// we don't have a method modifier, check if the default method is set
const set<string> socketMethods{ "bind", "connect" };
if (socketMethods.find(fMethod) == socketMethods.end())
{
if (socketMethods.find(fMethod) == socketMethods.end()) {
ss << "INVALID";
LOG(debug) << ss.str();
LOG(error) << "Invalid endpoint connection method: '" << fMethod << "' for " << endpoint;
@@ -441,56 +472,43 @@ try {
address = endpoint;
}
// check if address is a tcp or ipc address
if (address.compare(0, 6, "tcp://") == 0)
{
if (address.compare(0, 6, "tcp://") == 0) {
// check if TCP address contains port delimiter
string addressString = address.substr(6);
if (addressString.find(':') == string::npos)
{
if (addressString.find(':') == string::npos) {
ss << "INVALID";
LOG(debug) << ss.str();
LOG(error) << "invalid channel address: '" << address << "' (missing port?)";
return false;
}
}
else if (address.compare(0, 6, "ipc://") == 0)
{
} else if (address.compare(0, 6, "ipc://") == 0) {
// check if IPC address is not empty
string addressString = address.substr(6);
if (addressString == "")
{
if (addressString == "") {
ss << "INVALID";
LOG(debug) << ss.str();
LOG(error) << "invalid channel address: '" << address << "' (empty IPC address?)";
return false;
}
}
else if (address.compare(0, 9, "inproc://") == 0)
{
} else if (address.compare(0, 9, "inproc://") == 0) {
// check if IPC address is not empty
string addressString = address.substr(9);
if (addressString == "")
{
if (addressString == "") {
ss << "INVALID";
LOG(debug) << ss.str();
LOG(error) << "invalid channel address: '" << address << "' (empty inproc address?)";
return false;
}
}
else if (address.compare(0, 8, "verbs://") == 0)
{
} else if (address.compare(0, 8, "verbs://") == 0) {
// check if IPC address is not empty
string addressString = address.substr(8);
if (addressString == "")
{
if (addressString == "") {
ss << "INVALID";
LOG(debug) << ss.str();
LOG(error) << "invalid channel address: '" << address << "' (empty verbs address?)";
return false;
}
}
else
{
} else {
// if neither TCP or IPC is specified, return invalid
ss << "INVALID";
LOG(debug) << ss.str();
@@ -501,8 +519,7 @@ try {
}
// validate socket buffer size for sending
if (fSndBufSize < 0)
{
if (fSndBufSize < 0) {
ss << "INVALID";
LOG(debug) << ss.str();
LOG(error) << "invalid channel send buffer size (cannot be negative): '" << fSndBufSize << "'";
@@ -510,8 +527,7 @@ try {
}
// validate socket buffer size for receiving
if (fRcvBufSize < 0)
{
if (fRcvBufSize < 0) {
ss << "INVALID";
LOG(debug) << ss.str();
LOG(error) << "invalid channel receive buffer size (cannot be negative): '" << fRcvBufSize << "'";
@@ -519,8 +535,7 @@ try {
}
// validate socket kernel transmit size for sending
if (fSndKernelSize < 0)
{
if (fSndKernelSize < 0) {
ss << "INVALID";
LOG(debug) << ss.str();
LOG(error) << "invalid channel send kernel transmit size (cannot be negative): '" << fSndKernelSize << "'";
@@ -528,8 +543,7 @@ try {
}
// validate socket kernel transmit size for receiving
if (fRcvKernelSize < 0)
{
if (fRcvKernelSize < 0) {
ss << "INVALID";
LOG(debug) << ss.str();
LOG(error) << "invalid channel receive kernel transmit size (cannot be negative): '" << fRcvKernelSize << "'";
@@ -537,8 +551,7 @@ try {
}
// validate socket rate logging interval
if (fRateLogging < 0)
{
if (fRateLogging < 0) {
ss << "INVALID";
LOG(debug) << ss.str();
LOG(error) << "invalid socket rate logging interval (cannot be negative): '" << fRateLogging << "'";
@@ -554,10 +567,71 @@ try {
throw ChannelConfigurationError(fair::mq::tools::ToString(e.what()));
}
void FairMQChannel::InitTransport(shared_ptr<FairMQTransportFactory> factory)
void FairMQChannel::Init()
{
fTransportFactory = factory;
fTransportType = factory->GetType();
lock_guard<mutex> lock(fChannelMutex);
fSocket = fTransportFactory->CreateSocket(fType, fName);
// set linger duration (how long socket should wait for outstanding transfers before shutdown)
fSocket->SetLinger(fLinger);
// set high water marks
fSocket->SetSndBufSize(fSndBufSize);
fSocket->SetRcvBufSize(fRcvBufSize);
// set kernel transmit size (set it only if value is not the default value)
if (fSndKernelSize != 0) {
fSocket->SetSndKernelSize(fSndKernelSize);
}
if (fRcvKernelSize != 0) {
fSocket->SetRcvKernelSize(fRcvKernelSize);
}
}
bool FairMQChannel::ConnectEndpoint(const string& endpoint)
{
lock_guard<mutex> lock(fChannelMutex);
return fSocket->Connect(endpoint);
}
bool FairMQChannel::BindEndpoint(string& endpoint)
{
lock_guard<mutex> lock(fChannelMutex);
// try to bind to the configured port. If it fails, try random one (if AutoBind is on).
if (fSocket->Bind(endpoint)) {
return true;
} else {
if (fAutoBind) {
// number of attempts when choosing a random port
int numAttempts = 0;
int maxAttempts = 1000;
// initialize random generator
default_random_engine generator(chrono::system_clock::now().time_since_epoch().count());
uniform_int_distribution<int> randomPort(fPortRangeMin, fPortRangeMax);
do {
LOG(debug) << "Could not bind to configured (TCP) port, trying random port in range " << fPortRangeMin << "-" << fPortRangeMax;
++numAttempts;
if (numAttempts > maxAttempts) {
LOG(error) << "could not bind to any (TCP) port in the given range after " << maxAttempts << " attempts";
return false;
}
size_t pos = endpoint.rfind(':');
endpoint = endpoint.substr(0, pos + 1) + fair::mq::tools::ToString(static_cast<int>(randomPort(generator)));
} while (fSocket->Bind(endpoint));
return true;
} else {
return false;
}
}
}
void FairMQChannel::ResetChannel()
@@ -566,131 +640,3 @@ void FairMQChannel::ResetChannel()
fIsValid = false;
// TODO: implement channel resetting
}
int FairMQChannel::Send(unique_ptr<FairMQMessage>& msg, int sndTimeoutInMs)
{
CheckSendCompatibility(msg);
return fSocket->Send(msg, sndTimeoutInMs);
}
int FairMQChannel::Receive(unique_ptr<FairMQMessage>& msg, int rcvTimeoutInMs)
{
CheckReceiveCompatibility(msg);
return fSocket->Receive(msg, rcvTimeoutInMs);
}
int FairMQChannel::SendAsync(unique_ptr<FairMQMessage>& msg)
{
CheckSendCompatibility(msg);
return fSocket->Send(msg, 0);
}
int FairMQChannel::ReceiveAsync(unique_ptr<FairMQMessage>& msg)
{
CheckReceiveCompatibility(msg);
return fSocket->Receive(msg, 0);
}
int64_t FairMQChannel::Send(vector<unique_ptr<FairMQMessage>>& msgVec, int sndTimeoutInMs)
{
CheckSendCompatibility(msgVec);
return fSocket->Send(msgVec, sndTimeoutInMs);
}
int64_t FairMQChannel::Receive(vector<unique_ptr<FairMQMessage>>& msgVec, int rcvTimeoutInMs)
{
CheckReceiveCompatibility(msgVec);
return fSocket->Receive(msgVec, rcvTimeoutInMs);
}
int64_t FairMQChannel::SendAsync(vector<unique_ptr<FairMQMessage>>& msgVec)
{
CheckSendCompatibility(msgVec);
return fSocket->Send(msgVec, 0);
}
int64_t FairMQChannel::ReceiveAsync(vector<unique_ptr<FairMQMessage>>& msgVec)
{
CheckReceiveCompatibility(msgVec);
return fSocket->Receive(msgVec, 0);
}
FairMQChannel::~FairMQChannel()
{
}
unsigned long FairMQChannel::GetBytesTx() const
{
return fSocket->GetBytesTx();
}
unsigned long FairMQChannel::GetBytesRx() const
{
return fSocket->GetBytesRx();
}
unsigned long FairMQChannel::GetMessagesTx() const
{
return fSocket->GetMessagesTx();
}
unsigned long FairMQChannel::GetMessagesRx() const
{
return fSocket->GetMessagesRx();
}
void FairMQChannel::CheckSendCompatibility(FairMQMessagePtr& msg)
{
if (fTransportType != msg->GetType())
{
// LOG(debug) << "Channel type does not match message type. Creating wrapper";
FairMQMessagePtr msgWrapper(NewMessage(msg->GetData(),
msg->GetSize(),
[](void* /*data*/, void* _msg) { delete static_cast<FairMQMessage*>(_msg); },
msg.get()
));
msg.release();
msg = move(msgWrapper);
}
}
void FairMQChannel::CheckSendCompatibility(vector<FairMQMessagePtr>& msgVec)
{
for (auto& msg : msgVec)
{
if (fTransportType != msg->GetType())
{
// LOG(debug) << "Channel type does not match message type. Creating wrapper";
FairMQMessagePtr msgWrapper(NewMessage(msg->GetData(),
msg->GetSize(),
[](void* /*data*/, void* _msg) { delete static_cast<FairMQMessage*>(_msg); },
msg.get()
));
msg.release();
msg = move(msgWrapper);
}
}
}
void FairMQChannel::CheckReceiveCompatibility(FairMQMessagePtr& msg)
{
if (fTransportType != msg->GetType())
{
// LOG(debug) << "Channel type does not match message type. Creating wrapper";
FairMQMessagePtr newMsg(NewMessage());
msg = move(newMsg);
}
}
void FairMQChannel::CheckReceiveCompatibility(vector<FairMQMessagePtr>& msgVec)
{
for (auto& msg : msgVec)
{
if (fTransportType != msg->GetType())
{
// LOG(debug) << "Channel type does not match message type. Creating wrapper";
FairMQMessagePtr newMsg(NewMessage());
msg = move(newMsg);
}
}
}