replace cmd socket with transport specific unblock mechanism

This commit is contained in:
Alexey Rybalchenko
2018-02-20 10:44:56 +01:00
committed by Mohammad Al-Turany
parent 209e521046
commit fed7601be4
21 changed files with 402 additions and 474 deletions

View File

@@ -27,6 +27,8 @@ FairMQSocketSHM::FairMQSocketSHM(Manager& manager, const string& type, const str
, fBytesRx(0)
, fMessagesTx(0)
, fMessagesRx(0)
, fSndTimeout(100)
, fRcvTimeout(100)
{
assert(context);
fSocket = zmq_socket(context, GetConstant(type));
@@ -50,14 +52,12 @@ FairMQSocketSHM::FairMQSocketSHM(Manager& manager, const string& type, const str
LOG(error) << "Failed setting ZMQ_LINGER socket option, reason: " << zmq_strerror(errno);
}
int sndTimeout = 700;
if (zmq_setsockopt(fSocket, ZMQ_SNDTIMEO, &sndTimeout, sizeof(sndTimeout)) != 0)
if (zmq_setsockopt(fSocket, ZMQ_SNDTIMEO, &fSndTimeout, sizeof(fSndTimeout)) != 0)
{
LOG(error) << "Failed setting ZMQ_SNDTIMEO socket option, reason: " << zmq_strerror(errno);
}
int rcvTimeout = 700;
if (zmq_setsockopt(fSocket, ZMQ_RCVTIMEO, &rcvTimeout, sizeof(rcvTimeout)) != 0)
if (zmq_setsockopt(fSocket, ZMQ_RCVTIMEO, &fRcvTimeout, sizeof(fRcvTimeout)) != 0)
{
LOG(error) << "Failed setting ZMQ_RCVTIMEO socket option, reason: " << zmq_strerror(errno);
}
@@ -101,19 +101,21 @@ void FairMQSocketSHM::Connect(const string& address)
}
}
int FairMQSocketSHM::Send(FairMQMessagePtr& msg) { return Send(msg, 0); }
int FairMQSocketSHM::SendAsync(FairMQMessagePtr& msg) { return Send(msg, ZMQ_DONTWAIT); }
int FairMQSocketSHM::Receive(FairMQMessagePtr& msg) { return Receive(msg, 0); }
int FairMQSocketSHM::ReceiveAsync(FairMQMessagePtr& msg) { return Receive(msg, ZMQ_DONTWAIT); }
int FairMQSocketSHM::Send(FairMQMessagePtr& msg, const int timeout) { return SendImpl(msg, 0, timeout); }
int FairMQSocketSHM::Receive(FairMQMessagePtr& msg, const int timeout) { return ReceiveImpl(msg, 0, timeout); }
int64_t FairMQSocketSHM::Send(vector<unique_ptr<FairMQMessage>>& msgVec, const int timeout) { return SendImpl(msgVec, 0, timeout); }
int64_t FairMQSocketSHM::Receive(vector<unique_ptr<FairMQMessage>>& msgVec, const int timeout) { return ReceiveImpl(msgVec, 0, timeout); }
int64_t FairMQSocketSHM::Send(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) { return Send(msgVec, 0); }
int64_t FairMQSocketSHM::SendAsync(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) { return Send(msgVec, ZMQ_DONTWAIT); }
int64_t FairMQSocketSHM::Receive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) { return Receive(msgVec, 0); }
int64_t FairMQSocketSHM::ReceiveAsync(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) { return Receive(msgVec, ZMQ_DONTWAIT); }
int FairMQSocketSHM::TrySend(FairMQMessagePtr& msg) { return SendImpl(msg, ZMQ_DONTWAIT, 0); }
int FairMQSocketSHM::TryReceive(FairMQMessagePtr& msg) { return ReceiveImpl(msg, ZMQ_DONTWAIT, 0); }
int64_t FairMQSocketSHM::TrySend(vector<unique_ptr<FairMQMessage>>& msgVec) { return SendImpl(msgVec, ZMQ_DONTWAIT, 0); }
int64_t FairMQSocketSHM::TryReceive(vector<unique_ptr<FairMQMessage>>& msgVec) { return ReceiveImpl(msgVec, ZMQ_DONTWAIT, 0); }
int FairMQSocketSHM::Send(FairMQMessagePtr& msg, const int flags)
int FairMQSocketSHM::SendImpl(FairMQMessagePtr& msg, const int flags, const int timeout)
{
int nbytes = -1;
int elapsed = 0;
while (true && !fInterrupted)
{
nbytes = zmq_msg_send(static_cast<FairMQMessageSHM*>(msg.get())->GetMessage(), fSocket, flags);
@@ -135,6 +137,14 @@ int FairMQSocketSHM::Send(FairMQMessagePtr& msg, const int flags)
{
if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0))
{
if (timeout)
{
elapsed += fSndTimeout;
if (elapsed >= timeout)
{
return -2;
}
}
continue;
}
else
@@ -157,7 +167,7 @@ int FairMQSocketSHM::Send(FairMQMessagePtr& msg, const int flags)
return -1;
}
int FairMQSocketSHM::Receive(FairMQMessagePtr& msg, const int flags)
int FairMQSocketSHM::ReceiveImpl(FairMQMessagePtr& msg, const int flags, const int timeout)
{
int nbytes = -1;
@@ -199,6 +209,14 @@ int FairMQSocketSHM::Receive(FairMQMessagePtr& msg, const int flags)
{
if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0))
{
if (timeout)
{
elapsed += fRcvTimeout;
if (elapsed >= timeout)
{
return -2;
}
}
continue;
}
else
@@ -219,7 +237,7 @@ int FairMQSocketSHM::Receive(FairMQMessagePtr& msg, const int flags)
}
}
int64_t FairMQSocketSHM::Send(vector<FairMQMessagePtr>& msgVec, const int flags)
int64_t FairMQSocketSHM::SendImpl(vector<FairMQMessagePtr>& msgVec, const int flags, const int timeout)
{
const unsigned int vecSize = msgVec.size();
int64_t totalSize = 0;
@@ -392,14 +410,14 @@ void FairMQSocketSHM::Close()
void FairMQSocketSHM::Interrupt()
{
fManager.Interrupt();
Manager::Interrupt();
FairMQMessageSHM::fInterrupted = true;
fInterrupted = true;
}
void FairMQSocketSHM::Resume()
{
fManager.Resume();
Manager::Resume();
FairMQMessageSHM::fInterrupted = false;
fInterrupted = false;
}
@@ -453,6 +471,7 @@ unsigned long FairMQSocketSHM::GetMessagesRx() const
bool FairMQSocketSHM::SetSendTimeout(const int timeout, const string& address, const string& method)
{
fSndTimeout = timeout;
if (method == "bind")
{
if (zmq_unbind(fSocket, address.c_str()) != 0)
@@ -460,7 +479,7 @@ bool FairMQSocketSHM::SetSendTimeout(const int timeout, const string& address, c
LOG(error) << "Failed unbinding socket " << fId << ", reason: " << zmq_strerror(errno);
return false;
}
if (zmq_setsockopt(fSocket, ZMQ_SNDTIMEO, &timeout, sizeof(int)) != 0)
if (zmq_setsockopt(fSocket, ZMQ_SNDTIMEO, &fSndTimeout, sizeof(fSndTimeout)) != 0)
{
LOG(error) << "Failed setting option on socket " << fId << ", reason: " << zmq_strerror(errno);
return false;
@@ -478,7 +497,7 @@ bool FairMQSocketSHM::SetSendTimeout(const int timeout, const string& address, c
LOG(error) << "Failed disconnecting socket " << fId << ", reason: " << zmq_strerror(errno);
return false;
}
if (zmq_setsockopt(fSocket, ZMQ_SNDTIMEO, &timeout, sizeof(int)) != 0)
if (zmq_setsockopt(fSocket, ZMQ_SNDTIMEO, &fSndTimeout, sizeof(fSndTimeout)) != 0)
{
LOG(error) << "Failed setting option on socket " << fId << ", reason: " << zmq_strerror(errno);
return false;
@@ -500,19 +519,12 @@ bool FairMQSocketSHM::SetSendTimeout(const int timeout, const string& address, c
int FairMQSocketSHM::GetSendTimeout() const
{
int timeout = -1;
size_t size = sizeof(timeout);
if (zmq_getsockopt(fSocket, ZMQ_SNDTIMEO, &timeout, &size) != 0)
{
LOG(error) << "Failed getting option 'receive timeout' on socket " << fId << ", reason: " << zmq_strerror(errno);
}
return timeout;
return fSndTimeout;
}
bool FairMQSocketSHM::SetReceiveTimeout(const int timeout, const string& address, const string& method)
{
fRcvTimeout = timeout;
if (method == "bind")
{
if (zmq_unbind(fSocket, address.c_str()) != 0)
@@ -520,7 +532,7 @@ bool FairMQSocketSHM::SetReceiveTimeout(const int timeout, const string& address
LOG(error) << "Failed unbinding socket " << fId << ", reason: " << zmq_strerror(errno);
return false;
}
if (zmq_setsockopt(fSocket, ZMQ_RCVTIMEO, &timeout, sizeof(int)) != 0)
if (zmq_setsockopt(fSocket, ZMQ_RCVTIMEO, &fRcvTimeout, sizeof(fRcvTimeout)) != 0)
{
LOG(error) << "Failed setting option on socket " << fId << ", reason: " << zmq_strerror(errno);
return false;
@@ -538,7 +550,7 @@ bool FairMQSocketSHM::SetReceiveTimeout(const int timeout, const string& address
LOG(error) << "Failed disconnecting socket " << fId << ", reason: " << zmq_strerror(errno);
return false;
}
if (zmq_setsockopt(fSocket, ZMQ_RCVTIMEO, &timeout, sizeof(int)) != 0)
if (zmq_setsockopt(fSocket, ZMQ_RCVTIMEO, &fRcvTimeout, sizeof(fRcvTimeout)) != 0)
{
LOG(error) << "Failed setting option on socket " << fId << ", reason: " << zmq_strerror(errno);
return false;
@@ -560,63 +572,34 @@ bool FairMQSocketSHM::SetReceiveTimeout(const int timeout, const string& address
int FairMQSocketSHM::GetReceiveTimeout() const
{
int timeout = -1;
size_t size = sizeof(timeout);
if (zmq_getsockopt(fSocket, ZMQ_RCVTIMEO, &timeout, &size) != 0)
{
LOG(error) << "Failed getting option 'receive timeout' on socket " << fId << ", reason: " << zmq_strerror(errno);
}
return timeout;
return fRcvTimeout;
}
int FairMQSocketSHM::GetConstant(const string& constant)
{
if (constant == "")
return 0;
if (constant == "sub")
return ZMQ_SUB;
if (constant == "pub")
return ZMQ_PUB;
if (constant == "xsub")
return ZMQ_XSUB;
if (constant == "xpub")
return ZMQ_XPUB;
if (constant == "push")
return ZMQ_PUSH;
if (constant == "pull")
return ZMQ_PULL;
if (constant == "req")
return ZMQ_REQ;
if (constant == "rep")
return ZMQ_REP;
if (constant == "dealer")
return ZMQ_DEALER;
if (constant == "router")
return ZMQ_ROUTER;
if (constant == "pair")
return ZMQ_PAIR;
if (constant == "") return 0;
if (constant == "sub") return ZMQ_SUB;
if (constant == "pub") return ZMQ_PUB;
if (constant == "xsub") return ZMQ_XSUB;
if (constant == "xpub") return ZMQ_XPUB;
if (constant == "push") return ZMQ_PUSH;
if (constant == "pull") return ZMQ_PULL;
if (constant == "req") return ZMQ_REQ;
if (constant == "rep") return ZMQ_REP;
if (constant == "dealer") return ZMQ_DEALER;
if (constant == "router") return ZMQ_ROUTER;
if (constant == "pair") return ZMQ_PAIR;
if (constant == "snd-hwm")
return ZMQ_SNDHWM;
if (constant == "rcv-hwm")
return ZMQ_RCVHWM;
if (constant == "snd-size")
return ZMQ_SNDBUF;
if (constant == "rcv-size")
return ZMQ_RCVBUF;
if (constant == "snd-more")
return ZMQ_SNDMORE;
if (constant == "rcv-more")
return ZMQ_RCVMORE;
if (constant == "snd-hwm") return ZMQ_SNDHWM;
if (constant == "rcv-hwm") return ZMQ_RCVHWM;
if (constant == "snd-size") return ZMQ_SNDBUF;
if (constant == "rcv-size") return ZMQ_RCVBUF;
if (constant == "snd-more") return ZMQ_SNDMORE;
if (constant == "rcv-more") return ZMQ_RCVMORE;
if (constant == "linger")
return ZMQ_LINGER;
if (constant == "no-block")
return ZMQ_DONTWAIT;
if (constant == "snd-more no-block")
return ZMQ_DONTWAIT|ZMQ_SNDMORE;
if (constant == "linger") return ZMQ_LINGER;
if (constant == "no-block") return ZMQ_DONTWAIT;
if (constant == "snd-more no-block") return ZMQ_DONTWAIT|ZMQ_SNDMORE;
return -1;
}

View File

@@ -28,22 +28,23 @@ class FairMQSocketSHM : public FairMQSocket
bool Bind(const std::string& address) override;
void Connect(const std::string& address) override;
int Send(FairMQMessagePtr& msg) override;
int SendAsync(FairMQMessagePtr& msg) override;
int Receive(FairMQMessagePtr& msg) override;
int ReceiveAsync(FairMQMessagePtr& msg) override;
int Send(FairMQMessagePtr& msg, const int timeout = 0) override;
int Receive(FairMQMessagePtr& msg, const int timeout = 0) override;
int64_t Send(std::vector<std::unique_ptr<FairMQMessage>>& msgVec, const int timeout = 0) override;
int64_t Receive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec, const int timeout = 0) override;
int64_t Send(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) override;
int64_t SendAsync(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) override;
int64_t Receive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) override;
int64_t ReceiveAsync(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) override;
int TrySend(FairMQMessagePtr& msg) override;
int TryReceive(FairMQMessagePtr& msg) override;
int64_t TrySend(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) override;
int64_t TryReceive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) override;
void* GetSocket() const override;
int GetSocket(int nothing) const override;
void Close() override;
void Interrupt() override;
void Resume() override;
static void Interrupt();
static void Resume();
void SetOption(const std::string& option, const void* value, size_t valueSize) override;
void GetOption(const std::string& option, void* value, size_t* valueSize) override;
@@ -73,10 +74,14 @@ class FairMQSocketSHM : public FairMQSocket
static std::atomic<bool> fInterrupted;
int Send(FairMQMessagePtr& msg, const int flags);
int Receive(FairMQMessagePtr& msg, const int flags);
int64_t Send(std::vector<std::unique_ptr<FairMQMessage>>& msgVec, const int flags);
int64_t Receive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec, const int flags);
int fSndTimeout;
int fRcvTimeout;
int SendImpl(FairMQMessagePtr& msg, const int flags, const int timeout);
int ReceiveImpl(FairMQMessagePtr& msg, const int flags, const int timeout);
int64_t SendImpl(std::vector<std::unique_ptr<FairMQMessage>>& msgVec, const int flags, const int timeout);
int64_t ReceiveImpl(std::vector<std::unique_ptr<FairMQMessage>>& msgVec, const int flags, const int timeout);
};
#endif /* FAIRMQSOCKETSHM_H_ */

View File

@@ -49,6 +49,9 @@ class FairMQTransportFactorySHM : public FairMQTransportFactory
FairMQ::Transport GetType() const override;
void Interrupt() override { FairMQSocketSHM::Interrupt(); }
void Resume() override { FairMQSocketSHM::Resume(); }
~FairMQTransportFactorySHM() override;
private:

View File

@@ -19,13 +19,14 @@ namespace shmem
using namespace std;
namespace bipc = boost::interprocess;
std::unordered_map<uint64_t, Region> Manager::fRegions;
Manager::Manager(const string& name, size_t size)
: fSessionName(name)
, fSegmentName("fmq_shm_" + fSessionName + "_main")
, fManagementSegmentName("fmq_shm_" + fSessionName + "_management")
, fSegment(bipc::open_or_create, fSegmentName.c_str(), size)
, fManagementSegment(bipc::open_or_create, fManagementSegmentName.c_str(), 65536)
, fRegions()
{}
bipc::managed_shared_memory& Manager::Segment()

View File

@@ -49,8 +49,8 @@ class Manager
boost::interprocess::managed_shared_memory& Segment();
void Interrupt();
void Resume();
static void Interrupt();
static void Resume();
boost::interprocess::mapped_region* CreateRegion(const size_t size, const uint64_t id, FairMQRegionCallback callback);
Region* GetRemoteRegion(const uint64_t id);
@@ -66,7 +66,7 @@ class Manager
std::string fManagementSegmentName;
boost::interprocess::managed_shared_memory fSegment;
boost::interprocess::managed_shared_memory fManagementSegment;
std::unordered_map<uint64_t, Region> fRegions;
static std::unordered_map<uint64_t, Region> fRegions;
};
} // namespace shmem