Compare commits

..

9 Commits

Author SHA1 Message Date
Dennis Klein
7cacf471b9 CI: Disable sdk until DDS 2.6 2019-07-29 09:22:02 +02:00
Dennis Klein
7316b0e7f2 Example.DDS: Run example as unit test
Part of #185
2019-07-29 09:22:02 +02:00
Dennis Klein
1fa82f5f22 Example.DDS: Make example topologies pass xml validation 2019-07-29 09:22:02 +02:00
Dennis Klein
1bb77bf47b DDS plugin: Automatically set session and device id if not provided
Resolves #187
2019-07-29 09:22:02 +02:00
Dennis Klein
07fe02a0a0 Tests.SDK: Add another test 2019-07-29 09:22:02 +02:00
Dennis Klein
9cbccface7 DDS plugin: Synchronize FillChannelContainers and DDSKeyValue updates
This was a regression after introducing external control mode in f7cdf5e.
2019-07-29 09:22:02 +02:00
Dennis Klein
7b773cde51 SDK: Improve error handling in case state-change fails on a device
Replace the log message with

1. Nothing, if the device is already in the target state
2. Abort and call the completion callback with error otherwise
2019-07-29 09:22:02 +02:00
Dennis Klein
fd282fa950 SDK: Track channel to task id association 2019-07-29 09:22:02 +02:00
Dennis Klein
008be36125 PluginServices: Do not throw if device control cannot be released 2019-07-29 09:22:02 +02:00
17 changed files with 180 additions and 63 deletions

View File

@@ -29,7 +29,7 @@ Set(configure_options "${configure_options};-DCMAKE_PREFIX_PATH=$ENV{SIMPATH}")
Set(configure_options "${configure_options};-DBUILD_NANOMSG_TRANSPORT=ON")
# Set(configure_options "${configure_options};-DBUILD_OFI_TRANSPORT=ON")
Set(configure_options "${configure_options};-DBUILD_DDS_PLUGIN=ON")
Set(configure_options "${configure_options};-DBUILD_SDK=ON")
Set(configure_options "${configure_options};-DBUILD_SDK=OFF")
Set(configure_options "${configure_options};-DFAST_BUILD=ON")
Set(configure_options "${configure_options};-DCOTIRE_MAXIMUM_NUMBER_OF_UNITY_INCLUDES=-j$ENV{number_of_processors}")

View File

@@ -37,6 +37,12 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-ex-dds-env.sh ${CMAKE_CURRENT_
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-dds.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-dds.sh @ONLY)
# test
add_test(NAME Example.DDS.localhost COMMAND ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-dds.sh localhost)
set_tests_properties(Example.DDS.localhost PROPERTIES
TIMEOUT 15
RUN_SERIAL true
PASS_REGULAR_EXPRESSION "Example successful"
)
# install

View File

@@ -8,8 +8,8 @@
<declrequirement name="SinkWorker" type="wnname" value="sink"/>
<decltask name="Sampler">
<exe>fairmq-ex-dds-sampler --color false --channel-config name=data1,type=push,method=bind --rate 100 -P dds</exe>
<env reachable="false">fairmq-ex-dds-env.sh</env>
<exe>fairmq-ex-dds-sampler --id sampler --color false --channel-config name=data1,type=push,method=bind --rate 100 -P dds</exe>
<requirements>
<name>SamplerWorker</name>
</requirements>
@@ -19,10 +19,10 @@
</decltask>
<decltask name="Processor">
<exe>fairmq-ex-dds-processor --color false --channel-config name=data1,type=pull,method=connect name=data2,type=push,method=connect -P dds</exe>
<env reachable="false">fairmq-ex-dds-env.sh</env>
<exe>fairmq-ex-dds-processor --id processor_%taskIndex% --color false --channel-config name=data1,type=pull,method=connect name=data2,type=push,method=connect -P dds</exe>
<requirements>
<id>ProcessorWorker</id>
<name>ProcessorWorker</name>
</requirements>
<properties>
<name access="read">data1</name>
@@ -31,8 +31,8 @@
</decltask>
<decltask name="Sink">
<exe>fairmq-ex-dds-sink --color false --channel-config name=data2,type=pull,method=bind -P dds</exe>
<env reachable="false">fairmq-ex-dds-env.sh</env>
<exe>fairmq-ex-dds-sink --id sink --color false --channel-config name=data2,type=pull,method=bind -P dds</exe>
<requirements>
<name>SinkWorker</name>
</requirements>

