Skip to content

Commit 7489c27

Browse files
refactor: Factor out exectuor.head method into cache and execute slice (#1676)
1 parent ee062bf commit 7489c27

File tree

3 files changed

+56
-58
lines changed

3 files changed

+56
-58
lines changed

bigframes/core/blocks.py

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
import bigframes.exceptions as bfe
7070
import bigframes.operations as ops
7171
import bigframes.operations.aggregations as agg_ops
72+
from bigframes.session import executor as executors
7273

7374
# Type constraint for wherever column labels are used
7475
Label = typing.Hashable
@@ -1560,12 +1561,19 @@ def retrieve_repr_request_results(
15601561
"""
15611562

15621563
# head caches full underlying expression, so row_count will be free after
1563-
head_result = self.session._executor.head(self.expr, max_results)
1564+
executor = self.session._executor
1565+
executor.cached(
1566+
array_value=self.expr,
1567+
config=executors.CacheConfig(optimize_for="head", if_cached="reuse-strict"),
1568+
)
1569+
head_result = self.session._executor.execute(
1570+
self.expr.slice(start=None, stop=max_results, step=None)
1571+
)
15641572
row_count = self.session._executor.execute(self.expr.row_count()).to_py_scalar()
15651573

1566-
df = head_result.to_pandas()
1567-
self._copy_index_to_pandas(df)
1568-
return df, row_count, head_result.query_job
1574+
head_df = head_result.to_pandas()
1575+
self._copy_index_to_pandas(head_df)
1576+
return head_df, row_count, head_result.query_job
15691577

15701578
def promote_offsets(self, label: Label = None) -> typing.Tuple[Block, str]:
15711579
expr, result_id = self._expr.promote_offsets()
@@ -2535,9 +2543,12 @@ def cached(self, *, force: bool = False, session_aware: bool = False) -> None:
25352543
# use a heuristic for whether something needs to be cached
25362544
self.session._executor.cached(
25372545
self.expr,
2538-
force=force,
2539-
use_session=session_aware,
2540-
cluster_cols=self.index_columns,
2546+
config=executors.CacheConfig(
2547+
optimize_for="auto"
2548+
if session_aware
2549+
else executors.HierarchicalKey(tuple(self.index_columns)),
2550+
if_cached="replace" if force else "reuse-any",
2551+
),
25412552
)
25422553

25432554
def _is_monotonic(

bigframes/session/bq_caching_executor.py

Lines changed: 26 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -243,46 +243,37 @@ def peek(
243243
plan, ordered=False, destination=destination_table, peek=n_rows
244244
)
245245

246-
def head(
247-
self, array_value: bigframes.core.ArrayValue, n_rows: int
248-
) -> executor.ExecuteResult:
249-
plan = self.logical_plan(array_value.node)
250-
if (plan.row_count is not None) and (plan.row_count <= n_rows):
251-
return self._execute_plan(plan, ordered=True)
252-
253-
if not self.strictly_ordered and not array_value.node.explicitly_ordered:
254-
# No user-provided ordering, so just get any N rows, its faster!
255-
return self.peek(array_value, n_rows)
256-
257-
if not tree_properties.can_fast_head(plan):
258-
# If can't get head fast, we are going to need to execute the whole query
259-
# Will want to do this in a way such that the result is reusable, but the first
260-
# N values can be easily extracted.
261-
# This currently requires clustering on offsets.
262-
self._cache_with_offsets(array_value)
263-
# Get a new optimized plan after caching
264-
plan = self.logical_plan(array_value.node)
265-
assert tree_properties.can_fast_head(plan)
266-
267-
head_plan = generate_head_plan(plan, n_rows)
268-
return self._execute_plan(head_plan, ordered=True)
269-
270246
def cached(
271-
self,
272-
array_value: bigframes.core.ArrayValue,
273-
*,
274-
force: bool = False,
275-
use_session: bool = False,
276-
cluster_cols: Sequence[str] = (),
247+
self, array_value: bigframes.core.ArrayValue, *, config: executor.CacheConfig
277248
) -> None:
278249
"""Write the block to a session table."""
279-
# use a heuristic for whether something needs to be cached
280-
if (not force) and self._is_trivially_executable(array_value):
281-
return
282-
if use_session:
250+
# First, see if we can reuse the existing cache
251+
# TODO(b/415105423): Provide feedback to user on whether new caching action was deemed necessary
252+
# TODO(b/415105218): Make cached a deferred action
253+
if config.if_cached == "reuse-any":
254+
if self._is_trivially_executable(array_value):
255+
return
256+
elif config.if_cached == "reuse-strict":
257+
# This path basically exists to make sure that repr in head mode is optimized for subsequent repr operations.
258+
if config.optimize_for == "head":
259+
if tree_properties.can_fast_head(array_value.node):
260+
return
261+
else:
262+
raise NotImplementedError(
263+
"if_cached='reuse-strict' currently only supported with optimize_for='head'"
264+
)
265+
elif config.if_cached != "replace":
266+
raise ValueError(f"Unexpected 'if_cached' arg: {config.if_cached}")
267+
268+
if config.optimize_for == "auto":
283269
self._cache_with_session_awareness(array_value)
270+
elif config.optimize_for == "head":
271+
self._cache_with_offsets(array_value)
284272
else:
285-
self._cache_with_cluster_cols(array_value, cluster_cols=cluster_cols)
273+
assert isinstance(config.optimize_for, executor.HierarchicalKey)
274+
self._cache_with_cluster_cols(
275+
array_value, cluster_cols=config.optimize_for.columns
276+
)
286277

287278
# Helpers
288279
def _run_execute_query(
@@ -571,7 +562,3 @@ def _sanitize(
571562
)
572563
for f in schema
573564
)
574-
575-
576-
def generate_head_plan(node: nodes.BigFrameNode, n: int):
577-
return nodes.SliceNode(node, start=None, stop=n)

bigframes/session/executor.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,17 @@ def to_py_scalar(self):
7373
return column[0]
7474

7575

76+
@dataclasses.dataclass(frozen=True)
77+
class HierarchicalKey:
78+
columns: tuple[str, ...]
79+
80+
81+
@dataclasses.dataclass(frozen=True)
82+
class CacheConfig(abc.ABC):
83+
optimize_for: Union[Literal["auto", "head"], HierarchicalKey] = "auto"
84+
if_cached: Literal["reuse-strict", "reuse-any", "replace"] = "reuse-any"
85+
86+
7687
class Executor(abc.ABC):
7788
"""
7889
Interface for an executor, which compiles and executes ArrayValue objects.
@@ -149,21 +160,10 @@ def peek(
149160
"""
150161
raise NotImplementedError("peek not implemented for this executor")
151162

152-
# TODO: Remove this and replace with efficient slice operator that can use execute()
153-
def head(
154-
self, array_value: bigframes.core.ArrayValue, n_rows: int
155-
) -> ExecuteResult:
156-
"""
157-
Preview the first n rows of the dataframe. This is less efficient than the unordered peek preview op.
158-
"""
159-
raise NotImplementedError("head not implemented for this executor")
160-
161163
def cached(
162164
self,
163165
array_value: bigframes.core.ArrayValue,
164166
*,
165-
force: bool = False,
166-
use_session: bool = False,
167-
cluster_cols: Sequence[str] = (),
167+
config: CacheConfig,
168168
) -> None:
169169
raise NotImplementedError("cached not implemented for this executor")

0 commit comments

Comments
 (0)