Skip to content

Commit

Permalink
warn about missing pw.run() (#7231)
Browse files Browse the repository at this point in the history
GitOrigin-RevId: f947ae61dbb299a54b9b6ca0ed6417cb6026887a
  • Loading branch information
KamilPiechowiak authored and Manul from Pathway committed Sep 9, 2024
1 parent bf4de1d commit 5fdcaf0
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 0 deletions.
25 changes: 25 additions & 0 deletions integration_tests/common/test_common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import os
import subprocess as sp

import pytest


@pytest.mark.parametrize("filename", ["unused_operators.py", "unused_operators_2.py"])
def test_some_operators_unused(filename: str):
test_dir = os.path.dirname(os.path.abspath(__file__))
p = sp.run(["python3", f"{test_dir}/{filename}"], capture_output=True, check=True)
assert (
"There are operators in the computation graph that haven't been used."
+ " Use pathway.run() (or similar) to run the computation involving these nodes."
) in p.stderr.decode()


def test_all_operators_used():
test_dir = os.path.dirname(os.path.abspath(__file__))
p = sp.run(
["python3", f"{test_dir}/used_operators.py"], capture_output=True, check=True
)
assert (
"There are operators in the computation graph that haven't been used."
+ " Use pathway.run() (or similar) to run the computation involving these nodes."
) not in p.stderr.decode()
8 changes: 8 additions & 0 deletions integration_tests/common/unused_operators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import pathway as pw

pw.debug.table_from_markdown(
"""
a | b
1 | 2
"""
)
10 changes: 10 additions & 0 deletions integration_tests/common/unused_operators_2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import pathway as pw

t = pw.debug.table_from_markdown(
"""
a | b
1 | 2
"""
)
pw.debug.compute_and_print(t)
t.with_columns(c=10)
9 changes: 9 additions & 0 deletions integration_tests/common/used_operators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import pathway as pw

t = pw.debug.table_from_markdown(
"""
a | b
1 | 2
"""
)
pw.debug.compute_and_print(t)
1 change: 1 addition & 0 deletions python/pathway/internals/graph_runner/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ def _run(
after_build: Callable[[ScopeState, OperatorStorageGraph], None] | None = None,
run_all: bool = False,
) -> list[api.CapturedStream]:
self._graph.mark_all_operators_as_used()
run_id = self._get_run_id()
pathway_config = get_pathway_config()
otel = telemetry.Telemetry.create(
Expand Down
19 changes: 19 additions & 0 deletions python/pathway/internals/parse_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@

from __future__ import annotations

import atexit
import hashlib
import itertools
import warnings
from collections import Counter
from collections.abc import Callable, Iterable, Iterator
from types import TracebackType
Expand Down Expand Up @@ -110,6 +112,7 @@ class ParseGraph:
static_tables_cache: dict[int, Table[Any]]
interactive_mode_controller: interactive.InteractiveModeController | None = None
error_log_stack: list[Table[ErrorLogSchema]]
unused_operators: bool

def __init__(self) -> None:
self.clear()
Expand Down Expand Up @@ -148,6 +151,7 @@ def add_operator(
node.set_error_log(self.error_log_stack[-1] if require_error_log else None)
result = call_operator(node)
self._current_scope.add_node(node, special=special)
self.unused_operators = True
return result

def add_iterate(
Expand Down Expand Up @@ -206,6 +210,10 @@ def clear(self) -> None:
self.cache = {}
self.static_tables_cache = {}
self.error_log_stack = []
self.mark_all_operators_as_used()

def mark_all_operators_as_used(self) -> None:
self.unused_operators = False

def sig(self) -> str:
return hashlib.sha256(repr(self).encode()).hexdigest()
Expand Down Expand Up @@ -234,3 +242,14 @@ class ErrorLogSchema(Schema):


G = ParseGraph()


def warn_if_some_operators_unused() -> None:
if G.unused_operators:
warnings.warn(
"There are operators in the computation graph that haven't been used."
+ " Use pathway.run() (or similar) to run the computation involving these nodes."
)


atexit.register(warn_if_some_operators_unused)
32 changes: 32 additions & 0 deletions python/pathway/tests/test_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import pathway.internals.shadows.operator as operator
from pathway.debug import table_from_pandas, table_to_pandas
from pathway.internals import dtype as dt
from pathway.internals.parse_graph import warn_if_some_operators_unused
from pathway.internals.table_io import empty_from_schema
from pathway.tests.utils import (
T,
Expand Down Expand Up @@ -6494,3 +6495,34 @@ def test_dtype_pandas():
)["a"].dtype
== pd.Int64Dtype()
)


def test_warns_if_unused_operators():
T(
"""
a | b
1 | 2
"""
)
with pytest.warns(
UserWarning,
match=re.escape(
"There are operators in the computation graph that haven't been used."
+ " Use pathway.run() (or similar) to run the computation involving these nodes."
),
):
warn_if_some_operators_unused()


def test_doesnt_warn_if_all_operators_used():
t = T(
"""
a | b
1 | 2
"""
)
pw.debug.compute_and_print(t)

with warnings.catch_warnings():
warnings.simplefilter("error")
warn_if_some_operators_unused()

0 comments on commit 5fdcaf0

Please sign in to comment.