Skip to content

Commit 96d10a3

Browse files
committed
Add table_kwargs context manager to make pandas/Dask support dialect
Unlock SQLAlchemy ORM's `__table_args__` on the pandas/Dask `to_sql()` interface, in order to support CrateDB's special SQL DDL options.
1 parent 5e39bbf commit 96d10a3

File tree

5 files changed

+158
-1
lines changed

5 files changed

+158
-1
lines changed

CHANGES.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
## Unreleased
55
- Added/reactivated documentation as `sqlalchemy-cratedb`
66
- Added re-usable patches and polyfills from application adapters
7+
- Added `table_kwargs` context manager to make pandas/Dask support
8+
CrateDB dialect table options.
79

810
## 2024/06/13 0.37.0
911
- Added support for CrateDB's [FLOAT_VECTOR] data type and its accompanying

docs/support.md

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,36 @@ df.to_sql(
5454
)
5555
```
5656

57+
58+
(support-table-kwargs)=
59+
## Context Manager `table_kwargs`
60+
61+
:::{rubric} Background
62+
:::
63+
CrateDB's special SQL DDL options to support [](inv:crate-reference#partitioned-tables),
64+
[](inv:crate-reference#ddl-sharding), or [](inv:crate-reference#ddl-replication)
65+
sometimes can't be configured easily when SQLAlchemy is wrapped into a 3rd-party
66+
framework like pandas or Dask.
67+
68+
:::{rubric} Utility
69+
:::
70+
The `table_kwargs` utility is a context manager that is able to forward CrateDB's
71+
dialect-specific table creation options to the `sa.Table()` constructor call sites
72+
at runtime.
73+
74+
:::{rubric} Synopsis
75+
:::
76+
Using a context manager incantation like outlined below will render a
77+
`PARTITIONED BY ("time")` SQL clause, without touching the call site of
78+
`sa.Table(...)`.
79+
```python
80+
from sqlalchemy_cratedb.support import table_kwargs
81+
82+
with table_kwargs(crate_partitioned_by="time"):
83+
return df.to_sql(...)
84+
```
85+
86+
5787
(support-autoincrement)=
5888
## Synthetic Autoincrement using Timestamps
5989

src/sqlalchemy_cratedb/support/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from sqlalchemy_cratedb.support.pandas import insert_bulk
1+
from sqlalchemy_cratedb.support.pandas import insert_bulk, table_kwargs
22
from sqlalchemy_cratedb.support.polyfill import check_uniqueness_factory, refresh_after_dml, \
33
patch_autoincrement_timestamp
44
from sqlalchemy_cratedb.support.util import refresh_table, refresh_dirty
@@ -10,4 +10,5 @@
1010
refresh_after_dml,
1111
refresh_dirty,
1212
refresh_table,
13+
table_kwargs,
1314
]

src/sqlalchemy_cratedb/support/pandas.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,15 @@
1818
# However, if you have executed another commercial license agreement
1919
# with Crate these terms will supersede the license and you may use the
2020
# software solely pursuant to the terms of the relevant commercial agreement.
21+
from contextlib import contextmanager
22+
from typing import Any
23+
from unittest.mock import patch
24+
2125
import logging
2226

27+
import sqlalchemy as sa
28+
29+
from sqlalchemy_cratedb import SA_VERSION, SA_2_0
2330

2431
logger = logging.getLogger(__name__)
2532

@@ -60,3 +67,45 @@ def insert_bulk(pd_table, conn, keys, data_iter):
6067
cursor = conn._dbapi_connection.cursor()
6168
cursor.execute(sql=sql, bulk_parameters=data)
6269
cursor.close()
70+
71+
72+
@contextmanager
73+
def table_kwargs(**kwargs):
74+
"""
75+
Context manager for adding SQLAlchemy dialect-specific table options at runtime.
76+
77+
In certain cases where SQLAlchemy orchestration is implemented within a
78+
framework, like at this spot [1] in pandas' `SQLTable._create_table_setup`,
79+
it is not easily possible to forward SQLAlchemy dialect options at table
80+
creation time.
81+
82+
In order to augment the SQL DDL statement to make it honor database-specific
83+
dialect options, the only way to work around the unfortunate situation is by
84+
monkey-patching the call to `sa.Table()` at runtime, relaying additional
85+
dialect options through corresponding keyword arguments in their original
86+
`<dialect>_<kwarg>` format [2].
87+
88+
[1] https://github.com/pandas-dev/pandas/blob/v2.2.2/pandas/io/sql.py#L1282-L1285
89+
[2] https://docs.sqlalchemy.org/en/20/core/foundation.html#sqlalchemy.sql.base.DialectKWArgs.dialect_kwargs
90+
"""
91+
92+
if SA_VERSION < SA_2_0:
93+
_init_dist = sa.sql.schema.Table._init
94+
95+
def _init(self, name, metadata, *args, **kwargs_effective):
96+
kwargs_effective.update(kwargs)
97+
return _init_dist(self, name, metadata, *args, **kwargs_effective)
98+
99+
with patch("sqlalchemy.sql.schema.Table._init", _init):
100+
yield
101+
102+
else:
103+
new_dist = sa.sql.schema.Table._new
104+
105+
def _new(cls, *args: Any, **kw: Any) -> Any:
106+
kw.update(kwargs)
107+
table = new_dist(cls, *args, **kw)
108+
return table
109+
110+
with patch("sqlalchemy.sql.schema.Table._new", _new):
111+
yield

tests/test_support_pandas.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
import re
2+
import sys
3+
4+
import pytest
5+
import sqlalchemy as sa
6+
from sqlalchemy.exc import ProgrammingError
7+
from sqlalchemy.orm import sessionmaker
8+
9+
from pueblo.testing.pandas import makeTimeDataFrame
10+
11+
from sqlalchemy_cratedb import SA_VERSION, SA_1_4
12+
from sqlalchemy_cratedb.support.pandas import table_kwargs
13+
14+
TABLE_NAME = "foobar"
15+
INSERT_RECORDS = 42
16+
17+
# Create dataframe, to be used as input data.
18+
df = makeTimeDataFrame(nper=INSERT_RECORDS, freq="S")
19+
df["time"] = df.index
20+
21+
22+
@pytest.mark.skipif(sys.version_info < (3, 8), reason="Feature not supported on Python 3.7 and earlier")
23+
@pytest.mark.skipif(SA_VERSION < SA_1_4, reason="Feature not supported on SQLAlchemy 1.3 and earlier")
24+
def test_table_kwargs_partitioned_by(cratedb_service):
25+
"""
26+
Validate adding CrateDB dialect table option `PARTITIONED BY` at runtime.
27+
"""
28+
29+
engine = cratedb_service.database.engine
30+
session = sessionmaker(bind=engine)()
31+
32+
# Insert records from pandas dataframe.
33+
with table_kwargs(crate_partitioned_by="time"):
34+
with engine.connect() as con:
35+
df.to_sql(
36+
TABLE_NAME,
37+
con,
38+
if_exists="replace",
39+
index=False,
40+
)
41+
42+
# Synchronize writes.
43+
cratedb_service.database.refresh_table(TABLE_NAME)
44+
45+
# Inquire table cardinality.
46+
metadata = sa.MetaData()
47+
query = sa.select(sa.func.count()).select_from(sa.Table(TABLE_NAME, metadata))
48+
results = engine.execute(query)
49+
count = results.scalar()
50+
51+
# Compare outcome.
52+
assert count == INSERT_RECORDS
53+
54+
# Validate SQL DDL.
55+
ddl = cratedb_service.database.run_sql(f"SHOW CREATE TABLE {TABLE_NAME}")
56+
assert 'PARTITIONED BY ("time")' in ddl[0][0]
57+
58+
59+
@pytest.mark.skipif(sys.version_info < (3, 8), reason="Feature not supported on Python 3.7 and earlier")
60+
@pytest.mark.skipif(SA_VERSION < SA_1_4, reason="Feature not supported on SQLAlchemy 1.3 and earlier")
61+
def test_table_kwargs_unknown(cratedb_service):
62+
"""
63+
Validate behaviour when adding an unknown CrateDB dialect table option.
64+
"""
65+
engine = cratedb_service.database.engine
66+
with table_kwargs(crate_unknown_option="bazqux"):
67+
with engine.connect() as con:
68+
with pytest.raises(ProgrammingError) as ex:
69+
df.to_sql(
70+
TABLE_NAME,
71+
con,
72+
if_exists="replace",
73+
index=False,
74+
)
75+
assert ex.match(re.escape("ColumnUnknownException[Column bazqux unknown]"))

0 commit comments

Comments
 (0)