Skip to content

Commit

Permalink
Add support for liquid clustering
Browse files Browse the repository at this point in the history
  • Loading branch information
RadekBuczkowski committed Nov 10, 2024
1 parent b7ac334 commit a48ae22
Show file tree
Hide file tree
Showing 13 changed files with 108 additions and 3 deletions.
4 changes: 4 additions & 0 deletions docs/configuration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ CREATE TABLE [ IF NOT EXISTS ] table_identifier
USING data_source
[ OPTIONS ( key1=val1, key2=val2, ... ) ]
[ PARTITIONED BY ( col_name1, col_name2, ... ) ]
[ CLUSTER BY ( col_name1, col_name2, ... ) ]
[ CLUSTERED BY ( col_name3, col_name4, ... )
[ SORTED BY ( col_name [ ASC | DESC ], ... ) ]
INTO num_buckets BUCKETS ]
Expand Down Expand Up @@ -218,6 +219,9 @@ MySpetlrTableReference:
partitioned_by:
- col_name1
- col_name2
cluster_by:
- col_name1
- col_name2
clustered_by:
cols:
- col_name3
Expand Down
4 changes: 4 additions & 0 deletions src/spetlr/configurator/sql/StatementBlocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class StatementBlocks:
using: str = "DELTA"
options: Dict[str, str] = None
partitioned_by: List[str] = None
cluster_by: List[str] = None
clustered_by: ClusteredBy = None
location: str = None
comment: str = None
Expand All @@ -48,6 +49,9 @@ def get_simple_structure(self):
object_details["schema"] = {"sql": self.schema}
object_details["_raw_sql_schema"] = self.schema

if self.cluster_by:
object_details["cluster_by"] = self.cluster_by

if self.clustered_by:
object_details["clustered_by"] = {
"cols": self.clustered_by.clustering_cols,
Expand Down
1 change: 1 addition & 0 deletions src/spetlr/configurator/sql/init_sqlparse.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
(r"ZORDER\s+BY\b", tokens.Keyword),
(r"PARTITIONED\s+BY\b", tokens.Keyword),
(r"SORTED\s+BY\b", tokens.Keyword),
(r"CLUSTER\s+BY\b", tokens.Keyword),
(r"CLUSTERED\s+BY\b", tokens.Keyword),
(r"WITH\s+DBPROPERTIES\b", tokens.Keyword),
(r"BLOOMFILTER\s+INDEX\b", tokens.Keyword),
Expand Down
11 changes: 11 additions & 0 deletions src/spetlr/configurator/sql/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,17 @@ def _extract_table_blocks(stmt: _PeekableTokenList) -> StatementBlocks:
)
continue

if re.match(r"CLUSTER\s+BY", val):
try:
blocks.cluster_by = [
token.value for token in _unpack_list_of_single_variables(stmt)
]
except SpetlrConfiguratorInvalidSqlException:
raise SpetlrConfiguratorInvalidSqlException(
"Please add your cluster columns to the schema."
)
continue

if re.match(r"CLUSTERED\s+BY", val):
blocks.clustered_by = ClusteredBy(
[token.value for token in _unpack_list_of_single_variables(stmt)]
Expand Down
20 changes: 20 additions & 0 deletions src/spetlr/delta/delta_handle.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def __init__(
self._data_format = data_format

self._partitioning: Optional[List[str]] = None
self._cluster: Optional[List[str]] = None
self._validate()

if options_dict is None or options_dict == "":
Expand Down Expand Up @@ -209,6 +210,25 @@ def get_partitioning(self):
)
return self._partitioning

def get_cluster(self):
"""The result of DESCRIBE DETAIL tablename is like this:
+------+--------------------+--------------------+----------------+-------+
|format| id| name|clusteringColumns| ... |
+------+--------------------+--------------------+----------------+-------+
| delta|c96a1e94-314b-427...|spark_catalog.tes...| [colB, colA]| ... |
+------+--------------------+--------------------+----------------+-------+
but this method return the cluster in the form ['mycolA'],
if there is no cluster, an empty list is returned.
"""
if self._cluster is None:
self._cluster = (
Spark.get()
.sql(f"DESCRIBE DETAIL {self.get_tablename()}")
.select("clusteringColumns")
.collect()[0][0]
)
return self._cluster

def get_tablename(self) -> str:
return self._name

Expand Down
4 changes: 4 additions & 0 deletions src/spetlr/deltaspec/DeltaTableSpec.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def from_sql(cls, sql: str) -> "DeltaTableSpec":
schema=schema,
options=details.get("options", {}),
partitioned_by=details.get("partitioned_by", []),
cluster_by=details.get("cluster_by", []),
tblproperties=details.get("tblproperties", {}),
)

