Skip to content

Feat: Update partitioning by DATE, DATETIME, TIMESTAMP, _PARTITIONDATE #1113

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 47 additions & 7 deletions sqlalchemy_bigquery/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -812,7 +812,9 @@ def _raise_for_type(self, option, value, expected_type):
)

def _process_time_partitioning(
self, table: Table, time_partitioning: TimePartitioning
self,
table: Table,
time_partitioning: TimePartitioning,
):
"""
Generates a SQL 'PARTITION BY' clause for partitioning a table by a date or timestamp.
Expand All @@ -830,23 +832,61 @@ def _process_time_partitioning(
- Given a table with a TIMESTAMP type column 'event_timestamp' and setting
'time_partitioning.field' to 'event_timestamp', the function returns
"PARTITION BY TIMESTAMP_TRUNC(event_timestamp, DAY)".

Current inputs allowed by BQ and covered by this function include:
* _PARTITIONDATE
* DATETIME_TRUNC(<datetime_column>, DAY/HOUR/MONTH/YEAR)
* TIMESTAMP_TRUNC(<timestamp_column>, DAY/HOUR/MONTH/YEAR)
* DATE_TRUNC(<date_column>, MONTH/YEAR)

Additional allowed options not explicitly covered by this function
include:
* DATE(_PARTITIONTIME)
* DATE(<timestamp_column>)
* DATE(<datetime_column>)
* DATE column
"""

field = "_PARTITIONDATE"
trunc_fn = "DATE_TRUNC"

# Format used with _PARTITIONDATE which can only be used for
# DAY / MONTH / YEAR
if time_partitioning.field is None and field == "_PARTITIONDATE":
if time_partitioning.type_ in {"DAY", "MONTH", "YEAR"}:
return f"PARTITION BY {trunc_fn}({field})"
else:
raise ValueError(
f"_PARTITIONDATE can only be used with TimePartitioningTypes {{DAY, MONTH, YEAR}} received {time_partitioning.type_}"
)

if time_partitioning.field is not None:
field = time_partitioning.field

if isinstance(
table.columns[time_partitioning.field].type,
sqlalchemy.sql.sqltypes.DATE,
table.columns[field].type,
(sqlalchemy.sql.sqltypes.TIMESTAMP),
):
return f"PARTITION BY {field}"
trunc_fn = "TIMESTAMP_TRUNC"
elif isinstance(
table.columns[time_partitioning.field].type,
sqlalchemy.sql.sqltypes.TIMESTAMP,
table.columns[field].type,
sqlalchemy.sql.sqltypes.DATETIME,
):
trunc_fn = "TIMESTAMP_TRUNC"
trunc_fn = "DATETIME_TRUNC"

if isinstance(
table.columns[field].type,
sqlalchemy.sql.sqltypes.DATE,
):
if time_partitioning.type_ in {"DAY", "MONTH", "YEAR"}:
# CHECK for type: DAY/MONTH/YEAR
trunc_fn = "DATE_TRUNC"
else:
raise ValueError(
f"DATE_TRUNC can only be used with TimePartitioningTypes {{DAY, MONTH, YEAR}} received {time_partitioning.type_}"
)

# Format used with generically with DATE, TIMESTAMP, DATETIME
return f"PARTITION BY {trunc_fn}({field}, {time_partitioning.type_})"

def _process_range_partitioning(
Expand Down
134 changes: 84 additions & 50 deletions tests/unit/test_table_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,20 +104,59 @@ def test_table_clustering_fields_dialect_option_type_error(faux_conn):
)


def test_table_time_partitioning_dialect_option(faux_conn):
# expect table creation to fail as SQLite does not support partitioned tables
# DATETIME with type and field
@pytest.mark.parametrize(
"column_dtype,time_partitioning_type,func_name",
[
# DATE dtype
pytest.param(
sqlalchemy.DATE,
TimePartitioningType.HOUR,
"DATE_TRUNC",
marks=pytest.mark.xfail,
),
(sqlalchemy.DATE, TimePartitioningType.DAY, "DATE_TRUNC"),
(sqlalchemy.DATE, TimePartitioningType.MONTH, "DATE_TRUNC"),
(sqlalchemy.DATE, TimePartitioningType.YEAR, "DATE_TRUNC"),
# TIMESTAMP dtype
(sqlalchemy.TIMESTAMP, TimePartitioningType.HOUR, "TIMESTAMP_TRUNC"),
(sqlalchemy.TIMESTAMP, TimePartitioningType.DAY, "TIMESTAMP_TRUNC"),
(sqlalchemy.TIMESTAMP, TimePartitioningType.MONTH, "TIMESTAMP_TRUNC"),
(sqlalchemy.TIMESTAMP, TimePartitioningType.YEAR, "TIMESTAMP_TRUNC"),
# DATETIME dtype
(sqlalchemy.DATETIME, TimePartitioningType.HOUR, "DATETIME_TRUNC"),
(sqlalchemy.DATETIME, TimePartitioningType.DAY, "DATETIME_TRUNC"),
(sqlalchemy.DATETIME, TimePartitioningType.MONTH, "DATETIME_TRUNC"),
(sqlalchemy.DATETIME, TimePartitioningType.YEAR, "DATETIME_TRUNC"),
],
)
def test_table_time_partitioning_date_timestamp_and_datetime_dialect_option(
faux_conn, column_dtype, time_partitioning_type, func_name
):
"""Expect table creation to fail as SQLite does not support partitioned tables

Each parametrization ensures that the appropriate function is generated
depending on whether the column datatype is DATE, TIMESTAMP, DATETIME and
whether the TimePartitioningType is HOUR, DAY, MONTH, YEAR.

Notes:
* BigQuery will not partition on DATE by HOUR, so that is expected to xfail.
"""

