Skip to content

Commit

Permalink
Better handling of Synced states: AT LEAST ONCE, it worked.... Too ba…
Browse files Browse the repository at this point in the history
…d other times it fails randomly..
  • Loading branch information
micoloth committed Mar 9, 2024
1 parent 8671fef commit 040b44b
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 37 deletions.
49 changes: 31 additions & 18 deletions src/reactive_python_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,8 @@ class DagNode:
errored_input_vars: Set[str] # Represents the variables that are defined in this node (either via assignment or function definition) and can be used later
errored_output_vars: Set[str] # Represents the variables that are defined in this node (either via assignment or function definition) and can be used later
stale: bool = False
depends_on_stale_nodes_not_selected: bool = False # Helper var used in a specific function to do a specific thing, it's just handy to have it here directly
needstobestale: bool = False
depends_on_stale_nodes_not_selected: bool = False # Helper var used in a specific function to do a specific thing, it's just handy to have it here directly, but ignore for now

def node_repr_hash(node):
repr = f"{node['lineno']}-{node['end_lineno']}: {node['text'].strip()}"
Expand Down Expand Up @@ -769,7 +770,7 @@ def dagnodes_to_dag(dag_nodes):
graph.add_edge(edge[0], edge[1], vars=[edge[2]])

# Add a single node "_START_" that has an edge pointing to ALL THE NODES THAT HAVE NO PARENTS
graph.add_node("_START_", stale=False, text="_START_")
graph.add_node("_START_", stale=False, needstobestale=False, text="_START_")
for i, node in enumerate(dag_nodes):
if len(list(graph.predecessors(i))) == 0:
graph.add_edge("_START_", i, vars=[])
Expand Down Expand Up @@ -797,9 +798,19 @@ def update_staleness_info_in_new_dag(old_dag, new_dag):
Start from the "_START_" node of the new dag, and for each node, check if it is in the old_dag (Compare the "ast_tree" dataclasses.field by equality).
Each node in the new_dag is FRESH (not stale) if and only if:
- all its parents in the new_dag are fresh
- it has a corresponding node in the old_dag, and that node is not stale
Each node in the new_dag NEEDSTOBESTALE if and only if:
- ANY of its parents in the new_dag NEEDSTOBESTALE, OR
- it Doesn't have a corresponding node in the old_dag (ie it's new/ has been changed).
Subsequently,
Each noed is STALE if and only if:
- it NEEDSTOBESTALE, OR
- ANY of its parents in the new_dag NEEDSTOBESTALE, OR
- it Doesn't have a corresponding node in the old_dag (ie it's new/ has been changed).
Otherwise, it should retain its state.
This double logic is to handle this case:
- If a node is STALE and one of its childer in FRESH (not stale), but None of them NEEDSTOBESTALE, the the Children SHOULD remain Fresh...
Args:
old_dag (_type_): The old dag
Expand All @@ -814,14 +825,18 @@ def update_staleness_info_in_new_dag(old_dag, new_dag):
new_node_to_old['_START_'] = '_START_'; continue
parents = list(new_dag.predecessors(new_node))

# If any of the parents is stale, it's stale:
if any(new_dag.nodes[x]['stale'] for x in parents):
new_dag.nodes[new_node]['stale'] = True; continue
# If any of the parents needstobestale, then needstobestale:
if any(new_dag.nodes[x]['needstobestale'] for x in parents):
new_dag.nodes[new_node]['needstobestale'] = True
new_dag.nodes[new_node]['stale'] = True
continue

# For all the parents, get the corresponding nodes in the old_dag: if any don't exist, it's stale:
parents_in_old_dag = [new_node_to_old.get(parent) for parent in parents]
if any(x is None for x in parents_in_old_dag):
new_dag.nodes[new_node]['stale'] = True; continue
new_dag.nodes[new_node]['needstobestale'] = True
new_dag.nodes[new_node]['stale'] = True
continue

# Among all the nodes in old_graph that are Successors of All nodes in parents_in_old_dag,
# Find the one that is the same as the new_node:
Expand All @@ -832,8 +847,13 @@ def update_staleness_info_in_new_dag(old_dag, new_dag):
break

# If there is no matching node in the old_dag, or the matching node is stale, it's stale:
if new_node_to_old.get(new_node) is None or old_dag.nodes[new_node_to_old[new_node]]['stale']:
new_dag.nodes[new_node]['stale'] = True; continue
if new_node_to_old.get(new_node) is None or old_dag.nodes[new_node_to_old[new_node]]['needstobestale']:
new_dag.nodes[new_node]['needstobestale'] = True;
new_dag.nodes[new_node]['stale'] = True;
continue
else:
# RETAIN THE PREVIOUS STATE:
new_dag.nodes[new_node]['stale'] = old_dag.nodes[new_node_to_old[new_node]]['stale']

