diff --git a/src/reactive_python_engine.py b/src/reactive_python_engine.py index eb85c92..0faded5 100644 --- a/src/reactive_python_engine.py +++ b/src/reactive_python_engine.py @@ -5,8 +5,8 @@ import itertools import functools import dataclasses -from collections import abc -from typing import Any, Dict, List, Optional, Set, Union, Tuple +from collections import abc, deque +from typing import Any, Dict, List, Optional, Set, Union, Tuple, Callable class ReactivePythonDagBuilderUtils__(): @@ -20,7 +20,30 @@ def define_reactive_python_utils(): # We define here the small subset of NetworkX objects that we need to use, # in order to avoid pip-installing the whole NetworkX library on the user's machine: - DiGraph, topological_sort, ancestors, descendants, has_path = ReactivePythonDagBuilderUtils__.define_networkx_objects() + DiGraph, topological_sort, has_path = ReactivePythonDagBuilderUtils__.define_networkx_objects() + + # Furthermore we need these modified methods, which are not in the library: + def generic_bfs_edges_with_pruning(G, source, neighbors, pruning_condition: Callable): + visited = {source} + queue = deque([(source, neighbors(source))]) + while queue: + parent, children = queue[0] + try: + child = next(children) + if child not in visited and pruning_condition(G.nodes[child]): + yield parent, child + visited.add(child) + queue.append((child, neighbors(child))) + except StopIteration: + queue.popleft() + + def directed_ancestors_with_pruning(G, source, pruning_condition): + return {child for parent, child in generic_bfs_edges_with_pruning(G, source, G.predecessors, pruning_condition=pruning_condition)} + + def directed_descendants_with_pruning(G, source, pruning_condition): + return {child for parent, child in generic_bfs_edges_with_pruning(G, source, G.neighbors, pruning_condition=pruning_condition)} + + @dataclasses.dataclass class AstNodeData: @@ -803,11 +826,11 @@ def update_staleness_info_in_new_dag(old_dag, new_dag): - it Doesn't have a corresponding node in the old_dag (ie it's new/ has been changed). Subsequently, - Each noed is STALE if and only if: + Each node is STALE if: - it NEEDSTOBESTALE, OR - ANY of its parents in the new_dag NEEDSTOBESTALE, OR - it Doesn't have a corresponding node in the old_dag (ie it's new/ has been changed). - Otherwise, it should retain its state. + Otherwise, it should retain the state it had in the old_dag. This double logic is to handle this case: - If a node is STALE and one of its childer in FRESH (not stale), but None of them NEEDSTOBESTALE, the the Children SHOULD remain Fresh... @@ -846,8 +869,8 @@ def update_staleness_info_in_new_dag(old_dag, new_dag): new_node_to_old[new_node] = candidate break - # If there is no matching node in the old_dag, or the matching node is stale, it's stale: - if new_node_to_old.get(new_node) is None or old_dag.nodes[new_node_to_old[new_node]]['needstobestale']: + # If there is no matching node in the old_dag, then needs to be stale and it's stale: + if new_node_to_old.get(new_node) is None: new_dag.nodes[new_node]['needstobestale'] = True; new_dag.nodes[new_node]['stale'] = True; continue @@ -876,7 +899,7 @@ def draw_dag(graph): # Source and target index, + "var" label: edge_list = [ - (source, target, str(vars) if len(vars:=graph[source][target].get('vars', [""])) > 1 else vars[0]) + (source, target, str(graph[source][target].get('vars', []))) for source, target in graph.edges ] @@ -935,22 +958,23 @@ def dag_to_node_ranges(current_dag, current_line: Optional[int] = None, get_upst nodes_to_return = [] if get_upstream and not stale_only: - nodes_to_return += [n for n in ancestors(current_dag, current_node)] + nodes_to_return += list(directed_ancestors_with_pruning(current_dag, current_node, pruning_condition=lambda x: x['text'] != '_START_')) elif get_upstream and stale_only: - nodes_to_return += [n for n in ancestors(current_dag, current_node) if current_dag.nodes[n].get('stale')] + nodes_to_return += list(directed_ancestors_with_pruning(current_dag, current_node, pruning_condition=lambda x: x['text'] != '_START_' and x.get('stale'))) + # This^ will get ALL THE STALE ANCESTORS, until you get the FIRST NOT-STALE node, then you prune that branch since it's good. if (not get_downstream) or not stale_only or current_dag.nodes[current_node].get('stale'): nodes_to_return += [current_node] # IF YOU ARE NOT GETTING THE DESCENDANTS (So "up to(including) the current node"), this is inserted EVEN IF NOT STALE, which is very intentional if get_downstream and not stale_only: - nodes_to_return += [n for n in descendants(current_dag, current_node)] + nodes_to_return += list(directed_descendants_with_pruning(current_dag, current_node, pruning_condition=lambda x: x['text'] != '_START_')) elif get_downstream and stale_only: - nodes_to_return += [n for n in descendants(current_dag, current_node) if current_dag.nodes[n].get('stale')] + nodes_to_return += list(directed_descendants_with_pruning(current_dag, current_node, pruning_condition=lambda x: x['text'] != '_START_' and x.get('stale'))) + # This will get ALL THE STALE DESCENDANTS, until you get the FIRST NOT-STALE node, then you prune that branch since it's good. # When you need to Execute code, the order is important, so we sort the nodes by topological order. # When you are just selecting, not so much. # For now, I'm using the stale_only flag to indicate this difference. + # Btw: It's unfortunate that directed_ancestors_with_pruning's bfs doesn't return nodes Already sorted, but I'm not gonna find out why... if stale_only: - nodes_to_return = [n for n in topological_sort(current_dag) if n in set(nodes_to_return) and current_dag.nodes[n]['text'] != '_START_'] - else: - nodes_to_return = [n for n in nodes_to_return if current_dag.nodes[n]['text'] != '_START_'] + nodes_to_return = [n for n in topological_sort(current_dag) if n in set(nodes_to_return)] # Additional step: If get_downstream OR if you are running a single block (neither downstream not upstream), # you want to identify which codes are stale but actually depend on other stale code @@ -1003,7 +1027,6 @@ def __init__(self): def update_dag(self, code: str, current_line: Optional[int] = None): try: - # TODO: Probably treat parsing errors different from code analysis errors ast_tree = ast.parse(code, "", mode='exec', type_comments=True) self.syntax_error_range = None except SyntaxError as e: @@ -2391,8 +2414,10 @@ def has_path(G, source, target): return True - return DiGraph, topological_sort, ancestors, descendants, has_path + return DiGraph, topological_sort, has_path reactive_python_dag_builder_utils__ = ReactivePythonDagBuilderUtils__() + + diff --git a/src/reactive_python_engine.ts b/src/reactive_python_engine.ts index dc531d9..457cfff 100644 --- a/src/reactive_python_engine.ts +++ b/src/reactive_python_engine.ts @@ -6,8 +6,8 @@ import hashlib import itertools import functools import dataclasses -from collections import abc -from typing import Any, Dict, List, Optional, Set, Union, Tuple +from collections import abc, deque +from typing import Any, Dict, List, Optional, Set, Union, Tuple, Callable class ReactivePythonDagBuilderUtils__(): @@ -21,7 +21,30 @@ class ReactivePythonDagBuilderUtils__(): # We define here the small subset of NetworkX objects that we need to use, # in order to avoid pip-installing the whole NetworkX library on the user's machine: - DiGraph, topological_sort, ancestors, descendants, has_path = ReactivePythonDagBuilderUtils__.define_networkx_objects() + DiGraph, topological_sort, has_path = ReactivePythonDagBuilderUtils__.define_networkx_objects() + + # Furthermore we need these modified methods, which are not in the library: + def generic_bfs_edges_with_pruning(G, source, neighbors, pruning_condition: Callable): + visited = {source} + queue = deque([(source, neighbors(source))]) + while queue: + parent, children = queue[0] + try: + child = next(children) + if child not in visited and pruning_condition(G.nodes[child]): + yield parent, child + visited.add(child) + queue.append((child, neighbors(child))) + except StopIteration: + queue.popleft() + + def directed_ancestors_with_pruning(G, source, pruning_condition): + return {child for parent, child in generic_bfs_edges_with_pruning(G, source, G.predecessors, pruning_condition=pruning_condition)} + + def directed_descendants_with_pruning(G, source, pruning_condition): + return {child for parent, child in generic_bfs_edges_with_pruning(G, source, G.neighbors, pruning_condition=pruning_condition)} + + @dataclasses.dataclass class AstNodeData: @@ -804,11 +827,11 @@ class ReactivePythonDagBuilderUtils__(): - it Doesn't have a corresponding node in the old_dag (ie it's new/ has been changed). Subsequently, - Each noed is STALE if and only if: + Each node is STALE if: - it NEEDSTOBESTALE, OR - ANY of its parents in the new_dag NEEDSTOBESTALE, OR - it Doesn't have a corresponding node in the old_dag (ie it's new/ has been changed). - Otherwise, it should retain its state. + Otherwise, it should retain the state it had in the old_dag. This double logic is to handle this case: - If a node is STALE and one of its childer in FRESH (not stale), but None of them NEEDSTOBESTALE, the the Children SHOULD remain Fresh... @@ -847,8 +870,8 @@ class ReactivePythonDagBuilderUtils__(): new_node_to_old[new_node] = candidate break - # If there is no matching node in the old_dag, or the matching node is stale, it's stale: - if new_node_to_old.get(new_node) is None or old_dag.nodes[new_node_to_old[new_node]]['needstobestale']: + # If there is no matching node in the old_dag, then needs to be stale and it's stale: + if new_node_to_old.get(new_node) is None: new_dag.nodes[new_node]['needstobestale'] = True; new_dag.nodes[new_node]['stale'] = True; continue @@ -877,7 +900,7 @@ class ReactivePythonDagBuilderUtils__(): # Source and target index, + "var" label: edge_list = [ - (source, target, str(vars) if len(vars:=graph[source][target].get('vars', [""])) > 1 else vars[0]) + (source, target, str(graph[source][target].get('vars', []))) for source, target in graph.edges ] @@ -936,22 +959,23 @@ class ReactivePythonDagBuilderUtils__(): nodes_to_return = [] if get_upstream and not stale_only: - nodes_to_return += [n for n in ancestors(current_dag, current_node)] + nodes_to_return += list(directed_ancestors_with_pruning(current_dag, current_node, pruning_condition=lambda x: x['text'] != '_START_')) elif get_upstream and stale_only: - nodes_to_return += [n for n in ancestors(current_dag, current_node) if current_dag.nodes[n].get('stale')] + nodes_to_return += list(directed_ancestors_with_pruning(current_dag, current_node, pruning_condition=lambda x: x['text'] != '_START_' and x.get('stale'))) + # This^ will get ALL THE STALE ANCESTORS, until you get the FIRST NOT-STALE node, then you prune that branch since it's good. if (not get_downstream) or not stale_only or current_dag.nodes[current_node].get('stale'): nodes_to_return += [current_node] # IF YOU ARE NOT GETTING THE DESCENDANTS (So "up to(including) the current node"), this is inserted EVEN IF NOT STALE, which is very intentional if get_downstream and not stale_only: - nodes_to_return += [n for n in descendants(current_dag, current_node)] + nodes_to_return += list(directed_descendants_with_pruning(current_dag, current_node, pruning_condition=lambda x: x['text'] != '_START_')) elif get_downstream and stale_only: - nodes_to_return += [n for n in descendants(current_dag, current_node) if current_dag.nodes[n].get('stale')] + nodes_to_return += list(directed_descendants_with_pruning(current_dag, current_node, pruning_condition=lambda x: x['text'] != '_START_' and x.get('stale'))) + # This will get ALL THE STALE DESCENDANTS, until you get the FIRST NOT-STALE node, then you prune that branch since it's good. # When you need to Execute code, the order is important, so we sort the nodes by topological order. # When you are just selecting, not so much. # For now, I'm using the stale_only flag to indicate this difference. + # Btw: It's unfortunate that directed_ancestors_with_pruning's bfs doesn't return nodes Already sorted, but I'm not gonna find out why... if stale_only: - nodes_to_return = [n for n in topological_sort(current_dag) if n in set(nodes_to_return) and current_dag.nodes[n]['text'] != '_START_'] - else: - nodes_to_return = [n for n in nodes_to_return if current_dag.nodes[n]['text'] != '_START_'] + nodes_to_return = [n for n in topological_sort(current_dag) if n in set(nodes_to_return)] # Additional step: If get_downstream OR if you are running a single block (neither downstream not upstream), # you want to identify which codes are stale but actually depend on other stale code @@ -1004,7 +1028,6 @@ class ReactivePythonDagBuilderUtils__(): def update_dag(self, code: str, current_line: Optional[int] = None): try: - # TODO: Probably treat parsing errors different from code analysis errors ast_tree = ast.parse(code, "", mode='exec', type_comments=True) self.syntax_error_range = None except SyntaxError as e: @@ -2392,10 +2415,12 @@ class ReactivePythonDagBuilderUtils__(): return True - return DiGraph, topological_sort, ancestors, descendants, has_path + return DiGraph, topological_sort, has_path reactive_python_dag_builder_utils__ = ReactivePythonDagBuilderUtils__() + + `; \ No newline at end of file diff --git a/src/test/reactive_python_engine_tests.py b/src/test/reactive_python_engine_tests.py index 7f3b260..9795a31 100644 --- a/src/test/reactive_python_engine_tests.py +++ b/src/test/reactive_python_engine_tests.py @@ -1144,35 +1144,144 @@ def ff(y): +######################################### ANCESTOR WITH PRUNING: + +from networkx import DiGraph, topological_sort, find_cycle +from networkx import * + + + +from typing import Callable +from collections import deque + +def generic_bfs_edges_with_pruning(G, source, neighbors, pruning_condition: Callable): + visited = {source} + queue = deque([(source, 0, neighbors(source))]) + while queue: + parent, depth_now, children = queue[0] + try: + child = next(children) + if child not in visited and pruning_condition(G.nodes[child]): + yield parent, child + visited.add(child) + queue.append((child, 0, neighbors(child))) + except StopIteration: + queue.popleft() + + +def directed_ancestors_with_pruning(G, source, pruning_condition): + return {child for parent, child in generic_bfs_edges_with_pruning(G, source, G.predecessors, pruning_condition=pruning_condition)} + +def directed_descendants_with_pruning(G, source, pruning_condition): + return {child for parent, child in generic_bfs_edges_with_pruning(G, source, G.neighbors, pruning_condition=pruning_condition)} -###################################### AN EXECUTION EXAMPLE: +# sample graph: +dirGraph = DiGraph() +dirGraph.add_edges_from([(1, 2), (1, 3), (1, 4), (2, 5), (3, 5), (4, 6),]) +# Add a "STALE" label to each node: +for node in dirGraph.nodes: + dirGraph.nodes[node]['stale'] = True +pruning_condition_ = lambda node: node['stale'] +# Uni test descendants_with_pruning: + +import copy +g1 = copy.deepcopy(dirGraph) + +g1.nodes[3]['stale'] = False + +directed_descendants_with_pruning(g1, 1, pruning_condition_) +assert directed_descendants_with_pruning(g1, 1, pruning_condition_) == {2, 4, 5, 6} + + +g2 = copy.deepcopy(dirGraph) +g2.nodes[4]['stale'] = False +directed_descendants_with_pruning(g2, 1, pruning_condition_) +assert directed_descendants_with_pruning(g2, 1, pruning_condition_) == {2, 3, 5} + + + +###################################### AN EXECUTION EXAMPLE: + +def pr(ranges): + print("\n".join([r[4] + ": " + r[2] for r in json.loads(ranges)] if ranges else [])) + +import json code = """ -x = 1 # line 1 +counter1 = 1 # line 1 +counter1 += 2 +counter1 -print(x) +counter3 = 3 +counter3 += 4 +print(counter3) """ reactive_python_dag_builder_utils__ = ReactivePythonDagBuilderUtils__() -reactive_python_dag_builder_utils__.update_dag_and_get_ranges(code=code, include_code=True) +ranges = reactive_python_dag_builder_utils__.update_dag_and_get_ranges(code=code, include_code=True) +pr(ranges) + +# RUN LINE 3: +ranges = reactive_python_dag_builder_utils__.ask_for_ranges_to_compute(code, current_line = 3, get_upstream=True, get_downstream=False, stale_only=True); ranges +pr(ranges) +for h in ([r[5] for r in json.loads(ranges)] if ranges else []): + reactive_python_dag_builder_utils__.set_locked_range_as_synced(h) +reactive_python_dag_builder_utils__.unlock() +ranges = reactive_python_dag_builder_utils__.update_dag_and_get_ranges(code=None, current_line=None, include_code=True) +pr(ranges) + +# RUN LINE 7: +ranges = reactive_python_dag_builder_utils__.ask_for_ranges_to_compute(code, current_line = 7, get_upstream=True, get_downstream=False, stale_only=True); ranges +pr(ranges) +for h in ([r[5] for r in json.loads(ranges)] if ranges else []): + reactive_python_dag_builder_utils__.set_locked_range_as_synced(h) +reactive_python_dag_builder_utils__.unlock() +ranges = reactive_python_dag_builder_utils__.update_dag_and_get_ranges(code=None, current_line=None, include_code=True, stale_only=False) +pr(ranges) -ranges = reactive_python_dag_builder_utils__.ask_for_ranges_to_compute(code, current_line = 3, get_upstream=True, get_downstream=True, stale_only=True); ranges -hashes = [r[5] for r in json.loads(ranges)] if ranges else [] +#### Now send a New text: +code = """ +counter1 = 1 # line 1 +counter1 += 2 +counter1 -reactive_python_dag_builder_utils__.set_locked_range_as_synced(hashes[0]) -reactive_python_dag_builder_utils__.set_locked_range_as_synced(hashes[1]) +counter3 = 333 +counter3 += 4 +print(counter3) +""" +ranges = reactive_python_dag_builder_utils__.update_dag_and_get_ranges(code=code, include_code=True, stale_only=False) +pr(ranges) +# RUN LINE 7 AGAIN: +ranges = reactive_python_dag_builder_utils__.ask_for_ranges_to_compute(code, current_line = 7, get_upstream=True, get_downstream=False, stale_only=True); ranges +pr(ranges) +for h in ([r[5] for r in json.loads(ranges)] if ranges else []): + reactive_python_dag_builder_utils__.set_locked_range_as_synced(h) reactive_python_dag_builder_utils__.unlock() +ranges = reactive_python_dag_builder_utils__.update_dag_and_get_ranges(code=None, current_line=None, include_code=True, stale_only=False) +pr(ranges) -reactive_python_dag_builder_utils__.update_dag_and_get_ranges(code=code, current_line=3) -reactive_python_dag_builder_utils__.update_dag_and_get_ranges(code=code, current_line=None) +ranges = reactive_python_dag_builder_utils__.update_dag_and_get_ranges(code=code, current_line=None, include_code=True, stale_only=False) +pr(ranges) -ranges = reactive_python_dag_builder_utils__.ask_for_ranges_to_compute(code, current_line = 3, get_upstream=True, get_downstream=False, stale_only=True); ranges + + + + +# #### Run "counter2" again: +# ranges = reactive_python_dag_builder_utils__.ask_for_ranges_to_compute(code, current_line = 4, get_upstream=True, get_downstream=False, stale_only=True); ranges +# print("\n".join([r[4] for r in json.loads(ranges)])) +# hashes = [r[5] for r in json.loads(ranges)] if ranges else [] +# reactive_python_dag_builder_utils__.set_locked_range_as_synced(hashes[0]) +# reactive_python_dag_builder_utils__.unlock() + +# #### Draw DAG: +# reactive_python_dag_builder_utils__.draw_dag(reactive_python_dag_builder_utils__.current_dag)