Skip to content

Commit

Permalink
FIX: Better handling of Synced states: much better...
Browse files Browse the repository at this point in the history
  • Loading branch information
micoloth committed Mar 9, 2024
1 parent 040b44b commit 6109e6f
Show file tree
Hide file tree
Showing 3 changed files with 204 additions and 45 deletions.
59 changes: 42 additions & 17 deletions src/reactive_python_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
import itertools
import functools
import dataclasses
from collections import abc
from typing import Any, Dict, List, Optional, Set, Union, Tuple
from collections import abc, deque
from typing import Any, Dict, List, Optional, Set, Union, Tuple, Callable


class ReactivePythonDagBuilderUtils__():
Expand All @@ -20,7 +20,30 @@ def define_reactive_python_utils():

# We define here the small subset of NetworkX objects that we need to use,
# in order to avoid pip-installing the whole NetworkX library on the user's machine:
DiGraph, topological_sort, ancestors, descendants, has_path = ReactivePythonDagBuilderUtils__.define_networkx_objects()
DiGraph, topological_sort, has_path = ReactivePythonDagBuilderUtils__.define_networkx_objects()

# Furthermore we need these modified methods, which are not in the library:
def generic_bfs_edges_with_pruning(G, source, neighbors, pruning_condition: Callable):
visited = {source}
queue = deque([(source, neighbors(source))])
while queue:
parent, children = queue[0]
try:
child = next(children)
if child not in visited and pruning_condition(G.nodes[child]):
yield parent, child
visited.add(child)
queue.append((child, neighbors(child)))
except StopIteration:
queue.popleft()

def directed_ancestors_with_pruning(G, source, pruning_condition):
return {child for parent, child in generic_bfs_edges_with_pruning(G, source, G.predecessors, pruning_condition=pruning_condition)}

def directed_descendants_with_pruning(G, source, pruning_condition):
return {child for parent, child in generic_bfs_edges_with_pruning(G, source, G.neighbors, pruning_condition=pruning_condition)}



@dataclasses.dataclass
class AstNodeData:
Expand Down Expand Up @@ -803,11 +826,11 @@ def update_staleness_info_in_new_dag(old_dag, new_dag):
- it Doesn't have a corresponding node in the old_dag (ie it's new/ has been changed).
Subsequently,
Each noed is STALE if and only if:
Each node is STALE if:
- it NEEDSTOBESTALE, OR
- ANY of its parents in the new_dag NEEDSTOBESTALE, OR
- it Doesn't have a corresponding node in the old_dag (ie it's new/ has been changed).
Otherwise, it should retain its state.
Otherwise, it should retain the state it had in the old_dag.
This double logic is to handle this case:
- If a node is STALE and one of its childer in FRESH (not stale), but None of them NEEDSTOBESTALE, the the Children SHOULD remain Fresh...
Expand Down Expand Up @@ -846,8 +869,8 @@ def update_staleness_info_in_new_dag(old_dag, new_dag):
new_node_to_old[new_node] = candidate
break

# If there is no matching node in the old_dag, or the matching node is stale, it's stale:
if new_node_to_old.get(new_node) is None or old_dag.nodes[new_node_to_old[new_node]]['needstobestale']:
# If there is no matching node in the old_dag, then needs to be stale and it's stale:
if new_node_to_old.get(new_node) is None:
new_dag.nodes[new_node]['needstobestale'] = True;
new_dag.nodes[new_node]['stale'] = True;
continue
Expand Down Expand Up @@ -876,7 +899,7 @@ def draw_dag(graph):

# Source and target index, + "var" label:
edge_list = [
(source, target, str(vars) if len(vars:=graph[source][target].get('vars', [""])) > 1 else vars[0])
(source, target, str(graph[source][target].get('vars', [])))
for source, target in graph.edges
]

