Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ jobs:
matrix:
mypy_packages:
- "nav2_smac_planner"
- "nav2_common"
steps:
- uses: actions/checkout@v4

Expand Down
17 changes: 8 additions & 9 deletions nav2_common/nav2_common/launch/has_node_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import List, Text

import launch
import yaml


class HasNodeParams(launch.Substitution):
class HasNodeParams(launch.Substitution): # type: ignore[misc]
"""
Substitution that checks if a param file contains parameters for a node.

Used in launch system
"""

def __init__(
self, source_file: launch.SomeSubstitutionsType, node_name: Text
self, source_file: launch.SomeSubstitutionsType, node_name: str
) -> None:
super().__init__()
"""
Expand All @@ -39,21 +37,22 @@ def __init__(
# import here to avoid loop
from launch.utilities import normalize_to_list_of_substitutions

self.__source_file = normalize_to_list_of_substitutions(source_file)
self.__source_file: list[launch.Substitution] = \
normalize_to_list_of_substitutions(source_file)
self.__node_name = node_name

@property
def name(self) -> List[launch.Substitution]:
def name(self) -> list[launch.Substitution]:
"""Getter for name."""
return self.__source_file

def describe(self) -> Text:
def describe(self) -> str:
"""Return a description of this substitution as a string."""
return ''

def perform(self, context: launch.LaunchContext) -> Text:
def perform(self, context: launch.LaunchContext) -> str:
yaml_filename = launch.utilities.perform_substitutions(context, self.name)
data = yaml.safe_load(open(yaml_filename, 'r'))
data = yaml.safe_load(open(yaml_filename))

if self.__node_name in data.keys():
return 'True'
Expand Down
26 changes: 15 additions & 11 deletions nav2_common/nav2_common/launch/replace_string.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@
# limitations under the License.

import tempfile
from typing import Dict, List, Optional, Text
from typing import Optional

import launch


class ReplaceString(launch.Substitution):
class ReplaceString(launch.Substitution): # type: ignore[misc]
"""
Substitution that replaces strings on a given file.