Expand Down Expand Up @@ -77,6 +78,7 @@ def from_tc(cls, id: str) -> "DeltaTableSpec":
schema=schema,
options=item.get("options", {}),
partitioned_by=item.get("partitioned_by", []),
cluster_by=item.get("cluster_by", []),
tblproperties=item.get("tblproperties", {}),
)

Expand Down Expand Up @@ -110,6 +112,7 @@ def from_name(cls, in_name: str) -> "DeltaTableSpec":
name=details["name"],
schema=spark.table(in_name).schema,
partitioned_by=details["partitionColumns"],
cluster_by=details["clusteringColumns"],
tblproperties=tblproperties,
location=details["location"],
comment=details["description"],
Expand All @@ -130,6 +133,7 @@ def __repr__(self):
if self.partitioned_by
else ""
),
(f"cluster_by={repr(self.cluster_by)}" if self.cluster_by else ""),
(f"tblproperties={repr(self.tblproperties)}" if self.tblproperties else ""),
(f"comment={repr(self.comment)}" if self.comment else ""),
(f"location={repr(self.location)}" if self.location else ""),
Expand Down
10 changes: 10 additions & 0 deletions src/spetlr/deltaspec/DeltaTableSpecBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class DeltaTableSpecBase:
schema: StructType
options: Dict[str, str] = field(default_factory=dict)
partitioned_by: List[str] = field(default_factory=list)
cluster_by: List[str] = field(default_factory=list)
tblproperties: Dict[str, str] = field(default_factory=dict)
location: Optional[str] = None
comment: str = None
Expand Down Expand Up @@ -65,6 +66,12 @@ def __post_init__(self):
"Supply the partitioning columns in the schema."
)

for col in self.cluster_by:
if col not in self.schema.names:
raise InvalidSpecificationError(
"Supply the cluster columns in the schema."
)

self.location = standard_databricks_location(self.location)

# Experiments have shown that the statement
Expand Down Expand Up @@ -197,6 +204,9 @@ def get_sql_create(self) -> str:
if self.partitioned_by:
sql += f"PARTITIONED BY ({', '.join(self.partitioned_by)})\n"

if self.cluster_by:
sql += f"CLUSTER BY ({', '.join(self.cluster_by)})\n"

if self.location:
sql += f"LOCATION {json.dumps(self.location)}\n"

Expand Down
1 change: 1 addition & 0 deletions src/spetlr/deltaspec/DeltaTableSpecDifference.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ def metadata_match(self):
return (
(self.base.options == self.target.options)
and (self.base.partitioned_by == self.target.partitioned_by)
and (self.base.cluster_by == self.target.cluster_by)
and (self.base.comment == self.target.comment)
and (self.base.tblproperties == self.target.tblproperties)
)
Expand Down
3 changes: 2 additions & 1 deletion src/spetlr/power_bi/PowerBi.py
Original file line number Diff line number Diff line change
Expand Up @@ -949,7 +949,7 @@ def _verify_last_refresh(self) -> bool:
return True
elif self.last_status == "Unknown":
self._raise_error("Refresh is still in progress!")
elif self.last_status == "Canceled":
elif self.last_status in ["Canceled", "Cancelled"]:
self._raise_error("Refresh has been canceled!")
elif self.last_status == "Disabled":
self._raise_error("Refresh is disabled!")
Expand Down Expand Up @@ -1037,6 +1037,7 @@ def _trigger_new_refresh(self, *, with_wait: bool = True) -> bool:
elif self.last_status is not None and self.last_status not in [
"Completed",
"Canceled",
"Cancelled",
]:
print(
f"Unknown refresh status: {self.last_status}! {self.last_exception}"
Expand Down
50 changes: 48 additions & 2 deletions tests/cluster/delta/test_delta_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,20 @@ def test_01_configure(self):
},
)

