Skip to content

Feat: Update partitioning by DATE, DATETIME, TIMESTAMP, _PARTITIONDATE #1113

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 77 additions & 20 deletions sqlalchemy_bigquery/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -812,42 +812,99 @@ def _raise_for_type(self, option, value, expected_type):
)

def _process_time_partitioning(
self, table: Table, time_partitioning: TimePartitioning
self,
table: Table,
time_partitioning: TimePartitioning,
):
"""
Generates a SQL 'PARTITION BY' clause for partitioning a table by a date or timestamp.
Generates a SQL 'PARTITION BY' clause for partitioning a table,

Args:
- table (Table): The SQLAlchemy table object representing the BigQuery table to be partitioned.
- table (Table): The SQLAlchemy table object representing the BigQuery
table to be partitioned.
- time_partitioning (TimePartitioning): The time partitioning details,
including the field to be used for partitioning.

Returns:
- str: A SQL 'PARTITION BY' clause that uses either TIMESTAMP_TRUNC or DATE_TRUNC to
partition data on the specified field.
- str: A SQL 'PARTITION BY' clause.

Example:
- Given a table with a TIMESTAMP type column 'event_timestamp' and setting
'time_partitioning.field' to 'event_timestamp', the function returns
- Given a table with an 'event_timestamp' and setting time_partitioning.type
as DAY and by setting 'time_partitioning.field' as 'event_timestamp', the
function returns:
"PARTITION BY TIMESTAMP_TRUNC(event_timestamp, DAY)".

Current inputs allowed by BQ and covered by this function include:
* _PARTITIONDATE
* DATETIME_TRUNC(<datetime_column>, DAY/HOUR/MONTH/YEAR)
* TIMESTAMP_TRUNC(<timestamp_column>, DAY/HOUR/MONTH/YEAR)
* DATE_TRUNC(<date_column>, MONTH/YEAR)

Additional options allowed by BQ but not explicitly covered by this
function include:
* DATE(_PARTITIONTIME)
* DATE(<timestamp_column>)
* DATE(<datetime_column>)
* DATE column
"""
field = "_PARTITIONDATE"
trunc_fn = "DATE_TRUNC"

sqltypes = {
"_PARTITIONDATE": ("_PARTITIONDATE", None),
"TIMESTAMP": ("TIMESTAMP_TRUNC", {"DAY", "HOUR", "MONTH", "YEAR"}),
"DATETIME": ("DATETIME_TRUNC", {"DAY", "HOUR", "MONTH", "YEAR"}),
"DATE": ("DATE_TRUNC", {"MONTH", "YEAR"}),
}

# Extract field (i.e <column_name> or _PARTITIONDATE)
# AND extract the name of the column_type (i.e. "TIMESTAMP", "DATE",
# "DATETIME", "_PARTITIONDATE")
if time_partitioning.field is not None:
field = time_partitioning.field
if isinstance(
table.columns[time_partitioning.field].type,
sqlalchemy.sql.sqltypes.DATE,
):
return f"PARTITION BY {field}"
elif isinstance(
table.columns[time_partitioning.field].type,
sqlalchemy.sql.sqltypes.TIMESTAMP,
):
trunc_fn = "TIMESTAMP_TRUNC"
column_type = table.columns[field].type.__visit_name__.upper()

else:
field = "_PARTITIONDATE"
column_type = "_PARTITIONDATE"

# Extract time_partitioning.type_ (DAY, HOUR, MONTH, YEAR)
# i.e. generates one partition per type (1/DAY, 1/HOUR)
# NOTE: if time_partitioning.type_ == None, it gets
# immediately overwritten by python-bigquery to a default of DAY.
partitioning_period = time_partitioning.type_

# Extract the truncation_function (i.e. DATE_TRUNC)
# and the set of allowable partition_periods
# that can be used in that function
trunc_fn, allowed_partitions = sqltypes[column_type]

# Create output:
# Special Case: _PARTITIONDATE does NOT use a function or partitioning_period
if trunc_fn == "_PARTITIONDATE":
return f"PARTITION BY {field}"

# Special Case: BigQuery will not accept DAY as partitioning_period for
# DATE_TRUNC.
# However, the default argument in python-bigquery for TimePartioning
# is DAY. This case overwrites that to avoid making a breaking change in
# python-bigquery.
# https://github.com/googleapis/python-bigquery/blob/a4d9534a900f13ae7355904cda05097d781f27e3/google/cloud/bigquery/table.py#L2916
if trunc_fn == "DATE_TRUNC" and partitioning_period == "DAY":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this would be added when you complete the draft, I think we need to set the default value of partitioning_period somewhere

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Linchin

I am unclear on why we might need to create a default value of partitioning_period. It is set based on the value of time_partitioning.type_ on line 873.