Expand All @@ -28,7 +28,7 @@ class ReplaceString(launch.Substitution):
def __init__(
self,
source_file: launch.SomeSubstitutionsType,
replacements: Dict,
replacements: dict[str, launch.SomeSubstitutionsType],
condition: Optional[launch.Condition] = None,
) -> None:
super().__init__()
Expand All @@ -37,7 +37,8 @@ def __init__(

# import here to avoid loop

self.__source_file = normalize_to_list_of_substitutions(source_file)
self.__source_file: list[launch.Substitution] = \
normalize_to_list_of_substitutions(source_file)
self.__replacements = {}
for key in replacements:
self.__replacements[key] = normalize_to_list_of_substitutions(
Expand All @@ -46,7 +47,7 @@ def __init__(
self.__condition = condition

@property
def name(self) -> List[launch.Substitution]:
def name(self) -> list[launch.Substitution]:
"""Getter for name."""
return self.__source_file

Expand All @@ -55,17 +56,19 @@ def condition(self) -> Optional[launch.Condition]:
"""Getter for condition."""
return self.__condition

def describe(self) -> Text:
def describe(self) -> str:
"""Return a description of this substitution as a string."""
return ''

def perform(self, context: launch.LaunchContext) -> Text:
yaml_filename = launch.utilities.perform_substitutions(context, self.name)
def perform(self, context: launch.LaunchContext) -> str:
yaml_filename: str = launch.utilities.perform_substitutions(
context, self.name
)
if self.__condition is None or self.__condition.evaluate(context):
output_file = tempfile.NamedTemporaryFile(mode='w', delete=False)
replacements = self.resolve_replacements(context)
try:
input_file = open(yaml_filename, 'r')
input_file = open(yaml_filename)
self.replace(input_file, output_file, replacements)
except Exception as err: # noqa: B902
print('ReplaceString substitution error: ', err)
Expand All @@ -76,15 +79,16 @@ def perform(self, context: launch.LaunchContext) -> Text:
else:
return yaml_filename

def resolve_replacements(self, context):
def resolve_replacements(self, context: launch.LaunchContext) -> dict[str, str]:
resolved_replacements = {}
for key in self.__replacements:
resolved_replacements[key] = launch.utilities.perform_substitutions(
context, self.__replacements[key]
)
return resolved_replacements

def replace(self, input_file, output_file, replacements):
def replace(self, input_file: launch.SomeSubstitutionsType,
output_file: launch.SomeSubstitutionsType, replacements: dict[str, str]) -> None:
for line in input_file:
for key, value in replacements.items():
if isinstance(key, str) and isinstance(value, str):
Expand Down
83 changes: 54 additions & 29 deletions nav2_common/nav2_common/launch/rewritten_yaml.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,30 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from collections.abc import Generator
import tempfile
from typing import Dict, List, Optional, Text
from typing import Optional, TypeAlias, Union

import launch
import yaml

YamlValue: TypeAlias = Union[str, int, float, bool]


class DictItemReference:

def __init__(self, dictionary, key):
def __init__(self, dictionary: dict[str, YamlValue], key: str):
self.dictionary = dictionary
self.dictKey = key

def key(self):
def key(self) -> str:
return self.dictKey

def setValue(self, value):
def setValue(self, value: YamlValue) -> None:
self.dictionary[self.dictKey] = value


class RewrittenYaml(launch.Substitution):
class RewrittenYaml(launch.Substitution): # type: ignore[misc]
"""
Substitution that modifies the given YAML file.

Expand All @@ -42,10 +45,10 @@ class RewrittenYaml(launch.Substitution):
def __init__(
self,
source_file: launch.SomeSubstitutionsType,
param_rewrites: Dict,
param_rewrites: dict[str, launch.SomeSubstitutionsType],
root_key: Optional[launch.SomeSubstitutionsType] = None,
key_rewrites: Optional[Dict] = None,
convert_types=False,
key_rewrites: Optional[dict[str, launch.SomeSubstitutionsType]] = None,
convert_types: bool = False,
) -> None:
super().__init__()
"""
Expand All @@ -61,7 +64,8 @@ def __init__(
# import here to avoid loop
from launch.utilities import normalize_to_list_of_substitutions

self.__source_file = normalize_to_list_of_substitutions(source_file)
self.__source_file: list[launch.Substitution] = \
normalize_to_list_of_substitutions(source_file)
self.__param_rewrites = {}
self.__key_rewrites = {}
self.__convert_types = convert_types
Expand All @@ -79,19 +83,19 @@ def __init__(
self.__root_key = normalize_to_list_of_substitutions(root_key)

@property
def name(self) -> List[launch.Substitution]:
def name(self) -> list[launch.Substitution]:
"""Getter for name."""
return self.__source_file

def describe(self) -> Text:
def describe(self) -> str:
"""Return a description of this substitution as a string."""
return ''

def perform(self, context: launch.LaunchContext) -> Text:
def perform(self, context: launch.LaunchContext) -> str:
yaml_filename = launch.utilities.perform_substitutions(context, self.name)
rewritten_yaml = tempfile.NamedTemporaryFile(mode='w', delete=False)
param_rewrites, keys_rewrites = self.resolve_rewrites(context)
data = yaml.safe_load(open(yaml_filename, 'r'))
data = yaml.safe_load(open(yaml_filename))
self.substitute_params(data, param_rewrites)
self.add_params(data, param_rewrites)
self.substitute_keys(data, keys_rewrites)
Expand All @@ -103,7 +107,8 @@ def perform(self, context: launch.LaunchContext) -> Text:
rewritten_yaml.close()
return rewritten_yaml.name

def resolve_rewrites(self, context):
def resolve_rewrites(self, context: launch.LaunchContext) -> \
tuple[dict[str, str], dict[str, str]]:
resolved_params = {}
for key in self.__param_rewrites:
resolved_params[key] = launch.utilities.perform_substitutions(
Expand All @@ -116,7 +121,8 @@ def resolve_rewrites(self, context):
)
return resolved_params, resolved_keys

def substitute_params(self, yaml, param_rewrites):
def substitute_params(self, yaml: dict[str, YamlValue],
param_rewrites: dict[str, str]) -> None:
# substitute leaf-only parameters
for key in self.getYamlLeafKeys(yaml):
if key.key() in param_rewrites:
Expand All @@ -132,7 +138,8 @@ def substitute_params(self, yaml, param_rewrites):
yaml_keys = path.split('.')
yaml = self.updateYamlPathVals(yaml, yaml_keys, rewrite_val)

def add_params(self, yaml, param_rewrites):
def add_params(self, yaml: dict[str, YamlValue],
param_rewrites: dict[str, str]) -> None:
# add new total path parameters
yaml_paths = self.pathify(yaml)
for path in param_rewrites:
Expand All @@ -142,7 +149,10 @@ def add_params(self, yaml, param_rewrites):
if 'ros__parameters' in yaml_keys:
yaml = self.updateYamlPathVals(yaml, yaml_keys, new_val)

def updateYamlPathVals(self, yaml, yaml_key_list, rewrite_val):
def updateYamlPathVals(
self, yaml: dict[str, YamlValue],
yaml_key_list: list[str], rewrite_val: YamlValue) -> dict[str, YamlValue]:

for key in yaml_key_list:
if key == yaml_key_list[-1]:
yaml[key] = rewrite_val
Expand All @@ -153,12 +163,15 @@ def updateYamlPathVals(self, yaml, yaml_key_list, rewrite_val):
yaml[int(key)], yaml_key_list, rewrite_val
)
else:
yaml[key] = self.updateYamlPathVals(
yaml.get(key, {}), yaml_key_list, rewrite_val
yaml[key] = self.updateYamlPathVals( # type: ignore[assignment]
yaml.get(key, {}), # type: ignore[arg-type]
yaml_key_list,
rewrite_val
)
return yaml

def substitute_keys(self, yaml, key_rewrites):
def substitute_keys(
self, yaml: dict[str, YamlValue], key_rewrites: dict[str, str]) -> None:
if len(key_rewrites) != 0:
for key in list(yaml.keys()):
val = yaml[key]
Expand All @@ -169,20 +182,31 @@ def substitute_keys(self, yaml, key_rewrites):
if isinstance(val, dict):
self.substitute_keys(val, key_rewrites)

def getYamlLeafKeys(self, yamlData):
try:
for key in yamlData.keys():
for k in self.getYamlLeafKeys(yamlData[key]):
yield k
yield DictItemReference(yamlData, key)
except AttributeError:
def getYamlLeafKeys(self, yamlData: dict[str, YamlValue]) -> \
Generator[DictItemReference, None, None]:
if not isinstance(yamlData, dict):
return

def pathify(self, d, p=None, paths=None, joinchar='.'):
for key in yamlData.keys():
child = yamlData[key]

if isinstance(child, dict):
# Recursively process nested dictionaries
yield from self.getYamlLeafKeys(child)

yield DictItemReference(yamlData, key)

def pathify(
self, d: Union[dict[str, YamlValue], list[YamlValue], YamlValue],
p: Optional[str] = None,
paths: Optional[dict[str, YamlValue]] = None,
joinchar: str = '.') -> dict[str, YamlValue]:
if p is None:
paths = {}
self.pathify(d, '', paths, joinchar=joinchar)
return paths

assert paths is not None
pn = p
if p != '':
pn += joinchar
Expand All @@ -195,8 +219,9 @@ def pathify(self, d, p=None, paths=None, joinchar='.'):
self.pathify(e, pn + str(idx), paths, joinchar=joinchar)
else:
paths[p] = d
return paths

def convert(self, text_value):
def convert(self, text_value: str) -> YamlValue:
if self.__convert_types:
# try converting to int or float
try:
Expand Down
7 changes: 6 additions & 1 deletion tools/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,10 @@ explicit_package_bases = true
strict = true

[[tool.mypy.overrides]]
module = ["matplotlib.*", "rtree.*"]
module = [
"matplotlib.*",
"rtree.*",
"launch.*",
]

ignore_missing_imports = true
Loading