Skip to content

feat: Support dry_run in to_pandas() #1436

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 32 commits into from
Mar 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
dac34d7
feat: Support dry_run in
sycai Feb 27, 2025
b88ba73
Merge branch 'main' into sycai_to_pandas_dry_run
sycai Feb 27, 2025
330a647
Merge branch 'main' into sycai_to_pandas_dry_run
sycai Feb 28, 2025
5f8a76a
centralize dry_run logics at block level
sycai Feb 28, 2025
75f4ce1
fix lint errors
sycai Feb 28, 2025
0b4c48c
Merge branch 'main' into sycai_to_pandas_dry_run
sycai Feb 28, 2025
40c557b
Merge branch 'main' into sycai_to_pandas_dry_run
sycai Feb 28, 2025
fe82c6d
remove unnecessary code
sycai Feb 28, 2025
1adc96a
Merge branch 'main' into sycai_to_pandas_dry_run
sycai Feb 28, 2025
3c0efc2
Merge branch 'main' into sycai_to_pandas_dry_run
sycai Mar 3, 2025
9c3d849
Merge branch 'main' into sycai_to_pandas_dry_run
sycai Mar 3, 2025
725050b
Merge branch 'main' into sycai_to_pandas_dry_run
sycai Mar 3, 2025
7550f6a
Merge branch 'main' into sycai_to_pandas_dry_run
sycai Mar 4, 2025
3b9ea0e
Merge branch 'main' into sycai_to_pandas_dry_run
sycai Mar 5, 2025
70e1986
Merge branch 'main' into sycai_to_pandas_dry_run
sycai Mar 5, 2025
c2c3fca
Merge branch 'main' into sycai_to_pandas_dry_run
sycai Mar 6, 2025
cde29a0
use dataframe for dry_run stats
sycai Mar 6, 2025
e291c70
Merge branch 'main' into sycai_to_pandas_dry_run
sycai Mar 6, 2025
86bf46b
flatten the job stats to a series
sycai Mar 6, 2025
09fb874
Merge branch 'main' into sycai_to_pandas_dry_run
sycai Mar 6, 2025
4af0ac4
fix lint
sycai Mar 6, 2025
416ad49
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Mar 6, 2025
301e993
fix query job issue
sycai Mar 7, 2025
67e40e9
Merge branch 'main' into sycai_to_pandas_dry_run
sycai Mar 10, 2025
6eeb69e
Merge branch 'main' into sycai_to_pandas_dry_run
sycai Mar 10, 2025
5a85ad5
Merge branch 'main' into sycai_to_pandas_dry_run
sycai Mar 11, 2025
e11ccdb
Merge branch 'main' into sycai_to_pandas_dry_run
sycai Mar 12, 2025
c610e57
Merge branch 'main' into sycai_to_pandas_dry_run
sycai Mar 13, 2025
b4db897
Make pandas surface directly call block._compute_dry_run
sycai Mar 13, 2025
1401076
type hint update
sycai Mar 13, 2025
fb9f8bf
Merge branch 'main' into sycai_to_pandas_dry_run
sycai Mar 18, 2025
30b9f3d
Merge branch 'main' into sycai_to_pandas_dry_run
sycai Mar 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 101 additions & 19 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from __future__ import annotations