tc.register(
"MyTbl6",
{
"name": "TestDb{ID}.TestTbl6",
},
)

tc.register(
"MyTbl7",
{
"name": "TestDb{ID}.TestTbl7",
},
)

tc.register(
"MyTblWithSchema",
{
Expand All @@ -68,6 +82,8 @@ def test_01_configure(self):
DeltaHandle.from_tc("MyTbl2")
DeltaHandle.from_tc("MyTbl4")
DeltaHandle.from_tc("MyTbl5")
DeltaHandle.from_tc("MyTbl6")
DeltaHandle.from_tc("MyTbl7")
DeltaHandle.from_tc("MyTblWithSchema")

db.create()
Expand Down Expand Up @@ -174,13 +190,43 @@ def test_09_partitioning(self):

self.assertEqual(dh2.get_partitioning(), [])

def test_10_get_schema(self):
def test_10_cluster(self):
dh = DeltaHandle.from_tc("MyTbl6")
Spark.get().sql(
f"""
CREATE TABLE {dh.get_tablename()}
(
colA string,
colB int,
payload string
)
CLUSTER BY (colB,colA)
"""
)

self.assertEqual(dh.get_cluster(), ["colB", "colA"])

dh2 = DeltaHandle.from_tc("MyTbl7")
Spark.get().sql(
f"""
CREATE TABLE {dh2.get_tablename()}
(
colA string,
colB int,
payload string
)
"""
)

self.assertEqual(dh2.get_cluster(), [])

def test_11_get_schema(self):
# test instantiation without error
dh = DeltaHandle.from_tc("MyTblWithSchema")

self.assertEqualSchema(test_schema, dh.get_schema())

def test_11_set_schema(self):
def test_12_set_schema(self):
# test instantiation without error
dh = DeltaHandle.from_tc("MyTblWithSchema")

Expand Down
1 change: 1 addition & 0 deletions tests/local/configurator/sql/test1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ CREATE TABLE IF NOT EXISTS {MySparkDb}.tbl1
USING DELTA
OPTIONS (key1='val1', key2="val2")
PARTITIONED BY ( a, b )
CLUSTER BY ( a, b )
CLUSTERED BY ( c,d )
SORTED BY ( a, b DESC )
INTO 5 BUCKETS
Expand Down
1 change: 1 addition & 0 deletions tests/local/configurator/test_configurator.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ def test_09_configure_from_sql(self):
c.get("MySqlTable", "options"), {"key1": "val1", "key2": "val2"}
)
self.assertEqual(c.get("MySqlTable", "partitioned_by"), ["a", "b"])
self.assertEqual(c.get("MySqlTable", "cluster_by"), ["a", "b"])
self.assertEqual(
c.get("MySqlTable", "clustered_by"),
dict(
Expand Down
1 change: 1 addition & 0 deletions tests/local/sql/sql/test1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ CREATE TABLE IF NOT EXISTS my_db1.tbl1
USING DELTA
OPTIONS (key1='val1', key2="val2")
PARTITIONED BY ( a, b )
CLUSTER BY ( a, b )
CLUSTERED BY ( c,d )
SORTED BY ( a, b DESC )
INTO 5 BUCKETS
Expand Down

0 comments on commit a48ae22

Please sign in to comment.