View File

@@ -8,8 +8,8 @@
<declrequirement name="SinkWorker" type="wnname" value="sink"/>
<decltask name="Sampler">
<exe>fairmq-ex-dds-sampler --color false --channel-config name=data1,type=push,method=bind -P dds --iterations 10</exe>
<env reachable="false">fairmq-ex-dds-env.sh</env>
<exe>fairmq-ex-dds-sampler --id sampler --color false --channel-config name=data1,type=push,method=bind -P dds --iterations 10</exe>
<requirements>
<name>SamplerWorker</name>
</requirements>
@@ -19,10 +19,10 @@
</decltask>
<decltask name="Processor">
<exe>fairmq-ex-dds-processor --color false --channel-config name=data1,type=pull,method=connect name=data2,type=push,method=connect -P dds</exe>
<env reachable="false">fairmq-ex-dds-env.sh</env>
<exe>fairmq-ex-dds-processor --id processor_%taskIndex% --color false --channel-config name=data1,type=pull,method=connect name=data2,type=push,method=connect -P dds</exe>
<requirements>
<id>ProcessorWorker</id>
<name>ProcessorWorker</name>
</requirements>
<properties>
<name access="read">data1</name>
@@ -31,8 +31,8 @@
</decltask>
<decltask name="Sink">
<exe>fairmq-ex-dds-sink --color false --channel-config name=data2,type=pull,method=bind -P dds --iterations 10</exe>
<env reachable="false">fairmq-ex-dds-env.sh</env>
<exe>fairmq-ex-dds-sink --id sink --color false --channel-config name=data2,type=pull,method=bind -P dds --iterations 10</exe>
<requirements>
<name>SinkWorker</name>
</requirements>

View File

@@ -46,7 +46,7 @@ echo "TOPOLOGY FILE: ${topologyFile}"
# TODO Uncomment once DDS 2.6 is released
# dds-info --active-topology
dds-topology --disable-validation --activate ${topologyFile}
dds-topology --activate ${topologyFile}
# dds-info --active-topology
# dds-info --wait-for-executing-agents ${requiredNofAgents}
sleep 1
@@ -82,4 +82,7 @@ logDir="${wrkDir}/logs"
for file in $(find "${logDir}" -name "*.tar.gz"); do tar -xf ${file} -C "${logDir}" ; done
echo "AGENT LOG FILES IN: ${logDir}"
# This string is used by ctest to detect success
echo "Example successful :)"
# Cleanup function is called by EXIT trap

View File

@@ -63,8 +63,6 @@ bool DeviceRunner::HandleGeneralOptions(const fair::mq::ProgOptions& config, boo
<< " / __/ / /_/ / / / _ / / / / /_/ / " << FAIRMQ_REPO_URL << endl
<< " /_/ \\__,_/_/_/ /_/ /_/ \\___\\_\\ " << FAIRMQ_LICENSE << " © " << FAIRMQ_COPYRIGHT << endl;
}
config.PrintOptions();
}
return true;
@@ -169,6 +167,9 @@ auto DeviceRunner::Run() -> int
// Instantiate and run plugins
fPluginManager.InstantiatePlugins();
// Log IDLE configuration
fConfig.PrintOptions();
// Run the device
fDevice->RunStateMachine();

View File

@@ -74,7 +74,8 @@ auto PluginServices::ReleaseDeviceControl(const string& controller) -> void
if (fDeviceController == controller) {
fDeviceController = boost::none;
} else {
throw DeviceControlError{tools::ToString("Plugin '", controller, "' cannot release control because it has not taken over control.")};
LOG(debug) << "Plugin '" << controller << "' cannot release control "
<< "because it has no control.";
}
}

View File