import ast
import copy
import dataclasses
import datetime
import functools
Expand All @@ -30,6 +31,7 @@
import textwrap
import typing
from typing import (
Any,
Iterable,
List,
Literal,
Expand All @@ -49,7 +51,7 @@
import pyarrow as pa

from bigframes import session
import bigframes._config.sampling_options as sampling_options
from bigframes._config import sampling_options
import bigframes.constants
import bigframes.core as core
import bigframes.core.compile.googlesql as googlesql
Expand Down Expand Up @@ -535,19 +537,9 @@ def to_pandas(
Returns:
pandas.DataFrame, QueryJob
"""
if (sampling_method is not None) and (sampling_method not in _SAMPLING_METHODS):
raise NotImplementedError(
f"The downsampling method {sampling_method} is not implemented, "
f"please choose from {','.join(_SAMPLING_METHODS)}."
)

sampling = bigframes.options.sampling.with_max_download_size(max_download_size)
if sampling_method is not None:
sampling = sampling.with_method(sampling_method).with_random_state( # type: ignore
random_state
)
else:
sampling = sampling.with_disabled()
sampling = self._get_sampling_option(
max_download_size, sampling_method, random_state
)

df, query_job = self._materialize_local(
materialize_options=MaterializationOptions(
Expand All @@ -559,6 +551,27 @@ def to_pandas(
df.set_axis(self.column_labels, axis=1, copy=False)
return df, query_job

def _get_sampling_option(
self,
max_download_size: Optional[int] = None,
sampling_method: Optional[str] = None,
random_state: Optional[int] = None,
) -> sampling_options.SamplingOptions:

if (sampling_method is not None) and (sampling_method not in _SAMPLING_METHODS):
raise NotImplementedError(
f"The downsampling method {sampling_method} is not implemented, "
f"please choose from {','.join(_SAMPLING_METHODS)}."
)

sampling = bigframes.options.sampling.with_max_download_size(max_download_size)
if sampling_method is None:
return sampling.with_disabled()

return sampling.with_method(sampling_method).with_random_state( # type: ignore
random_state
)

def try_peek(
self, n: int = 20, force: bool = False, allow_large_results=None
) -> typing.Optional[pd.DataFrame]:
Expand Down Expand Up @@ -798,11 +811,73 @@ def split(
return [sliced_block.drop_columns(drop_cols) for sliced_block in sliced_blocks]

def _compute_dry_run(
self, value_keys: Optional[Iterable[str]] = None
) -> bigquery.QueryJob:
self,
value_keys: Optional[Iterable[str]] = None,
*,
ordered: bool = True,
max_download_size: Optional[int] = None,
sampling_method: Optional[str] = None,
random_state: Optional[int] = None,
) -> typing.Tuple[pd.Series, bigquery.QueryJob]:
sampling = self._get_sampling_option(
max_download_size, sampling_method, random_state
)
if sampling.enable_downsampling:
raise NotImplementedError("Dry run with sampling is not supported")

index: List[Any] = []
values: List[Any] = []

index.append("columnCount")
values.append(len(self.value_columns))
index.append("columnDtypes")
values.append(
{
col: self.expr.get_column_type(self.resolve_label_exact_or_error(col))
for col in self.column_labels
}
)

index.append("indexLevel")
values.append(self.index.nlevels)
index.append("indexDtypes")
values.append(self.index.dtypes)

expr = self._apply_value_keys_to_expr(value_keys=value_keys)
query_job = self.session._executor.dry_run(expr)
return query_job
query_job = self.session._executor.dry_run(expr, ordered)
job_api_repr = copy.deepcopy(query_job._properties)

job_ref = job_api_repr["jobReference"]
for key, val in job_ref.items():
index.append(key)
values.append(val)

index.append("jobType")
values.append(job_api_repr["configuration"]["jobType"])

query_config = job_api_repr["configuration"]["query"]
for key in ("destinationTable", "useLegacySql"):
index.append(key)
values.append(query_config.get(key))

query_stats = job_api_repr["statistics"]["query"]
for key in (
"referencedTables",
"totalBytesProcessed",
"cacheHit",
"statementType",
):
index.append(key)
values.append(query_stats.get(key))

index.append("creationTime")
values.append(
pd.Timestamp(
job_api_repr["statistics"]["creationTime"], unit="ms", tz="UTC"
)
)

return pd.Series(values, index=index), query_job

def _apply_value_keys_to_expr(self, value_keys: Optional[Iterable[str]] = None):
expr = self._expr
Expand Down Expand Up @@ -2703,11 +2778,18 @@ def to_pandas(
"Cannot materialize index, as this object does not have an index. Set index column(s) using set_index."
)
ordered = ordered if ordered is not None else True

df, query_job = self._block.select_columns([]).to_pandas(
ordered=ordered, allow_large_results=allow_large_results
ordered=ordered,
allow_large_results=allow_large_results,
)
return df.index, query_job

def _compute_dry_run(
self, *, ordered: bool = True
) -> Tuple[pd.Series, bigquery.QueryJob]:
return self._block.select_columns([])._compute_dry_run(ordered=ordered)

def resolve_level(self, level: LevelsType) -> typing.Sequence[str]:
if utils.is_list_like(level):
levels = list(level)
Expand Down
44 changes: 37 additions & 7 deletions bigframes/core/indexes/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from __future__ import annotations

import typing
from typing import Hashable, Literal, Optional, Sequence, Union
from typing import Hashable, Literal, Optional, overload, Sequence, Union

import bigframes_vendored.constants as constants
import bigframes_vendored.pandas.core.indexes.base as vendored_pandas_index
Expand Down Expand Up @@ -228,15 +228,16 @@ def T(self) -> Index:
return self.transpose()

@property
def query_job(self) -> Optional[bigquery.QueryJob]:
def query_job(self) -> bigquery.QueryJob:
"""BigQuery job metadata for the most recent query.

Returns:
The most recent `QueryJob
<https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.QueryJob>`_.
"""
if self._query_job is None:
self._query_job = self._block._compute_dry_run()
_, query_job = self._block._compute_dry_run()
self._query_job = query_job
return self._query_job

def __repr__(self) -> str:
Expand All @@ -252,7 +253,8 @@ def __repr__(self) -> str:
opts = bigframes.options.display
max_results = opts.max_rows
if opts.repr_mode == "deferred":
return formatter.repr_query_job(self._block._compute_dry_run())
_, dry_run_query_job = self._block._compute_dry_run()
return formatter.repr_query_job(dry_run_query_job)

pandas_df, _, query_job = self._block.retrieve_repr_request_results(max_results)
self._query_job = query_job
Expand Down Expand Up @@ -490,18 +492,46 @@ def __getitem__(self, key: int) -> typing.Any:
else:
raise NotImplementedError(f"Index key not supported {key}")

def to_pandas(self, *, allow_large_results: Optional[bool] = None) -> pandas.Index:
@overload
def to_pandas(
self,
*,
allow_large_results: Optional[bool] = ...,
dry_run: Literal[False] = ...,
) -> pandas.Index:
...

@overload
def to_pandas(
self, *, allow_large_results: Optional[bool] = ..., dry_run: Literal[True] = ...
) -> pandas.Series:
...

def to_pandas(
self, *, allow_large_results: Optional[bool] = None, dry_run: bool = False
) -> pandas.Index | pandas.Series:
"""Gets the Index as a pandas Index.

Args:
allow_large_results (bool, default None):
If not None, overrides the global setting to allow or disallow large query results
over the default size limit of 10 GB.
dry_run (bool, default False):
If this argument is true, this method will not process the data. Instead, it returns
a Pandas series containing dtype and the amount of bytes to be processed.

Returns:
pandas.Index:
A pandas Index with all of the labels from this Index.
pandas.Index | pandas.Series:
A pandas Index with all of the labels from this Index. If dry run is set to True,
returns a Series containing dry run statistics.
"""
if dry_run:
dry_run_stats, dry_run_job = self._block.index._compute_dry_run(
ordered=True
)
self._query_job = dry_run_job
return dry_run_stats

df, query_job = self._block.index.to_pandas(
ordered=True, allow_large_results=allow_large_results
)
Expand Down
51 changes: 48 additions & 3 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
Literal,
Mapping,
Optional,
overload,
Sequence,
Tuple,
Union,
Expand Down Expand Up @@ -1594,15 +1595,42 @@ def to_arrow(
self._set_internal_query_job(query_job)
return pa_table

@overload
def to_pandas(
self,
max_download_size: Optional[int] = ...,
sampling_method: Optional[str] = ...,
random_state: Optional[int] = ...,
*,
ordered: bool = ...,
dry_run: Literal[False] = ...,
allow_large_results: Optional[bool] = ...,
) -> pandas.DataFrame:
...

@overload
def to_pandas(
self,
max_download_size: Optional[int] = ...,
sampling_method: Optional[str] = ...,
random_state: Optional[int] = ...,
*,
ordered: bool = ...,
dry_run: Literal[True] = ...,
allow_large_results: Optional[bool] = ...,
) -> pandas.Series:
...

def to_pandas(
self,
max_download_size: Optional[int] = None,
sampling_method: Optional[str] = None,
random_state: Optional[int] = None,
*,
ordered: bool = True,
dry_run: bool = False,
allow_large_results: Optional[bool] = None,
) -> pandas.DataFrame:
) -> pandas.DataFrame | pandas.Series:
"""Write DataFrame to pandas DataFrame.

Args:
Expand All @@ -1624,16 +1652,32 @@ def to_pandas(
ordered (bool, default True):
Determines whether the resulting pandas dataframe will be ordered.
In some cases, unordered may result in a faster-executing query.
dry_run (bool, default False):
If this argument is true, this method will not process the data. Instead, it returns
a Pandas Series containing dry run statistics
allow_large_results (bool, default None):
If not None, overrides the global setting to allow or disallow large query results
over the default size limit of 10 GB.

Returns:
pandas.DataFrame: A pandas DataFrame with all rows and columns of this DataFrame if the
data_sampling_threshold_mb is not exceeded; otherwise, a pandas DataFrame with
downsampled rows and all columns of this DataFrame.
downsampled rows and all columns of this DataFrame. If dry_run is set, a pandas
Series containing dry run statistics will be returned.
"""

# TODO(orrbradford): Optimize this in future. Potentially some cases where we can return the stored query job

if dry_run:
dry_run_stats, dry_run_job = self._block._compute_dry_run(
max_download_size=max_download_size,
sampling_method=sampling_method,
random_state=random_state,
ordered=ordered,
)
self._set_internal_query_job(dry_run_job)
return dry_run_stats

df, query_job = self._block.to_pandas(
max_download_size=max_download_size,
sampling_method=sampling_method,
Expand Down Expand Up @@ -1679,7 +1723,8 @@ def to_pandas_batches(
)

def _compute_dry_run(self) -> bigquery.QueryJob:
return self._block._compute_dry_run()
_, query_job = self._block._compute_dry_run()
return query_job

def copy(self) -> DataFrame:
return DataFrame(self._block)
Expand Down
Loading