mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-17 18:41:46 +00:00
Compare commits
9 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
7cacf471b9 | ||
|
7316b0e7f2 | ||
|
1fa82f5f22 | ||
|
1bb77bf47b | ||
|
07fe02a0a0 | ||
|
9cbccface7 | ||
|
7b773cde51 | ||
|
fd282fa950 | ||
|
008be36125 |
@@ -29,7 +29,7 @@ Set(configure_options "${configure_options};-DCMAKE_PREFIX_PATH=$ENV{SIMPATH}")
|
|||||||
Set(configure_options "${configure_options};-DBUILD_NANOMSG_TRANSPORT=ON")
|
Set(configure_options "${configure_options};-DBUILD_NANOMSG_TRANSPORT=ON")
|
||||||
# Set(configure_options "${configure_options};-DBUILD_OFI_TRANSPORT=ON")
|
# Set(configure_options "${configure_options};-DBUILD_OFI_TRANSPORT=ON")
|
||||||
Set(configure_options "${configure_options};-DBUILD_DDS_PLUGIN=ON")
|
Set(configure_options "${configure_options};-DBUILD_DDS_PLUGIN=ON")
|
||||||
Set(configure_options "${configure_options};-DBUILD_SDK=ON")
|
Set(configure_options "${configure_options};-DBUILD_SDK=OFF")
|
||||||
Set(configure_options "${configure_options};-DFAST_BUILD=ON")
|
Set(configure_options "${configure_options};-DFAST_BUILD=ON")
|
||||||
Set(configure_options "${configure_options};-DCOTIRE_MAXIMUM_NUMBER_OF_UNITY_INCLUDES=-j$ENV{number_of_processors}")
|
Set(configure_options "${configure_options};-DCOTIRE_MAXIMUM_NUMBER_OF_UNITY_INCLUDES=-j$ENV{number_of_processors}")
|
||||||
|
|
||||||
|
@@ -37,6 +37,12 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-ex-dds-env.sh ${CMAKE_CURRENT_
|
|||||||
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-dds.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-dds.sh @ONLY)
|
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-dds.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-dds.sh @ONLY)
|
||||||
|
|
||||||
# test
|
# test
|
||||||
|
add_test(NAME Example.DDS.localhost COMMAND ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-dds.sh localhost)
|
||||||
|
set_tests_properties(Example.DDS.localhost PROPERTIES
|
||||||
|
TIMEOUT 15
|
||||||
|
RUN_SERIAL true
|
||||||
|
PASS_REGULAR_EXPRESSION "Example successful"
|
||||||
|
)
|
||||||
|
|
||||||
# install
|
# install
|
||||||
|
|
||||||
|
@@ -8,8 +8,8 @@
|
|||||||
<declrequirement name="SinkWorker" type="wnname" value="sink"/>
|
<declrequirement name="SinkWorker" type="wnname" value="sink"/>
|
||||||
|
|
||||||
<decltask name="Sampler">
|
<decltask name="Sampler">
|
||||||
|
<exe>fairmq-ex-dds-sampler --color false --channel-config name=data1,type=push,method=bind --rate 100 -P dds</exe>
|
||||||
<env reachable="false">fairmq-ex-dds-env.sh</env>
|
<env reachable="false">fairmq-ex-dds-env.sh</env>
|
||||||
<exe>fairmq-ex-dds-sampler --id sampler --color false --channel-config name=data1,type=push,method=bind --rate 100 -P dds</exe>
|
|
||||||
<requirements>
|
<requirements>
|
||||||
<name>SamplerWorker</name>
|
<name>SamplerWorker</name>
|
||||||
</requirements>
|
</requirements>
|
||||||
@@ -19,10 +19,10 @@
|
|||||||
</decltask>
|
</decltask>
|
||||||
|
|
||||||
<decltask name="Processor">
|
<decltask name="Processor">
|
||||||
|
<exe>fairmq-ex-dds-processor --color false --channel-config name=data1,type=pull,method=connect name=data2,type=push,method=connect -P dds</exe>
|
||||||
<env reachable="false">fairmq-ex-dds-env.sh</env>
|
<env reachable="false">fairmq-ex-dds-env.sh</env>
|
||||||
<exe>fairmq-ex-dds-processor --id processor_%taskIndex% --color false --channel-config name=data1,type=pull,method=connect name=data2,type=push,method=connect -P dds</exe>
|
|
||||||
<requirements>
|
<requirements>
|
||||||
<id>ProcessorWorker</id>
|
<name>ProcessorWorker</name>
|
||||||
</requirements>
|
</requirements>
|
||||||
<properties>
|
<properties>
|
||||||
<name access="read">data1</name>
|
<name access="read">data1</name>
|
||||||
@@ -31,8 +31,8 @@
|
|||||||
</decltask>
|
</decltask>
|
||||||
|
|
||||||
<decltask name="Sink">
|
<decltask name="Sink">
|
||||||
|
<exe>fairmq-ex-dds-sink --color false --channel-config name=data2,type=pull,method=bind -P dds</exe>
|
||||||
<env reachable="false">fairmq-ex-dds-env.sh</env>
|
<env reachable="false">fairmq-ex-dds-env.sh</env>
|
||||||
<exe>fairmq-ex-dds-sink --id sink --color false --channel-config name=data2,type=pull,method=bind -P dds</exe>
|
|
||||||
<requirements>
|
<requirements>
|
||||||
<name>SinkWorker</name>
|
<name>SinkWorker</name>
|
||||||
</requirements>
|
</requirements>
|
||||||
|
@@ -8,8 +8,8 @@
|
|||||||
<declrequirement name="SinkWorker" type="wnname" value="sink"/>
|
<declrequirement name="SinkWorker" type="wnname" value="sink"/>
|
||||||
|
|
||||||
<decltask name="Sampler">
|
<decltask name="Sampler">
|
||||||
|
<exe>fairmq-ex-dds-sampler --color false --channel-config name=data1,type=push,method=bind -P dds --iterations 10</exe>
|
||||||
<env reachable="false">fairmq-ex-dds-env.sh</env>
|
<env reachable="false">fairmq-ex-dds-env.sh</env>
|
||||||
<exe>fairmq-ex-dds-sampler --id sampler --color false --channel-config name=data1,type=push,method=bind -P dds --iterations 10</exe>
|
|
||||||
<requirements>
|
<requirements>
|
||||||
<name>SamplerWorker</name>
|
<name>SamplerWorker</name>
|
||||||
</requirements>
|
</requirements>
|
||||||
@@ -19,10 +19,10 @@
|
|||||||
</decltask>
|
</decltask>
|
||||||
|
|
||||||
<decltask name="Processor">
|
<decltask name="Processor">
|
||||||
|
<exe>fairmq-ex-dds-processor --color false --channel-config name=data1,type=pull,method=connect name=data2,type=push,method=connect -P dds</exe>
|
||||||
<env reachable="false">fairmq-ex-dds-env.sh</env>
|
<env reachable="false">fairmq-ex-dds-env.sh</env>
|
||||||
<exe>fairmq-ex-dds-processor --id processor_%taskIndex% --color false --channel-config name=data1,type=pull,method=connect name=data2,type=push,method=connect -P dds</exe>
|
|
||||||
<requirements>
|
<requirements>
|
||||||
<id>ProcessorWorker</id>
|
<name>ProcessorWorker</name>
|
||||||
</requirements>
|
</requirements>
|
||||||
<properties>
|
<properties>
|
||||||
<name access="read">data1</name>
|
<name access="read">data1</name>
|
||||||
@@ -31,8 +31,8 @@
|
|||||||
</decltask>
|
</decltask>
|
||||||
|
|
||||||
<decltask name="Sink">
|
<decltask name="Sink">
|
||||||
|
<exe>fairmq-ex-dds-sink --color false --channel-config name=data2,type=pull,method=bind -P dds --iterations 10</exe>
|
||||||
<env reachable="false">fairmq-ex-dds-env.sh</env>
|
<env reachable="false">fairmq-ex-dds-env.sh</env>
|
||||||
<exe>fairmq-ex-dds-sink --id sink --color false --channel-config name=data2,type=pull,method=bind -P dds --iterations 10</exe>
|
|
||||||
<requirements>
|
<requirements>
|
||||||
<name>SinkWorker</name>
|
<name>SinkWorker</name>
|
||||||
</requirements>
|
</requirements>
|
||||||
|
@@ -46,7 +46,7 @@ echo "TOPOLOGY FILE: ${topologyFile}"
|
|||||||
|
|
||||||
# TODO Uncomment once DDS 2.6 is released
|
# TODO Uncomment once DDS 2.6 is released
|
||||||
# dds-info --active-topology
|
# dds-info --active-topology
|
||||||
dds-topology --disable-validation --activate ${topologyFile}
|
dds-topology --activate ${topologyFile}
|
||||||
# dds-info --active-topology
|
# dds-info --active-topology
|
||||||
# dds-info --wait-for-executing-agents ${requiredNofAgents}
|
# dds-info --wait-for-executing-agents ${requiredNofAgents}
|
||||||
sleep 1
|
sleep 1
|
||||||
@@ -82,4 +82,7 @@ logDir="${wrkDir}/logs"
|
|||||||
for file in $(find "${logDir}" -name "*.tar.gz"); do tar -xf ${file} -C "${logDir}" ; done
|
for file in $(find "${logDir}" -name "*.tar.gz"); do tar -xf ${file} -C "${logDir}" ; done
|
||||||
echo "AGENT LOG FILES IN: ${logDir}"
|
echo "AGENT LOG FILES IN: ${logDir}"
|
||||||
|
|
||||||
|
# This string is used by ctest to detect success
|
||||||
|
echo "Example successful :)"
|
||||||
|
|
||||||
# Cleanup function is called by EXIT trap
|
# Cleanup function is called by EXIT trap
|
||||||
|
@@ -63,8 +63,6 @@ bool DeviceRunner::HandleGeneralOptions(const fair::mq::ProgOptions& config, boo
|
|||||||
<< " / __/ / /_/ / / / _ / / / / /_/ / " << FAIRMQ_REPO_URL << endl
|
<< " / __/ / /_/ / / / _ / / / / /_/ / " << FAIRMQ_REPO_URL << endl
|
||||||
<< " /_/ \\__,_/_/_/ /_/ /_/ \\___\\_\\ " << FAIRMQ_LICENSE << " © " << FAIRMQ_COPYRIGHT << endl;
|
<< " /_/ \\__,_/_/_/ /_/ /_/ \\___\\_\\ " << FAIRMQ_LICENSE << " © " << FAIRMQ_COPYRIGHT << endl;
|
||||||
}
|
}
|
||||||
|
|
||||||
config.PrintOptions();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
@@ -169,6 +167,9 @@ auto DeviceRunner::Run() -> int
|
|||||||
// Instantiate and run plugins
|
// Instantiate and run plugins
|
||||||
fPluginManager.InstantiatePlugins();
|
fPluginManager.InstantiatePlugins();
|
||||||
|
|
||||||
|
// Log IDLE configuration
|
||||||
|
fConfig.PrintOptions();
|
||||||
|
|
||||||
// Run the device
|
// Run the device
|
||||||
fDevice->RunStateMachine();
|
fDevice->RunStateMachine();
|
||||||
|
|
||||||
|
@@ -74,7 +74,8 @@ auto PluginServices::ReleaseDeviceControl(const string& controller) -> void
|
|||||||
if (fDeviceController == controller) {
|
if (fDeviceController == controller) {
|
||||||
fDeviceController = boost::none;
|
fDeviceController = boost::none;
|
||||||
} else {
|
} else {
|
||||||
throw DeviceControlError{tools::ToString("Plugin '", controller, "' cannot release control because it has not taken over control.")};
|
LOG(debug) << "Plugin '" << controller << "' cannot release control "
|
||||||
|
<< "because it has no control.";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -51,21 +51,22 @@ DDS::DDS(const string& name,
|
|||||||
, fLastState(DeviceState::Idle)
|
, fLastState(DeviceState::Idle)
|
||||||
, fDeviceTerminationRequested(false)
|
, fDeviceTerminationRequested(false)
|
||||||
, fHeartbeatInterval(100)
|
, fHeartbeatInterval(100)
|
||||||
|
, fUpdatesAllowed(false)
|
||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
TakeDeviceControl();
|
TakeDeviceControl();
|
||||||
fControllerThread = thread(&DDS::HandleControl, this);
|
|
||||||
fHeartbeatThread = thread(&DDS::HeartbeatSender, this);
|
|
||||||
} catch (PluginServices::DeviceControlError& e) {
|
|
||||||
LOG(debug) << e.what();
|
|
||||||
} catch (exception& e) {
|
|
||||||
LOG(error) << "Error in plugin initialization: " << e.what();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
auto DDS::HandleControl() -> void
|
fHeartbeatThread = thread(&DDS::HeartbeatSender, this);
|
||||||
{
|
|
||||||
try {
|
std::string deviceId(GetProperty<std::string>("id"));
|
||||||
|
if (deviceId.empty()) {
|
||||||
|
SetProperty<std::string>("id", dds::env_prop<dds::task_path>());
|
||||||
|
}
|
||||||
|
std::string sessionId(GetProperty<std::string>("session"));
|
||||||
|
if (sessionId == "default") {
|
||||||
|
SetProperty<std::string>("session", dds::env_prop<dds::dds_session_id>());
|
||||||
|
}
|
||||||
|
|
||||||
auto control = GetProperty<string>("control");
|
auto control = GetProperty<string>("control");
|
||||||
bool staticMode(false);
|
bool staticMode(false);
|
||||||
if (control == "static") {
|
if (control == "static") {
|
||||||
@@ -85,22 +86,28 @@ auto DDS::HandleControl() -> void
|
|||||||
// subscribe to device state changes, pushing new state changes into the event queue
|
// subscribe to device state changes, pushing new state changes into the event queue
|
||||||
SubscribeToDeviceStateChange([&](DeviceState newState) {
|
SubscribeToDeviceStateChange([&](DeviceState newState) {
|
||||||
fStateQueue.Push(newState);
|
fStateQueue.Push(newState);
|
||||||
switch(newState) {
|
switch (newState) {
|
||||||
case DeviceState::Bound:
|
case DeviceState::Bound:
|
||||||
// Receive addresses of connecting channels from DDS
|
// Receive addresses of connecting channels from DDS
|
||||||
// and propagate addresses of bound channels to DDS.
|
// and propagate addresses of bound channels to DDS.
|
||||||
FillChannelContainers();
|
FillChannelContainers();
|
||||||
|
|
||||||
// publish bound addresses via DDS at keys corresponding to the channel prefixes, e.g. 'data' in data[i]
|
// publish bound addresses via DDS at keys corresponding to the channel
|
||||||
PublishBoundChannels();
|
// prefixes, e.g. 'data' in data[i]
|
||||||
break;
|
PublishBoundChannels();
|
||||||
case DeviceState::Exiting:
|
break;
|
||||||
fDeviceTerminationRequested = true;
|
case DeviceState::ResettingDevice: {
|
||||||
UnsubscribeFromDeviceStateChange();
|
std::lock_guard<std::mutex> lk(fUpdateMutex);
|
||||||
ReleaseDeviceControl();
|
fUpdatesAllowed = false;
|
||||||
break;
|
break;
|
||||||
default:
|
}
|
||||||
break;
|
case DeviceState::Exiting:
|
||||||
|
fDeviceTerminationRequested = true;
|
||||||
|
UnsubscribeFromDeviceStateChange();
|
||||||
|
ReleaseDeviceControl();
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
|
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
|
||||||
@@ -114,15 +121,26 @@ auto DDS::HandleControl() -> void
|
|||||||
});
|
});
|
||||||
|
|
||||||
if (staticMode) {
|
if (staticMode) {
|
||||||
TransitionDeviceStateTo(DeviceState::Running);
|
fControllerThread = thread(&DDS::StaticControl, this);
|
||||||
|
|
||||||
// wait until stop signal
|
|
||||||
unique_lock<mutex> lock(fStopMutex);
|
|
||||||
while (!fDeviceTerminationRequested) {
|
|
||||||
fStopCondition.wait_for(lock, chrono::seconds(1));
|
|
||||||
}
|
|
||||||
LOG(debug) << "Stopping DDS control plugin";
|
|
||||||
}
|
}
|
||||||
|
} catch (PluginServices::DeviceControlError& e) {
|
||||||
|
LOG(debug) << e.what();
|
||||||
|
} catch (exception& e) {
|
||||||
|
LOG(error) << "Error in plugin initialization: " << e.what();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
auto DDS::StaticControl() -> void
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
TransitionDeviceStateTo(DeviceState::Running);
|
||||||
|
|
||||||
|
// wait until stop signal
|
||||||
|
unique_lock<mutex> lock(fStopMutex);
|
||||||
|
while (!fDeviceTerminationRequested) {
|
||||||
|
fStopCondition.wait_for(lock, chrono::seconds(1));
|
||||||
|
}
|
||||||
|
LOG(debug) << "Stopping DDS plugin static controller";
|
||||||
} catch (DeviceErrorState&) {
|
} catch (DeviceErrorState&) {
|
||||||
ReleaseDeviceControl();
|
ReleaseDeviceControl();
|
||||||
} catch (exception& e) {
|
} catch (exception& e) {
|
||||||
@@ -197,6 +215,11 @@ auto DDS::FillChannelContainers() -> void
|
|||||||
LOG(debug) << "dds-i-n: adding " << chanName << " -> i: " << i << " n: " << n;
|
LOG(debug) << "dds-i-n: adding " << chanName << " -> i: " << i << " n: " << n;
|
||||||
fIofN.insert(make_pair(chanName, IofN(i, n)));
|
fIofN.insert(make_pair(chanName, IofN(i, n)));
|
||||||
}
|
}
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lk(fUpdateMutex);
|
||||||
|
fUpdatesAllowed = true;
|
||||||
|
}
|
||||||
|
fUpdateCondition.notify_one();
|
||||||
} catch (const exception& e) {
|
} catch (const exception& e) {
|
||||||
LOG(error) << "Error filling channel containers: " << e.what();
|
LOG(error) << "Error filling channel containers: " << e.what();
|
||||||
}
|
}
|
||||||
@@ -209,6 +232,10 @@ auto DDS::SubscribeForConnectingChannels() -> void
|
|||||||
fDDS.SubscribeKeyValue([&] (const string& propertyId, const string& value, uint64_t senderTaskID) {
|
fDDS.SubscribeKeyValue([&] (const string& propertyId, const string& value, uint64_t senderTaskID) {
|
||||||
try {
|
try {
|
||||||
LOG(debug) << "Received update for " << propertyId << ": value=" << value << ", senderTaskID=" << senderTaskID;
|
LOG(debug) << "Received update for " << propertyId << ": value=" << value << ", senderTaskID=" << senderTaskID;
|
||||||
|
|
||||||
|
std::unique_lock<std::mutex> lk(fUpdateMutex);
|
||||||
|
fUpdateCondition.wait(lk, [&]{ return fUpdatesAllowed; });
|
||||||
|
|
||||||
string val = value;
|
string val = value;
|
||||||
// check if it is to handle as one out of multiple values
|
// check if it is to handle as one out of multiple values
|
||||||
auto it = fIofN.find(propertyId);
|
auto it = fIofN.find(propertyId);
|
||||||
|
@@ -63,6 +63,9 @@ struct DDSSubscription
|
|||||||
LOG(error) << "DDS Error received: error code: " << errorCode << ", error message: " << errorMsg;
|
LOG(error) << "DDS Error received: error code: " << errorCode << ", error message: " << errorMsg;
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// fDDSCustomCmd.subscribe([](const std::string& cmd, const std::string& cond, uint64_t senderId) {
|
||||||
|
// LOG(debug) << "cmd: " << cmd << ", cond: " << cond << ", senderId: " << senderId;
|
||||||
|
// });
|
||||||
assert(!dds_session_id.empty());
|
assert(!dds_session_id.empty());
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -125,7 +128,7 @@ class DDS : public Plugin
|
|||||||
~DDS();
|
~DDS();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
auto HandleControl() -> void;
|
auto StaticControl() -> void;
|
||||||
|
|
||||||
auto FillChannelContainers() -> void;
|
auto FillChannelContainers() -> void;
|
||||||
auto SubscribeForConnectingChannels() -> void;
|
auto SubscribeForConnectingChannels() -> void;
|
||||||
@@ -160,6 +163,10 @@ class DDS : public Plugin
|
|||||||
|
|
||||||
std::thread fHeartbeatThread;
|
std::thread fHeartbeatThread;
|
||||||
std::chrono::milliseconds fHeartbeatInterval;
|
std::chrono::milliseconds fHeartbeatInterval;
|
||||||
|
|
||||||
|
bool fUpdatesAllowed;
|
||||||
|
std::mutex fUpdateMutex;
|
||||||
|
std::condition_variable fUpdateCondition;
|
||||||
};
|
};
|
||||||
|
|
||||||
Plugin::ProgOptions DDSProgramOptions()
|
Plugin::ProgOptions DDSProgramOptions()
|
||||||
|
@@ -273,7 +273,7 @@ int main(int argc, char* argv[])
|
|||||||
cerr << "state-changes-unsubscription failed with return code: " << parts[2];
|
cerr << "state-changes-unsubscription failed with return code: " << parts[2];
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// cout << "Received: " << msg << endl;
|
cout << "Received: " << msg << endl;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@@ -20,7 +20,9 @@
|
|||||||
|
|
||||||
#include <cassert>
|
#include <cassert>
|
||||||
#include <cstdlib>
|
#include <cstdlib>
|
||||||
|
#include <mutex>
|
||||||
#include <sstream>
|
#include <sstream>
|
||||||
|
#include <unordered_map>
|
||||||
#include <utility>
|
#include <utility>
|
||||||
|
|
||||||
namespace fair {
|
namespace fair {
|
||||||
@@ -134,6 +136,8 @@ struct DDSSession::Impl
|
|||||||
dds::intercom_api::CCustomCmd fDDSCustomCmd;
|
dds::intercom_api::CCustomCmd fDDSCustomCmd;
|
||||||
Id fId;
|
Id fId;
|
||||||
bool fStopOnDestruction;
|
bool fStopOnDestruction;
|
||||||
|
mutable std::mutex fMtx;
|
||||||
|
std::unordered_map<DDSChannel::Id, DDSTask::Id> fTaskIdByChannelIdMap;
|
||||||
};
|
};
|
||||||
|
|
||||||
DDSSession::DDSSession(DDSEnvironment env)
|
DDSSession::DDSSession(DDSEnvironment env)
|
||||||
@@ -316,6 +320,18 @@ void DDSSession::UnsubscribeFromCommands()
|
|||||||
|
|
||||||
void DDSSession::SendCommand(const std::string& cmd) { fImpl->fDDSCustomCmd.send(cmd, ""); }
|
void DDSSession::SendCommand(const std::string& cmd) { fImpl->fDDSCustomCmd.send(cmd, ""); }
|
||||||
|
|
||||||
|
auto DDSSession::UpdateChannelToTaskAssociation(DDSChannel::Id channelId, DDSTask::Id taskId) -> void
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lk(fImpl->fMtx);
|
||||||
|
fImpl->fTaskIdByChannelIdMap[channelId] = taskId;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto DDSSession::GetTaskId(DDSChannel::Id channelId) const -> DDSTask::Id
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lk(fImpl->fMtx);
|
||||||
|
return fImpl->fTaskIdByChannelIdMap.at(channelId);
|
||||||
|
}
|
||||||
|
|
||||||
auto operator<<(std::ostream& os, const DDSSession& session) -> std::ostream&
|
auto operator<<(std::ostream& os, const DDSSession& session) -> std::ostream&
|
||||||
{
|
{
|
||||||
return os << "$DDS_SESSION_ID: " << session.GetId();
|
return os << "$DDS_SESSION_ID: " << session.GetId();
|
||||||
|
@@ -43,6 +43,18 @@ auto operator>>(std::istream& is, DDSRMSPlugin& plugin) -> std::istream&;
|
|||||||
class DDSTopology;
|
class DDSTopology;
|
||||||
class DDSAgent;
|
class DDSAgent;
|
||||||
|
|
||||||
|
class DDSTask
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
using Id = std::uint64_t;
|
||||||
|
};
|
||||||
|
|
||||||
|
class DDSChannel
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
using Id = std::uint64_t;
|
||||||
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @class DDSSession DDSSession.h <fairmq/sdk/DDSSession.h>
|
* @class DDSSession DDSSession.h <fairmq/sdk/DDSSession.h>
|
||||||
* @brief Represents a DDS session
|
* @brief Represents a DDS session
|
||||||
@@ -95,6 +107,8 @@ class DDSSession
|
|||||||
void SubscribeToCommands(std::function<void(const std::string& msg, const std::string& condition, uint64_t senderId)>);
|
void SubscribeToCommands(std::function<void(const std::string& msg, const std::string& condition, uint64_t senderId)>);
|
||||||
void UnsubscribeFromCommands();
|
void UnsubscribeFromCommands();
|
||||||
void SendCommand(const std::string&);
|
void SendCommand(const std::string&);
|
||||||
|
auto UpdateChannelToTaskAssociation(DDSChannel::Id, DDSTask::Id) -> void;
|
||||||
|
auto GetTaskId(DDSChannel::Id) const -> DDSTask::Id;
|
||||||
|
|
||||||
friend auto operator<<(std::ostream& os, const DDSSession& session) -> std::ostream&;
|
friend auto operator<<(std::ostream& os, const DDSSession& session) -> std::ostream&;
|
||||||
|
|
||||||
|
@@ -75,7 +75,7 @@ Topology::Topology(DDSTopology topo, DDSSession session)
|
|||||||
// LOG(debug) << "Adding device " << d;
|
// LOG(debug) << "Adding device " << d;
|
||||||
fState.emplace(d, DeviceStatus{ false, DeviceState::Ok });
|
fState.emplace(d, DeviceStatus{ false, DeviceState::Ok });
|
||||||
}
|
}
|
||||||
fDDSSession.SubscribeToCommands([this](const std::string& msg, const std::string& /* condition */, uint64_t senderId) {
|
fDDSSession.SubscribeToCommands([this](const std::string& msg, const std::string& /* condition */, DDSChannel::Id senderId) {
|
||||||
// LOG(debug) << "Received from " << senderId << ": " << msg;
|
// LOG(debug) << "Received from " << senderId << ": " << msg;
|
||||||
std::vector<std::string> parts;
|
std::vector<std::string> parts;
|
||||||
boost::algorithm::split(parts, msg, boost::algorithm::is_any_of(":,"));
|
boost::algorithm::split(parts, msg, boost::algorithm::is_any_of(":,"));
|
||||||
@@ -85,7 +85,9 @@ Topology::Topology(DDSTopology topo, DDSSession session)
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (parts[0] == "state-change") {
|
if (parts[0] == "state-change") {
|
||||||
AddNewStateEntry(std::stoull(parts[2]), parts[3]);
|
DDSTask::Id taskId(std::stoull(parts[2]));
|
||||||
|
fDDSSession.UpdateChannelToTaskAssociation(senderId, taskId);
|
||||||
|
AddNewStateEntry(taskId, parts[3]);
|
||||||
} else if (parts[0] == "state-changes-subscription") {
|
} else if (parts[0] == "state-changes-subscription") {
|
||||||
LOG(debug) << "Received from " << senderId << ": " << msg;
|
LOG(debug) << "Received from " << senderId << ": " << msg;
|
||||||
if (parts[2] != "OK") {
|
if (parts[2] != "OK") {
|
||||||
@@ -96,7 +98,16 @@ Topology::Topology(DDSTopology topo, DDSSession session)
|
|||||||
LOG(error) << "state-changes-unsubscription failed with return code: " << parts[2];
|
LOG(error) << "state-changes-unsubscription failed with return code: " << parts[2];
|
||||||
}
|
}
|
||||||
} else if (parts[1] == "could not queue") {
|
} else if (parts[1] == "could not queue") {
|
||||||
LOG(warn) << "Could not queue " << parts[2] << " transition on " << senderId;
|
std::unique_lock<std::mutex> lock(fMtx);
|
||||||
|
|
||||||
|
if (fStateChangeOngoing) {
|
||||||
|
if (fState.at(fDDSSession.GetTaskId(senderId)).state != fTargetState) {
|
||||||
|
fStateChangeError =
|
||||||
|
tools::ToString("Could not queue ", parts[2], " transition on ", senderId);
|
||||||
|
lock.unlock();
|
||||||
|
fCV.notify_one();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
fDDSSession.StartDDSService();
|
fDDSSession.StartDDSService();
|
||||||
@@ -122,13 +133,13 @@ auto Topology::ChangeState(TopologyTransition transition, ChangeStateCallback cb
|
|||||||
std::unique_lock<std::mutex> lock(fMtx);
|
std::unique_lock<std::mutex> lock(fMtx);
|
||||||
if (fStateChangeOngoing) {
|
if (fStateChangeOngoing) {
|
||||||
throw std::runtime_error("A state change request is already in progress, concurrent requests are currently not supported");
|
throw std::runtime_error("A state change request is already in progress, concurrent requests are currently not supported");
|
||||||
lock.unlock();
|
|
||||||
}
|
}
|
||||||
LOG(debug) << "Initiating ChangeState with " << transition << " to " << expectedState.at(transition);
|
LOG(debug) << "Initiating ChangeState with " << transition << " to " << expectedState.at(transition);
|
||||||
fStateChangeOngoing = true;
|
fStateChangeOngoing = true;
|
||||||
fChangeStateCallback = cb;
|
fChangeStateCallback = cb;
|
||||||
fStateChangeTimeout = timeout;
|
fStateChangeTimeout = timeout;
|
||||||
fTargetState = expectedState.at(transition);
|
fTargetState = expectedState.at(transition);
|
||||||
|
fStateChangeError.clear();
|
||||||
|
|
||||||
fDDSSession.SendCommand(GetTransitionName(transition));
|
fDDSSession.SendCommand(GetTransitionName(transition));
|
||||||
}
|
}
|
||||||
@@ -155,12 +166,15 @@ void Topology::WaitForState()
|
|||||||
while (!fShutdown) {
|
while (!fShutdown) {
|
||||||
if (fStateChangeOngoing) {
|
if (fStateChangeOngoing) {
|
||||||
try {
|
try {
|
||||||
|
std::unique_lock<std::mutex> lock(fMtx);
|
||||||
|
|
||||||
auto condition = [&] {
|
auto condition = [&] {
|
||||||
// LOG(info) << "checking condition";
|
// LOG(info) << "checking condition";
|
||||||
// LOG(info) << "fShutdown: " << fShutdown;
|
// LOG(info) << "fShutdown: " << fShutdown;
|
||||||
// LOG(info) << "condition: " << std::all_of(fState.cbegin(), fState.cend(),
|
// LOG(info) << "condition: " << std::all_of(fState.cbegin(), fState.cend(),
|
||||||
// [&](TopologyState::value_type i) { return i.second.state == fTargetState; });
|
// [&](TopologyState::value_type i) { return i.second.state == fTargetState; });
|
||||||
return fShutdown
|
return fShutdown
|
||||||
|
|| !fStateChangeError.empty()
|
||||||
|| std::all_of(
|
|| std::all_of(
|
||||||
fState.cbegin(), fState.cend(), [&](TopologyState::value_type i) {
|
fState.cbegin(), fState.cend(), [&](TopologyState::value_type i) {
|
||||||
// TODO Check, if we can make sure that EXITING state change event are not missed
|
// TODO Check, if we can make sure that EXITING state change event are not missed
|
||||||
@@ -170,8 +184,6 @@ void Topology::WaitForState()
|
|||||||
});
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
std::unique_lock<std::mutex> lock(fMtx);
|
|
||||||
|
|
||||||
if (fStateChangeTimeout > std::chrono::milliseconds(0)) {
|
if (fStateChangeTimeout > std::chrono::milliseconds(0)) {
|
||||||
if (!fCV.wait_for(lock, fStateChangeTimeout, condition)) {
|
if (!fCV.wait_for(lock, fStateChangeTimeout, condition)) {
|
||||||
// LOG(debug) << "timeout";
|
// LOG(debug) << "timeout";
|
||||||
@@ -187,6 +199,15 @@ void Topology::WaitForState()
|
|||||||
}
|
}
|
||||||
|
|
||||||
fStateChangeOngoing = false;
|
fStateChangeOngoing = false;
|
||||||
|
|
||||||
|
if (!fStateChangeError.empty()) {
|
||||||
|
TopologyState state = fState;
|
||||||
|
lock.unlock();
|
||||||
|
fChangeStateCallback(
|
||||||
|
{{AsyncOpResultCode::Error, fStateChangeError}, std::move(state)});
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
if (fShutdown) {
|
if (fShutdown) {
|
||||||
LOG(debug) << "Aborting because a shutdown was requested";
|
LOG(debug) << "Aborting because a shutdown was requested";
|
||||||
TopologyState state = fState;
|
TopologyState state = fState;
|
||||||
|
@@ -56,7 +56,7 @@ struct DeviceStatus
|
|||||||
DeviceState state;
|
DeviceState state;
|
||||||
};
|
};
|
||||||
|
|
||||||
using TopologyState = std::unordered_map<uint64_t, DeviceStatus>;
|
using TopologyState = std::unordered_map<DDSTask::Id, DeviceStatus>;
|
||||||
using TopologyTransition = fair::mq::Transition;
|
using TopologyTransition = fair::mq::Transition;
|
||||||
|
|
||||||
struct MixedState : std::runtime_error { using std::runtime_error::runtime_error; };
|
struct MixedState : std::runtime_error { using std::runtime_error::runtime_error; };
|
||||||
@@ -150,6 +150,7 @@ class Topology
|
|||||||
ChangeStateCallback fChangeStateCallback;
|
ChangeStateCallback fChangeStateCallback;
|
||||||
std::chrono::milliseconds fStateChangeTimeout;
|
std::chrono::milliseconds fStateChangeTimeout;
|
||||||
bool fShutdown;
|
bool fShutdown;
|
||||||
|
std::string fStateChangeError;
|
||||||
|
|
||||||
void WaitForState();
|
void WaitForState();
|
||||||
void AddNewStateEntry(uint64_t senderId, const std::string& state);
|
void AddNewStateEntry(uint64_t senderId, const std::string& state);
|
||||||
|
@@ -26,10 +26,7 @@ TEST_F(PluginServices, OnlySingleController)
|
|||||||
mServices.ChangeDeviceState("bar", DeviceStateTransition::InitDevice),
|
mServices.ChangeDeviceState("bar", DeviceStateTransition::InitDevice),
|
||||||
fair::mq::PluginServices::DeviceControlError
|
fair::mq::PluginServices::DeviceControlError
|
||||||
);
|
);
|
||||||
ASSERT_THROW( // no control for bar
|
ASSERT_NO_THROW(mServices.ReleaseDeviceControl("bar"));
|
||||||
mServices.ReleaseDeviceControl("bar"),
|
|
||||||
fair::mq::PluginServices::DeviceControlError
|
|
||||||
);
|
|
||||||
|
|
||||||
ASSERT_NO_THROW(mServices.ReleaseDeviceControl("foo"));
|
ASSERT_NO_THROW(mServices.ReleaseDeviceControl("foo"));
|
||||||
ASSERT_FALSE(mServices.GetDeviceController());
|
ASSERT_FALSE(mServices.GetDeviceController());
|
||||||
|
@@ -105,7 +105,7 @@ TEST_F(Topology, ChangeStateTimeout)
|
|||||||
blocker.Wait();
|
blocker.Wait();
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(Topology, ChangeStateFullDeviceLifetime)
|
TEST_F(Topology, ChangeStateFullDeviceLifecycle)
|
||||||
{
|
{
|
||||||
using fair::mq::sdk::Topology;
|
using fair::mq::sdk::Topology;
|
||||||
using fair::mq::sdk::TopologyTransition;
|
using fair::mq::sdk::TopologyTransition;
|
||||||
@@ -125,4 +125,27 @@ TEST_F(Topology, ChangeStateFullDeviceLifetime)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(Topology, ChangeStateFullDeviceLifecycle2)
|
||||||
|
{
|
||||||
|
using fair::mq::sdk::Topology;
|
||||||
|
using fair::mq::sdk::TopologyTransition;
|
||||||
|
|
||||||
|
Topology topo(mDDSTopo, mDDSSession);
|
||||||
|
for (int i(0); i < 10; ++i) {
|
||||||
|
for (auto transition : {TopologyTransition::InitDevice,
|
||||||
|
TopologyTransition::CompleteInit,
|
||||||
|
TopologyTransition::Bind,
|
||||||
|
TopologyTransition::Connect,
|
||||||
|
TopologyTransition::InitTask,
|
||||||
|
TopologyTransition::Run}) {
|
||||||
|
ASSERT_EQ(topo.ChangeState(transition).rc, fair::mq::AsyncOpResultCode::Ok);
|
||||||
|
}
|
||||||
|
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||||
|
for (auto transition : {TopologyTransition::Stop,
|
||||||
|
TopologyTransition::ResetTask,
|
||||||
|
TopologyTransition::ResetDevice}) {
|
||||||
|
ASSERT_EQ(topo.ChangeState(transition).rc, fair::mq::AsyncOpResultCode::Ok);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
} // namespace
|
} // namespace
|
||||||
|
@@ -6,7 +6,7 @@
|
|||||||
<declrequirement name="SinkWorker" type="wnname" value="sink"/>
|
<declrequirement name="SinkWorker" type="wnname" value="sink"/>
|
||||||
|
|
||||||
<decltask name="Sampler">
|
<decltask name="Sampler">
|
||||||
<exe reachable="true">fairmq-bsampler --id sampler --color false --channel-config name=data,type=push,method=bind -P dds --msg-rate 10</exe>
|
<exe reachable="true">fairmq-bsampler --color false --channel-config name=data,type=push,method=bind -P dds --msg-rate 10</exe>
|
||||||
<requirements>
|
<requirements>
|
||||||
<name>SamplerWorker</name>
|
<name>SamplerWorker</name>
|
||||||
</requirements>
|
</requirements>
|
||||||
@@ -16,7 +16,7 @@
|
|||||||
</decltask>
|
</decltask>
|
||||||
|
|
||||||
<decltask name="Sink">
|
<decltask name="Sink">
|
||||||
<exe reachable="true">fairmq-sink --id sink_%taskIndex% --color false --channel-config name=data,type=pull,method=connect -P dds</exe>
|
<exe reachable="true">fairmq-sink --color false --channel-config name=data,type=pull,method=connect -P dds</exe>
|
||||||
<requirements>
|
<requirements>
|
||||||
<name>SinkWorker</name>
|
<name>SinkWorker</name>
|
||||||
</requirements>
|
</requirements>
|
||||||
|
Reference in New Issue
Block a user