Skip to content

Commit dfcc2d3

Browse files
authored
Revert "fix: use table clone instead of system time for read_gbq_table (#109)" (#171)
This reverts commit 031f253.
1 parent c065071 commit dfcc2d3

File tree

6 files changed

+84
-137
lines changed

6 files changed

+84
-137
lines changed

bigframes/constants.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,4 @@
2626

2727
ABSTRACT_METHOD_ERROR_MESSAGE = f"Abstract method. You have likely encountered a bug. Please share this stacktrace and how you reached it with the BigQuery DataFrames team. {FEEDBACK_LINK}"
2828

29-
DEFAULT_EXPIRATION = datetime.timedelta(days=7)
29+
DEFAULT_EXPIRATION = datetime.timedelta(days=1)

bigframes/dataframe.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
from __future__ import annotations
1818

19-
import datetime
2019
import re
2120
import textwrap
2221
import typing
@@ -2328,8 +2327,7 @@ def to_gbq(
23282327
self._session.bqclient,
23292328
self._session._anonymous_dataset,
23302329
# TODO(swast): allow custom expiration times, probably via session configuration.
2331-
datetime.datetime.now(datetime.timezone.utc)
2332-
+ constants.DEFAULT_EXPIRATION,
2330+
constants.DEFAULT_EXPIRATION,
23332331
)
23342332

23352333
if if_exists is not None and if_exists != "replace":

bigframes/session/__init__.py

Lines changed: 35 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
from __future__ import annotations
1818

19-
import datetime
2019
import logging
2120
import os
2221
import re
@@ -431,9 +430,7 @@ def _read_gbq_query(
431430
index_cols = list(index_col)
432431

433432
destination, query_job = self._query_to_destination(
434-
query,
435-
index_cols,
436-
api_name=api_name,
433+
query, index_cols, api_name="read_gbq_query"
437434
)
438435

439436
# If there was no destination table, that means the query must have
@@ -511,12 +508,6 @@ def _read_gbq_table_to_ibis_with_total_ordering(
511508
If we can get a total ordering from the table, such as via primary key
512509
column(s), then return those too so that ordering generation can be
513510
avoided.
514-
515-
For tables that aren't already read-only, this creates Create a table
516-
clone so that any changes to the underlying table don't affect the
517-
DataFrame and break our assumptions, especially with regards to unique
518-
index and ordering. See:
519-
https://cloud.google.com/bigquery/docs/table-clones-create
520511
"""
521512
if table_ref.dataset_id.upper() == "_SESSION":
522513
# _SESSION tables aren't supported by the tables.get REST API.
@@ -527,24 +518,15 @@ def _read_gbq_table_to_ibis_with_total_ordering(
527518
None,
528519
)
529520

530-
now = datetime.datetime.now(datetime.timezone.utc)
531-
destination = bigframes_io.create_table_clone(
532-
table_ref,
533-
self._anonymous_dataset,
534-
# TODO(swast): Allow the default expiration to be configured.
535-
now + constants.DEFAULT_EXPIRATION,
536-
self,
537-
api_name,
538-
)
539521
table_expression = self.ibis_client.table(
540-
destination.table_id,
541-
database=f"{destination.project}.{destination.dataset_id}",
522+
table_ref.table_id,
523+
database=f"{table_ref.project}.{table_ref.dataset_id}",
542524
)
543525

544526
# If there are primary keys defined, the query engine assumes these
545527
# columns are unique, even if the constraint is not enforced. We make
546528
# the same assumption and use these columns as the total ordering keys.
547-
table = self.bqclient.get_table(destination)
529+
table = self.bqclient.get_table(table_ref)
548530

549531
# TODO(b/305264153): Use public properties to fetch primary keys once
550532
# added to google-cloud-bigquery.
@@ -553,7 +535,23 @@ def _read_gbq_table_to_ibis_with_total_ordering(
553535
.get("primaryKey", {})
554536
.get("columns")
555537
)
556-
return table_expression, primary_keys
538+
539+
if not primary_keys:
540+
return table_expression, None
541+
else:
542+
# Read from a snapshot since we won't have to copy the table data to create a total ordering.
543+
job_config = bigquery.QueryJobConfig()
544+
job_config.labels["bigframes-api"] = api_name
545+
current_timestamp = list(
546+
self.bqclient.query(
547+
"SELECT CURRENT_TIMESTAMP() AS `current_timestamp`",
548+
job_config=job_config,
549+
).result()
550+
)[0][0]
551+
table_expression = self.ibis_client.sql(
552+
bigframes_io.create_snapshot_sql(table_ref, current_timestamp)
553+
)
554+
return table_expression, primary_keys
557555

558556
def _read_gbq_table(
559557
self,
@@ -664,7 +662,20 @@ def _read_gbq_table(
664662
total_ordering_columns=frozenset(index_cols),
665663
)
666664

667-
if not is_total_ordering:
665+
# We have a total ordering, so query via "time travel" so that
666+
# the underlying data doesn't mutate.
667+
if is_total_ordering:
668+
# Get the timestamp from the job metadata rather than the query
669+
# text so that the query for determining uniqueness of the ID
670+
# columns can be cached.
671+
current_timestamp = query_job.started
672+
673+
# The job finished, so we should have a start time.
674+
assert current_timestamp is not None
675+
table_expression = self.ibis_client.sql(
676+
bigframes_io.create_snapshot_sql(table_ref, current_timestamp)
677+
)
678+
else:
668679
# Make sure when we generate an ordering, the row_number()
669680
# coresponds to the index columns.
670681
table_expression = table_expression.order_by(index_cols)

bigframes/session/_io/bigquery.py

Lines changed: 21 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -14,21 +14,14 @@
1414

1515
"""Private module: Helpers for I/O operations."""
1616

17-
from __future__ import annotations
18-
1917
import datetime
2018
import textwrap
2119
import types
22-
import typing
2320
from typing import Dict, Iterable, Union
2421
import uuid
2522

2623
import google.cloud.bigquery as bigquery
2724

28-
if typing.TYPE_CHECKING:
29-
import bigframes.session
30-
31-
3225
IO_ORDERING_ID = "bqdf_row_nums"
3326
TEMP_TABLE_PREFIX = "bqdf{date}_{random_id}"
3427

@@ -76,83 +69,43 @@ def create_export_data_statement(
7669
)
7770

7871

79-
def random_table(dataset: bigquery.DatasetReference) -> bigquery.TableReference:
80-
"""Generate a random table ID with BigQuery DataFrames prefix.
81-
82-
Args:
83-
dataset (google.cloud.bigquery.DatasetReference):
84-
The dataset to make the table reference in. Usually the anonymous
85-
dataset for the session.
86-
87-
Returns:
88-
google.cloud.bigquery.TableReference:
89-
Fully qualified table ID of a table that doesn't exist.
90-
"""
91-
now = datetime.datetime.now(datetime.timezone.utc)
92-
random_id = uuid.uuid4().hex
93-
table_id = TEMP_TABLE_PREFIX.format(
94-
date=now.strftime("%Y%m%d"), random_id=random_id
95-
)
96-
return dataset.table(table_id)
97-
98-
99-
def table_ref_to_sql(table: bigquery.TableReference) -> str:
100-
"""Format a table reference as escaped SQL."""
101-
return f"`{table.project}`.`{table.dataset_id}`.`{table.table_id}`"
72+
def create_snapshot_sql(
73+
table_ref: bigquery.TableReference, current_timestamp: datetime.datetime
74+
) -> str:
75+
"""Query a table via 'time travel' for consistent reads."""
10276

77+
# If we have a _SESSION table, assume that it's already a copy. Nothing to do here.
78+
if table_ref.dataset_id.upper() == "_SESSION":
79+
return f"SELECT * FROM `_SESSION`.`{table_ref.table_id}`"
10380

104-
def create_table_clone(
105-
source: bigquery.TableReference,
106-
dataset: bigquery.DatasetReference,
107-
expiration: datetime.datetime,
108-
session: bigframes.session.Session,
109-
api_name: str,
110-
) -> bigquery.TableReference:
111-
"""Create a table clone for consistent reads."""
11281
# If we have an anonymous query results table, it can't be modified and
11382
# there isn't any BigQuery time travel.
114-
if source.dataset_id.startswith("_"):
115-
return source
116-
117-
fully_qualified_source_id = table_ref_to_sql(source)
118-
destination = random_table(dataset)
119-
fully_qualified_destination_id = table_ref_to_sql(destination)
83+
if table_ref.dataset_id.startswith("_"):
84+
return f"SELECT * FROM `{table_ref.project}`.`{table_ref.dataset_id}`.`{table_ref.table_id}`"
12085

121-
# Include a label so that Dataplex Lineage can identify temporary
122-
# tables that BigQuery DataFrames creates. Googlers: See internal issue
123-
# 296779699.
124-
ddl = textwrap.dedent(
86+
return textwrap.dedent(
12587
f"""
126-
CREATE OR REPLACE TABLE
127-
{fully_qualified_destination_id}
128-
CLONE {fully_qualified_source_id}
129-
OPTIONS(
130-
expiration_timestamp=TIMESTAMP "{expiration.isoformat()}",
131-
labels=[
132-
("source", "bigquery-dataframes-temp"),
133-
("bigframes-api", {repr(api_name)})
134-
]
135-
)
88+
SELECT *
89+
FROM `{table_ref.project}`.`{table_ref.dataset_id}`.`{table_ref.table_id}`
90+
FOR SYSTEM_TIME AS OF TIMESTAMP({repr(current_timestamp.isoformat())})
13691
"""
13792
)
138-
job_config = bigquery.QueryJobConfig()
139-
job_config.labels = {
140-
"source": "bigquery-dataframes-temp",
141-
"bigframes-api": api_name,
142-
}
143-
session._start_query(ddl, job_config=job_config)
144-
return destination
14593

14694

14795
def create_temp_table(
14896
bqclient: bigquery.Client,
14997
dataset: bigquery.DatasetReference,
150-
expiration: datetime.datetime,
98+
expiration: datetime.timedelta,
15199
) -> str:
152100
"""Create an empty table with an expiration in the desired dataset."""
153-
table_ref = random_table(dataset)
101+
now = datetime.datetime.now(datetime.timezone.utc)
102+
random_id = uuid.uuid4().hex
103+
table_id = TEMP_TABLE_PREFIX.format(
104+
date=now.strftime("%Y%m%d"), random_id=random_id
105+
)
106+
table_ref = dataset.table(table_id)
154107
destination = bigquery.Table(table_ref)
155-
destination.expires = expiration
108+
destination.expires = now + expiration
156109
bqclient.create_table(destination)
157110
return f"{table_ref.project}.{table_ref.dataset_id}.{table_ref.table_id}"
158111

tests/system/small/test_session.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,9 @@ def test_read_gbq_w_primary_keys_table(
252252
sorted_result = result.sort_values(primary_keys)
253253
pd.testing.assert_frame_equal(result, sorted_result)
254254

255+
# Verify that we're working from a snapshot rather than a copy of the table.
256+
assert "FOR SYSTEM_TIME AS OF TIMESTAMP" in df.sql
257+
255258

256259
@pytest.mark.parametrize(
257260
("query_or_table", "max_results"),

tests/unit/session/test_io_bigquery.py

Lines changed: 23 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -19,63 +19,46 @@
1919
import google.cloud.bigquery as bigquery
2020
import pytest
2121

22-
import bigframes.session
2322
import bigframes.session._io.bigquery
2423

2524

26-
def test_create_table_clone_doesnt_clone_anonymous_datasets():
27-
session = mock.create_autospec(bigframes.session.Session)
28-
source = bigquery.TableReference.from_string(
25+
def test_create_snapshot_sql_doesnt_timetravel_anonymous_datasets():
26+
table_ref = bigquery.TableReference.from_string(
2927
"my-test-project._e8166e0cdb.anonbb92cd"
3028
)
3129

32-
destination = bigframes.session._io.bigquery.create_table_clone(
33-
source,
34-
bigquery.DatasetReference("other-project", "other_dataset"),
35-
datetime.datetime(2023, 11, 2, 15, 43, 21, tzinfo=datetime.timezone.utc),
36-
session,
37-
"test_api",
30+
sql = bigframes.session._io.bigquery.create_snapshot_sql(
31+
table_ref, datetime.datetime.now(datetime.timezone.utc)
3832
)
3933

40-
# Anonymous query results tables don't support CLONE
41-
assert destination is source
42-
session._start_query.assert_not_called()
34+
# Anonymous query results tables don't support time travel.
35+
assert "SYSTEM_TIME" not in sql
4336

37+
# Need fully-qualified table name.
38+
assert "`my-test-project`.`_e8166e0cdb`.`anonbb92cd`" in sql
4439

45-
def test_create_table_clone_sets_expiration():
46-
session = mock.create_autospec(bigframes.session.Session)
47-
source = bigquery.TableReference.from_string(
48-
"my-test-project.test_dataset.some_table"
49-
)
5040

51-
expiration = datetime.datetime(
52-
2023, 11, 2, 15, 43, 21, tzinfo=datetime.timezone.utc
53-
)
54-
bigframes.session._io.bigquery.create_table_clone(
55-
source,
56-
bigquery.DatasetReference("other-project", "other_dataset"),
57-
expiration,
58-
session,
59-
"test_api",
41+
def test_create_snapshot_sql_doesnt_timetravel_session_tables():
42+
table_ref = bigquery.TableReference.from_string("my-test-project._session.abcdefg")
43+
44+
sql = bigframes.session._io.bigquery.create_snapshot_sql(
45+
table_ref, datetime.datetime.now(datetime.timezone.utc)
6046
)
6147

62-
session._start_query.assert_called_once()
63-
call_args = session._start_query.call_args
64-
query = call_args.args[0]
65-
assert "CREATE OR REPLACE TABLE" in query
66-
assert "CLONE" in query
67-
assert f'expiration_timestamp=TIMESTAMP "{expiration.isoformat()}"' in query
68-
assert '("source", "bigquery-dataframes-temp")' in query
69-
assert call_args.kwargs["job_config"].labels["bigframes-api"] == "test_api"
48+
# We aren't modifying _SESSION tables, so don't use time travel.
49+
assert "SYSTEM_TIME" not in sql
50+
51+
# Don't need the project ID for _SESSION tables.
52+
assert "my-test-project" not in sql
7053

7154

7255
def test_create_temp_table_default_expiration():
7356
"""Make sure the created table has an expiration."""
7457
bqclient = mock.create_autospec(bigquery.Client)
7558
dataset = bigquery.DatasetReference("test-project", "test_dataset")
76-
expiration = datetime.datetime(
77-
2023, 11, 2, 13, 44, 55, 678901, datetime.timezone.utc
78-
)
59+
now = datetime.datetime.now(datetime.timezone.utc)
60+
expiration = datetime.timedelta(days=3)
61+
expected_expires = now + expiration
7962

8063
bigframes.session._io.bigquery.create_temp_table(bqclient, dataset, expiration)
8164

@@ -85,11 +68,10 @@ def test_create_temp_table_default_expiration():
8568
assert table.project == "test-project"
8669
assert table.dataset_id == "test_dataset"
8770
assert table.table_id.startswith("bqdf")
88-
# TODO(swast): Why isn't the expiration exactly what we set it to?
8971
assert (
90-
(expiration - datetime.timedelta(minutes=1))
72+
(expected_expires - datetime.timedelta(minutes=1))
9173
< table.expires
92-
< (expiration + datetime.timedelta(minutes=1))
74+
< (expected_expires + datetime.timedelta(minutes=1))
9375
)
9476

9577

0 commit comments

Comments
 (0)