-
Notifications
You must be signed in to change notification settings - Fork 49
feat: add allow_large_results to peek #1448
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
acc951f
e882453
1f4bb3b
ae775c6
2efac60
8608e4f
94b8abe
2308f92
3e5efbb
4bf907e
98f1326
2bf3555
69c686c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -105,7 +105,6 @@ def execute( | |||
*, | ||||
ordered: bool = True, | ||||
use_explicit_destination: Optional[bool] = False, | ||||
get_size_bytes: bool = False, | ||||
page_size: Optional[int] = None, | ||||
max_results: Optional[int] = None, | ||||
): | ||||
|
@@ -152,6 +151,7 @@ def peek( | |||
self, | ||||
array_value: bigframes.core.ArrayValue, | ||||
n_rows: int, | ||||
use_explicit_destination: Optional[bool] = False, | ||||
) -> ExecuteResult: | ||||
""" | ||||
A 'peek' efficiently accesses a small number of rows in the dataframe. | ||||
|
@@ -233,8 +233,7 @@ def execute( | |||
array_value: bigframes.core.ArrayValue, | ||||
*, | ||||
ordered: bool = True, | ||||
use_explicit_destination: Optional[bool] = False, | ||||
get_size_bytes: bool = False, | ||||
use_explicit_destination: Optional[bool] = None, | ||||
page_size: Optional[int] = None, | ||||
max_results: Optional[int] = None, | ||||
): | ||||
|
@@ -259,13 +258,14 @@ def execute( | |||
job_config=job_config, | ||||
page_size=page_size, | ||||
max_results=max_results, | ||||
query_with_job=use_explicit_destination, | ||||
) | ||||
|
||||
# Though we provide the read client, iterator may or may not use it based on what is efficient for the result | ||||
def iterator_supplier(): | ||||
return iterator.to_arrow_iterable(bqstorage_client=self.bqstoragereadclient) | ||||
|
||||
if get_size_bytes is True or use_explicit_destination: | ||||
if query_job: | ||||
size_bytes = self.bqclient.get_table(query_job.destination).num_bytes | ||||
else: | ||||
size_bytes = None | ||||
|
@@ -329,8 +329,7 @@ def export_gbq( | |||
if if_exists != "append" and has_timedelta_col: | ||||
# Only update schema if this is not modifying an existing table, and the | ||||
# new table contains timedelta columns. | ||||
assert query_job.destination is not None | ||||
table = self.bqclient.get_table(query_job.destination) | ||||
table = self.bqclient.get_table(destination) | ||||
table.schema = array_value.schema.to_bigquery() | ||||
self.bqclient.update_table(table, ["schema"]) | ||||
|
||||
|
@@ -377,6 +376,7 @@ def peek( | |||
self, | ||||
array_value: bigframes.core.ArrayValue, | ||||
n_rows: int, | ||||
use_explicit_destination: Optional[bool] = None, | ||||
) -> ExecuteResult: | ||||
""" | ||||
A 'peek' efficiently accesses a small number of rows in the dataframe. | ||||
|
@@ -385,12 +385,24 @@ def peek( | |||
if not tree_properties.can_fast_peek(plan): | ||||
msg = "Peeking this value cannot be done efficiently." | ||||
warnings.warn(msg) | ||||
if use_explicit_destination is None: | ||||
use_explicit_destination = bigframes.options.bigquery.allow_large_results | ||||
|
||||
job_config = bigquery.QueryJobConfig() | ||||
# Use explicit destination to avoid 10GB limit of temporary table | ||||
if use_explicit_destination: | ||||
destination_table = self.storage_manager.create_temp_table( | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think it needs to be a full table, right? We should be able to avoid the
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I assume the main reason is we need to set an expiration time for temp table. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, good point. |
||||
array_value.schema.to_bigquery(), cluster_cols=[] | ||||
) | ||||
job_config.destination = destination_table | ||||
|
||||
sql = self.compiler.compile(plan, ordered=False, limit=n_rows) | ||||
|
||||
# TODO(swast): plumb through the api_name of the user-facing api that | ||||
# caused this query. | ||||
iterator, query_job = self._run_execute_query(sql=sql) | ||||
iterator, query_job = self._run_execute_query( | ||||
sql=sql, job_config=job_config, query_with_job=use_explicit_destination | ||||
) | ||||
return ExecuteResult( | ||||
# Probably don't need read client for small peek results, but let client decide | ||||
arrow_batches=lambda: iterator.to_arrow_iterable( | ||||
|
@@ -485,7 +497,8 @@ def _run_execute_query( | |||
api_name: Optional[str] = None, | ||||
page_size: Optional[int] = None, | ||||
max_results: Optional[int] = None, | ||||
) -> Tuple[bq_table.RowIterator, bigquery.QueryJob]: | ||||
query_with_job: bool = True, | ||||
) -> Tuple[bq_table.RowIterator, Optional[bigquery.QueryJob]]: | ||||
""" | ||||
Starts BigQuery query job and waits for results. | ||||
""" | ||||
|
@@ -503,15 +516,17 @@ def _run_execute_query( | |||
# as `add_and_trim_labels` ensures the label count does not exceed 64. | ||||
bq_io.add_and_trim_labels(job_config, api_name=api_name) | ||||
try: | ||||
return bq_io.start_query_with_client( | ||||
iterator, query_job = bq_io.start_query_with_client( | ||||
self.bqclient, | ||||
sql, | ||||
job_config=job_config, | ||||
api_name=api_name, | ||||
max_results=max_results, | ||||
page_size=page_size, | ||||
metrics=self.metrics, | ||||
query_with_job=query_with_job, | ||||
) | ||||
return iterator, query_job | ||||
|
||||
except google.api_core.exceptions.BadRequest as e: | ||||
# Unfortunately, this error type does not have a separate error code or exception type | ||||
|
@@ -642,7 +657,7 @@ def _sql_as_cached_temp_table( | |||
job_config=job_config, | ||||
api_name="cached", | ||||
) | ||||
query_job.destination | ||||
assert query_job is not None | ||||
query_job.result() | ||||
return query_job.destination | ||||
|
||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query response includes
totalBytesProcessed
. Let's create a follow-up issue to include those metrics, too.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Filed internal issue 400961399