-
Notifications
You must be signed in to change notification settings - Fork 2
/
_implementation.py
212 lines (172 loc) · 7.83 KB
/
_implementation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
import dataclasses
import re
import typing as t
from collections import defaultdict
from collections.abc import Mapping
from packaging.requirements import Requirement
def _normalize_name(name: str) -> str:
return re.sub(r"[-_.]+", "-", name).lower()
def _normalize_group_names(
dependency_groups: Mapping[str, t.Union[str, Mapping[str, str]]]
) -> Mapping[str, t.Union[str, Mapping[str, str]]]:
original_names = defaultdict(list)
normalized_groups = {}
for group_name, value in dependency_groups.items():
normed_group_name = _normalize_name(group_name)
original_names[normed_group_name].append(group_name)
normalized_groups[normed_group_name] = value
errors = []
for normed_name, names in original_names.items():
if len(names) > 1:
errors.append(f"{normed_name} ({', '.join(names)})")
if errors:
raise ValueError(f"Duplicate dependency group names: {', '.join(errors)}")
return normalized_groups
@dataclasses.dataclass
class DependencyGroupInclude:
include_group: str
class CyclicDependencyError(ValueError):
"""
An error representing the detection of a cycle.
"""
def __init__(self, requested_group: str, group: str, include_group: str) -> None:
self.requested_group = requested_group
self.group = group
self.include_group = include_group
if include_group == group:
reason = f"{group} includes itself"
else:
reason = f"{include_group} -> {group}, {group} -> {include_group}"
super().__init__(
"Cyclic dependency group include while resolving "
f"{requested_group}: {reason}"
)
class DependencyGroupResolver:
"""
A resolver for Dependency Group data.
This class handles caching, name normalization, cycle detection, and other
parsing requirements. There are only two public methods for exploring the data:
``lookup()`` and ``resolve()``.
:param dependency_groups: A mapping, as provided via pyproject
``[dependency-groups]``.
"""
def __init__(
self,
dependency_groups: Mapping[str, t.Union[str, Mapping[str, str]]],
) -> None:
if not isinstance(dependency_groups, Mapping):
raise TypeError("Dependency Groups table is not a mapping")
self.dependency_groups = _normalize_group_names(dependency_groups)
# a map of group names to parsed data
self._parsed_groups: dict[
str, tuple[t.Union[Requirement, DependencyGroupInclude], ...]
] = {}
# a map of group names to their ancestors, used for cycle detection
self._include_graph_ancestors: dict[str, tuple[str, ...]] = defaultdict(tuple)
# a cache of completed resolutions to Requirement lists
self._resolve_cache: dict[str, tuple[Requirement, ...]] = {}
def lookup(
self, group: str
) -> tuple[t.Union[Requirement, DependencyGroupInclude], ...]:
"""
Lookup a group name, returning the parsed dependency data for that group.
This will not resolve includes.
:param group: the name of the group to lookup
:raises ValueError: if the data does not appear to be valid dependency group
data
:raises LookupError: if group name is absent
:raises packaging.requirements.InvalidRequirement: if a specifier is not valid
"""
if not isinstance(group, str):
raise TypeError("Dependency group name is not a str")
group = _normalize_name(group)
return self._parse_group(group)
def resolve(self, group: str) -> tuple[Requirement, ...]:
"""
Resolve a dependency group to a list of requirements.
:param group: the name of the group to resolve
:raises TypeError: if the inputs appear to be the wrong types
:raises ValueError: if the data does not appear to be valid dependency group
data
:raises LookupError: if group name is absent
:raises packaging.requirements.InvalidRequirement: if a specifier is not valid
"""
if not isinstance(group, str):
raise TypeError("Dependency group name is not a str")
group = _normalize_name(group)
return self._resolve(group, group)
def _parse_group(
self, group: str
) -> tuple[t.Union[Requirement, DependencyGroupInclude], ...]:
# short circuit -- never do the work twice
if group in self._parsed_groups:
return self._parsed_groups[group]
if group not in self.dependency_groups:
raise LookupError(f"Dependency group '{group}' not found")
raw_group = self.dependency_groups[group]
if not isinstance(raw_group, list):
raise ValueError(f"Dependency group '{group}' is not a list")
elements: list[t.Union[Requirement, DependencyGroupInclude]] = []
for item in raw_group:
if isinstance(item, str):
# packaging.requirements.Requirement parsing ensures that this is a
# valid PEP 508 Dependency Specifier
# raises InvalidRequirement on failure
elements.append(Requirement(item))
elif isinstance(item, dict):
if tuple(item.keys()) != ("include-group",):
raise ValueError(f"Invalid dependency group item: {item}")
include_group = next(iter(item.values()))
elements.append(DependencyGroupInclude(include_group=include_group))
else:
raise ValueError(f"Invalid dependency group item: {item}")
self._parsed_groups[group] = tuple(elements)
return self._parsed_groups[group]
def _resolve(self, group: str, requested_group: str) -> tuple[Requirement, ...]:
"""
This is a helper for cached resolution to strings.
:param group: The name of the group to resolve.
:param requested_group: The group which was used in the original, user-facing
request.
"""
if group in self._resolve_cache:
return self._resolve_cache[group]
parsed = self._parse_group(group)
resolved_group = []
for item in parsed:
if isinstance(item, Requirement):
resolved_group.append(item)
elif isinstance(item, DependencyGroupInclude):
if item.include_group in self._include_graph_ancestors[group]:
raise CyclicDependencyError(
requested_group, group, item.include_group
)
else:
self._include_graph_ancestors[item.include_group] = (
self._include_graph_ancestors[group] + (group,)
)
resolved_group.extend(
self._resolve(item.include_group, requested_group)
)
else: # unreachable
raise NotImplementedError(
f"Invalid dependency group item after parse: {item}"
)
self._resolve_cache[group] = tuple(resolved_group)
return self._resolve_cache[group]
def resolve(
dependency_groups: Mapping[str, t.Union[str, Mapping[str, str]]], group: str, /
) -> tuple[str, ...]:
"""
Resolve a dependency group to a list of requirements, as strings.
:param dependency_groups: the parsed contents of the ``[dependency-groups]`` table
from ``pyproject.toml``
:param group: the name of the group to resolve
:raises TypeError: if the inputs appear to be the wrong types
:raises ValueError: if the data does not appear to be valid dependency group data
:raises LookupError: if group name is absent
:raises packaging.requirements.InvalidRequirement: if a specifier is not valid
"""
return tuple(
str(r) for r in DependencyGroupResolver(dependency_groups).resolve(group)
)