From aa8d16ff9aee2e9cff4f6d57dee4be246045d98b Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Fri, 13 Oct 2017 13:15:12 +0200 Subject: [PATCH] Move shmem prototype out of MQ examples --- fairmq/CMakeLists.txt | 1 + fairmq/shmem/prototype/CMakeLists.txt | 76 ++++++ .../prototype/FairMQShmPrototypeSampler.cxx | 227 ++++++++++++++++++ .../prototype/FairMQShmPrototypeSampler.h | 45 ++++ .../prototype/FairMQShmPrototypeSink.cxx | 145 +++++++++++ .../shmem/prototype/FairMQShmPrototypeSink.h | 40 +++ fairmq/shmem/prototype/ShmChunk.h | 182 ++++++++++++++ .../prototype/runShmPrototypeSampler.cxx | 24 ++ .../shmem/prototype/runShmPrototypeSink.cxx | 21 ++ fairmq/shmem/prototype/shm-prototype.json | 34 +++ .../shmem/prototype/startShmPrototype.sh.in | 49 ++++ 11 files changed, 844 insertions(+) create mode 100644 fairmq/shmem/prototype/CMakeLists.txt create mode 100644 fairmq/shmem/prototype/FairMQShmPrototypeSampler.cxx create mode 100644 fairmq/shmem/prototype/FairMQShmPrototypeSampler.h create mode 100644 fairmq/shmem/prototype/FairMQShmPrototypeSink.cxx create mode 100644 fairmq/shmem/prototype/FairMQShmPrototypeSink.h create mode 100644 fairmq/shmem/prototype/ShmChunk.h create mode 100644 fairmq/shmem/prototype/runShmPrototypeSampler.cxx create mode 100644 fairmq/shmem/prototype/runShmPrototypeSink.cxx create mode 100644 fairmq/shmem/prototype/shm-prototype.json create mode 100755 fairmq/shmem/prototype/startShmPrototype.sh.in diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index 409b8609..4922fb70 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -54,6 +54,7 @@ endif() if(BUILD_TESTING) add_subdirectory(test) endif() +add_subdirectory(shmem/prototype) ########################## # libFairMQ header files # diff --git a/fairmq/shmem/prototype/CMakeLists.txt b/fairmq/shmem/prototype/CMakeLists.txt new file mode 100644 index 00000000..e864ad16 --- /dev/null +++ b/fairmq/shmem/prototype/CMakeLists.txt @@ -0,0 +1,76 @@ + ################################################################################ + # Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH # + # # + # This software is distributed under the terms of the # + # GNU Lesser General Public Licence version 3 (LGPL) version 3, # + # copied verbatim in the file "LICENSE" # + ################################################################################ + +configure_file(${CMAKE_SOURCE_DIR}/fairmq/shmem/prototype/shm-prototype.json + ${CMAKE_BINARY_DIR}/bin/config/shm-prototype.json) +configure_file(${CMAKE_SOURCE_DIR}/fairmq/shmem/prototype/startShmPrototype.sh.in + ${CMAKE_BINARY_DIR}/bin/prototype/shmem/startShmPrototype.sh) + +Set(INCLUDE_DIRECTORIES + ${CMAKE_SOURCE_DIR}/fairmq + ${CMAKE_SOURCE_DIR}/fairmq/zeromq + ${CMAKE_SOURCE_DIR}/fairmq/nanomsg + ${CMAKE_SOURCE_DIR}/fairmq/devices + ${CMAKE_SOURCE_DIR}/fairmq/tools + ${CMAKE_SOURCE_DIR}/fairmq/options + ${CMAKE_SOURCE_DIR}/fairmq/shmem/prototype + ${CMAKE_CURRENT_BINARY_DIR} +) + +Set(SYSTEM_INCLUDE_DIRECTORIES + ${Boost_INCLUDE_DIR} + ${ZeroMQ_INCLUDE_DIR} +) + +Include_Directories(${INCLUDE_DIRECTORIES}) +Include_Directories(SYSTEM ${SYSTEM_INCLUDE_DIRECTORIES}) + +Set(LINK_DIRECTORIES + ${Boost_LIBRARY_DIRS} +) + +Link_Directories(${LINK_DIRECTORIES}) + +Set(SRCS + "FairMQShmPrototypeSampler.cxx" + "FairMQShmPrototypeSink.cxx" +) + +Set(DEPENDENCIES + ${DEPENDENCIES} + ${Boost_INTERPROCESS_LIBRARY} + FairMQ +) + +Set(LIBRARY_NAME FairMQShmPrototype) + +GENERATE_LIBRARY() + +Set(Exe_Names + shm-prototype-sampler + shm-prototype-sink +) + +Set(Exe_Source + runShmPrototypeSampler.cxx + runShmPrototypeSink.cxx +) + +list(LENGTH Exe_Names _length) +math(EXPR _length ${_length}-1) + +set(EXECUTABLE_OUTPUT_PATH "${EXECUTABLE_OUTPUT_PATH}/prototype/shmem") + +ForEach(_file RANGE 0 ${_length}) + list(GET Exe_Names ${_file} _name) + list(GET Exe_Source ${_file} _src) + set(EXE_NAME ${_name}) + set(SRCS ${_src}) + set(DEPENDENCIES FairMQShmPrototype) + GENERATE_EXECUTABLE() +EndForEach(_file RANGE 0 ${_length}) diff --git a/fairmq/shmem/prototype/FairMQShmPrototypeSampler.cxx b/fairmq/shmem/prototype/FairMQShmPrototypeSampler.cxx new file mode 100644 index 00000000..213c959a --- /dev/null +++ b/fairmq/shmem/prototype/FairMQShmPrototypeSampler.cxx @@ -0,0 +1,227 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +/** + * FairMQShmPrototypeSampler.cpp + * + * @since 2016-04-08 + * @author A. Rybalchenko + */ + +#include +#include +#include +#include + +#include +#include + +#include "FairMQShmPrototypeSampler.h" +#include "FairMQProgOptions.h" +#include "FairMQLogger.h" + +#include "ShmChunk.h" + +using namespace std; +using namespace boost::interprocess; + +FairMQShmPrototypeSampler::FairMQShmPrototypeSampler() + : fMsgSize(10000) + , fMsgCounter(0) + , fMsgRate(1) + , fBytesOut(0) + , fMsgOut(0) + , fBytesOutNew(0) + , fMsgOutNew(0) +{ + if (shared_memory_object::remove("FairMQSharedMemoryPrototype")) + { + LOG(INFO) << "Successfully removed shared memory upon device start."; + } + else + { + LOG(INFO) << "Did not remove shared memory upon device start."; + } +} + +FairMQShmPrototypeSampler::~FairMQShmPrototypeSampler() +{ + if (shared_memory_object::remove("FairMQSharedMemoryPrototype")) + { + LOG(INFO) << "Successfully removed shared memory after the device has stopped."; + } + else + { + LOG(INFO) << "Did not remove shared memory after the device stopped. Still in use?"; + } +} + +void FairMQShmPrototypeSampler::Init() +{ + fMsgSize = fConfig->GetValue("msg-size"); + fMsgRate = fConfig->GetValue("msg-rate"); + + SegmentManager::Instance().InitializeSegment("open_or_create", "FairMQSharedMemoryPrototype", 2000000000); + LOG(INFO) << "Created/Opened shared memory segment of 2,000,000,000 bytes. Available are " + << SegmentManager::Instance().Segment()->get_free_memory() << " bytes."; +} + +void FairMQShmPrototypeSampler::Run() +{ + // count sent messages (also used in creating ShmChunk container ID) + static uint64_t numSentMsgs = 0; + + LOG(INFO) << "Starting the benchmark with message size of " << fMsgSize; + + // start rate logger and acknowledgement listener in separate threads + thread rateLogger(&FairMQShmPrototypeSampler::Log, this, 1000); + // thread resetMsgCounter(&FairMQShmPrototypeSampler::ResetMsgCounter, this); + + // int charnum = 97; + + while (CheckCurrentState(RUNNING)) + { + void* ptr = nullptr; + bipc::managed_shared_memory::handle_t handle; + + while (!ptr) + { + try + { + ptr = SegmentManager::Instance().Segment()->allocate(fMsgSize); + } + catch (bipc::bad_alloc& ba) + { + this_thread::sleep_for(chrono::milliseconds(50)); + if (CheckCurrentState(RUNNING)) + { + continue; + } + else + { + break; + } + } + } + + // // ShmChunk container ID + // string chunkID = "c" + to_string(numSentMsgs); + // // shared pointer ID + // string ownerID = "o" + to_string(numSentMsgs); + + // ShPtrOwner* owner = nullptr; + + // try + // { + // owner = SegmentManager::Instance().Segment()->construct(ownerID.c_str())( + // make_managed_shared_ptr(SegmentManager::Instance().Segment()->construct(chunkID.c_str())(fMsgSize), + // *(SegmentManager::Instance().Segment()))); + // } + // catch (bipc::bad_alloc& ba) + // { + // LOG(WARN) << "Shared memory full..."; + // this_thread::sleep_for(chrono::milliseconds(100)); + // continue; + // } + + // void* ptr = owner->fPtr->GetData(); + + // write something to memory, otherwise only (incomplete) allocation will be measured + // memset(ptr, 0, fMsgSize); + + // static_cast(ptr)[3] = charnum++; + // if (charnum == 123) + // { + // charnum = 97; + // } + + // LOG(DEBUG) << "chunk handle: " << owner->fPtr->GetHandle(); + // LOG(DEBUG) << "chunk size: " << owner->fPtr->GetSize(); + // LOG(DEBUG) << "owner (" << ownerID << ") use count: " << owner->fPtr.use_count(); + + // char* cptr = static_cast(ptr); + // LOG(DEBUG) << "check: " << cptr[3]; + + // FairMQMessagePtr msg(NewSimpleMessage(ownerID)); + + if (ptr) + { + handle = SegmentManager::Instance().Segment()->get_handle_from_address(ptr); + FairMQMessagePtr msg(NewMessage(sizeof(ExMetaHeader))); + ExMetaHeader* metaPtr = new(msg->GetData()) ExMetaHeader(); + metaPtr->fSize = fMsgSize; + metaPtr->fHandle = handle; + + // LOG(INFO) << metaPtr->fSize; + // LOG(INFO) << metaPtr->fHandle; + // LOG(WARN) << ptr; + + if (Send(msg, "meta", 0) > 0) + { + fBytesOutNew += fMsgSize; + ++fMsgOutNew; + ++numSentMsgs; + } + else + { + SegmentManager::Instance().Segment()->deallocate(ptr); + // SegmentManager::Instance().Segment()->destroy_ptr(owner); + } + } + + // --fMsgCounter; + // while (fMsgCounter == 0) + // { + // this_thread::sleep_for(chrono::milliseconds(1)); + // } + } + + LOG(INFO) << "Sent " << numSentMsgs << " messages, leaving RUNNING state."; + + rateLogger.join(); + // resetMsgCounter.join(); +} + +void FairMQShmPrototypeSampler::Log(const int intervalInMs) +{ + timestamp_t t0 = get_timestamp(); + timestamp_t t1; + timestamp_t msSinceLastLog; + + double mbPerSecOut = 0; + double msgPerSecOut = 0; + + while (CheckCurrentState(RUNNING)) + { + t1 = get_timestamp(); + + msSinceLastLog = (t1 - t0) / 1000.0L; + + mbPerSecOut = (static_cast(fBytesOutNew - fBytesOut) / (1024. * 1024.)) / static_cast(msSinceLastLog) * 1000.; + fBytesOut = fBytesOutNew; + + msgPerSecOut = static_cast(fMsgOutNew - fMsgOut) / static_cast(msSinceLastLog) * 1000.; + fMsgOut = fMsgOutNew; + + LOG(DEBUG) << fixed + << setprecision(0) << "out: " << msgPerSecOut << " msg (" + << setprecision(2) << mbPerSecOut << " MB)\t(" + << SegmentManager::Instance().Segment()->get_free_memory() / (1024. * 1024.) << " MB free)"; + + t0 = t1; + this_thread::sleep_for(chrono::milliseconds(intervalInMs)); + } +} + +void FairMQShmPrototypeSampler::ResetMsgCounter() +{ + while (CheckCurrentState(RUNNING)) + { + fMsgCounter = fMsgRate / 100; + this_thread::sleep_for(chrono::milliseconds(10)); + } +} diff --git a/fairmq/shmem/prototype/FairMQShmPrototypeSampler.h b/fairmq/shmem/prototype/FairMQShmPrototypeSampler.h new file mode 100644 index 00000000..e800b34f --- /dev/null +++ b/fairmq/shmem/prototype/FairMQShmPrototypeSampler.h @@ -0,0 +1,45 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +/** + * FairMQShmPrototypeSampler.h + * + * @since 2016-04-08 + * @author A. Rybalchenko + */ + +#ifndef FAIRMQSHMPROTOTYPESAMPLER_H_ +#define FAIRMQSHMPROTOTYPESAMPLER_H_ + +#include + +#include "FairMQDevice.h" + +class FairMQShmPrototypeSampler : public FairMQDevice +{ + public: + FairMQShmPrototypeSampler(); + virtual ~FairMQShmPrototypeSampler(); + + void Log(const int intervalInMs); + void ResetMsgCounter(); + + protected: + unsigned int fMsgSize; + unsigned int fMsgCounter; + unsigned int fMsgRate; + + unsigned long long fBytesOut; + unsigned long long fMsgOut; + std::atomic fBytesOutNew; + std::atomic fMsgOutNew; + + virtual void Init(); + virtual void Run(); +}; + +#endif /* FAIRMQSHMPROTOTYPESAMPLER_H_ */ diff --git a/fairmq/shmem/prototype/FairMQShmPrototypeSink.cxx b/fairmq/shmem/prototype/FairMQShmPrototypeSink.cxx new file mode 100644 index 00000000..bfef728b --- /dev/null +++ b/fairmq/shmem/prototype/FairMQShmPrototypeSink.cxx @@ -0,0 +1,145 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +/** + * FairMQShmPrototypeSink.cxx + * + * @since 2016-04-08 + * @author A. Rybalchenko + */ + +#include +#include +#include +#include + +#include +#include + +#include "FairMQShmPrototypeSink.h" +#include "FairMQProgOptions.h" +#include "FairMQLogger.h" + +#include "ShmChunk.h" + +using namespace std; +using namespace boost::interprocess; + +FairMQShmPrototypeSink::FairMQShmPrototypeSink() + : fBytesIn(0) + , fMsgIn(0) + , fBytesInNew(0) + , fMsgInNew(0) +{ +} + +FairMQShmPrototypeSink::~FairMQShmPrototypeSink() +{ +} + +void FairMQShmPrototypeSink::Init() +{ + SegmentManager::Instance().InitializeSegment("open_or_create", "FairMQSharedMemoryPrototype", 2000000000); + LOG(INFO) << "Created/Opened shared memory segment of 2,000,000,000 bytes. Available are " + << SegmentManager::Instance().Segment()->get_free_memory() << " bytes."; +} + +void FairMQShmPrototypeSink::Run() +{ + static uint64_t numReceivedMsgs = 0; + + thread rateLogger(&FairMQShmPrototypeSink::Log, this, 1000); + + while (CheckCurrentState(RUNNING)) + { + FairMQMessagePtr msg(NewMessage()); + + if (Receive(msg, "meta") > 0) + { + ExMetaHeader* hdr = static_cast(msg->GetData()); + size_t size = hdr->fSize; + bipc::managed_shared_memory::handle_t handle = hdr->fHandle; + void* ptr = SegmentManager::Instance().Segment()->get_address_from_handle(handle); + + // LOG(INFO) << size; + // LOG(INFO) << handle; + // LOG(WARN) << ptr; + + fBytesInNew += size; + ++fMsgInNew; + SegmentManager::Instance().Segment()->deallocate(ptr); + + // get the shared pointer ID from the received message + // string ownerID(static_cast(msg->GetData()), msg->GetSize()); + + // find the shared pointer in shared memory with its ID + // ShPtrOwner* owner = SegmentManager::Instance().Segment()->find(ownerID.c_str()).first; + // LOG(DEBUG) << "owner (" << ownerID << ") use count: " << owner->fPtr.use_count(); + + + // if (owner) + // { + // // void* ptr = owner->fPtr->GetData(); + + // // LOG(DEBUG) << "chunk handle: " << owner->fPtr->GetHandle(); + // // LOG(DEBUG) << "chunk size: " << owner->fPtr->GetSize(); + + // fBytesInNew += owner->fPtr->GetSize(); + // ++fMsgInNew; + + // // char* cptr = static_cast(ptr); + // // LOG(DEBUG) << "check: " << cptr[3]; + + // SegmentManager::Instance().Segment()->deallocate(ptr); + + // // SegmentManager::Instance().Segment()->destroy_ptr(owner); + // } + // else + // { + // LOG(WARN) << "Shared pointer is zero."; + // } + + + ++numReceivedMsgs; + } + } + + LOG(INFO) << "Received " << numReceivedMsgs << " messages, leaving RUNNING state."; + + rateLogger.join(); +} + +void FairMQShmPrototypeSink::Log(const int intervalInMs) +{ + timestamp_t t0 = get_timestamp(); + timestamp_t t1; + timestamp_t msSinceLastLog; + + double mbPerSecIn = 0; + double msgPerSecIn = 0; + + while (CheckCurrentState(RUNNING)) + { + t1 = get_timestamp(); + + msSinceLastLog = (t1 - t0) / 1000.0L; + + mbPerSecIn = (static_cast(fBytesInNew - fBytesIn) / (1024. * 1024.)) / static_cast(msSinceLastLog) * 1000.; + fBytesIn = fBytesInNew; + + msgPerSecIn = static_cast(fMsgInNew - fMsgIn) / static_cast(msSinceLastLog) * 1000.; + fMsgIn = fMsgInNew; + + LOG(DEBUG) << fixed + << setprecision(0) << "in: " << msgPerSecIn << " msg (" + << setprecision(2) << mbPerSecIn << " MB)\t(" + << SegmentManager::Instance().Segment()->get_free_memory() / (1024. * 1024.) << " MB free)"; + + t0 = t1; + this_thread::sleep_for(chrono::milliseconds(intervalInMs)); + } +} diff --git a/fairmq/shmem/prototype/FairMQShmPrototypeSink.h b/fairmq/shmem/prototype/FairMQShmPrototypeSink.h new file mode 100644 index 00000000..154db3f6 --- /dev/null +++ b/fairmq/shmem/prototype/FairMQShmPrototypeSink.h @@ -0,0 +1,40 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +/** + * FairMQShmPrototypeSink.h + * + * @since 2016-04-08 + * @author A. Rybalchenko + */ + +#ifndef FAIRMQSHMPROTOTYPESINK_H_ +#define FAIRMQSHMPROTOTYPESINK_H_ + +#include + +#include "FairMQDevice.h" + +class FairMQShmPrototypeSink : public FairMQDevice +{ + public: + FairMQShmPrototypeSink(); + virtual ~FairMQShmPrototypeSink(); + + void Log(const int intervalInMs); + + protected: + unsigned long long fBytesIn; + unsigned long long fMsgIn; + std::atomic fBytesInNew; + std::atomic fMsgInNew; + + virtual void Init(); + virtual void Run(); +}; + +#endif /* FAIRMQSHMPROTOTYPESINK_H_ */ diff --git a/fairmq/shmem/prototype/ShmChunk.h b/fairmq/shmem/prototype/ShmChunk.h new file mode 100644 index 00000000..2c9cd975 --- /dev/null +++ b/fairmq/shmem/prototype/ShmChunk.h @@ -0,0 +1,182 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +/** + * ShmChunk.h + * + * @since 2016-04-08 + * @author A. Rybalchenko + */ + +#ifndef SHMCHUNK_H_ +#define SHMCHUNK_H_ + +#include +#include + +#include +#include + +#include "FairMQLogger.h" + +namespace bipc = boost::interprocess; + +class SegmentManager +{ + public: + static SegmentManager& Instance() + { + static SegmentManager man; + return man; + } + + void InitializeSegment(const std::string& op, const std::string& name, const size_t size = 0) + { + if (!fSegment) + { + try + { + if (op == "open_or_create") + { + fSegment = new bipc::managed_shared_memory(bipc::open_or_create, name.c_str(), size); + } + else if (op == "create_only") + { + fSegment = new bipc::managed_shared_memory(bipc::create_only, name.c_str(), size); + } + else if (op == "open_only") + { + int numTries = 0; + bool success = false; + + do + { + try + { + fSegment = new bipc::managed_shared_memory(bipc::open_only, name.c_str()); + success = true; + } + catch (bipc::interprocess_exception& ie) + { + if (++numTries == 5) + { + LOG(ERROR) << "Could not open shared memory after " << numTries << " attempts, exiting!"; + exit(EXIT_FAILURE); + } + else + { + LOG(DEBUG) << "Could not open shared memory segment on try " << numTries << ". Retrying in 1 second..."; + LOG(DEBUG) << ie.what(); + + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + } + } + } + while (!success); + } + else + { + LOG(ERROR) << "Unknown operation when initializing shared memory segment: " << op; + } + } + catch (std::exception& e) + { + LOG(ERROR) << "Exception during shared memory segment initialization: " << e.what() << ", application will now exit"; + exit(EXIT_FAILURE); + } + } + else + { + LOG(INFO) << "Segment already initialized"; + } + } + + bipc::managed_shared_memory* Segment() const + { + if (fSegment) + { + return fSegment; + } + else + { + LOG(ERROR) << "Segment not initialized"; + exit(EXIT_FAILURE); + } + } + + private: + SegmentManager() + : fSegment(nullptr) + {} + + bipc::managed_shared_memory* fSegment; +}; + +struct alignas(16) ExMetaHeader +{ + uint64_t fSize; + bipc::managed_shared_memory::handle_t fHandle; +}; + +// class ShmChunk +// { +// public: +// ShmChunk() +// : fHandle() +// , fSize(0) +// { +// } + +// ShmChunk(const size_t size) +// : fHandle() +// , fSize(size) +// { +// void* ptr = SegmentManager::Instance().Segment()->allocate(size); +// fHandle = SegmentManager::Instance().Segment()->get_handle_from_address(ptr); +// } + +// ~ShmChunk() +// { +// SegmentManager::Instance().Segment()->deallocate(SegmentManager::Instance().Segment()->get_address_from_handle(fHandle)); +// } + +// bipc::managed_shared_memory::handle_t GetHandle() const +// { +// return fHandle; +// } + +// void* GetData() const +// { +// return SegmentManager::Instance().Segment()->get_address_from_handle(fHandle); +// } + +// size_t GetSize() const +// { +// return fSize; +// } + +// private: +// bipc::managed_shared_memory::handle_t fHandle; +// size_t fSize; +// }; + +// typedef bipc::managed_shared_ptr::type ShPtrType; + +// struct ShPtrOwner +// { +// ShPtrOwner(const ShPtrType& other) +// : fPtr(other) +// {} + +// ShPtrOwner(const ShPtrOwner& other) +// : fPtr(other.fPtr) +// {} + +// ShPtrType fPtr; +// }; + +#endif /* SHMCHUNK_H_ */ diff --git a/fairmq/shmem/prototype/runShmPrototypeSampler.cxx b/fairmq/shmem/prototype/runShmPrototypeSampler.cxx new file mode 100644 index 00000000..1bb3ff3a --- /dev/null +++ b/fairmq/shmem/prototype/runShmPrototypeSampler.cxx @@ -0,0 +1,24 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#include "runFairMQDevice.h" +#include "FairMQShmPrototypeSampler.h" + +namespace bpo = boost::program_options; + +void addCustomOptions(bpo::options_description& options) +{ + options.add_options() + ("msg-size", bpo::value()->default_value(1000), "Message size in bytes") + ("msg-rate", bpo::value()->default_value(0), "Msg rate limit in maximum number of messages per second"); +} + +FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) +{ + return new FairMQShmPrototypeSampler(); +} diff --git a/fairmq/shmem/prototype/runShmPrototypeSink.cxx b/fairmq/shmem/prototype/runShmPrototypeSink.cxx new file mode 100644 index 00000000..5ff54b2a --- /dev/null +++ b/fairmq/shmem/prototype/runShmPrototypeSink.cxx @@ -0,0 +1,21 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#include "runFairMQDevice.h" +#include "FairMQShmPrototypeSink.h" + +namespace bpo = boost::program_options; + +void addCustomOptions(bpo::options_description& /*options*/) +{ +} + +FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) +{ + return new FairMQShmPrototypeSink(); +} diff --git a/fairmq/shmem/prototype/shm-prototype.json b/fairmq/shmem/prototype/shm-prototype.json new file mode 100644 index 00000000..43fd2dfe --- /dev/null +++ b/fairmq/shmem/prototype/shm-prototype.json @@ -0,0 +1,34 @@ +{ + "fairMQOptions": { + "devices": [ + { + "id": "sampler1", + "channels": [ + { + "name": "meta", + "type": "push", + "method": "bind", + "address": "tcp://127.0.0.1:5555", + "sndBufSize": 10, + "rcvBufSize": 10, + "rateLogging": 0 + } + ] + }, + { + "id": "sink1", + "channels": [ + { + "name": "meta", + "type": "pull", + "method": "connect", + "address": "tcp://127.0.0.1:5555", + "sndBufSize": 10, + "rcvBufSize": 10, + "rateLogging": 0 + } + ] + } + ] + } +} diff --git a/fairmq/shmem/prototype/startShmPrototype.sh.in b/fairmq/shmem/prototype/startShmPrototype.sh.in new file mode 100755 index 00000000..f7b28ba4 --- /dev/null +++ b/fairmq/shmem/prototype/startShmPrototype.sh.in @@ -0,0 +1,49 @@ +#!/bin/bash + +msgSize="1000000" +transport="zeromq" + +if [[ $1 =~ ^[0-9]+$ ]]; then + msgSize=$1 +fi + +echo "Starting shared memory example with message size of $msgSize bytes." +echo "" +echo "Usage: startShmPrototype [message size=1000000]" + +SAMPLER="shm-prototype-sampler" +SAMPLER+=" --id sampler1" +SAMPLER+=" --transport $transport" +# SAMPLER+=" --verbosity TRACE" +SAMPLER+=" --msg-size $msgSize" +# SAMPLER+=" --msg-rate 1000" +SAMPLER+=" --mq-config @CMAKE_BINARY_DIR@/bin/config/shm-prototype.json" +xterm -geometry 80x32+0+0 -hold -e @CMAKE_BINARY_DIR@/bin/prototype/shmem/$SAMPLER & + +SINK1="shm-prototype-sink" +SINK1+=" --id sink1" +SINK1+=" --transport $transport" +# SINK1+=" --verbose TRACE" +SINK1+=" --mq-config @CMAKE_BINARY_DIR@/bin/config/shm-prototype.json" +xterm -geometry 80x32+500+0 -hold -e @CMAKE_BINARY_DIR@/bin/prototype/shmem/$SINK1 & + +# SINK2="shm-prototype-sink" +# SINK2+=" --id sink2" +# SINK2+=" --transport $transport" +# # SINK2+=" --verbose TRACE" +# SINK2+=" --mq-config @CMAKE_BINARY_DIR@/bin/config/shm-prototype.json" +# xterm -geometry 80x32+500+500 -hold -e @CMAKE_BINARY_DIR@/bin/prototype/shmem/$SINK2 & + +# SINK3="shm-prototype-sink" +# SINK3+=" --id sink3" +# SINK3+=" --transport $transport" +# # SINK3+=" --verbose TRACE" +# SINK3+=" --mq-config @CMAKE_BINARY_DIR@/bin/config/shm-prototype.json" +# xterm -geometry 80x32+1000+0 -hold -e @CMAKE_BINARY_DIR@/bin/prototype/shmem/$SINK3 & + +# SINK4="shm-prototype-sink" +# SINK4+=" --id sink4" +# SINK4+=" --transport $transport" +# # SINK4+=" --verbose TRACE" +# SINK4+=" --mq-config @CMAKE_BINARY_DIR@/bin/config/shm-prototype.json" +# xterm -geometry 80x32+1000+500 -hold -e @CMAKE_BINARY_DIR@/bin/prototype/shmem/$SINK4 &