Skip to content

Commit 2d36722

Browse files
cpcloudkszucs
authored andcommitted
feat(dask): enable pyarrow conversion
1 parent 72aa573 commit 2d36722

File tree

5 files changed

+27
-29
lines changed

5 files changed

+27
-29
lines changed

ibis/backends/base/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -945,7 +945,7 @@ def _run_pre_execute_hooks(self, expr: ir.Expr) -> None:
945945
self._register_in_memory_tables(expr)
946946

947947
def _define_udf_translation_rules(self, expr):
948-
if self.supports_in_memory_tables:
948+
if self.supports_python_udfs:
949949
raise NotImplementedError(self.name)
950950

951951
def compile(

ibis/backends/dask/__init__.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
if TYPE_CHECKING:
2323
from collections.abc import Mapping, MutableMapping
24+
from pathlib import Path
2425

2526
# Make sure that the pandas backend options have been loaded
2627
ibis.pandas # noqa: B018
@@ -29,6 +30,7 @@
2930
class Backend(BasePandasBackend):
3031
name = "dask"
3132
backend_table_type = dd.DataFrame
33+
supports_in_memory_tables = False
3234

3335
def do_connect(
3436
self,
@@ -133,3 +135,8 @@ def _convert_object(cls, obj: dd.DataFrame) -> dd.DataFrame:
133135

134136
def _load_into_cache(self, name, expr):
135137
self.create_table(name, self.compile(expr).persist())
138+
139+
def read_delta(
140+
self, source: str | Path, table_name: str | None = None, **kwargs: Any
141+
):
142+
raise NotImplementedError(self.name)

ibis/backends/pandas/__init__.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -229,10 +229,6 @@ def has_operation(cls, operation: type[ops.Value]) -> bool:
229229
def _clean_up_cached_table(self, op):
230230
del self.dictionary[op.name]
231231

232-
233-
class Backend(BasePandasBackend):
234-
name = "pandas"
235-
236232
def to_pyarrow(
237233
self,
238234
expr: ir.Expr,
@@ -264,6 +260,10 @@ def to_pyarrow_batches(
264260
pa_table.schema, pa_table.to_batches(max_chunksize=chunk_size)
265261
)
266262

263+
264+
class Backend(BasePandasBackend):
265+
name = "pandas"
266+
267267
def execute(self, query, params=None, limit="default", **kwargs):
268268
from ibis.backends.pandas.core import execute_and_reset
269269

ibis/backends/tests/test_dataframe_interchange.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
]
1414

1515

16-
@pytest.mark.notimpl(["dask", "druid"])
16+
@pytest.mark.notimpl(["druid"])
1717
@pytest.mark.notimpl(
1818
["impala"], raises=AttributeError, reason="missing `fetchmany` on the cursor"
1919
)
@@ -60,7 +60,6 @@ def test_dataframe_interchange_no_execute(con, alltypes, mocker):
6060
assert not to_pyarrow.called
6161

6262

63-
@pytest.mark.notimpl(["dask"])
6463
@pytest.mark.notimpl(
6564
["impala"], raises=AttributeError, reason="missing `fetchmany` on the cursor"
6665
)
@@ -80,7 +79,7 @@ def test_dataframe_interchange_dataframe_methods_execute(con, alltypes, mocker):
8079
assert to_pyarrow.call_count == 1
8180

8281

83-
@pytest.mark.notimpl(["dask", "druid"])
82+
@pytest.mark.notimpl(["druid"])
8483
@pytest.mark.notimpl(
8584
["impala"], raises=AttributeError, reason="missing `fetchmany` on the cursor"
8685
)
@@ -112,7 +111,6 @@ def test_dataframe_interchange_column_methods_execute(con, alltypes, mocker):
112111
assert col2.size() == pa_col2.size()
113112

114113

115-
@pytest.mark.notimpl(["dask"])
116114
@pytest.mark.notimpl(
117115
["impala"], raises=AttributeError, reason="missing `fetchmany` on the cursor"
118116
)

ibis/backends/tests/test_export.py

Lines changed: 13 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,7 @@
4343
]
4444

4545
no_limit = [
46-
param(
47-
None, id="nolimit", marks=[pytest.mark.notimpl(["dask", "impala", "pyspark"])]
48-
)
46+
param(None, id="nolimit", marks=[pytest.mark.notimpl(["impala", "pyspark"])])
4947
]
5048

5149
limit_no_limit = limit + no_limit
@@ -117,7 +115,7 @@ def test_scalar_to_pyarrow_scalar(limit, awards_players):
117115
}
118116

