Skip to content

Commit f87a256

Browse files
feat: Initial support for biglake iceberg tables
1 parent 6fef9be commit f87a256

File tree

25 files changed

+663
-267
lines changed

25 files changed

+663
-267
lines changed

bigframes/core/array_value.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,8 @@
1717
import datetime
1818
import functools
1919
import typing
20-
from typing import Iterable, List, Mapping, Optional, Sequence, Tuple
20+
from typing import Iterable, List, Mapping, Optional, Sequence, Tuple, Union
2121

22-
import google.cloud.bigquery
2322
import pandas
2423
import pyarrow as pa
2524

@@ -91,7 +90,7 @@ def from_range(cls, start, end, step):
9190
@classmethod
9291
def from_table(
9392
cls,
94-
table: google.cloud.bigquery.Table,
93+
table: Union[bq_data.BiglakeIcebergTable, bq_data.GbqNativeTable],
9594
session: Session,
9695
*,
9796
columns: Optional[Sequence[str]] = None,
@@ -103,8 +102,6 @@ def from_table(
103102
):
104103
if offsets_col and primary_key:
105104
raise ValueError("must set at most one of 'offests', 'primary_key'")
106-
# define data source only for needed columns, this makes row-hashing cheaper
107-
table_def = bq_data.GbqTable.from_table(table, columns=columns or ())
108105

109106
# create ordering from info
110107
ordering = None
@@ -115,7 +112,9 @@ def from_table(
115112
[ids.ColumnId(key_part) for key_part in primary_key]
116113
)
117114

118-
bf_schema = schemata.ArraySchema.from_bq_table(table, columns=columns)
115+
bf_schema = schemata.ArraySchema.from_bq_schema(
116+
table.physical_schema, columns=columns
117+
)
119118
# Scan all columns by default, we define this list as it can be pruned while preserving source_def
120119
scan_list = nodes.ScanList(
121120
tuple(
@@ -124,7 +123,7 @@ def from_table(
124123
)
125124
)
126125
source_def = bq_data.BigqueryDataSource(
127-
table=table_def,
126+
table=table,
128127
schema=bf_schema,
129128
at_time=at_time,
130129
sql_predicate=predicate,

bigframes/core/bq_data.py

Lines changed: 160 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,73 +22,200 @@
2222
import queue
2323
import threading
2424
import typing
25-
from typing import Any, Iterator, Optional, Sequence, Tuple
25+
from typing import Any, Iterator, List, Literal, Optional, Sequence, Tuple, Union
2626

2727
from google.cloud import bigquery_storage_v1
2828
import google.cloud.bigquery as bq
2929
import google.cloud.bigquery_storage_v1.types as bq_storage_types
3030
from google.protobuf import timestamp_pb2
3131
import pyarrow as pa
3232

33+
import bigframes.constants
3334
from bigframes.core import pyarrow_utils
3435
import bigframes.core.schema
3536

3637
if typing.TYPE_CHECKING:
3738
import bigframes.core.ordering as orderings
3839

3940

41+
def _resolve_standard_gcp_region(bq_region: str):
42+
"""
43+
Resolve bq regions to standardized
44+
"""
45+
if bq_region.casefold() == "US":
46+
return "us-central1"
47+
elif bq_region.casefold() == "EU":
48+
return "europe-west4"
49+
return bq_region
50+
51+
52+
def is_irc_table(table_id: str):
53+
"""
54+
Determines if a table id should be resolved through the iceberg rest catalog.
55+
"""
56+
return len(table_id.split(".")) == 4
57+
58+
59+
def is_compatible(
60+
data_region: Union[GcsRegion, BigQueryRegion], session_location: str
61+
) -> bool:
62+
# based on https://docs.cloud.google.com/bigquery/docs/locations#storage-location-considerations
63+
if isinstance(data_region, BigQueryRegion):
64+
return data_region.name == session_location
65+
else:
66+
assert isinstance(data_region, GcsRegion)
67+
# TODO(b/463675088): Multi-regions don't yet support rest catalog tables
68+
if session_location in bigframes.constants.BIGQUERY_MULTIREGIONS:
69+
return False
70+
return _resolve_standard_gcp_region(session_location) in data_region.included
71+
72+
73+
def get_default_bq_region(data_region: Union[GcsRegion, BigQueryRegion]) -> str:
74+
if isinstance(data_region, BigQueryRegion):
75+
return data_region.name
76+
elif isinstance(data_region, GcsRegion):
77+
# should maybe try to track and prefer primary replica?
78+
return data_region.included[0]
79+
80+
4081
@dataclasses.dataclass(frozen=True)
41-
class GbqTable:
82+
class BigQueryRegion:
83+
name: str
84+
85+
86+
@dataclasses.dataclass(frozen=True)
87+
class GcsRegion:
88+
# this is the name of gcs regions, which may be names for multi-regions, so shouldn't be compared with non-gcs locations
89+
storage_regions: tuple[str, ...]
90+
# this tracks all the included standard, specific regions (eg us-east1), and should be comparable to bq regions (except non-standard US, EU, omni regions)
91+
included: tuple[str, ...]
92+
93+
94+
# what is the line between metadata and core fields? Mostly metadata fields are optional or unreliable, but its fuzzy
95+
@dataclasses.dataclass(frozen=True)
96+
class TableMetadata:
97+
# this size metadata might be stale, don't use where strict correctness is needed
98+
location: Union[BigQueryRegion, GcsRegion]
99+
type: Literal["TABLE", "EXTERNAL", "VIEW", "MATERIALIZE_VIEW", "SNAPSHOT"]
100+
numBytes: Optional[int] = None
101+
numRows: Optional[int] = None
102+
created_time: Optional[datetime.datetime] = None
103+
modified_time: Optional[datetime.datetime] = None
104+
105+
106+
@dataclasses.dataclass(frozen=True)
107+
class GbqNativeTable:
42108
project_id: str = dataclasses.field()
43109
dataset_id: str = dataclasses.field()
44110
table_id: str = dataclasses.field()
45111
physical_schema: Tuple[bq.SchemaField, ...] = dataclasses.field()
46-
is_physically_stored: bool = dataclasses.field()
47-
cluster_cols: typing.Optional[Tuple[str, ...]]
112+
metadata: TableMetadata = dataclasses.field()
113+
partition_col: Optional[str] = None
114+
cluster_cols: typing.Optional[Tuple[str, ...]] = None
115+
primary_key: Optional[Tuple[str, ...]] = None
48116

49117
@staticmethod
50-
def from_table(table: bq.Table, columns: Sequence[str] = ()) -> GbqTable:
118+
def from_table(table: bq.Table, columns: Sequence[str] = ()) -> GbqNativeTable:
51119
# Subsetting fields with columns can reduce cost of row-hash default ordering
52120
if columns:
53121
schema = tuple(item for item in table.schema if item.name in columns)
54122
else:
55123
schema = tuple(table.schema)
56-
return GbqTable(
124+
125+
metadata = TableMetadata(
126+
numBytes=table.num_bytes,
127+
numRows=table.num_rows,
128+
location=BigQueryRegion(table.location), # type: ignore
129+
type=table.table_type or "TABLE", # type: ignore
130+
created_time=table.created,
131+
modified_time=table.modified,
132+
)
133+
partition_col = None
134+
if table.range_partitioning:
135+
partition_col = table.range_partitioning.field
136+
elif table.time_partitioning:
137+
partition_col = table.time_partitioning.field
138+
139+
return GbqNativeTable(
57140
project_id=table.project,
58141
dataset_id=table.dataset_id,
59142
table_id=table.table_id,
60143
physical_schema=schema,
61-
is_physically_stored=(table.table_type in ["TABLE", "MATERIALIZED_VIEW"]),
144+
partition_col=partition_col,
62145
cluster_cols=None
63-
if table.clustering_fields is None
146+
if (table.clustering_fields is None)
64147
else tuple(table.clustering_fields),
148+
primary_key=tuple(_get_primary_keys(table)),
149+
metadata=metadata,
65150
)
66151

67152
@staticmethod
68153
def from_ref_and_schema(
69154
table_ref: bq.TableReference,
70155
schema: Sequence[bq.SchemaField],
156+
location: str,
157+
table_type: Literal["TABLE"] = "TABLE",
71158
cluster_cols: Optional[Sequence[str]] = None,
72-
) -> GbqTable:
73-
return GbqTable(
159+
) -> GbqNativeTable:
160+
return GbqNativeTable(
74161
project_id=table_ref.project,
75162
dataset_id=table_ref.dataset_id,
76163
table_id=table_ref.table_id,
164+
metadata=TableMetadata(location=BigQueryRegion(location), type=table_type),
77165
physical_schema=tuple(schema),
78-
is_physically_stored=True,
79166
cluster_cols=tuple(cluster_cols) if cluster_cols else None,
80167
)
81168

169+
@property
170+
def is_physically_stored(self) -> bool:
171+
return self.metadata.type in ["TABLE", "MATERIALIZED_VIEW"]
172+
82173
def get_table_ref(self) -> bq.TableReference:
83174
return bq.TableReference(
84175
bq.DatasetReference(self.project_id, self.dataset_id), self.table_id
85176
)
86177

178+
def get_full_id(self, quoted: bool = False) -> str:
179+
if quoted:
180+
return f"`{self.project_id}`.`{self.dataset_id}`.`{self.table_id}`"
181+
return f"{self.project_id}.{self.dataset_id}.{self.table_id}"
182+
183+
@property
184+
@functools.cache
185+
def schema_by_id(self):
186+
return {col.name: col for col in self.physical_schema}
187+
188+
189+
@dataclasses.dataclass(frozen=True)
190+
class BiglakeIcebergTable:
191+
project_id: str = dataclasses.field()
192+
catalog_id: str = dataclasses.field()
193+
namespace_id: str = dataclasses.field()
194+
table_id: str = dataclasses.field()
195+
physical_schema: Tuple[bq.SchemaField, ...] = dataclasses.field()
196+
cluster_cols: typing.Optional[Tuple[str, ...]]
197+
metadata: TableMetadata
198+
199+
def get_full_id(self, quoted: bool = False) -> str:
200+
if quoted:
201+
return f"`{self.project_id}`.`{self.catalog_id}`.`{self.namespace_id}`.`{self.table_id}`"
202+
return (
203+
f"{self.project_id}.{self.catalog_id}.{self.namespace_id}.{self.table_id}"
204+
)
205+
87206
@property
88207
@functools.cache
89208
def schema_by_id(self):
90209
return {col.name: col for col in self.physical_schema}
91210

211+
@property
212+
def partition_col(self) -> Optional[str]:
213+
return None
214+
215+
@property
216+
def primary_key(self) -> Optional[Tuple[str, ...]]:
217+
return None
218+
92219

93220
@dataclasses.dataclass(frozen=True)
94221
class BigqueryDataSource:
@@ -104,13 +231,13 @@ def __post_init__(self):
104231
self.schema.names
105232
)
106233

107-
table: GbqTable
234+
table: Union[GbqNativeTable, BiglakeIcebergTable]
108235
schema: bigframes.core.schema.ArraySchema
109236
at_time: typing.Optional[datetime.datetime] = None
110237
# Added for backwards compatibility, not validated
111238
sql_predicate: typing.Optional[str] = None
112239
ordering: typing.Optional[orderings.RowOrdering] = None
113-
# Optimization field
240+
# Optimization field, must be correct if set, don't put maybe-stale number here
114241
n_rows: Optional[int] = None
115242

116243

@@ -188,6 +315,8 @@ def get_arrow_batches(
188315
project_id: str,
189316
sample_rate: Optional[float] = None,
190317
) -> ReadResult:
318+
assert isinstance(data.table, GbqNativeTable)
319+
191320
table_mod_options = {}
192321
read_options_dict: dict[str, Any] = {"selected_fields": list(columns)}
193322

@@ -245,3 +374,21 @@ def process_batch(pa_batch):
245374
return ReadResult(
246375
batches, session.estimated_row_count, session.estimated_total_bytes_scanned
247376
)
377+
378+
379+
def _get_primary_keys(
380+
table: bq.Table,
381+
) -> List[str]:
382+
"""Get primary keys from table if they are set."""
383+
384+
primary_keys: List[str] = []
385+
if (
386+
(table_constraints := getattr(table, "table_constraints", None)) is not None
387+
and (primary_key := table_constraints.primary_key) is not None
388+
# This will be False for either None or empty list.
389+
# We want primary_keys = None if no primary keys are set.
390+
and (columns := primary_key.columns)
391+
):
392+
primary_keys = columns if columns is not None else []
393+
394+
return primary_keys

bigframes/core/compile/ibis_compiler/ibis_compiler.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -207,9 +207,7 @@ def _table_to_ibis(
207207
source: bq_data.BigqueryDataSource,
208208
scan_cols: typing.Sequence[str],
209209
) -> ibis_types.Table:
210-
full_table_name = (
211-
f"{source.table.project_id}.{source.table.dataset_id}.{source.table.table_id}"
212-
)
210+
full_table_name = source.table.get_full_id(quoted=False)
213211
# Physical schema might include unused columns, unsupported datatypes like JSON
214212
physical_schema = ibis_bigquery.BigQuerySchema.to_ibis(
215213
list(source.table.physical_schema)

bigframes/core/compile/sqlglot/compiler.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
from bigframes.core import (
2323
agg_expressions,
24+
bq_data,
2425
expression,
2526
guid,
2627
identifiers,
@@ -173,10 +174,21 @@ def compile_readlocal(node: nodes.ReadLocalNode, child: ir.SQLGlotIR) -> ir.SQLG
173174
@_compile_node.register
174175
def compile_readtable(node: nodes.ReadTableNode, child: ir.SQLGlotIR):
175176
table = node.source.table
177+
if isinstance(table, bq_data.GbqNativeTable):
178+
project, dataset, table_id = table.project_id, table.dataset_id, table.table_id
179+
elif isinstance(table, bq_data.BiglakeIcebergTable):
180+
project, dataset, table_id = (
181+
table.project_id,
182+
table.catalog_id,
183+
f"{table.namespace_id}.{table.table_id}",
184+
)
185+
186+
else:
187+
raise ValueError(f"Unrecognized table type: {table}")
176188
return ir.SQLGlotIR.from_table(
177-
table.project_id,
178-
table.dataset_id,
179-
table.table_id,
189+
project,
190+
dataset,
191+
table_id,
180192
col_names=[col.source_id for col in node.scan_list.items],
181193
alias_names=[col.id.sql for col in node.scan_list.items],
182194
uid_gen=child.uid_gen,

bigframes/core/nodes.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -825,9 +825,7 @@ def variables_introduced(self) -> int:
825825

826826
@property
827827
def row_count(self) -> typing.Optional[int]:
828-
if self.source.sql_predicate is None and self.source.table.is_physically_stored:
829-
return self.source.n_rows
830-
return None
828+
return self.source.n_rows
831829

832830
@property
833831
def node_defined_ids(self) -> Tuple[identifiers.ColumnId, ...]:

0 commit comments

Comments
 (0)