time_partitioning.type_ is set in the python-bigquery library in the TimePartitioning class.

When that class is instantiated, the default for type_ in the __init__ is None but it is then processed and immediately overwritten by DAY as shown at the link above.

So by the time .type_ gets to us, it will either be DAY or will be set to something else that we use to set the variable named partitioning_period.

NOTE: I rename the variable to partitioning_period simply to make this code more readable. type_ seemed less intuitive than partitioning_period.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, I missed the part in __init__() that sets the default value.

raise ValueError(
"The TimePartitioning.type_ must be one of: "
f"{allowed_partitions}, received {partitioning_period}."
"NOTE: the `default` value for TimePartioning.type_ as set in "
"python-bigquery is 'DAY', if you wish to use 'DATE_TRUNC' "
"ensure that you overwrite the default TimePartitioning.type_. "
)

# Generic Case
if partitioning_period not in allowed_partitions:
raise ValueError(
"The TimePartitioning.type_ must be one of: "
f"{allowed_partitions}, received {partitioning_period}."
)

return f"PARTITION BY {trunc_fn}({field}, {time_partitioning.type_})"
return f"PARTITION BY {trunc_fn}({field}, {partitioning_period})"

def _process_range_partitioning(
self, table: Table, range_partitioning: RangePartitioning
Expand Down
11 changes: 10 additions & 1 deletion tests/system/test_sqlalchemy_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,16 @@ def test_dml(engine, session, table_dml):
assert len(result) == 0


@pytest.mark.parametrize("time_partitioning_field", ["timestamp_c", "date_c"])
@pytest.mark.parametrize(
"time_partitioning_field",
[
("timestamp_c"),
("datetime_c"),
# Fails because python-bigquery TimePartitioning.type_ defaults to "DAY", but
# the DATE_TRUNC() function only allows "MONTH"/"YEAR"
pytest.param("date_c", marks=[pytest.mark.xfail]),
],
)
def test_create_table(engine, bigquery_dataset, time_partitioning_field):
meta = MetaData()
Table(
Expand Down
193 changes: 123 additions & 70 deletions tests/unit/test_table_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,111 +104,161 @@ def test_table_clustering_fields_dialect_option_type_error(faux_conn):
)


def test_table_time_partitioning_dialect_option(faux_conn):
# expect table creation to fail as SQLite does not support partitioned tables
with pytest.raises(sqlite3.OperationalError):
setup_table(
faux_conn,
"some_table",
sqlalchemy.Column("id", sqlalchemy.Integer),
sqlalchemy.Column("createdAt", sqlalchemy.DateTime),
bigquery_time_partitioning=TimePartitioning(),
)
@pytest.mark.parametrize(
"column_dtype,time_partitioning_type,func_name",
[
# DATE dtype
pytest.param(
sqlalchemy.DATE,
TimePartitioningType.HOUR, # Only MONTH/YEAR are permitted in BigQuery
"DATE_TRUNC",
marks=pytest.mark.xfail,
),
pytest.param(
sqlalchemy.DATE,
TimePartitioningType.DAY, # Only MONTH/YEAR are permitted in BigQuery
"DATE_TRUNC",
marks=pytest.mark.xfail,
),
(sqlalchemy.DATE, TimePartitioningType.MONTH, "DATE_TRUNC"),
(sqlalchemy.DATE, TimePartitioningType.YEAR, "DATE_TRUNC"),
# TIMESTAMP dtype
(sqlalchemy.TIMESTAMP, TimePartitioningType.HOUR, "TIMESTAMP_TRUNC"),
(sqlalchemy.TIMESTAMP, TimePartitioningType.DAY, "TIMESTAMP_TRUNC"),
(sqlalchemy.TIMESTAMP, TimePartitioningType.MONTH, "TIMESTAMP_TRUNC"),
(sqlalchemy.TIMESTAMP, TimePartitioningType.YEAR, "TIMESTAMP_TRUNC"),
# DATETIME dtype
(sqlalchemy.DATETIME, TimePartitioningType.HOUR, "DATETIME_TRUNC"),
(sqlalchemy.DATETIME, TimePartitioningType.DAY, "DATETIME_TRUNC"),
(sqlalchemy.DATETIME, TimePartitioningType.MONTH, "DATETIME_TRUNC"),
(sqlalchemy.DATETIME, TimePartitioningType.YEAR, "DATETIME_TRUNC"),
# TimePartitioning.type_ == None
(sqlalchemy.DATETIME, None, "DATETIME_TRUNC"),
],
)
def test_table_time_partitioning_given_field_and_type__dialect_options(
faux_conn, column_dtype, time_partitioning_type, func_name
):
"""NOTE: Expect table creation to fail as SQLite does not support
partitioned tables, despite that, we are still able to test the generation
of SQL statements.

assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == (
"CREATE TABLE `some_table` ( `id` INT64, `createdAt` DATETIME )"
" PARTITION BY DATE_TRUNC(_PARTITIONDATE, DAY)"
)
Each parametrization ensures that the appropriate function is generated
depending on whether the column datatype is DATE, TIMESTAMP, DATETIME and
whether the TimePartitioningType is HOUR, DAY, MONTH, YEAR.

`DATE_TRUNC` only returns a result if TimePartitioningType is DAY, MONTH,
YEAR. BigQuery cannot partition on DATE by HOUR, so that is expected to
xfail.

def test_table_require_partition_filter_dialect_option(faux_conn):
# expect table creation to fail as SQLite does not support partitioned tables
with pytest.raises(sqlite3.OperationalError):
setup_table(
faux_conn,
"some_table",
sqlalchemy.Column("createdAt", sqlalchemy.DateTime),
bigquery_time_partitioning=TimePartitioning(field="createdAt"),
bigquery_require_partition_filter=True,
)
A distinguishing characteristic of this test is we provide an argument to
the TimePartitioning class for both field and type_.

assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == (
"CREATE TABLE `some_table` ( `createdAt` DATETIME )"
" PARTITION BY DATE_TRUNC(createdAt, DAY)"
" OPTIONS(require_partition_filter=true)"
)
Special case: IF time_partitioning_type is None, the __init__() in the
TimePartitioning class will overwrite it with TimePartitioningType.DAY as
the default.
"""

if time_partitioning_type is None:
time_partitioning_type = TimePartitioningType.DAY

def test_table_time_partitioning_with_field_dialect_option(faux_conn):
# expect table creation to fail as SQLite does not support partitioned tables
with pytest.raises(sqlite3.OperationalError):
setup_table(
faux_conn,
"some_table",
sqlalchemy.Column("id", sqlalchemy.Integer),
sqlalchemy.Column("createdAt", sqlalchemy.DateTime),
bigquery_time_partitioning=TimePartitioning(field="createdAt"),
sqlalchemy.Column("createdAt", column_dtype),
bigquery_time_partitioning=TimePartitioning(
field="createdAt", type_=time_partitioning_type
),
)

assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == (
"CREATE TABLE `some_table` ( `id` INT64, `createdAt` DATETIME )"
" PARTITION BY DATE_TRUNC(createdAt, DAY)"
result = " ".join(faux_conn.test_data["execute"][-1][0].strip().split())
expected = (
f"CREATE TABLE `some_table` ( `id` INT64, `createdAt` {column_dtype.__visit_name__} )"
f" PARTITION BY {func_name}(createdAt, {time_partitioning_type})"
)
assert result == expected


def test_table_time_partitioning_by_month_dialect_option(faux_conn):
# expect table creation to fail as SQLite does not support partitioned tables
with pytest.raises(sqlite3.OperationalError):
setup_table(
faux_conn,
"some_table",
sqlalchemy.Column("id", sqlalchemy.Integer),
sqlalchemy.Column("createdAt", sqlalchemy.DateTime),
bigquery_time_partitioning=TimePartitioning(
field="createdAt",
type_=TimePartitioningType.MONTH,
),
)
def test_table_time_partitioning_given_field_but_no_type__dialect_option(faux_conn):
"""Expect table creation to fail as SQLite does not support partitioned tables

assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == (
"CREATE TABLE `some_table` ( `id` INT64, `createdAt` DATETIME )"
" PARTITION BY DATE_TRUNC(createdAt, MONTH)"
)
Confirms that if the column datatype is DATETIME but no TimePartitioning.type_
has been supplied, the system will default to DAY.

A distinguishing characteristic of this test is we provide an argument to
the TimePartitioning class for field but not type_.
"""

def test_table_time_partitioning_with_timestamp_dialect_option(faux_conn):
# expect table creation to fail as SQLite does not support partitioned tables
with pytest.raises(sqlite3.OperationalError):
setup_table(
faux_conn,
"some_table",
sqlalchemy.Column("id", sqlalchemy.Integer),
sqlalchemy.Column("createdAt", sqlalchemy.TIMESTAMP),
sqlalchemy.Column("createdAt", sqlalchemy.DateTime),
bigquery_time_partitioning=TimePartitioning(field="createdAt"),
)

assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == (
"CREATE TABLE `some_table` ( `id` INT64, `createdAt` TIMESTAMP )"
" PARTITION BY TIMESTAMP_TRUNC(createdAt, DAY)"
result = " ".join(faux_conn.test_data["execute"][-1][0].strip().split())
Copy link
Contributor

@Linchin Linchin Nov 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems the second half of this test does the following, as the docstring suggests:

Confirms that if the column datatype is DATETIME but no TimePartitioning.type_
    has been supplied, the system will default to DAY.

Maybe I'm missing something, but I think it's a duplicate of the test case at line 136.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The difference between this test and the parametrized test above is minor.

Essentially, in the parameters on line 136 we explicitly provide a value of None as a type_ argument to the TimePartitioning class.

We know the default is None in python-bigquery but we set the value deliberately.

In line 193, we do not set the value of type_ explicitly, we allow the default value (which should be None) come through and thus confirm that everything works as expected whether we provide a value to type_ or not.

expected = (
"CREATE TABLE `some_table` ( `id` INT64, `createdAt` DATETIME )"
" PARTITION BY DATETIME_TRUNC(createdAt, DAY)"
)
assert result == expected


def test_table_time_partitioning_with_date_dialect_option(faux_conn):
# expect table creation to fail as SQLite does not support partitioned tables
@pytest.mark.parametrize(
"column_dtype,time_partitioning_type",
[
pytest.param(
sqlalchemy.DATE,
TimePartitioningType.HOUR,
marks=pytest.mark.xfail,
),
(sqlalchemy.DATE, TimePartitioningType.DAY),
(sqlalchemy.DATE, TimePartitioningType.MONTH),
(sqlalchemy.DATE, TimePartitioningType.YEAR),
],
)
def test_table_time_partitioning_given_type__but_no_field_dialect_option(
faux_conn,
column_dtype,
time_partitioning_type,
):
"""NOTE: Expect table creation to fail as SQLite does not support
partitioned tables, despite that, we are still able to test the generation
of SQL statements

If the `field` argument to TimePartitioning() is not provided, it defaults to
None. That causes the pseudocolumn "_PARTITIONDATE" to be used by default as
the column to partition by.

_PARTITIONTIME only returns a result if TimePartitioningType is DAY, MONTH,
YEAR. BigQuery cannot partition on _PARTITIONDATE by HOUR, so that is
expected to xfail.

A distinguishing characteristic of this test is we provide an argument to
the TimePartitioning class for type_ but not field.
"""

with pytest.raises(sqlite3.OperationalError):
setup_table(
faux_conn,
"some_table_2",
sqlalchemy.Column("id", sqlalchemy.Integer),
sqlalchemy.Column("createdAt", sqlalchemy.DATE),
bigquery_time_partitioning=TimePartitioning(field="createdAt"),
sqlalchemy.Column("createdAt", column_dtype),
bigquery_time_partitioning=TimePartitioning(type_=time_partitioning_type),
)

# confirm that the following code creates the correct SQL string
assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == (
"CREATE TABLE `some_table_2` ( `id` INT64, `createdAt` DATE )"
" PARTITION BY createdAt"
result = " ".join(faux_conn.test_data["execute"][-1][0].strip().split())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this test, we do NOT provide a field argument to the TimePartitioning class whereas in the tests above, we do.

This is needed to confirm that in the absence of a field argument the SQL statement will not use a function, etc but will instead use _PARTITIONDATE

Both the tests above end up including a truncation function but this on does not (and should not).


# We need two versions of expected depending on whether we use _PARTITIONDATE
expected = (
f"CREATE TABLE `some_table_2` ( `id` INT64, `createdAt` {column_dtype.__visit_name__} )"
f" PARTITION BY _PARTITIONDATE"
)
assert result == expected


def test_table_time_partitioning_dialect_option_partition_expiration_days(faux_conn):
Expand All @@ -227,7 +277,7 @@ def test_table_time_partitioning_dialect_option_partition_expiration_days(faux_c

assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == (
"CREATE TABLE `some_table` ( `createdAt` DATETIME )"
" PARTITION BY DATE_TRUNC(createdAt, DAY)"
" PARTITION BY DATETIME_TRUNC(createdAt, DAY)"
" OPTIONS(partition_expiration_days=0.25)"
)

Expand Down Expand Up @@ -400,13 +450,16 @@ def test_table_all_dialect_option(faux_conn):
),
)

assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == (
result = " ".join(faux_conn.test_data["execute"][-1][0].strip().split())
expected = (
"CREATE TABLE `some_table` ( `id` INT64, `country` STRING, `town` STRING, `createdAt` DATETIME )"
" PARTITION BY DATE_TRUNC(createdAt, DAY)"
" PARTITION BY DATETIME_TRUNC(createdAt, DAY)"
" CLUSTER BY country, town"
" OPTIONS(partition_expiration_days=30.0, expiration_timestamp=TIMESTAMP '2038-01-01 00:00:00+00:00', require_partition_filter=true, default_rounding_mode='ROUND_HALF_EVEN')"
)

assert result == expected


def test_validate_friendly_name_value_type(ddl_compiler):
# expect option value to be transformed as a string expression
Expand Down