with pytest.raises(sqlite3.OperationalError):
setup_table(
faux_conn,
"some_table",
sqlalchemy.Column("id", sqlalchemy.Integer),
sqlalchemy.Column("createdAt", sqlalchemy.DateTime),
bigquery_time_partitioning=TimePartitioning(),
sqlalchemy.Column("createdAt", column_dtype),
bigquery_time_partitioning=TimePartitioning(
field="createdAt", type_=time_partitioning_type
),
)

assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == (
"CREATE TABLE `some_table` ( `id` INT64, `createdAt` DATETIME )"
" PARTITION BY DATE_TRUNC(_PARTITIONDATE, DAY)"
f"CREATE TABLE `some_table` ( `id` INT64, `createdAt` {column_dtype.__visit_name__} )"
f" PARTITION BY {func_name}(createdAt, {time_partitioning_type})"
)


Expand All @@ -139,75 +178,70 @@ def test_table_require_partition_filter_dialect_option(faux_conn):
)


# DATETIME WITH FIELD but no TYPE: defaults to DAY
def test_table_time_partitioning_with_field_dialect_option(faux_conn):
# expect table creation to fail as SQLite does not support partitioned tables
with pytest.raises(sqlite3.OperationalError):
setup_table(
faux_conn,
"some_table",
sqlalchemy.Column("id", sqlalchemy.Integer),
sqlalchemy.Column("createdAt", sqlalchemy.DateTime),
bigquery_time_partitioning=TimePartitioning(field="createdAt"),
)

assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == (
"CREATE TABLE `some_table` ( `id` INT64, `createdAt` DATETIME )"
" PARTITION BY DATE_TRUNC(createdAt, DAY)"
)
"""Expect table creation to fail as SQLite does not support partitioned tables

Confirms that if the column datatype is DATETIME but no TIMEPARTITIONINGTYPE
has been supplied, the system will default to DAY.
"""

def test_table_time_partitioning_by_month_dialect_option(faux_conn):
# expect table creation to fail as SQLite does not support partitioned tables
with pytest.raises(sqlite3.OperationalError):
setup_table(
faux_conn,
"some_table",
sqlalchemy.Column("id", sqlalchemy.Integer),
sqlalchemy.Column("createdAt", sqlalchemy.DateTime),
bigquery_time_partitioning=TimePartitioning(
field="createdAt",
type_=TimePartitioningType.MONTH,
),
)

assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == (
"CREATE TABLE `some_table` ( `id` INT64, `createdAt` DATETIME )"
" PARTITION BY DATE_TRUNC(createdAt, MONTH)"
)


def test_table_time_partitioning_with_timestamp_dialect_option(faux_conn):
# expect table creation to fail as SQLite does not support partitioned tables
with pytest.raises(sqlite3.OperationalError):
setup_table(
faux_conn,
"some_table",
sqlalchemy.Column("id", sqlalchemy.Integer),
sqlalchemy.Column("createdAt", sqlalchemy.TIMESTAMP),
bigquery_time_partitioning=TimePartitioning(field="createdAt"),
)

assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == (
"CREATE TABLE `some_table` ( `id` INT64, `createdAt` TIMESTAMP )"
" PARTITION BY TIMESTAMP_TRUNC(createdAt, DAY)"
"CREATE TABLE `some_table` ( `id` INT64, `createdAt` DATETIME )"
" PARTITION BY DATE_TRUNC(createdAt, DAY)"
)


def test_table_time_partitioning_with_date_dialect_option(faux_conn):
# expect table creation to fail as SQLite does not support partitioned tables
@pytest.mark.parametrize(
"column_dtype,time_partitioning_type,func_name",
[
pytest.param(
sqlalchemy.DATE,
TimePartitioningType.HOUR,
"DATE_TRUNC",
marks=pytest.mark.xfail,
),
(sqlalchemy.DATE, TimePartitioningType.DAY, "DATE_TRUNC"),
(sqlalchemy.DATE, TimePartitioningType.MONTH, "DATE_TRUNC"),
(sqlalchemy.DATE, TimePartitioningType.YEAR, "DATE_TRUNC"),
],
)
def test_table_time_partitioning_with_partitiondate_option(
faux_conn, column_dtype, time_partitioning_type, func_name
):
"""Expect table creation to fail as SQLite does not support partitioned tables

Each parametrization ensures that the appropriate function is generated
depending on the pseudocolumn datatype is _PARTITIONDATE and whether the
TimePartitioningType is HOUR, DAY, MONTH, YEAR.

Notes:
* BigQuery will not partition on _PARTITIONDATE by HOUR, so that is expected
to xfail.
"""
with pytest.raises(sqlite3.OperationalError):
setup_table(
faux_conn,
"some_table_2",
# schema=str([id_, createdAt]),
sqlalchemy.Column("id", sqlalchemy.Integer),
sqlalchemy.Column("createdAt", sqlalchemy.DATE),
bigquery_time_partitioning=TimePartitioning(field="createdAt"),
sqlalchemy.Column("createdAt", column_dtype),
bigquery_time_partitioning=TimePartitioning(type_=time_partitioning_type),
)

# confirm that the following code creates the correct SQL string
assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == (
"CREATE TABLE `some_table_2` ( `id` INT64, `createdAt` DATE )"
" PARTITION BY createdAt"
f"CREATE TABLE `some_table_2` ( `id` INT64, `createdAt` {column_dtype.__visit_name__} )"
f" PARTITION BY {func_name}(_PARTITIONDATE)"
)


Expand Down