-
Notifications
You must be signed in to change notification settings - Fork 414
Expand file tree
/
Copy pathdependency_resolver.py
More file actions
449 lines (354 loc) · 14.3 KB
/
dependency_resolver.py
File metadata and controls
449 lines (354 loc) · 14.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
"""
Dependency Resolver
===================
Provides dependency resolution using Kahn's algorithm for topological sorting.
Includes cycle detection, validation, and helper functions for dependency management.
"""
import heapq
from collections import deque
from typing import TypedDict
# Security: Prevent DoS via excessive dependencies
MAX_DEPENDENCIES_PER_FEATURE = 20
MAX_DEPENDENCY_DEPTH = 50 # Prevent stack overflow in cycle detection
class DependencyResult(TypedDict):
"""Result from dependency resolution."""
ordered_features: list[dict]
circular_dependencies: list[list[int]]
blocked_features: dict[int, list[int]] # feature_id -> [blocking_ids]
missing_dependencies: dict[int, list[int]] # feature_id -> [missing_ids]
def resolve_dependencies(features: list[dict]) -> DependencyResult:
"""Topological sort using Kahn's algorithm with priority-aware ordering.
Returns ordered features respecting dependencies, plus metadata about
cycles, blocked features, and missing dependencies.
Args:
features: List of feature dicts with id, priority, passes, and dependencies fields
Returns:
DependencyResult with ordered_features, circular_dependencies,
blocked_features, and missing_dependencies
"""
feature_map = {f["id"]: f for f in features}
in_degree = {f["id"]: 0 for f in features}
adjacency: dict[int, list[int]] = {f["id"]: [] for f in features}
blocked: dict[int, list[int]] = {}
missing: dict[int, list[int]] = {}
# Build graph
for feature in features:
deps = feature.get("dependencies") or []
for dep_id in deps:
if dep_id not in feature_map:
missing.setdefault(feature["id"], []).append(dep_id)
else:
adjacency[dep_id].append(feature["id"])
in_degree[feature["id"]] += 1
# Track blocked features
dep = feature_map[dep_id]
if not dep.get("passes"):
blocked.setdefault(feature["id"], []).append(dep_id)
# Kahn's algorithm with priority-aware selection using a heap
# Heap entries are tuples: (priority, id, feature_dict) for stable ordering
heap = [
(f.get("priority", 999), f["id"], f)
for f in features
if in_degree[f["id"]] == 0
]
heapq.heapify(heap)
ordered: list[dict] = []
while heap:
_, _, current = heapq.heappop(heap)
ordered.append(current)
for dependent_id in adjacency[current["id"]]:
in_degree[dependent_id] -= 1
if in_degree[dependent_id] == 0:
dep_feature = feature_map[dependent_id]
heapq.heappush(
heap,
(dep_feature.get("priority", 999), dependent_id, dep_feature)
)
# Detect cycles (features not in ordered = part of cycle)
cycles: list[list[int]] = []
if len(ordered) < len(features):
remaining = [f for f in features if f not in ordered]
cycles = _detect_cycles(remaining, feature_map)
ordered.extend(remaining) # Add cyclic features at end
return {
"ordered_features": ordered,
"circular_dependencies": cycles,
"blocked_features": blocked,
"missing_dependencies": missing,
}
def are_dependencies_satisfied(
feature: dict,
all_features: list[dict],
passing_ids: set[int] | None = None,
) -> bool:
"""Check if all dependencies have passes=True.
Args:
feature: Feature dict to check
all_features: List of all feature dicts
passing_ids: Optional pre-computed set of passing feature IDs.
If None, will be computed from all_features. Pass this when
calling in a loop to avoid O(n^2) complexity.
Returns:
True if all dependencies are satisfied (or no dependencies)
"""
deps = feature.get("dependencies") or []
if not deps:
return True
if passing_ids is None:
passing_ids = {f["id"] for f in all_features if f.get("passes")}
return all(dep_id in passing_ids for dep_id in deps)
def get_blocking_dependencies(
feature: dict,
all_features: list[dict],
passing_ids: set[int] | None = None,
) -> list[int]:
"""Get list of incomplete dependency IDs.
Args:
feature: Feature dict to check
all_features: List of all feature dicts
passing_ids: Optional pre-computed set of passing feature IDs.
If None, will be computed from all_features. Pass this when
calling in a loop to avoid O(n^2) complexity.
Returns:
List of feature IDs that are blocking this feature
"""
deps = feature.get("dependencies") or []
if passing_ids is None:
passing_ids = {f["id"] for f in all_features if f.get("passes")}
return [dep_id for dep_id in deps if dep_id not in passing_ids]
def would_create_circular_dependency(
features: list[dict], source_id: int, target_id: int
) -> bool:
"""Check if adding a dependency from target to source would create a cycle.
Uses DFS with visited set for efficient cycle detection.
Args:
features: List of all feature dicts
source_id: The feature that would gain the dependency
target_id: The feature that would become a dependency
Returns:
True if adding the dependency would create a cycle
"""
if source_id == target_id:
return True # Self-reference is a cycle
feature_map = {f["id"]: f for f in features}
source = feature_map.get(source_id)
if not source:
return False
# Check if target already depends on source (direct or indirect)
target = feature_map.get(target_id)
if not target:
return False
# DFS from target to see if we can reach source
visited: set[int] = set()
def can_reach(current_id: int, depth: int = 0) -> bool:
# Security: Prevent stack overflow with depth limit
if depth > MAX_DEPENDENCY_DEPTH:
return True # Assume cycle if too deep (fail-safe)
if current_id == source_id:
return True
if current_id in visited:
return False
visited.add(current_id)
current = feature_map.get(current_id)
if not current:
return False
deps = current.get("dependencies") or []
for dep_id in deps:
if can_reach(dep_id, depth + 1):
return True
return False
return can_reach(target_id)
def validate_dependencies(
feature_id: int, dependency_ids: list[int], all_feature_ids: set[int]
) -> tuple[bool, str]:
"""Validate dependency list.
Args:
feature_id: ID of the feature being validated
dependency_ids: List of proposed dependency IDs
all_feature_ids: Set of all valid feature IDs
Returns:
Tuple of (is_valid, error_message)
"""
# Security: Check limits
if len(dependency_ids) > MAX_DEPENDENCIES_PER_FEATURE:
return False, f"Maximum {MAX_DEPENDENCIES_PER_FEATURE} dependencies allowed"
# Check self-reference
if feature_id in dependency_ids:
return False, "A feature cannot depend on itself"
# Check all dependencies exist
missing = [d for d in dependency_ids if d not in all_feature_ids]
if missing:
return False, f"Dependencies not found: {missing}"
# Check for duplicates
if len(dependency_ids) != len(set(dependency_ids)):
return False, "Duplicate dependencies not allowed"
return True, ""
def _detect_cycles(features: list[dict], feature_map: dict) -> list[list[int]]:
"""Detect cycles using DFS with recursion tracking.
Args:
features: List of features to check for cycles
feature_map: Map of feature_id -> feature dict
Returns:
List of cycles, where each cycle is a list of feature IDs
"""
cycles: list[list[int]] = []
visited: set[int] = set()
rec_stack: set[int] = set()
path: list[int] = []
def dfs(fid: int) -> bool:
visited.add(fid)
rec_stack.add(fid)
path.append(fid)
feature = feature_map.get(fid)
if feature:
for dep_id in feature.get("dependencies") or []:
if dep_id not in visited:
if dfs(dep_id):
return True
elif dep_id in rec_stack:
cycle_start = path.index(dep_id)
cycles.append(path[cycle_start:])
return True
path.pop()
rec_stack.remove(fid)
return False
for f in features:
if f["id"] not in visited:
dfs(f["id"])
return cycles
def compute_scheduling_scores(features: list[dict]) -> dict[int, float]:
"""Compute scheduling scores for all features.
Higher scores mean higher priority for scheduling. The algorithm considers:
1. Unblocking potential - Features that unblock more downstream work score higher
2. Depth in graph - Features with no dependencies (roots) are "shovel-ready"
3. User priority - Existing priority field as tiebreaker
Score formula: (1000 * unblock) + (100 * depth_score) + (10 * priority_factor)
Args:
features: List of feature dicts with id, priority, dependencies fields
Returns:
Dict mapping feature_id -> score (higher = schedule first)
"""
if not features:
return {}
# Build adjacency lists
children: dict[int, list[int]] = {f["id"]: [] for f in features} # who depends on me
parents: dict[int, list[int]] = {f["id"]: [] for f in features} # who I depend on
for f in features:
for dep_id in (f.get("dependencies") or []):
if dep_id in children: # Only valid deps
children[dep_id].append(f["id"])
parents[f["id"]].append(dep_id)
# Calculate depths via BFS from roots
# Use visited set to prevent infinite loops from circular dependencies
# Use deque for O(1) popleft instead of list.pop(0) which is O(n)
depths: dict[int, int] = {}
visited: set[int] = set()
roots = [f["id"] for f in features if not parents[f["id"]]]
bfs_queue: deque[tuple[int, int]] = deque((root, 0) for root in roots)
while bfs_queue:
node_id, depth = bfs_queue.popleft()
if node_id in visited:
continue # Skip already visited nodes (handles cycles)
visited.add(node_id)
depths[node_id] = depth
for child_id in children[node_id]:
if child_id not in visited:
bfs_queue.append((child_id, depth + 1))
# Handle orphaned nodes (shouldn't happen but be safe)
for f in features:
if f["id"] not in depths:
depths[f["id"]] = 0
# Calculate transitive downstream counts (reverse topo order)
downstream: dict[int, int] = {f["id"]: 0 for f in features}
# Process in reverse depth order (leaves first)
for fid in sorted(depths.keys(), key=lambda x: -depths[x]):
for parent_id in parents[fid]:
downstream[parent_id] += 1 + downstream[fid]
# Normalize and compute scores
max_depth = max(depths.values()) if depths else 0
max_downstream = max(downstream.values()) if downstream else 0
scores: dict[int, float] = {}
for f in features:
fid = f["id"]
# Unblocking score: 0-1, higher = unblocks more
unblock = downstream[fid] / max_downstream if max_downstream > 0 else 0
# Depth score: 0-1, higher = closer to root (no deps)
depth_score = 1 - (depths[fid] / max_depth) if max_depth > 0 else 1
# Priority factor: 0-1, lower priority number = higher factor
priority = f.get("priority", 999)
priority_factor = (10 - min(priority, 10)) / 10
scores[fid] = (1000 * unblock) + (100 * depth_score) + (10 * priority_factor)
return scores
def get_ready_features(features: list[dict], limit: int = 10) -> list[dict]:
"""Get features that are ready to be worked on.
A feature is ready if:
- It is not passing
- It is not in progress
- All its dependencies are satisfied
Args:
features: List of all feature dicts
limit: Maximum number of features to return
Returns:
List of ready features, sorted by priority
"""
passing_ids = {f["id"] for f in features if f.get("passes")}
ready = []
for f in features:
if f.get("passes") or f.get("in_progress"):
continue
deps = f.get("dependencies") or []
if all(dep_id in passing_ids for dep_id in deps):
ready.append(f)
# Sort by scheduling score (higher = first), then priority, then id
scores = compute_scheduling_scores(features)
ready.sort(key=lambda f: (-scores.get(f["id"], 0), f.get("priority", 999), f["id"]))
return ready[:limit]
def get_blocked_features(features: list[dict]) -> list[dict]:
"""Get features that are blocked by unmet dependencies.
Args:
features: List of all feature dicts
Returns:
List of blocked features with 'blocked_by' field added
"""
passing_ids = {f["id"] for f in features if f.get("passes")}
blocked = []
for f in features:
if f.get("passes"):
continue
deps = f.get("dependencies") or []
blocking = [d for d in deps if d not in passing_ids]
if blocking:
blocked.append({**f, "blocked_by": blocking})
return blocked
def build_graph_data(features: list[dict]) -> dict:
"""Build graph data structure for visualization.
Args:
features: List of all feature dicts
Returns:
Dict with 'nodes' and 'edges' for graph visualization
"""
passing_ids = {f["id"] for f in features if f.get("passes")}
nodes = []
edges = []
for f in features:
deps = f.get("dependencies") or []
blocking = [d for d in deps if d not in passing_ids]
if f.get("passes"):
status = "done"
elif blocking:
status = "blocked"
elif f.get("in_progress"):
status = "in_progress"
else:
status = "pending"
nodes.append({
"id": f["id"],
"name": f["name"],
"category": f["category"],
"status": status,
"priority": f.get("priority", 999),
"dependencies": deps,
})
for dep_id in deps:
edges.append({"source": dep_id, "target": f["id"]})
return {"nodes": nodes, "edges": edges}