Skip to content

Commit ff83110

Browse files
committed
feat(bigquery): add read_csv, read_json, read_parquet support
1 parent c3cf316 commit ff83110

File tree

2 files changed

+155
-62
lines changed

2 files changed

+155
-62
lines changed

ibis/backends/bigquery/__init__.py

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@
22

33
from __future__ import annotations
44

5+
import concurrent.futures
56
import contextlib
7+
import glob
8+
import os
69
import re
710
import warnings
811
from functools import partial
@@ -40,6 +43,7 @@
4043

4144
if TYPE_CHECKING:
4245
from collections.abc import Iterable, Mapping
46+
from pathlib import Path
4347

4448
import pyarrow as pa
4549
from google.cloud.bigquery.table import RowIterator
@@ -147,6 +151,141 @@ def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
147151
)
148152
load_job.result()
149153

154+
def _read_file(
155+
self,
156+
path: str | Path,
157+
*,
158+
table_name: str | None = None,
159+
job_config: bq.LoadJobConfig,
160+
) -> ir.Table:
161+
self._make_session()
162+
163+
if table_name is None:
164+
table_name = util.gen_name(f"bq_read_{job_config.source_format}")
165+
166+
table_ref = self._session_dataset.table(table_name)
167+
168+
schema = self._session_dataset.dataset_id
169+
database = self._session_dataset.project
170+
171+
# drop the table if it exists
172+
#
173+
# we could do this with write_disposition = WRITE_TRUNCATE but then the
174+
# concurrent append jobs aren't possible
175+
#
176+
# dropping the table first means all write_dispositions can be
177+
# WRITE_APPEND
178+
self.drop_table(table_name, schema=schema, database=database, force=True)
179+
180+
if os.path.isdir(path):
181+
raise NotImplementedError("Reading from a directory is not supported.")
182+
elif str(path).startswith("gs://"):
183+
load_job = self.client.load_table_from_uri(
184+
path, table_ref, job_config=job_config
185+
)
186+
load_job.result()
187+
else:
188+
189+
def load(file: str) -> None:
190+
with open(file, mode="rb") as f:
191+
load_job = self.client.load_table_from_file(
192+
f, table_ref, job_config=job_config
193+
)
194+
load_job.result()
195+
196+
job_config.write_disposition = bq.WriteDisposition.WRITE_APPEND
197+
198+
with concurrent.futures.ThreadPoolExecutor() as executor:
199+
for fut in concurrent.futures.as_completed(
200+
executor.submit(load, file) for file in glob.glob(str(path))
201+
):
202+
fut.result()
203+
204+
return self.table(table_name, schema=schema, database=database)
205+
206+
def read_parquet(
207+
self, path: str | Path, table_name: str | None = None, **kwargs: Any
208+
):
209+
"""Read Parquet data into a BigQuery table.
210+
211+
Parameters
212+
----------
213+
path
214+
Path to a Parquet file on GCS or the local filesystem. Globs are supported.
215+
table_name
216+
Optional table name
217+
kwargs
218+
Additional keyword arguments passed to `google.cloud.bigquery.LoadJobConfig`.
219+
220+
Returns
221+
-------
222+
Table
223+
An Ibis table expression
224+
"""
225+
return self._read_file(
226+
path,
227+
table_name=table_name,
228+
job_config=bq.LoadJobConfig(
229+
source_format=bq.SourceFormat.PARQUET, **kwargs
230+
),
231+
)
232+
233+
def read_csv(
234+
self, path: str | Path, table_name: str | None = None, **kwargs: Any
235+
) -> ir.Table:
236+
"""Read CSV data into a BigQuery table.
237+
238+
Parameters
239+
----------
240+
path
241+
Path to a CSV file on GCS or the local filesystem. Globs are supported.
242+
table_name
243+
Optional table name
244+
kwargs
245+
Additional keyword arguments passed to
246+
`google.cloud.bigquery.LoadJobConfig`.
247+
248+
Returns
249+
-------
250+
Table
251+
An Ibis table expression
252+
"""
253+
job_config = bq.LoadJobConfig(
254+
source_format=bq.SourceFormat.CSV,
255+
autodetect=True,
256+
skip_leading_rows=1,
257+
**kwargs,
258+
)
259+
return self._read_file(path, table_name=table_name, job_config=job_config)
260+
261+
def read_json(
262+
self, path: str | Path, table_name: str | None = None, **kwargs: Any
263+
) -> ir.Table:
264+
"""Read newline-delimited JSON data into a BigQuery table.
265+
266+
Parameters
267+
----------
268+
path
269+
Path to a newline-delimited JSON file on GCS or the local
270+
filesystem. Globs are supported.
271+
table_name
272+
Optional table name
273+
kwargs
274+
Additional keyword arguments passed to
275+
`google.cloud.bigquery.LoadJobConfig`.
276+
277+
Returns
278+
-------
279+
Table
280+
An Ibis table expression
281+
"""
282+
job_config = bq.LoadJobConfig(
283+
source_format=bq.SourceFormat.NEWLINE_DELIMITED_JSON,
284+
autodetect=True,
285+
**kwargs,
286+
)
287+
return self._read_file(path, table_name=table_name, job_config=job_config)
288+
150289
def _from_url(self, url: str, **kwargs):
151290
result = urlparse(url)
152291
params = parse_qs(result.query)

