Skip to content

Commit a538c69

Browse files
tswastshuoweil
authored andcommitted
feat: support INFORMATION_SCHEMA views in read_gbq (#1895)
* feat: support INFORMATION_SCHEMA tables in read_gbq * avoid storage semi executor * use faster tables for peek tests * more tests * fix mypy * Update bigframes/session/_io/bigquery/read_gbq_table.py * immediately query for information_schema tables * Fix mypy errors and temporarily update python version * snapshot * snapshot again
1 parent e0ac827 commit a538c69

File tree

5 files changed

+161
-22
lines changed

5 files changed

+161
-22
lines changed

bigframes/session/_io/bigquery/read_gbq_table.py

Lines changed: 90 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import google.cloud.bigquery as bigquery
2929
import google.cloud.bigquery.table
3030

31+
import bigframes.core
3132
import bigframes.core.events
3233
import bigframes.exceptions as bfe
3334
import bigframes.session._io.bigquery
@@ -37,18 +38,79 @@
3738
import bigframes.session
3839

3940

41+
def _convert_information_schema_table_id_to_table_reference(
42+
table_id: str,
43+
default_project: Optional[str],
44+
) -> bigquery.TableReference:
45+
"""Squeeze an INFORMATION_SCHEMA reference into a TableReference.
46+
This is kind-of a hack. INFORMATION_SCHEMA is a view that isn't available
47+
via the tables.get REST API.
48+
"""
49+
parts = table_id.split(".")
50+
parts_casefold = [part.casefold() for part in parts]
51+
dataset_index = parts_casefold.index("INFORMATION_SCHEMA".casefold())
52+
53+
if dataset_index == 0:
54+
project = default_project
55+
else:
56+
project = ".".join(parts[:dataset_index])
57+
58+
if project is None:
59+
message = (
60+
"Could not determine project ID. "
61+
"Please provide a project or region in your INFORMATION_SCHEMA table ID, "
62+
"For example, 'region-REGION_NAME.INFORMATION_SCHEMA.JOBS'."
63+
)
64+
raise ValueError(message)
65+
66+
dataset = "INFORMATION_SCHEMA"
67+
table_id_short = ".".join(parts[dataset_index + 1 :])
68+
return bigquery.TableReference(
69+
bigquery.DatasetReference(project, dataset),
70+
table_id_short,
71+
)
72+
73+
74+
def get_information_schema_metadata(
75+
bqclient: bigquery.Client,
76+
table_id: str,
77+
default_project: Optional[str],
78+
) -> bigquery.Table:
79+
job_config = bigquery.QueryJobConfig(dry_run=True)
80+
job = bqclient.query(
81+
f"SELECT * FROM `{table_id}`",
82+
job_config=job_config,
83+
)
84+
table_ref = _convert_information_schema_table_id_to_table_reference(
85+
table_id=table_id,
86+
default_project=default_project,
87+
)
88+
table = bigquery.Table.from_api_repr(
89+
{
90+
"tableReference": table_ref.to_api_repr(),
91+
"location": job.location,
92+
# Prevent ourselves from trying to read the table with the BQ
93+
# Storage API.
94+
"type": "VIEW",
95+
}
96+
)
97+
table.schema = job.schema
98+
return table
99+
100+
40101
def get_table_metadata(
41102
bqclient: bigquery.Client,
42-
table_ref: google.cloud.bigquery.table.TableReference,
43-
bq_time: datetime.datetime,
44103
*,
45-
cache: Dict[bigquery.TableReference, Tuple[datetime.datetime, bigquery.Table]],
104+
table_id: str,
105+
default_project: Optional[str],
106+
bq_time: datetime.datetime,
107+
cache: Dict[str, Tuple[datetime.datetime, bigquery.Table]],
46108
use_cache: bool = True,
47109
publisher: bigframes.core.events.Publisher,
48110
) -> Tuple[datetime.datetime, google.cloud.bigquery.table.Table]:
49111
"""Get the table metadata, either from cache or via REST API."""
50112

51-
cached_table = cache.get(table_ref)
113+
cached_table = cache.get(table_id)
52114
if use_cache and cached_table is not None:
53115
snapshot_timestamp, table = cached_table
54116

@@ -90,18 +152,38 @@ def get_table_metadata(
90152

91153
return cached_table
92154

93-
table = bqclient.get_table(table_ref)
155+
if is_information_schema(table_id):
156+
table = get_information_schema_metadata(
157+
bqclient=bqclient, table_id=table_id, default_project=default_project
158+
)
159+
else:
160+
table_ref = google.cloud.bigquery.table.TableReference.from_string(
161+
table_id, default_project=default_project
162+
)
163+
table = bqclient.get_table(table_ref)
164+
94165
# local time will lag a little bit do to network latency
95166
# make sure it is at least table creation time.
96167
# This is relevant if the table was created immediately before loading it here.
97168
if (table.created is not None) and (table.created > bq_time):
98169
bq_time = table.created
99170

100171
cached_table = (bq_time, table)
101-
cache[table_ref] = cached_table
172+
cache[table_id] = cached_table
102173
return cached_table
103174

104175

176+
def is_information_schema(table_id: str):
177+
table_id_casefold = table_id.casefold()
178+
# Include the "."s to ensure we don't have false positives for some user
179+
# defined dataset like MY_INFORMATION_SCHEMA or tables called
180+
# INFORMATION_SCHEMA.
181+
return (
182+
".INFORMATION_SCHEMA.".casefold() in table_id_casefold
183+
or table_id_casefold.startswith("INFORMATION_SCHEMA.".casefold())
184+
)
185+
186+
105187
def is_time_travel_eligible(
106188
bqclient: bigquery.Client,
107189
table: google.cloud.bigquery.table.Table,
@@ -168,6 +250,8 @@ def is_time_travel_eligible(
168250
msg, category=bfe.TimeTravelDisabledWarning, stacklevel=stacklevel
169251
)
170252
return False
253+
elif table.table_type == "VIEW":
254+
return False
171255

172256
# table might support time travel, lets do a dry-run query with time travel
173257
if should_dry_run:

bigframes/session/loader.py

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@
4747
import pandas
4848
import pyarrow as pa
4949

50+
import bigframes._tools
51+
import bigframes._tools.strings
5052
from bigframes.core import guid, identifiers, local_data, nodes, ordering, utils
5153
import bigframes.core as core
5254
import bigframes.core.blocks as blocks
@@ -272,9 +274,7 @@ def __init__(
272274
self._default_index_type = default_index_type
273275
self._scan_index_uniqueness = scan_index_uniqueness
274276
self._force_total_order = force_total_order
275-
self._df_snapshot: Dict[
276-
bigquery.TableReference, Tuple[datetime.datetime, bigquery.Table]
277-
] = {}
277+
self._df_snapshot: Dict[str, Tuple[datetime.datetime, bigquery.Table]] = {}
278278
self._metrics = metrics
279279
self._publisher = publisher
280280
# Unfortunate circular reference, but need to pass reference when constructing objects
@@ -629,10 +629,6 @@ def read_gbq_table(
629629

630630
_check_duplicates("columns", columns)
631631

632-
table_ref = google.cloud.bigquery.table.TableReference.from_string(
633-
table_id, default_project=self._bqclient.project
634-
)
635-
636632
columns = list(columns)
637633
include_all_columns = columns is None or len(columns) == 0
638634
filters = typing.cast(list, list(filters))
@@ -643,7 +639,8 @@ def read_gbq_table(
643639

644640
time_travel_timestamp, table = bf_read_gbq_table.get_table_metadata(
645641
self._bqclient,
646-
table_ref=table_ref,
642+
table_id=table_id,
643+
default_project=self._bqclient.project,
647644
bq_time=self._clock.get_time(),
648645
cache=self._df_snapshot,
649646
use_cache=use_cache,
@@ -706,18 +703,23 @@ def read_gbq_table(
706703
# Optionally, execute the query
707704
# -----------------------------
708705

709-
# max_results introduces non-determinism and limits the cost on
710-
# clustered tables, so fallback to a query. We do this here so that
711-
# the index is consistent with tables that have primary keys, even
712-
# when max_results is set.
713-
if max_results is not None:
706+
if (
707+
# max_results introduces non-determinism and limits the cost on
708+
# clustered tables, so fallback to a query. We do this here so that
709+
# the index is consistent with tables that have primary keys, even
710+
# when max_results is set.
711+
max_results is not None
712+
# Views such as INFORMATION_SCHEMA can introduce non-determinism.
713+
# They can update frequently and don't support time travel.
714+
or bf_read_gbq_table.is_information_schema(table_id)
715+
):
714716
# TODO(b/338111344): If we are running a query anyway, we might as
715717
# well generate ROW_NUMBER() at the same time.
716718
all_columns: Iterable[str] = (
717719
itertools.chain(index_cols, columns) if columns else ()
718720
)
719721
query = bf_io_bigquery.to_query(
720-
table_id,
722+
f"{table.project}.{table.dataset_id}.{table.table_id}",
721723
columns=all_columns,
722724
sql_predicate=bf_io_bigquery.compile_filters(filters)
723725
if filters

bigframes/session/read_api_execution.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@ def execute(
4646
if node.explicitly_ordered and ordered:
4747
return None
4848

49+
if not node.source.table.is_physically_stored:
50+
return None
51+
4952
if limit is not None:
5053
if peek is None or limit < peek:
5154
peek = limit
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import pytest
16+
17+
18+
@pytest.mark.parametrize("include_project", [True, False])
19+
@pytest.mark.parametrize(
20+
"view_id",
21+
[
22+
# https://cloud.google.com/bigquery/docs/information-schema-intro
23+
"region-US.INFORMATION_SCHEMA.SESSIONS_BY_USER",
24+
"region-US.INFORMATION_SCHEMA.SCHEMATA",
25+
],
26+
)
27+
def test_read_gbq_jobs_by_user_returns_schema(
28+
unordered_session, view_id: str, include_project: bool
29+
):
30+
if include_project:
31+
table_id = unordered_session.bqclient.project + "." + view_id
32+
else:
33+
table_id = view_id
34+
35+
df = unordered_session.read_gbq(table_id, max_results=10)
36+
assert df.dtypes is not None
37+
38+
39+
def test_read_gbq_schemata_can_be_peeked(unordered_session):
40+
df = unordered_session.read_gbq("region-US.INFORMATION_SCHEMA.SCHEMATA")
41+
result = df.peek()
42+
assert result is not None
43+
44+
45+
def test_read_gbq_schemata_four_parts_can_be_peeked(unordered_session):
46+
df = unordered_session.read_gbq(
47+
f"{unordered_session.bqclient.project}.region-US.INFORMATION_SCHEMA.SCHEMATA"
48+
)
49+
result = df.peek()
50+
assert result is not None

tests/unit/session/test_session.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ def test_read_gbq_cached_table():
242242
table._properties["numRows"] = "1000000000"
243243
table._properties["location"] = session._location
244244
table._properties["type"] = "TABLE"
245-
session._loader._df_snapshot[table_ref] = (
245+
session._loader._df_snapshot[str(table_ref)] = (
246246
datetime.datetime(1999, 1, 2, 3, 4, 5, 678901, tzinfo=datetime.timezone.utc),
247247
table,
248248
)
@@ -273,7 +273,7 @@ def test_read_gbq_cached_table_doesnt_warn_for_anonymous_tables_and_doesnt_inclu
273273
table._properties["numRows"] = "1000000000"
274274
table._properties["location"] = session._location
275275
table._properties["type"] = "TABLE"
276-
session._loader._df_snapshot[table_ref] = (
276+
session._loader._df_snapshot[str(table_ref)] = (
277277
datetime.datetime(1999, 1, 2, 3, 4, 5, 678901, tzinfo=datetime.timezone.utc),
278278
table,
279279
)

0 commit comments

Comments
 (0)