diff --git a/fairmq/nanomsg/FairMQTransportFactoryNN.cxx b/fairmq/nanomsg/FairMQTransportFactoryNN.cxx index a845682c..288bc3da 100644 --- a/fairmq/nanomsg/FairMQTransportFactoryNN.cxx +++ b/fairmq/nanomsg/FairMQTransportFactoryNN.cxx @@ -68,12 +68,12 @@ FairMQPollerPtr FairMQTransportFactoryNN::CreatePoller(const FairMQSocket& cmdSo void FairMQTransportFactoryNN::Shutdown() { - // nothing to do for nanomsg, transport is ready to be terminated any time. + nn_term(); } void FairMQTransportFactoryNN::Terminate() { - nn_term(); + // nothing to do for nanomsg } FairMQ::Transport FairMQTransportFactoryNN::GetType() const diff --git a/fairmq/run/startMQBenchmark.sh.in b/fairmq/run/startMQBenchmark.sh.in index f765007e..7459486a 100755 --- a/fairmq/run/startMQBenchmark.sh.in +++ b/fairmq/run/startMQBenchmark.sh.in @@ -71,6 +71,9 @@ SAMPLER+=" --same-msg $sameMsg" SAMPLER+=" --num-msgs $numMsgs" SAMPLER+=" --mq-config @CMAKE_BINARY_DIR@/bin/config/benchmark.json" xterm -geometry 90x23+0+0 -hold -e $affinitySamp @CMAKE_BINARY_DIR@/bin/$SAMPLER & +echo "" +echo "started: xterm -geometry 90x23+0+0 -hold -e $affinitySamp @CMAKE_BINARY_DIR@/bin/$SAMPLER" +echo "pid: $!" SINK="sink" SINK+=" --id sink1" @@ -80,3 +83,7 @@ SINK+=" --transport $transport" SINK+=" --num-msgs $numMsgs" SINK+=" --mq-config @CMAKE_BINARY_DIR@/bin/config/benchmark.json" xterm -geometry 90x23+550+0 -hold -e $affinitySink @CMAKE_BINARY_DIR@/bin/$SINK & +echo "" +echo "started: xterm -geometry 90x23+550+0 -hold -e $affinitySink @CMAKE_BINARY_DIR@/bin/$SINK" +echo "pid: $!" +echo "" diff --git a/fairmq/shmem/FairMQShmDeviceCounter.h b/fairmq/shmem/FairMQShmDeviceCounter.h new file mode 100644 index 00000000..1f4e3e89 --- /dev/null +++ b/fairmq/shmem/FairMQShmDeviceCounter.h @@ -0,0 +1,29 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +#ifndef FAIRMQSHMDEVICECOUNTER_H_ +#define FAIRMQSHMDEVICECOUNTER_H_ + +#include + +namespace FairMQ +{ +namespace shmem +{ + +struct FairMQShmDeviceCounter +{ + FairMQShmDeviceCounter(unsigned int c) + : count(c) + {} + std::atomic count; +}; + +} // namespace shmem +} // namespace FairMQ + +#endif /* FAIRMQSHMDEVICECOUNTER_H_ */ diff --git a/fairmq/shmem/FairMQTransportFactorySHM.cxx b/fairmq/shmem/FairMQTransportFactorySHM.cxx index c7dbeebc..7ec7e63a 100644 --- a/fairmq/shmem/FairMQTransportFactorySHM.cxx +++ b/fairmq/shmem/FairMQTransportFactorySHM.cxx @@ -5,10 +5,18 @@ * GNU Lesser General Public Licence version 3 (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ -#include "zmq.h" +#include + #include #include +#include +#include + +#include + +#include + #include "FairMQLogger.h" #include "FairMQShmManager.h" #include "FairMQTransportFactorySHM.h" @@ -16,11 +24,17 @@ using namespace std; using namespace FairMQ::shmem; +namespace bipc = boost::interprocess; FairMQ::Transport FairMQTransportFactorySHM::fTransportType = FairMQ::Transport::SHM; FairMQTransportFactorySHM::FairMQTransportFactorySHM() : fContext(nullptr) + , fHeartbeatSocket(nullptr) + , fHeartbeatThread() + , fSendHeartbeats(true) + , fShMutex(bipc::open_or_create, "fairmq_shmem_mutex") + , fDeviceCounter(nullptr) { int major, minor, patch; zmq_version(&major, &minor, &patch); @@ -51,17 +65,65 @@ void FairMQTransportFactorySHM::Initialize(const FairMQProgOptions* config) if (zmq_ctx_set(fContext, ZMQ_IO_THREADS, numIoThreads) != 0) { - LOG(ERROR) << "failed configuring context, reason: " << zmq_strerror(errno); + LOG(ERROR) << "shmem: failed configuring context, reason: " << zmq_strerror(errno); } // Set the maximum number of allowed sockets on the context. if (zmq_ctx_set(fContext, ZMQ_MAX_SOCKETS, 10000) != 0) { - LOG(ERROR) << "failed configuring context, reason: " << zmq_strerror(errno); + LOG(ERROR) << "shmem: failed configuring context, reason: " << zmq_strerror(errno); } + fSendHeartbeats = true; + fHeartbeatThread = thread(&FairMQTransportFactorySHM::SendHeartbeats, this); + Manager::Instance().InitializeSegment("open_or_create", "FairMQSharedMemory", segmentSize); LOG(DEBUG) << "shmem: created/opened shared memory segment of " << segmentSize << " bytes. Available are " << Manager::Instance().Segment()->get_free_memory() << " bytes."; + + { // mutex scope + bipc::scoped_lock lock(fShMutex); + + pair result = Manager::Instance().Segment()->find(bipc::unique_instance); + if (result.first != nullptr) + { + fDeviceCounter = result.first; + LOG(DEBUG) << "shmem: device counter found, with value of " << fDeviceCounter->count << ". incrementing."; + (fDeviceCounter->count)++; + LOG(DEBUG) << "shmem: incremented device counter, now: " << fDeviceCounter->count; + } + else + { + LOG(DEBUG) << "shmem: no device counter found, creating one and initializing with 1"; + fDeviceCounter = Manager::Instance().Segment()->construct(bipc::unique_instance)(1); + LOG(DEBUG) << "shmem: initialized device counter with: " << fDeviceCounter->count; + } + } +} + +void FairMQTransportFactorySHM::SendHeartbeats() +{ + while (fSendHeartbeats) + { + try + { + bipc::message_queue mq(bipc::open_only, "fairmq_shmem_control_queue"); + bool heartbeat = true; + boost::posix_time::ptime sndTill = boost::posix_time::microsec_clock::universal_time() + boost::posix_time::milliseconds(100); + if (mq.timed_send(&heartbeat, sizeof(heartbeat), 0, sndTill)) + { + this_thread::sleep_for(chrono::milliseconds(100)); + } + else + { + LOG(DEBUG) << "control queue timeout"; + } + } + catch (bipc::interprocess_exception& ie) + { + this_thread::sleep_for(chrono::milliseconds(500)); + // LOG(WARN) << "no fairmq_shmem_control_queue found"; + } + } } FairMQMessagePtr FairMQTransportFactorySHM::CreateMessage() const @@ -106,6 +168,9 @@ void FairMQTransportFactorySHM::Shutdown() { LOG(ERROR) << "shmem: failed shutting down context, reason: " << zmq_strerror(errno); } + + fSendHeartbeats = false; + fHeartbeatThread.join(); } void FairMQTransportFactorySHM::Terminate() @@ -120,23 +185,38 @@ void FairMQTransportFactorySHM::Terminate() } else { - fContext = NULL; + fContext = nullptr; return; } } } else { - LOG(ERROR) << "shmem: Terminate(): context now available for shutdown"; + LOG(ERROR) << "shmem: Terminate(): context not available for shutdown"; } - if (boost::interprocess::shared_memory_object::remove("FairMQSharedMemory")) - { - LOG(DEBUG) << "shmem: successfully removed shared memory segment after the device has stopped."; - } - else - { - LOG(DEBUG) << "shmem: did not remove shared memory segment after the device stopped. Already removed?"; + { // mutex scope + bipc::scoped_lock lock(fShMutex); + + (fDeviceCounter->count)--; + + if (fDeviceCounter->count == 0) + { + LOG(DEBUG) << "shmem: last FairMQSharedMemory user, removing segment."; + + if (bipc::shared_memory_object::remove("FairMQSharedMemory")) + { + LOG(DEBUG) << "shmem: successfully removed shared memory segment after the device has stopped."; + } + else + { + LOG(DEBUG) << "shmem: did not remove shared memory segment after the device stopped. Already removed?"; + } + } + else + { + LOG(DEBUG) << "shmem: other FairMQSharedMemory users present (" << fDeviceCounter->count << "), not removing it."; + } } } diff --git a/fairmq/shmem/FairMQTransportFactorySHM.h b/fairmq/shmem/FairMQTransportFactorySHM.h index 8219ce80..3a32166a 100644 --- a/fairmq/shmem/FairMQTransportFactorySHM.h +++ b/fairmq/shmem/FairMQTransportFactorySHM.h @@ -10,11 +10,16 @@ #include #include +#include +#include + +#include #include "FairMQTransportFactory.h" #include "FairMQMessageSHM.h" #include "FairMQSocketSHM.h" #include "FairMQPollerSHM.h" +#include "FairMQShmDeviceCounter.h" class FairMQTransportFactorySHM : public FairMQTransportFactory { @@ -38,11 +43,18 @@ class FairMQTransportFactorySHM : public FairMQTransportFactory void Shutdown() override; void Terminate() override; + void SendHeartbeats(); + ~FairMQTransportFactorySHM() override {}; private: static FairMQ::Transport fTransportType; void* fContext; + void* fHeartbeatSocket; + std::thread fHeartbeatThread; + std::atomic fSendHeartbeats; + boost::interprocess::named_mutex fShMutex; + FairMQ::shmem::FairMQShmDeviceCounter* fDeviceCounter; }; #endif /* FAIRMQTRANSPORTFACTORYSHM_H_ */