Skip to content

feat: add allow_large_results to peek #1448

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Mar 5, 2025
34 changes: 23 additions & 11 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -559,10 +559,12 @@ def to_pandas(
return df, query_job

def try_peek(
self, n: int = 20, force: bool = False
self, n: int = 20, force: bool = False, allow_large_results=None
) -> typing.Optional[pd.DataFrame]:
if force or self.expr.supports_fast_peek:
result = self.session._executor.peek(self.expr, n)
result = self.session._executor.peek(
self.expr, n, use_explicit_destination=allow_large_results
)
df = io_pandas.arrow_to_pandas(result.to_arrow_table(), self.expr.schema)
self._copy_index_to_pandas(df)
return df
Expand Down Expand Up @@ -614,17 +616,27 @@ def _materialize_local(
self.expr,
ordered=materialize_options.ordered,
use_explicit_destination=materialize_options.allow_large_results,
get_size_bytes=True,
)
assert execute_result.total_bytes is not None
table_mb = execute_result.total_bytes / _BYTES_TO_MEGABYTES
sample_config = materialize_options.downsampling
max_download_size = sample_config.max_download_size
fraction = (
max_download_size / table_mb
if (max_download_size is not None) and (table_mb != 0)
else 2
)
if execute_result.total_bytes is not None:
table_mb = execute_result.total_bytes / _BYTES_TO_MEGABYTES
max_download_size = sample_config.max_download_size
fraction = (
max_download_size / table_mb
if (max_download_size is not None) and (table_mb != 0)
else 2
)
else:
# Since we cannot acquire the table size without a query_job,
# we skip the sampling.
if sample_config.enable_downsampling:
warnings.warn(
"Sampling is disabled and there is no download size limit when 'allow_large_results' is set to "
"False. To prevent downloading excessive data, it is recommended to use the peek() method, or "
"limit the data with methods like .head() or .sample() before proceeding with downloads.",
UserWarning,
)
fraction = 2

# TODO: Maybe materialize before downsampling
# Some downsampling methods
Expand Down
3 changes: 2 additions & 1 deletion bigframes/core/indexes/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,8 @@ def to_pandas(self, *, allow_large_results: Optional[bool] = None) -> pandas.Ind
df, query_job = self._block.index.to_pandas(
ordered=True, allow_large_results=allow_large_results
)
self._query_job = query_job
if query_job:
self._query_job = query_job
return df

def to_numpy(self, dtype=None, *, allow_large_results=None, **kwargs) -> np.ndarray:
Expand Down
19 changes: 14 additions & 5 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1587,7 +1587,8 @@ def to_arrow(
pa_table, query_job = self._block.to_arrow(
ordered=ordered, allow_large_results=allow_large_results
)
self._set_internal_query_job(query_job)
if query_job:
self._set_internal_query_job(query_job)
return pa_table

def to_pandas(
Expand Down Expand Up @@ -1637,7 +1638,8 @@ def to_pandas(
ordered=ordered,
allow_large_results=allow_large_results,
)
self._set_internal_query_job(query_job)
if query_job:
self._set_internal_query_job(query_job)
return df.set_axis(self._block.column_labels, axis=1, copy=False)

def to_pandas_batches(
Expand Down Expand Up @@ -1687,7 +1689,9 @@ def head(self, n: int = 5) -> DataFrame:
def tail(self, n: int = 5) -> DataFrame:
return typing.cast(DataFrame, self.iloc[-n:])

def peek(self, n: int = 5, *, force: bool = True) -> pandas.DataFrame:
def peek(
self, n: int = 5, *, force: bool = True, allow_large_results=None
) -> pandas.DataFrame:
"""
Preview n arbitrary rows from the dataframe. No guarantees about row selection or ordering.
``DataFrame.peek(force=False)`` will always be very fast, but will not succeed if data requires
Expand All @@ -1700,17 +1704,22 @@ def peek(self, n: int = 5, *, force: bool = True) -> pandas.DataFrame:
force (bool, default True):
If the data cannot be peeked efficiently, the dataframe will instead be fully materialized as part
of the operation if ``force=True``. If ``force=False``, the operation will throw a ValueError.
allow_large_results (bool, default None):
If not None, overrides the global setting to allow or disallow large query results
over the default size limit of 10 GB.
Returns:
pandas.DataFrame: A pandas DataFrame with n rows.

Raises:
ValueError: If force=False and data cannot be efficiently peeked.
"""
maybe_result = self._block.try_peek(n)
maybe_result = self._block.try_peek(n, allow_large_results=allow_large_results)
if maybe_result is None:
if force:
self._cached()
maybe_result = self._block.try_peek(n, force=True)
maybe_result = self._block.try_peek(
n, force=True, allow_large_results=allow_large_results
)
assert maybe_result is not None
else:
raise ValueError(
Expand Down
1 change: 1 addition & 0 deletions bigframes/functions/_function_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ def _create_bq_function(self, create_function_ddl: str) -> None:
create_function_ddl,
job_config=bigquery.QueryJobConfig(),
)
assert query_job is not None
logger.info(f"Created bigframes function {query_job.ddl_target_routine}")

def _format_function_options(self, function_options: dict) -> str:
Expand Down
16 changes: 12 additions & 4 deletions bigframes/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,8 @@ def to_pandas(
ordered=ordered,
allow_large_results=allow_large_results,
)
self._set_internal_query_job(query_job)
if query_job:
self._set_internal_query_job(query_job)
series = df.squeeze(axis=1)
series.name = self._name
return series
Expand Down Expand Up @@ -690,7 +691,9 @@ def head(self, n: int = 5) -> Series:
def tail(self, n: int = 5) -> Series:
return typing.cast(Series, self.iloc[-n:])

def peek(self, n: int = 5, *, force: bool = True) -> pandas.Series:
def peek(
self, n: int = 5, *, force: bool = True, allow_large_results=None
) -> pandas.Series:
"""
Preview n arbitrary elements from the series without guarantees about row selection or ordering.

Expand All @@ -704,17 +707,22 @@ def peek(self, n: int = 5, *, force: bool = True) -> pandas.Series:
force (bool, default True):
If the data cannot be peeked efficiently, the series will instead be fully materialized as part
of the operation if ``force=True``. If ``force=False``, the operation will throw a ValueError.
allow_large_results (bool, default None):
If not None, overrides the global setting to allow or disallow large query results
over the default size limit of 10 GB.
Returns:
pandas.Series: A pandas Series with n rows.

Raises:
ValueError: If force=False and data cannot be efficiently peeked.
"""
maybe_result = self._block.try_peek(n)
maybe_result = self._block.try_peek(n, allow_large_results=allow_large_results)
if maybe_result is None:
if force:
self._cached()
maybe_result = self._block.try_peek(n, force=True)
maybe_result = self._block.try_peek(
n, force=True, allow_large_results=allow_large_results
)
assert maybe_result is not None
else:
raise ValueError(
Expand Down
20 changes: 18 additions & 2 deletions bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,11 +344,25 @@ def _project(self):
@property
def bytes_processed_sum(self):
"""The sum of all bytes processed by bigquery jobs using this session."""
warnings.warn(
"Queries executed with `allow_large_results=False` within the session will not "
"have their bytes processed counted in this sum. If you need precise "
"bytes processed information, query the `INFORMATION_SCHEMA` tables "
"to get relevant metrics.",
UserWarning,
)
return self._metrics.bytes_processed

@property
def slot_millis_sum(self):
"""The sum of all slot time used by bigquery jobs in this session."""
warnings.warn(
"Queries executed with `allow_large_results=False` within the session will not "
"have their slot milliseconds counted in this sum. If you need precise slot "
"milliseconds information, query the `INFORMATION_SCHEMA` tables "
"to get relevant metrics.",
UserWarning,
)
return self._metrics.slot_millis

@property
Expand Down Expand Up @@ -1675,11 +1689,13 @@ def _start_query_ml_ddl(
# so we must reset any encryption set in the job config
# https://cloud.google.com/bigquery/docs/customer-managed-encryption#encrypt-model
job_config.destination_encryption_configuration = None

return bf_io_bigquery.start_query_with_client(
iterator, query_job = bf_io_bigquery.start_query_with_client(
self.bqclient, sql, job_config=job_config, metrics=self._metrics
)

assert query_job is not None
return iterator, query_job

def _create_object_table(self, path: str, connection: str) -> str:
"""Create a random id Object Table from the input path and connection."""
table = str(self._loader._storage_manager._random_table())
Expand Down
17 changes: 16 additions & 1 deletion bigframes/session/_io/bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,14 +228,28 @@ def start_query_with_client(
timeout: Optional[float] = None,
api_name: Optional[str] = None,
metrics: Optional[bigframes.session.metrics.ExecutionMetrics] = None,
) -> Tuple[bigquery.table.RowIterator, bigquery.QueryJob]:
*,
query_with_job: bool = True,
) -> Tuple[bigquery.table.RowIterator, Optional[bigquery.QueryJob]]:
"""
Starts query job and waits for results.
"""
try:
# Note: Ensure no additional labels are added to job_config after this point,
# as `add_and_trim_labels` ensures the label count does not exceed 64.
add_and_trim_labels(job_config, api_name=api_name)
if not query_with_job:
results_iterator = bq_client.query_and_wait(
sql,
job_config=job_config,
location=location,
project=project,
api_timeout=timeout,
)
if metrics is not None:
metrics.count_job_stats()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query response includes totalBytesProcessed. Let's create a follow-up issue to include those metrics, too.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed internal issue 400961399

return results_iterator, None

query_job = bq_client.query(
sql,
job_config=job_config,
Expand Down Expand Up @@ -338,6 +352,7 @@ def create_bq_dataset_reference(
# to the dataset, no BigQuery Session required. Note: there is a
# different anonymous dataset per location. See:
# https://cloud.google.com/bigquery/docs/cached-results#how_cached_results_are_stored
assert query_job is not None
query_destination = query_job.destination
return bigquery.DatasetReference(
query_destination.project,
Expand Down
35 changes: 25 additions & 10 deletions bigframes/session/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ def execute(
*,
ordered: bool = True,
use_explicit_destination: Optional[bool] = False,
get_size_bytes: bool = False,
page_size: Optional[int] = None,
max_results: Optional[int] = None,
):
Expand Down Expand Up @@ -152,6 +151,7 @@ def peek(
self,
array_value: bigframes.core.ArrayValue,
n_rows: int,
use_explicit_destination: Optional[bool] = False,
) -> ExecuteResult:
"""
A 'peek' efficiently accesses a small number of rows in the dataframe.
Expand Down Expand Up @@ -233,8 +233,7 @@ def execute(
array_value: bigframes.core.ArrayValue,
*,
ordered: bool = True,
use_explicit_destination: Optional[bool] = False,
get_size_bytes: bool = False,
use_explicit_destination: Optional[bool] = None,
page_size: Optional[int] = None,
max_results: Optional[int] = None,
):
Expand All @@ -259,13 +258,14 @@ def execute(
job_config=job_config,
page_size=page_size,
max_results=max_results,
query_with_job=use_explicit_destination,
)

# Though we provide the read client, iterator may or may not use it based on what is efficient for the result
def iterator_supplier():
return iterator.to_arrow_iterable(bqstorage_client=self.bqstoragereadclient)

if get_size_bytes is True or use_explicit_destination:
if query_job:
size_bytes = self.bqclient.get_table(query_job.destination).num_bytes
else:
size_bytes = None
Expand Down Expand Up @@ -329,8 +329,7 @@ def export_gbq(
if if_exists != "append" and has_timedelta_col:
# Only update schema if this is not modifying an existing table, and the
# new table contains timedelta columns.
assert query_job.destination is not None
table = self.bqclient.get_table(query_job.destination)
table = self.bqclient.get_table(destination)
table.schema = array_value.schema.to_bigquery()
self.bqclient.update_table(table, ["schema"])

Expand Down Expand Up @@ -377,6 +376,7 @@ def peek(
self,
array_value: bigframes.core.ArrayValue,
n_rows: int,
use_explicit_destination: Optional[bool] = None,
) -> ExecuteResult:
"""
A 'peek' efficiently accesses a small number of rows in the dataframe.
Expand All @@ -385,12 +385,24 @@ def peek(
if not tree_properties.can_fast_peek(plan):
msg = "Peeking this value cannot be done efficiently."
warnings.warn(msg)
if use_explicit_destination is None:
use_explicit_destination = bigframes.options.bigquery.allow_large_results

job_config = bigquery.QueryJobConfig()
# Use explicit destination to avoid 10GB limit of temporary table
if use_explicit_destination:
destination_table = self.storage_manager.create_temp_table(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it needs to be a full table, right? We should be able to avoid the tables.create call. _random_table seems more appropriate (

def _random_table(self, skip_cleanup: bool = False) -> bigquery.TableReference:
)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume the main reason is we need to set an expiration time for temp table.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, good point.

array_value.schema.to_bigquery(), cluster_cols=[]
)
job_config.destination = destination_table

sql = self.compiler.compile(plan, ordered=False, limit=n_rows)

# TODO(swast): plumb through the api_name of the user-facing api that
# caused this query.
iterator, query_job = self._run_execute_query(sql=sql)
iterator, query_job = self._run_execute_query(
sql=sql, job_config=job_config, query_with_job=use_explicit_destination
)
return ExecuteResult(
# Probably don't need read client for small peek results, but let client decide
arrow_batches=lambda: iterator.to_arrow_iterable(
Expand Down Expand Up @@ -485,7 +497,8 @@ def _run_execute_query(
api_name: Optional[str] = None,
page_size: Optional[int] = None,
max_results: Optional[int] = None,
) -> Tuple[bq_table.RowIterator, bigquery.QueryJob]:
query_with_job: bool = True,
) -> Tuple[bq_table.RowIterator, Optional[bigquery.QueryJob]]:
"""
Starts BigQuery query job and waits for results.
"""
Expand All @@ -503,15 +516,17 @@ def _run_execute_query(
# as `add_and_trim_labels` ensures the label count does not exceed 64.
bq_io.add_and_trim_labels(job_config, api_name=api_name)
try:
return bq_io.start_query_with_client(
iterator, query_job = bq_io.start_query_with_client(
self.bqclient,
sql,
job_config=job_config,
api_name=api_name,
max_results=max_results,
page_size=page_size,
metrics=self.metrics,
query_with_job=query_with_job,
)
return iterator, query_job

except google.api_core.exceptions.BadRequest as e:
# Unfortunately, this error type does not have a separate error code or exception type
Expand Down Expand Up @@ -642,7 +657,7 @@ def _sql_as_cached_temp_table(
job_config=job_config,
api_name="cached",
)
query_job.destination
assert query_job is not None
query_job.result()
return query_job.destination

Expand Down
4 changes: 3 additions & 1 deletion bigframes/session/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -726,14 +726,16 @@ def _start_query(
job_config.maximum_bytes_billed = (
bigframes.options.compute.maximum_bytes_billed
)
return bf_io_bigquery.start_query_with_client(
iterator, query_job = bf_io_bigquery.start_query_with_client(
self._bqclient,
sql,
job_config=job_config,
max_results=max_results,
timeout=timeout,
api_name=api_name,
)
assert query_job is not None
return iterator, query_job


def _transform_read_gbq_configuration(configuration: Optional[dict]) -> dict:
Expand Down
Loading