ibis/backends/tests/test_register.py

Lines changed: 16 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -389,29 +389,16 @@ def test_register_garbage(con, monkeypatch):
389389

390390

391391
@pytest.mark.parametrize(
392-
("fname", "in_table_name", "out_table_name"),
392+
("fname", "in_table_name"),
393393
[
394-
(
395-
"functional_alltypes.parquet",
396-
None,
397-
"ibis_read_parquet",
398-
),
399-
("functional_alltypes.parquet", "funk_all", "funk_all"),
394+
("functional_alltypes.parquet", None),
395+
("functional_alltypes.parquet", "funk_all"),
400396
],
401397
)
402398
@pytest.mark.notyet(
403-
[
404-
"bigquery",
405-
"flink",
406-
"impala",
407-
"mssql",
408-
"mysql",
409-
"postgres",
410-
"sqlite",
411-
"trino",
412-
]
399+
["flink", "impala", "mssql", "mysql", "postgres", "sqlite", "trino"]
413400
)
414-
def test_read_parquet(con, tmp_path, data_dir, fname, in_table_name, out_table_name):
401+
def test_read_parquet(con, tmp_path, data_dir, fname, in_table_name):
415402
pq = pytest.importorskip("pyarrow.parquet")
416403

417404
fname = Path(fname)
@@ -426,10 +413,9 @@ def test_read_parquet(con, tmp_path, data_dir, fname, in_table_name, out_table_n
426413
fname = str(Path(fname).absolute())
427414
table = con.read_parquet(fname, table_name=in_table_name)
428415

429-
assert any(out_table_name in t for t in con.list_tables())
430-
431-
if con.name != "datafusion":
432-
table.count().execute()
416+
if in_table_name is not None:
417+
assert table.op().name == in_table_name
418+
assert table.count().execute()
433419

434420

435421
@pytest.fixture(scope="module")
@@ -441,17 +427,7 @@ def ft_data(data_dir):
441427

442428

443429
@pytest.mark.notyet(
444-
[
445-
"bigquery",
446-
"flink",
447-
"impala",
448-
"mssql",
449-
"mysql",
450-
"pandas",
451-
"postgres",
452-
"sqlite",
453-
"trino",
454-
]
430+
["flink", "impala", "mssql", "mysql", "pandas", "postgres", "sqlite", "trino"]
455431
)
456432
def test_read_parquet_glob(con, tmp_path, ft_data):
457433
pq = pytest.importorskip("pyarrow.parquet")
@@ -470,17 +446,7 @@ def test_read_parquet_glob(con, tmp_path, ft_data):
470446

471447

472448
@pytest.mark.notyet(
473-
[
474-
"bigquery",
475-
"flink",
476-
"impala",
477-
"mssql",
478-
"mysql",
479-
"pandas",
480-
"postgres",
481-
"sqlite",
482-
"trino",
483-
]
449+
["flink", "impala", "mssql", "mysql", "pandas", "postgres", "sqlite", "trino"]
484450
)
485451
def test_read_csv_glob(con, tmp_path, ft_data):
486452
pc = pytest.importorskip("pyarrow.csv")
@@ -500,7 +466,6 @@ def test_read_csv_glob(con, tmp_path, ft_data):
500466

501467
@pytest.mark.notyet(
502468
[
503-
"bigquery",
504469
"clickhouse",
505470
"dask",
506471
"datafusion",
@@ -554,33 +519,22 @@ def num_diamonds(data_dir):
554519

555520

556521
@pytest.mark.parametrize(
557-
("in_table_name", "out_table_name"),
558-
[
559-
param(None, "ibis_read_csv_", id="default"),
560-
param("fancy_stones", "fancy_stones", id="file_name"),
561-
],
522+
"in_table_name",
523+
[param(None, id="default"), param("fancy_stones", id="file_name")],
562524
)
563525
@pytest.mark.notyet(
564-
[
565-
"bigquery",
566-
"flink",
567-
"impala",
568-
"mssql",
569-
"mysql",
570-
"postgres",
571-
"sqlite",
572-
"trino",
573-
]
526+
["flink", "impala", "mssql", "mysql", "postgres", "sqlite", "trino"]
574527
)
575-
def test_read_csv(con, data_dir, in_table_name, out_table_name, num_diamonds):
528+
def test_read_csv(con, data_dir, in_table_name, num_diamonds):
576529
fname = "diamonds.csv"
577530
with pushd(data_dir / "csv"):
578531
if con.name == "pyspark":
579532
# pyspark doesn't respect CWD
580533
fname = str(Path(fname).absolute())
581534
table = con.read_csv(fname, table_name=in_table_name)
582535

583-
assert any(out_table_name in t for t in con.list_tables())
536+
if in_table_name is not None:
537+
assert table.op().name == in_table_name
584538

585539
special_types = DIAMONDS_COLUMN_TYPES.get(con.name, {})
586540

0 commit comments

Comments
 (0)