Expand Down Expand Up @@ -935,22 +958,23 @@ def dag_to_node_ranges(current_dag, current_line: Optional[int] = None, get_upst

nodes_to_return = []
if get_upstream and not stale_only:
nodes_to_return += [n for n in ancestors(current_dag, current_node)]
nodes_to_return += list(directed_ancestors_with_pruning(current_dag, current_node, pruning_condition=lambda x: x['text'] != '_START_'))
elif get_upstream and stale_only:
nodes_to_return += [n for n in ancestors(current_dag, current_node) if current_dag.nodes[n].get('stale')]
nodes_to_return += list(directed_ancestors_with_pruning(current_dag, current_node, pruning_condition=lambda x: x['text'] != '_START_' and x.get('stale')))
# This^ will get ALL THE STALE ANCESTORS, until you get the FIRST NOT-STALE node, then you prune that branch since it's good.
if (not get_downstream) or not stale_only or current_dag.nodes[current_node].get('stale'):
nodes_to_return += [current_node] # IF YOU ARE NOT GETTING THE DESCENDANTS (So "up to(including) the current node"), this is inserted EVEN IF NOT STALE, which is very intentional
if get_downstream and not stale_only:
nodes_to_return += [n for n in descendants(current_dag, current_node)]
nodes_to_return += list(directed_descendants_with_pruning(current_dag, current_node, pruning_condition=lambda x: x['text'] != '_START_'))
elif get_downstream and stale_only:
nodes_to_return += [n for n in descendants(current_dag, current_node) if current_dag.nodes[n].get('stale')]
nodes_to_return += list(directed_descendants_with_pruning(current_dag, current_node, pruning_condition=lambda x: x['text'] != '_START_' and x.get('stale')))
# This will get ALL THE STALE DESCENDANTS, until you get the FIRST NOT-STALE node, then you prune that branch since it's good.

# When you need to Execute code, the order is important, so we sort the nodes by topological order.
# When you are just selecting, not so much. # For now, I'm using the stale_only flag to indicate this difference.
# Btw: It's unfortunate that directed_ancestors_with_pruning's bfs doesn't return nodes Already sorted, but I'm not gonna find out why...
if stale_only:
nodes_to_return = [n for n in topological_sort(current_dag) if n in set(nodes_to_return) and current_dag.nodes[n]['text'] != '_START_']
else:
nodes_to_return = [n for n in nodes_to_return if current_dag.nodes[n]['text'] != '_START_']
nodes_to_return = [n for n in topological_sort(current_dag) if n in set(nodes_to_return)]

# Additional step: If get_downstream OR if you are running a single block (neither downstream not upstream),
# you want to identify which codes are stale but actually depend on other stale code
Expand Down Expand Up @@ -1003,7 +1027,6 @@ def __init__(self):

def update_dag(self, code: str, current_line: Optional[int] = None):
try:
# TODO: Probably treat parsing errors different from code analysis errors
ast_tree = ast.parse(code, "", mode='exec', type_comments=True)
self.syntax_error_range = None
except SyntaxError as e:
Expand Down Expand Up @@ -2391,8 +2414,10 @@ def has_path(G, source, target):
return True


return DiGraph, topological_sort, ancestors, descendants, has_path
return DiGraph, topological_sort, has_path

reactive_python_dag_builder_utils__ = ReactivePythonDagBuilderUtils__()




59 changes: 42 additions & 17 deletions src/reactive_python_engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import hashlib
import itertools
import functools
import dataclasses
from collections import abc
from typing import Any, Dict, List, Optional, Set, Union, Tuple
from collections import abc, deque
from typing import Any, Dict, List, Optional, Set, Union, Tuple, Callable
class ReactivePythonDagBuilderUtils__():
Expand All @@ -21,7 +21,30 @@ class ReactivePythonDagBuilderUtils__():
# We define here the small subset of NetworkX objects that we need to use,
# in order to avoid pip-installing the whole NetworkX library on the user's machine:
DiGraph, topological_sort, ancestors, descendants, has_path = ReactivePythonDagBuilderUtils__.define_networkx_objects()
DiGraph, topological_sort, has_path = ReactivePythonDagBuilderUtils__.define_networkx_objects()
# Furthermore we need these modified methods, which are not in the library:
def generic_bfs_edges_with_pruning(G, source, neighbors, pruning_condition: Callable):
visited = {source}
queue = deque([(source, neighbors(source))])
while queue:
parent, children = queue[0]
try:
child = next(children)
if child not in visited and pruning_condition(G.nodes[child]):
yield parent, child
visited.add(child)
queue.append((child, neighbors(child)))
except StopIteration:
queue.popleft()
def directed_ancestors_with_pruning(G, source, pruning_condition):
return {child for parent, child in generic_bfs_edges_with_pruning(G, source, G.predecessors, pruning_condition=pruning_condition)}
def directed_descendants_with_pruning(G, source, pruning_condition):
return {child for parent, child in generic_bfs_edges_with_pruning(G, source, G.neighbors, pruning_condition=pruning_condition)}
@dataclasses.dataclass
class AstNodeData:
Expand Down Expand Up @@ -804,11 +827,11 @@ class ReactivePythonDagBuilderUtils__():
- it Doesn't have a corresponding node in the old_dag (ie it's new/ has been changed).
Subsequently,
Each noed is STALE if and only if:
Each node is STALE if:
- it NEEDSTOBESTALE, OR
- ANY of its parents in the new_dag NEEDSTOBESTALE, OR
- it Doesn't have a corresponding node in the old_dag (ie it's new/ has been changed).
Otherwise, it should retain its state.
Otherwise, it should retain the state it had in the old_dag.
This double logic is to handle this case:
- If a node is STALE and one of its childer in FRESH (not stale), but None of them NEEDSTOBESTALE, the the Children SHOULD remain Fresh...
Expand Down Expand Up @@ -847,8 +870,8 @@ class ReactivePythonDagBuilderUtils__():
new_node_to_old[new_node] = candidate
break
# If there is no matching node in the old_dag, or the matching node is stale, it's stale:
if new_node_to_old.get(new_node) is None or old_dag.nodes[new_node_to_old[new_node]]['needstobestale']:
# If there is no matching node in the old_dag, then needs to be stale and it's stale:
if new_node_to_old.get(new_node) is None:
new_dag.nodes[new_node]['needstobestale'] = True;
new_dag.nodes[new_node]['stale'] = True;
continue
Expand Down Expand Up @@ -877,7 +900,7 @@ class ReactivePythonDagBuilderUtils__():
# Source and target index, + "var" label:
edge_list = [
(source, target, str(vars) if len(vars:=graph[source][target].get('vars', [""])) > 1 else vars[0])
(source, target, str(graph[source][target].get('vars', [])))
for source, target in graph.edges
]
Expand Down Expand Up @@ -936,22 +959,23 @@ class ReactivePythonDagBuilderUtils__():
nodes_to_return = []
if get_upstream and not stale_only:
nodes_to_return += [n for n in ancestors(current_dag, current_node)]
nodes_to_return += list(directed_ancestors_with_pruning(current_dag, current_node, pruning_condition=lambda x: x['text'] != '_START_'))
elif get_upstream and stale_only:
nodes_to_return += [n for n in ancestors(current_dag, current_node) if current_dag.nodes[n].get('stale')]
nodes_to_return += list(directed_ancestors_with_pruning(current_dag, current_node, pruning_condition=lambda x: x['text'] != '_START_' and x.get('stale')))
# This^ will get ALL THE STALE ANCESTORS, until you get the FIRST NOT-STALE node, then you prune that branch since it's good.
if (not get_downstream) or not stale_only or current_dag.nodes[current_node].get('stale'):
nodes_to_return += [current_node] # IF YOU ARE NOT GETTING THE DESCENDANTS (So "up to(including) the current node"), this is inserted EVEN IF NOT STALE, which is very intentional
if get_downstream and not stale_only:
nodes_to_return += [n for n in descendants(current_dag, current_node)]
nodes_to_return += list(directed_descendants_with_pruning(current_dag, current_node, pruning_condition=lambda x: x['text'] != '_START_'))
elif get_downstream and stale_only:
nodes_to_return += [n for n in descendants(current_dag, current_node) if current_dag.nodes[n].get('stale')]
nodes_to_return += list(directed_descendants_with_pruning(current_dag, current_node, pruning_condition=lambda x: x['text'] != '_START_' and x.get('stale')))
# This will get ALL THE STALE DESCENDANTS, until you get the FIRST NOT-STALE node, then you prune that branch since it's good.
# When you need to Execute code, the order is important, so we sort the nodes by topological order.
# When you are just selecting, not so much. # For now, I'm using the stale_only flag to indicate this difference.
# Btw: It's unfortunate that directed_ancestors_with_pruning's bfs doesn't return nodes Already sorted, but I'm not gonna find out why...
if stale_only:
nodes_to_return = [n for n in topological_sort(current_dag) if n in set(nodes_to_return) and current_dag.nodes[n]['text'] != '_START_']
else:
nodes_to_return = [n for n in nodes_to_return if current_dag.nodes[n]['text'] != '_START_']
nodes_to_return = [n for n in topological_sort(current_dag) if n in set(nodes_to_return)]
# Additional step: If get_downstream OR if you are running a single block (neither downstream not upstream),
# you want to identify which codes are stale but actually depend on other stale code
Expand Down Expand Up @@ -1004,7 +1028,6 @@ class ReactivePythonDagBuilderUtils__():
def update_dag(self, code: str, current_line: Optional[int] = None):
try:
# TODO: Probably treat parsing errors different from code analysis errors
ast_tree = ast.parse(code, "", mode='exec', type_comments=True)
self.syntax_error_range = None
except SyntaxError as e:
Expand Down Expand Up @@ -2392,10 +2415,12 @@ class ReactivePythonDagBuilderUtils__():
return True
return DiGraph, topological_sort, ancestors, descendants, has_path
return DiGraph, topological_sort, has_path
reactive_python_dag_builder_utils__ = ReactivePythonDagBuilderUtils__()
`;
Loading

0 comments on commit 6109e6f

Please sign in to comment.