@@ -51,21 +51,22 @@ DDS::DDS(const string& name,
, fLastState(DeviceState::Idle)
, fDeviceTerminationRequested(false)
, fHeartbeatInterval(100)
, fUpdatesAllowed(false)
{
try {
TakeDeviceControl();
fControllerThread = thread(&DDS::HandleControl, this);
fHeartbeatThread = thread(&DDS::HeartbeatSender, this);
} catch (PluginServices::DeviceControlError& e) {
LOG(debug) << e.what();
} catch (exception& e) {
LOG(error) << "Error in plugin initialization: " << e.what();
}
}
auto DDS::HandleControl() -> void
{
try {
fHeartbeatThread = thread(&DDS::HeartbeatSender, this);
std::string deviceId(GetProperty<std::string>("id"));
if (deviceId.empty()) {
SetProperty<std::string>("id", dds::env_prop<dds::task_path>());
}
std::string sessionId(GetProperty<std::string>("session"));
if (sessionId == "default") {
SetProperty<std::string>("session", dds::env_prop<dds::dds_session_id>());
}
auto control = GetProperty<string>("control");
bool staticMode(false);
if (control == "static") {
@@ -85,22 +86,28 @@ auto DDS::HandleControl() -> void
// subscribe to device state changes, pushing new state changes into the event queue
SubscribeToDeviceStateChange([&](DeviceState newState) {
fStateQueue.Push(newState);
switch(newState) {
case DeviceState::Bound:
// Receive addresses of connecting channels from DDS
// and propagate addresses of bound channels to DDS.
FillChannelContainers();
switch (newState) {
case DeviceState::Bound:
// Receive addresses of connecting channels from DDS
// and propagate addresses of bound channels to DDS.
FillChannelContainers();
// publish bound addresses via DDS at keys corresponding to the channel prefixes, e.g. 'data' in data[i]
PublishBoundChannels();
break;
case DeviceState::Exiting:
fDeviceTerminationRequested = true;
UnsubscribeFromDeviceStateChange();
ReleaseDeviceControl();
break;
default:
break;
// publish bound addresses via DDS at keys corresponding to the channel
// prefixes, e.g. 'data' in data[i]
PublishBoundChannels();
break;
case DeviceState::ResettingDevice: {
std::lock_guard<std::mutex> lk(fUpdateMutex);
fUpdatesAllowed = false;
break;
}
case DeviceState::Exiting:
fDeviceTerminationRequested = true;
UnsubscribeFromDeviceStateChange();
ReleaseDeviceControl();
break;
default:
break;
}
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
@@ -114,15 +121,26 @@ auto DDS::HandleControl() -> void
});
if (staticMode) {
TransitionDeviceStateTo(DeviceState::Running);
// wait until stop signal
unique_lock<mutex> lock(fStopMutex);
while (!fDeviceTerminationRequested) {
fStopCondition.wait_for(lock, chrono::seconds(1));
}
LOG(debug) << "Stopping DDS control plugin";
fControllerThread = thread(&DDS::StaticControl, this);
}
} catch (PluginServices::DeviceControlError& e) {
LOG(debug) << e.what();
} catch (exception& e) {
LOG(error) << "Error in plugin initialization: " << e.what();
}
}
auto DDS::StaticControl() -> void
{
try {
TransitionDeviceStateTo(DeviceState::Running);
// wait until stop signal
unique_lock<mutex> lock(fStopMutex);
while (!fDeviceTerminationRequested) {
fStopCondition.wait_for(lock, chrono::seconds(1));
}
LOG(debug) << "Stopping DDS plugin static controller";
} catch (DeviceErrorState&) {
ReleaseDeviceControl();
} catch (exception& e) {
@@ -197,6 +215,11 @@ auto DDS::FillChannelContainers() -> void
LOG(debug) << "dds-i-n: adding " << chanName << " -> i: " << i << " n: " << n;
fIofN.insert(make_pair(chanName, IofN(i, n)));
}
{
std::lock_guard<std::mutex> lk(fUpdateMutex);
fUpdatesAllowed = true;
}
fUpdateCondition.notify_one();
} catch (const exception& e) {
LOG(error) << "Error filling channel containers: " << e.what();
}
@@ -209,6 +232,10 @@ auto DDS::SubscribeForConnectingChannels() -> void
fDDS.SubscribeKeyValue([&] (const string& propertyId, const string& value, uint64_t senderTaskID) {
try {
LOG(debug) << "Received update for " << propertyId << ": value=" << value << ", senderTaskID=" << senderTaskID;
std::unique_lock<std::mutex> lk(fUpdateMutex);
fUpdateCondition.wait(lk, [&]{ return fUpdatesAllowed; });
string val = value;
// check if it is to handle as one out of multiple values
auto it = fIofN.find(propertyId);

View File

@@ -63,6 +63,9 @@ struct DDSSubscription
LOG(error) << "DDS Error received: error code: " << errorCode << ", error message: " << errorMsg;
});
// fDDSCustomCmd.subscribe([](const std::string& cmd, const std::string& cond, uint64_t senderId) {
// LOG(debug) << "cmd: " << cmd << ", cond: " << cond << ", senderId: " << senderId;
// });
assert(!dds_session_id.empty());
}
@@ -125,7 +128,7 @@ class DDS : public Plugin
~DDS();
private:
auto HandleControl() -> void;
auto StaticControl() -> void;
auto FillChannelContainers() -> void;
auto SubscribeForConnectingChannels() -> void;
@@ -160,6 +163,10 @@ class DDS : public Plugin
std::thread fHeartbeatThread;
std::chrono::milliseconds fHeartbeatInterval;
bool fUpdatesAllowed;
std::mutex fUpdateMutex;
std::condition_variable fUpdateCondition;
};
Plugin::ProgOptions DDSProgramOptions()

View File

@@ -273,7 +273,7 @@ int main(int argc, char* argv[])
cerr << "state-changes-unsubscription failed with return code: " << parts[2];
}
} else {
// cout << "Received: " << msg << endl;
cout << "Received: " << msg << endl;
}
});

