From 3364da9541ed51609309c8f63f603abe74a43b8b Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Tue, 16 Jun 2020 14:36:56 +0200 Subject: [PATCH] Add linger setting for unmanaged region --- fairmq/FairMQUnmanagedRegion.h | 3 +++ fairmq/shmem/Region.h | 8 +++++++- fairmq/shmem/UnmanagedRegion.h | 2 ++ fairmq/zeromq/UnmanagedRegion.h | 2 ++ 4 files changed, 14 insertions(+), 1 deletion(-) diff --git a/fairmq/FairMQUnmanagedRegion.h b/fairmq/FairMQUnmanagedRegion.h index 080dab60..9f21f218 100644 --- a/fairmq/FairMQUnmanagedRegion.h +++ b/fairmq/FairMQUnmanagedRegion.h @@ -10,6 +10,7 @@ #define FAIRMQUNMANAGEDREGION_H_ #include // size_t +#include // uint32_t #include // std::unique_ptr #include // std::function #include // std::ostream @@ -72,6 +73,8 @@ class FairMQUnmanagedRegion virtual void* GetData() const = 0; virtual size_t GetSize() const = 0; virtual uint64_t GetId() const = 0; + virtual void SetLinger(uint32_t linger) = 0; + virtual uint32_t GetLinger() const = 0; FairMQTransportFactory* GetTransport() { return fTransport; } void SetTransport(FairMQTransportFactory* transport) { fTransport = transport; } diff --git a/fairmq/shmem/Region.h b/fairmq/shmem/Region.h index 16441a9f..105eb9f9 100644 --- a/fairmq/shmem/Region.h +++ b/fairmq/shmem/Region.h @@ -30,6 +30,7 @@ #include #include // min +#include #include #include #include @@ -49,6 +50,7 @@ struct Region { Region(const std::string& shmId, uint64_t id, uint64_t size, bool remote, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string& path, int flags) : fRemote(remote) + , fLinger(100) , fStop(false) , fName("fmq_" + shmId + "_rg_" + std::to_string(id)) , fQueueName("fmq_" + shmId + "_rgq_" + std::to_string(id)) @@ -208,6 +210,9 @@ struct Region } } + void SetLinger(uint32_t linger) { fLinger = linger; } + uint32_t GetLinger() const { return fLinger; } + ~Region() { fStop = true; @@ -244,7 +249,8 @@ struct Region } bool fRemote; - bool fStop; + uint32_t fLinger; + std::atomic fStop; std::string fName; std::string fQueueName; boost::interprocess::shared_memory_object fShmemObject; diff --git a/fairmq/shmem/UnmanagedRegion.h b/fairmq/shmem/UnmanagedRegion.h index cc6a589b..2b308b11 100644 --- a/fairmq/shmem/UnmanagedRegion.h +++ b/fairmq/shmem/UnmanagedRegion.h @@ -57,6 +57,8 @@ class UnmanagedRegion final : public fair::mq::UnmanagedRegion void* GetData() const override { return fRegion->get_address(); } size_t GetSize() const override { return fRegion->get_size(); } uint64_t GetId() const override { return fRegionId; } + void SetLinger(uint32_t linger) override { fManager.GetRegion(fRegionId)->SetLinger(linger); } + uint32_t GetLinger() const override { return fManager.GetRegion(fRegionId)->GetLinger(); } ~UnmanagedRegion() override { fManager.RemoveRegion(fRegionId); } diff --git a/fairmq/zeromq/UnmanagedRegion.h b/fairmq/zeromq/UnmanagedRegion.h index e40bd971..110e67bd 100644 --- a/fairmq/zeromq/UnmanagedRegion.h +++ b/fairmq/zeromq/UnmanagedRegion.h @@ -52,6 +52,8 @@ class UnmanagedRegion final : public fair::mq::UnmanagedRegion virtual size_t GetSize() const override { return fSize; } uint64_t GetId() const override { return fId; } int64_t GetUserFlags() const { return fUserFlags; } + void SetLinger(uint32_t /* linger */) override { LOG(debug) << "ZeroMQ UnmanagedRegion linger option not implemented. Acknowledgements are local."; } + uint32_t GetLinger() const override { LOG(debug) << "ZeroMQ UnmanagedRegion linger option not implemented. Acknowledgements are local."; return 0; } virtual ~UnmanagedRegion() {