119117

120-
@pytest.mark.notimpl(["dask", "impala", "pyspark", "druid"])
118+
@pytest.mark.notimpl(["impala", "pyspark", "druid"])
121119
def test_table_to_pyarrow_table_schema(con, awards_players):
122120
table = awards_players.to_pyarrow()
123121
assert isinstance(table, pa.Table)
@@ -136,7 +134,7 @@ def test_table_to_pyarrow_table_schema(con, awards_players):
136134
assert table.schema == expected_schema
137135

138136

139-
@pytest.mark.notimpl(["dask", "impala", "pyspark"])
137+
@pytest.mark.notimpl(["impala", "pyspark"])
140138
def test_column_to_pyarrow_table_schema(awards_players):
141139
expr = awards_players.awardID
142140
array = expr.to_pyarrow()
@@ -193,15 +191,15 @@ def test_to_pyarrow_batches_borked_types(batting):
193191
util.consume(batch_reader)
194192

195193

196-
@pytest.mark.notimpl(["dask", "impala", "pyspark"])
194+
@pytest.mark.notimpl(["impala", "pyspark"])
197195
def test_to_pyarrow_memtable(con):
198196
expr = ibis.memtable({"x": [1, 2, 3]})
199197
table = con.to_pyarrow(expr)
200198
assert isinstance(table, pa.Table)
201199
assert len(table) == 3
202200

203201

204-
@pytest.mark.notimpl(["dask", "impala", "pyspark"])
202+
@pytest.mark.notimpl(["impala", "pyspark"])
205203
def test_to_pyarrow_batches_memtable(con):
206204
expr = ibis.memtable({"x": [1, 2, 3]})
207205
n = 0
@@ -212,7 +210,7 @@ def test_to_pyarrow_batches_memtable(con):
212210
assert n == 3
213211

214212

215-
@pytest.mark.notimpl(["dask", "impala", "pyspark"])
213+
@pytest.mark.notimpl(["impala", "pyspark"])
216214
def test_table_to_parquet(tmp_path, backend, awards_players):
217215
outparquet = tmp_path / "out.parquet"
218216
awards_players.to_parquet(outparquet)
@@ -265,9 +263,7 @@ def test_roundtrip_partitioned_parquet(tmp_path, con, backend, awards_players):
265263
backend.assert_frame_equal(reingest.to_pandas(), awards_players.to_pandas())
266264

267265