View File

@@ -20,7 +20,9 @@
#include <cassert>
#include <cstdlib>
#include <mutex>
#include <sstream>
#include <unordered_map>
#include <utility>
namespace fair {
@@ -134,6 +136,8 @@ struct DDSSession::Impl
dds::intercom_api::CCustomCmd fDDSCustomCmd;
Id fId;
bool fStopOnDestruction;
mutable std::mutex fMtx;
std::unordered_map<DDSChannel::Id, DDSTask::Id> fTaskIdByChannelIdMap;
};
DDSSession::DDSSession(DDSEnvironment env)
@@ -316,6 +320,18 @@ void DDSSession::UnsubscribeFromCommands()
void DDSSession::SendCommand(const std::string& cmd) { fImpl->fDDSCustomCmd.send(cmd, ""); }
auto DDSSession::UpdateChannelToTaskAssociation(DDSChannel::Id channelId, DDSTask::Id taskId) -> void
{
std::lock_guard<std::mutex> lk(fImpl->fMtx);
fImpl->fTaskIdByChannelIdMap[channelId] = taskId;
}
auto DDSSession::GetTaskId(DDSChannel::Id channelId) const -> DDSTask::Id
{
std::lock_guard<std::mutex> lk(fImpl->fMtx);
return fImpl->fTaskIdByChannelIdMap.at(channelId);
}
auto operator<<(std::ostream& os, const DDSSession& session) -> std::ostream&
{
return os << "$DDS_SESSION_ID: " << session.GetId();

View File

@@ -43,6 +43,18 @@ auto operator>>(std::istream& is, DDSRMSPlugin& plugin) -> std::istream&;
class DDSTopology;
class DDSAgent;
class DDSTask
{
public:
using Id = std::uint64_t;
};
class DDSChannel
{
public:
using Id = std::uint64_t;
};
/**
* @class DDSSession DDSSession.h <fairmq/sdk/DDSSession.h>
* @brief Represents a DDS session
@@ -95,6 +107,8 @@ class DDSSession
void SubscribeToCommands(std::function<void(const std::string& msg, const std::string& condition, uint64_t senderId)>);
void UnsubscribeFromCommands();
void SendCommand(const std::string&);
auto UpdateChannelToTaskAssociation(DDSChannel::Id, DDSTask::Id) -> void;
auto GetTaskId(DDSChannel::Id) const -> DDSTask::Id;
friend auto operator<<(std::ostream& os, const DDSSession& session) -> std::ostream&;

View File

@@ -75,7 +75,7 @@ Topology::Topology(DDSTopology topo, DDSSession session)
// LOG(debug) << "Adding device " << d;
fState.emplace(d, DeviceStatus{ false, DeviceState::Ok });
}
fDDSSession.SubscribeToCommands([this](const std::string& msg, const std::string& /* condition */, uint64_t senderId) {
fDDSSession.SubscribeToCommands([this](const std::string& msg, const std::string& /* condition */, DDSChannel::Id senderId) {
// LOG(debug) << "Received from " << senderId << ": " << msg;
std::vector<std::string> parts;
boost::algorithm::split(parts, msg, boost::algorithm::is_any_of(":,"));
@@ -85,7 +85,9 @@ Topology::Topology(DDSTopology topo, DDSSession session)
}
if (parts[0] == "state-change") {
AddNewStateEntry(std::stoull(parts[2]), parts[3]);
DDSTask::Id taskId(std::stoull(parts[2]));
fDDSSession.UpdateChannelToTaskAssociation(senderId, taskId);
AddNewStateEntry(taskId, parts[3]);
} else if (parts[0] == "state-changes-subscription") {
LOG(debug) << "Received from " << senderId << ": " << msg;
if (parts[2] != "OK") {
@@ -96,7 +98,16 @@ Topology::Topology(DDSTopology topo, DDSSession session)
LOG(error) << "state-changes-unsubscription failed with return code: " << parts[2];
}
} else if (parts[1] == "could not queue") {
LOG(warn) << "Could not queue " << parts[2] << " transition on " << senderId;
std::unique_lock<std::mutex> lock(fMtx);
if (fStateChangeOngoing) {
if (fState.at(fDDSSession.GetTaskId(senderId)).state != fTargetState) {
fStateChangeError =
tools::ToString("Could not queue ", parts[2], " transition on ", senderId);
lock.unlock();
fCV.notify_one();
}
}
}
});
fDDSSession.StartDDSService();
@@ -122,13 +133,13 @@ auto Topology::ChangeState(TopologyTransition transition, ChangeStateCallback cb
std::unique_lock<std::mutex> lock(fMtx);
if (fStateChangeOngoing) {
throw std::runtime_error("A state change request is already in progress, concurrent requests are currently not supported");
lock.unlock();
}
LOG(debug) << "Initiating ChangeState with " << transition << " to " << expectedState.at(transition);
fStateChangeOngoing = true;
fChangeStateCallback = cb;
fStateChangeTimeout = timeout;
fTargetState = expectedState.at(transition);
fStateChangeError.clear();
fDDSSession.SendCommand(GetTransitionName(transition));
}
@@ -155,12 +166,15 @@ void Topology::WaitForState()
while (!fShutdown) {
if (fStateChangeOngoing) {
try {
std::unique_lock<std::mutex> lock(fMtx);
auto condition = [&] {
// LOG(info) << "checking condition";
// LOG(info) << "fShutdown: " << fShutdown;
// LOG(info) << "condition: " << std::all_of(fState.cbegin(), fState.cend(),
// [&](TopologyState::value_type i) { return i.second.state == fTargetState; });
return fShutdown
|| !fStateChangeError.empty()
|| std::all_of(
fState.cbegin(), fState.cend(), [&](TopologyState::value_type i) {
// TODO Check, if we can make sure that EXITING state change event are not missed
@@ -170,8 +184,6 @@ void Topology::WaitForState()
});
};
std::unique_lock<std::mutex> lock(fMtx);
if (fStateChangeTimeout > std::chrono::milliseconds(0)) {
if (!fCV.wait_for(lock, fStateChangeTimeout, condition)) {
// LOG(debug) << "timeout";
@@ -187,6 +199,15 @@ void Topology::WaitForState()
}
fStateChangeOngoing = false;
if (!fStateChangeError.empty()) {
TopologyState state = fState;
lock.unlock();
fChangeStateCallback(
{{AsyncOpResultCode::Error, fStateChangeError}, std::move(state)});
break;
}
if (fShutdown) {
LOG(debug) << "Aborting because a shutdown was requested";
TopologyState state = fState;

View File

@@ -56,7 +56,7 @@ struct DeviceStatus
DeviceState state;
};
using TopologyState = std::unordered_map<uint64_t, DeviceStatus>;
using TopologyState = std::unordered_map<DDSTask::Id, DeviceStatus>;
using TopologyTransition = fair::mq::Transition;
struct MixedState : std::runtime_error { using std::runtime_error::runtime_error; };
@@ -150,6 +150,7 @@ class Topology
ChangeStateCallback fChangeStateCallback;
std::chrono::milliseconds fStateChangeTimeout;
bool fShutdown;
std::string fStateChangeError;
void WaitForState();
void AddNewStateEntry(uint64_t senderId, const std::string& state);

View File

@@ -26,10 +26,7 @@ TEST_F(PluginServices, OnlySingleController)
mServices.ChangeDeviceState("bar", DeviceStateTransition::InitDevice),
fair::mq::PluginServices::DeviceControlError
);
ASSERT_THROW( // no control for bar
mServices.ReleaseDeviceControl("bar"),
fair::mq::PluginServices::DeviceControlError
);
ASSERT_NO_THROW(mServices.ReleaseDeviceControl("bar"));
ASSERT_NO_THROW(mServices.ReleaseDeviceControl("foo"));
ASSERT_FALSE(mServices.GetDeviceController());

View File

@@ -105,7 +105,7 @@ TEST_F(Topology, ChangeStateTimeout)
blocker.Wait();
}
TEST_F(Topology, ChangeStateFullDeviceLifetime)
TEST_F(Topology, ChangeStateFullDeviceLifecycle)
{
using fair::mq::sdk::Topology;
using fair::mq::sdk::TopologyTransition;
@@ -125,4 +125,27 @@ TEST_F(Topology, ChangeStateFullDeviceLifetime)
}
}
TEST_F(Topology, ChangeStateFullDeviceLifecycle2)
{
using fair::mq::sdk::Topology;
using fair::mq::sdk::TopologyTransition;
Topology topo(mDDSTopo, mDDSSession);
for (int i(0); i < 10; ++i) {
for (auto transition : {TopologyTransition::InitDevice,
TopologyTransition::CompleteInit,
TopologyTransition::Bind,
TopologyTransition::Connect,
TopologyTransition::InitTask,
TopologyTransition::Run}) {
ASSERT_EQ(topo.ChangeState(transition).rc, fair::mq::AsyncOpResultCode::Ok);
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
for (auto transition : {TopologyTransition::Stop,
TopologyTransition::ResetTask,
TopologyTransition::ResetDevice}) {
ASSERT_EQ(topo.ChangeState(transition).rc, fair::mq::AsyncOpResultCode::Ok);
}
}
}
} // namespace

View File

@@ -6,7 +6,7 @@
<declrequirement name="SinkWorker" type="wnname" value="sink"/>
<decltask name="Sampler">
<exe reachable="true">fairmq-bsampler --id sampler --color false --channel-config name=data,type=push,method=bind -P dds --msg-rate 10</exe>
<exe reachable="true">fairmq-bsampler --color false --channel-config name=data,type=push,method=bind -P dds --msg-rate 10</exe>
<requirements>
<name>SamplerWorker</name>
</requirements>
@@ -16,7 +16,7 @@
</decltask>
<decltask name="Sink">
<exe reachable="true">fairmq-sink --id sink_%taskIndex% --color false --channel-config name=data,type=pull,method=connect -P dds</exe>
<exe reachable="true">fairmq-sink --color false --channel-config name=data,type=pull,method=connect -P dds</exe>
<requirements>
<name>SinkWorker</name>
</requirements>