Add configurable default snd/rcv timeout

This commit is contained in:
Alexey Rybalchenko
2021-11-11 11:16:18 +01:00
parent 856780f88a
commit c446dd05e6
6 changed files with 138 additions and 99 deletions

View File

@@ -166,6 +166,14 @@ class Channel
/// @return Returns socket kernel transmit receive buffer size (in bytes)
int GetRcvKernelSize() const { return fRcvKernelSize; }
/// Get socket default send timeout (in ms)
/// @return Returns socket default send timeout (in ms)
int GetSndTimeout() const { return fSndTimeoutMs; }
/// Get socket default receive timeout (in ms)
/// @return Returns socket default receive timeout (in ms)
int GetRcvTimeout() const { return fRcvTimeoutMs; }
/// Get linger duration (in milliseconds)
/// @return Returns linger duration (in milliseconds)
int GetLinger() const { return fLinger; }
@@ -230,6 +238,14 @@ class Channel
/// @param rcvKernelSize Socket receive buffer size (in bytes)
void UpdateRcvKernelSize(int rcvKernelSize) { fRcvKernelSize = rcvKernelSize; Invalidate(); }
/// Set socket default send timeout (in ms)
/// @param sndTimeoutMs Socket default send timeout (in ms)
void UpdateSndTimeout(int sndTimeoutMs) { fSndTimeoutMs = sndTimeoutMs; Invalidate(); }
/// Set socket default receive timeout (in ms)
/// @param rcvTimeoutMs Socket default receive timeout (in ms)
void UpdateRcvTimeout(int rcvTimeoutMs) { fRcvTimeoutMs = rcvTimeoutMs; Invalidate(); }
/// Set linger duration (in milliseconds)
/// @param duration linger duration (in milliseconds)
void UpdateLinger(int duration) { fLinger = duration; Invalidate(); }
@@ -267,62 +283,52 @@ class Channel
/// invalidates the channel (requires validation to be used again).
void Invalidate() { fValid = false; }
/// Sends a message to the socket queue.
/// @param msg Constant reference of unique_ptr to a Message
/// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
/// @return Number of bytes that have been queued, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)
int64_t Send(MessagePtr& msg, int sndTimeoutInMs = -1)
/// Send message(s) to the socket queue.
/// @param m reference to MessagePtr/Parts/vector<MessagePtr>
/// @param sndTimeoutMs send timeout in ms.
/// -1 will wait forever (or until interrupt (e.g. via state change)),
/// 0 will not wait (return immediately if cannot send).
/// If not provided, default timeout will be taken.
/// @return Number of bytes that have been queued,
/// TransferCode::timeout if timed out,
/// TransferCode::error if there was an error,
/// TransferCode::interrupted if interrupted (e.g. by requested state change)
template<typename M, typename... Timeout>
std::enable_if_t<is_transferrable<M>::value, int64_t>
Send(M& m, Timeout&&... sndTimeoutMs)
{
CheckSendCompatibility(msg);
return fSocket->Send(msg, sndTimeoutInMs);
static_assert(sizeof...(sndTimeoutMs) <= 1, "Send called with too many arguments");
CheckSendCompatibility(m);
int t = fSndTimeoutMs;
if constexpr (sizeof...(sndTimeoutMs) == 1) {
t = {sndTimeoutMs...};
}
return fSocket->Send(m, t);
}
/// Receives a message from the socket queue.
/// @param msg Constant reference of unique_ptr to a Message
/// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
/// @return Number of bytes that have been received, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)
int64_t Receive(MessagePtr& msg, int rcvTimeoutInMs = -1)
/// Receive message(s) from the socket queue.
/// @param m reference to MessagePtr/Parts/vector<MessagePtr>
/// @param rcvTimeoutMs receive timeout in ms.
/// -1 will wait forever (or until interrupt (e.g. via state change)),
/// 0 will not wait (return immediately if cannot receive).
/// If not provided, default timeout will be taken.
/// @return Number of bytes that have been received,
/// TransferCode::timeout if timed out,
/// TransferCode::error if there was an error,
/// TransferCode::interrupted if interrupted (e.g. by requested state change)
template<typename M, typename... Timeout>
std::enable_if_t<is_transferrable<M>::value, int64_t>
Receive(M& m, Timeout&&... rcvTimeoutMs)
{
CheckReceiveCompatibility(msg);
return fSocket->Receive(msg, rcvTimeoutInMs);
}
static_assert(sizeof...(rcvTimeoutMs) <= 1, "Receive called with too many arguments");
/// Send a vector of messages
/// @param msgVec message vector reference
/// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
/// @return Number of bytes that have been queued, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)
int64_t Send(std::vector<MessagePtr>& msgVec, int sndTimeoutInMs = -1)
{
CheckSendCompatibility(msgVec);
return fSocket->Send(msgVec, sndTimeoutInMs);
}
/// Receive a vector of messages
/// @param msgVec message vector reference
/// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
/// @return Number of bytes that have been received, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)
int64_t Receive(std::vector<MessagePtr>& msgVec, int rcvTimeoutInMs = -1)
{
CheckReceiveCompatibility(msgVec);
return fSocket->Receive(msgVec, rcvTimeoutInMs);
}
/// Send Parts
/// @param parts Parts reference
/// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
/// @return Number of bytes that have been queued, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)
int64_t Send(Parts& parts, int sndTimeoutInMs = -1)
{
return Send(parts.fParts, sndTimeoutInMs);
}
/// Receive Parts
/// @param parts Parts reference
/// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
/// @return Number of bytes that have been received, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)
int64_t Receive(Parts& parts, int rcvTimeoutInMs = -1)
{
return Receive(parts.fParts, rcvTimeoutInMs);
CheckReceiveCompatibility(m);
int t = fRcvTimeoutMs;
if constexpr (sizeof...(rcvTimeoutMs) == 1) {
t = {rcvTimeoutMs...};
}
return fSocket->Receive(m, t);
}
unsigned long GetBytesTx() const { return fSocket->GetBytesTx(); }
@@ -366,6 +372,8 @@ class Channel
static constexpr int DefaultRcvBufSize = 1000;
static constexpr int DefaultSndKernelSize = 0;
static constexpr int DefaultRcvKernelSize = 0;
static constexpr int DefaultSndTimeoutMs = -1;
static constexpr int DefaultRcvTimeoutMs = -1;
static constexpr int DefaultLinger = 500;
static constexpr int DefaultRateLogging = 1;
static constexpr int DefaultPortRangeMin = 22000;
@@ -385,6 +393,8 @@ class Channel
int fRcvBufSize;
int fSndKernelSize;
int fRcvKernelSize;
int fSndTimeoutMs;
int fRcvTimeoutMs;
int fLinger;
int fRateLogging;
int fPortRangeMin;
@@ -414,6 +424,7 @@ class Channel
}
}
void CheckSendCompatibility(Parts& parts) { CheckSendCompatibility(parts.fParts); }
void CheckSendCompatibility(std::vector<MessagePtr>& msgVec)
{
for (auto& msg : msgVec) {
@@ -443,6 +454,7 @@ class Channel
}
}
void CheckReceiveCompatibility(Parts& parts) { CheckReceiveCompatibility(parts.fParts); }
void CheckReceiveCompatibility(std::vector<MessagePtr>& msgVec)
{
for (auto& msg : msgVec) {