268-
@pytest.mark.notimpl(
269-
["dask", "impala", "pyspark"], reason="No support for exporting files"
270-
)
266+
@pytest.mark.notimpl(["impala", "pyspark"], reason="No support for exporting files")
271267
@pytest.mark.parametrize("ftype", ["csv", "parquet"])
272268
def test_memtable_to_file(tmp_path, con, ftype, monkeypatch):
273269
"""
@@ -288,7 +284,7 @@ def test_memtable_to_file(tmp_path, con, ftype, monkeypatch):
288284
assert outfile.is_file()
289285

290286

291-
@pytest.mark.notimpl(["dask", "impala", "pyspark"])
287+
@pytest.mark.notimpl(["impala", "pyspark"])
292288
def test_table_to_csv(tmp_path, backend, awards_players):
293289
outcsv = tmp_path / "out.csv"
294290

@@ -314,7 +310,6 @@ def test_table_to_csv(tmp_path, backend, awards_players):
314310
["impala"], raises=AttributeError, reason="fetchmany doesn't exist"
315311
),
316312
pytest.mark.notyet(["druid"], raises=sa.exc.ProgrammingError),
317-
pytest.mark.notyet(["dask"], raises=NotImplementedError),
318313
pytest.mark.notyet(["pyspark"], raises=NotImplementedError),
319314
],
320315
),
@@ -329,7 +324,6 @@ def test_table_to_csv(tmp_path, backend, awards_players):
329324
["druid", "snowflake", "trino"], raises=sa.exc.ProgrammingError
330325
),
331326
pytest.mark.notyet(["oracle"], raises=sa.exc.DatabaseError),
332-
pytest.mark.notyet(["dask"], raises=NotImplementedError),
333327
pytest.mark.notyet(["mssql", "mysql"], raises=sa.exc.OperationalError),
334328
pytest.mark.notyet(["pyspark"], raises=ParseException),
335329
],
@@ -390,7 +384,6 @@ def test_roundtrip_delta(con, alltypes, tmp_path, monkeypatch):
390384
@pytest.mark.xfail_version(
391385
duckdb=["duckdb<0.8.1"], raises=AssertionError, reason="bug in duckdb"
392386
)
393-
@pytest.mark.notimpl(["dask"], raises=NotImplementedError)
394387
@pytest.mark.notimpl(
395388
["druid"], raises=AttributeError, reason="string type is used for timestamp_col"
396389
)
@@ -419,7 +412,7 @@ def test_arrow_timestamp_with_time_zone(alltypes):
419412
assert batch.schema.types == expected
420413

421414

422-
@pytest.mark.notimpl(["dask", "druid"])
415+
@pytest.mark.notimpl(["druid"])
423416
@pytest.mark.notimpl(
424417
["impala"], raises=AttributeError, reason="missing `fetchmany` on the cursor"
425418
)
@@ -447,7 +440,7 @@ def test_empty_memtable(backend, con):
447440
backend.assert_frame_equal(result, expected)
448441

449442

450-
@pytest.mark.notimpl(["dask", "flink", "impala", "pyspark"])
443+
@pytest.mark.notimpl(["flink", "impala", "pyspark"])
451444
def test_to_pandas_batches_empty_table(backend, con):
452445
t = backend.functional_alltypes.limit(0)
453446
n = t.count().execute()
@@ -456,7 +449,7 @@ def test_to_pandas_batches_empty_table(backend, con):
456449
assert sum(map(len, t.to_pandas_batches())) == n
457450

458451

459-
@pytest.mark.notimpl(["dask", "druid", "flink", "impala", "pyspark"])
452+
@pytest.mark.notimpl(["druid", "flink", "impala", "pyspark"])
460453
@pytest.mark.parametrize("n", [None, 1])
461454
def test_to_pandas_batches_nonempty_table(backend, con, n):
462455
t = backend.functional_alltypes.limit(n)
@@ -466,7 +459,7 @@ def test_to_pandas_batches_nonempty_table(backend, con, n):
466459
assert sum(map(len, t.to_pandas_batches())) == n
467460

468461

469-
@pytest.mark.notimpl(["dask", "flink", "impala", "pyspark"])
462+
@pytest.mark.notimpl(["flink", "impala", "pyspark"])
470463
@pytest.mark.parametrize("n", [None, 0, 1, 2])
471464
def test_to_pandas_batches_column(backend, con, n):
472465
t = backend.functional_alltypes.limit(n).timestamp_col
@@ -476,7 +469,7 @@ def test_to_pandas_batches_column(backend, con, n):
476469
assert sum(map(len, t.to_pandas_batches())) == n
477470

478471

479-
@pytest.mark.notimpl(["dask", "druid", "flink", "impala", "pyspark"])
472+
@pytest.mark.notimpl(["druid", "flink", "impala", "pyspark"])
480473
def test_to_pandas_batches_scalar(backend, con):
481474
t = backend.functional_alltypes.timestamp_col.max()
482475
expected = t.execute()

0 commit comments

Comments
 (0)