# Additional step: For all variables that are outputted by a stale node, mark all the nodes that use that variable as stale FROM THE BEGINNING:
# If the assignements always use a new name, as it should be in a functional setting, this is not necessary.
Expand Down Expand Up @@ -2376,10 +2396,3 @@ def has_path(G, source, target):
reactive_python_dag_builder_utils__ = ReactivePythonDagBuilderUtils__()




update_staleness_info_in_new_dag = reactive_python_dag_builder_utils__.update_staleness_info_in_new_dag
get_input_variables_for = reactive_python_dag_builder_utils__.get_input_variables_for
get_output_variables_for = reactive_python_dag_builder_utils__.get_output_variables_for
annotate = reactive_python_dag_builder_utils__.annotate

49 changes: 31 additions & 18 deletions src/reactive_python_engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,8 @@ class ReactivePythonDagBuilderUtils__():
errored_input_vars: Set[str] # Represents the variables that are defined in this node (either via assignment or function definition) and can be used later
errored_output_vars: Set[str] # Represents the variables that are defined in this node (either via assignment or function definition) and can be used later
stale: bool = False
depends_on_stale_nodes_not_selected: bool = False # Helper var used in a specific function to do a specific thing, it's just handy to have it here directly
needstobestale: bool = False
depends_on_stale_nodes_not_selected: bool = False # Helper var used in a specific function to do a specific thing, it's just handy to have it here directly, but ignore for now
def node_repr_hash(node):
repr = f"{node['lineno']}-{node['end_lineno']}: {node['text'].strip()}"
Expand Down Expand Up @@ -770,7 +771,7 @@ class ReactivePythonDagBuilderUtils__():
graph.add_edge(edge[0], edge[1], vars=[edge[2]])
# Add a single node "_START_" that has an edge pointing to ALL THE NODES THAT HAVE NO PARENTS
graph.add_node("_START_", stale=False, text="_START_")
graph.add_node("_START_", stale=False, needstobestale=False, text="_START_")
for i, node in enumerate(dag_nodes):
if len(list(graph.predecessors(i))) == 0:
graph.add_edge("_START_", i, vars=[])
Expand Down Expand Up @@ -798,9 +799,19 @@ class ReactivePythonDagBuilderUtils__():
Start from the "_START_" node of the new dag, and for each node, check if it is in the old_dag (Compare the "ast_tree" dataclasses.field by equality).
Each node in the new_dag is FRESH (not stale) if and only if:
- all its parents in the new_dag are fresh
- it has a corresponding node in the old_dag, and that node is not stale
Each node in the new_dag NEEDSTOBESTALE if and only if:
- ANY of its parents in the new_dag NEEDSTOBESTALE, OR
- it Doesn't have a corresponding node in the old_dag (ie it's new/ has been changed).
Subsequently,
Each noed is STALE if and only if:
- it NEEDSTOBESTALE, OR
- ANY of its parents in the new_dag NEEDSTOBESTALE, OR
- it Doesn't have a corresponding node in the old_dag (ie it's new/ has been changed).
Otherwise, it should retain its state.
This double logic is to handle this case:
- If a node is STALE and one of its childer in FRESH (not stale), but None of them NEEDSTOBESTALE, the the Children SHOULD remain Fresh...
Args:
old_dag (_type_): The old dag
Expand All @@ -815,14 +826,18 @@ class ReactivePythonDagBuilderUtils__():
new_node_to_old['_START_'] = '_START_'; continue
parents = list(new_dag.predecessors(new_node))
# If any of the parents is stale, it's stale:
if any(new_dag.nodes[x]['stale'] for x in parents):
new_dag.nodes[new_node]['stale'] = True; continue
# If any of the parents needstobestale, then needstobestale:
if any(new_dag.nodes[x]['needstobestale'] for x in parents):
new_dag.nodes[new_node]['needstobestale'] = True
new_dag.nodes[new_node]['stale'] = True
continue
# For all the parents, get the corresponding nodes in the old_dag: if any don't exist, it's stale:
parents_in_old_dag = [new_node_to_old.get(parent) for parent in parents]
if any(x is None for x in parents_in_old_dag):
new_dag.nodes[new_node]['stale'] = True; continue
new_dag.nodes[new_node]['needstobestale'] = True
new_dag.nodes[new_node]['stale'] = True
continue
# Among all the nodes in old_graph that are Successors of All nodes in parents_in_old_dag,
# Find the one that is the same as the new_node:
Expand All @@ -833,8 +848,13 @@ class ReactivePythonDagBuilderUtils__():
break
# If there is no matching node in the old_dag, or the matching node is stale, it's stale:
if new_node_to_old.get(new_node) is None or old_dag.nodes[new_node_to_old[new_node]]['stale']:
new_dag.nodes[new_node]['stale'] = True; continue
if new_node_to_old.get(new_node) is None or old_dag.nodes[new_node_to_old[new_node]]['needstobestale']:
new_dag.nodes[new_node]['needstobestale'] = True;
new_dag.nodes[new_node]['stale'] = True;
continue
else:
# RETAIN THE PREVIOUS STATE:
new_dag.nodes[new_node]['stale'] = old_dag.nodes[new_node_to_old[new_node]]['stale']
# Additional step: For all variables that are outputted by a stale node, mark all the nodes that use that variable as stale FROM THE BEGINNING:
# If the assignements always use a new name, as it should be in a functional setting, this is not necessary.
Expand Down Expand Up @@ -2378,11 +2398,4 @@ reactive_python_dag_builder_utils__ = ReactivePythonDagBuilderUtils__()
update_staleness_info_in_new_dag = reactive_python_dag_builder_utils__.update_staleness_info_in_new_dag
get_input_variables_for = reactive_python_dag_builder_utils__.get_input_variables_for
get_output_variables_for = reactive_python_dag_builder_utils__.get_output_variables_for
annotate = reactive_python_dag_builder_utils__.annotate
`;
42 changes: 41 additions & 1 deletion src/test/reactive_python_engine_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@



update_staleness_info_in_new_dag = reactive_python_dag_builder_utils__.update_staleness_info_in_new_dag
get_input_variables_for = reactive_python_dag_builder_utils__.get_input_variables_for
get_output_variables_for = reactive_python_dag_builder_utils__.get_output_variables_for
annotate = reactive_python_dag_builder_utils__.annotate



# [[6, 6, "outdated", "", "import boto3", "6-6: import boto3"], [55, 55, "outdated", "", "import pyarrow", "55-55: import pyarrow"], [59, 70, "outdated", "", "def get_filesystem(profile_name):\n session = boto3.Session(profile_name=profile_name) # )\n credentials = session.get_credentials() # Get credentials out of session:\n # Read with credentials:\n filesystem = pyarrow.fs.S3FileSystem(\n access_key=credentials.access_key,\n secret_key=credentials.secret_key,\n …redentials:\n filesystem = pyarrow.fs.S3FileSystem(\n access_key=credentials.access_key,\n secret_key=credentials.secret_key,\n session_token=credentials.token,\n region=\'eu-west-1\',\n # role_arn=\'arn:aws:iam::939571286166:role/aws_iam_role-unicron_readonly_dev_access\'\n )\n return filesystem"], [73, 73, "outdated", "current", "filesystem_pro = get_filesystem(\"sbt-it-pro:power\")", "73-73: filesystem_pro = get_filesystem(\"sbt-it-pro:power\")"]]
Expand Down Expand Up @@ -1135,4 +1140,39 @@ def ff(y):
w['asset'] = f(x)
z = w['asset']['value']

x
x





###################################### AN EXECUTION EXAMPLE:





code = """
x = 1 # line 1
print(x)
"""

reactive_python_dag_builder_utils__ = ReactivePythonDagBuilderUtils__()

reactive_python_dag_builder_utils__.update_dag_and_get_ranges(code=code, include_code=True)

ranges = reactive_python_dag_builder_utils__.ask_for_ranges_to_compute(code, current_line = 3, get_upstream=True, get_downstream=True, stale_only=True); ranges

hashes = [r[5] for r in json.loads(ranges)] if ranges else []

reactive_python_dag_builder_utils__.set_locked_range_as_synced(hashes[0])
reactive_python_dag_builder_utils__.set_locked_range_as_synced(hashes[1])

reactive_python_dag_builder_utils__.unlock()

reactive_python_dag_builder_utils__.update_dag_and_get_ranges(code=code, current_line=3)
reactive_python_dag_builder_utils__.update_dag_and_get_ranges(code=code, current_line=None)

ranges = reactive_python_dag_builder_utils__.ask_for_ranges_to_compute(code, current_line = 3, get_upstream=True, get_downstream=False, stale_only=True); ranges

0 comments on commit 040b44b

Please sign in to comment.