Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion requirements/base.in
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ numexpr>=2.9.0
# 5.0.0 has a sensitive deprecation used in other libs
# -> https://github.com/aio-libs/async-timeout/blob/master/CHANGES.rst#500-2024-10-31
async_timeout>=4.0.0,<5.0.0

pyodps>=0.12.2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh very few people use odps AFAIK, this dependency should be treated like the other analytics databases here: https://github.com/apache/superset/blob/master/pyproject.toml#L160-L188

# Known issue with 6.7.0 breaking a unit test, probably easy to fix, but will require
# a bit of attention to bump.
apispec>=6.0.0,<6.7.0
1 change: 1 addition & 0 deletions requirements/base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -451,3 +451,4 @@ zipp==3.21.0
# via importlib-metadata
zstandard==0.23.0
# via flask-compress
pyodps==0.12.2
1 change: 1 addition & 0 deletions requirements/development.txt
Original file line number Diff line number Diff line change
Expand Up @@ -937,3 +937,4 @@ zstandard==0.23.0
# via
# -c requirements/base.txt
# flask-compress
pyodps==0.12.2
44 changes: 43 additions & 1 deletion superset/daos/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@
from __future__ import annotations

import logging
from typing import Any
import re
from typing import Any, List, Tuple
from urllib.parse import unquote

from odps import ODPS

from superset.connectors.sqla.models import SqlaTable
from superset.daos.base import BaseDAO
Expand Down Expand Up @@ -166,6 +170,44 @@

return ssh_tunnel

@classmethod
def is_odps_partitioned_table(
Copy link
Member

@mistercrunch mistercrunch May 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all of the database-engine-specific code belongs in this package -> https://github.com/apache/superset/tree/master/superset/db_engine_specs

I see you you're setting this up as part of your PR, but this odps-specific method/logic belongs in that module.

cls, database: Database, table_name: str
) -> Tuple[bool, List[str]]:
"""
This function is used to determine and retrieve
partition information of the odsp table.
The return values are whether the partition
table is partitioned and the names of all partition fields.
"""
if not database:
raise ValueError("Database not found")

Check warning on line 184 in superset/daos/database.py

View check run for this annotation

Codecov / codecov/patch

superset/daos/database.py#L184

Added line #L184 was not covered by tests
uri = database.sqlalchemy_uri
access_key = database.password
pattern = re.compile(
r"odps://(?P<username>[^:]+):(?P<password>[^@]+)@(?P<project>[^/]+)/(?:\?"
r"endpoint=(?P<endpoint>[^&]+))"
)
if not uri or not isinstance(uri, str):
logger.warning(
"Invalid or missing sqlalchemy URI, Please provide a "
"correct sqlalchemy URI"
)
else:
if match := pattern.match(unquote(uri)):
access_id = match.group("username")
project = match.group("project")
endpoint = match.group("endpoint")
odps_client = ODPS(access_id, access_key, project, endpoint=endpoint)
table = odps_client.get_table(table_name)
if table.exist_partition:
partition_spec = table.table_schema.partitions
partition_fields = [partition.name for partition in partition_spec]
return True, partition_fields

Check warning on line 206 in superset/daos/database.py

View check run for this annotation

Codecov / codecov/patch

superset/daos/database.py#L197-L206

Added lines #L197 - L206 were not covered by tests
else:
return False, []

Check warning on line 208 in superset/daos/database.py

View check run for this annotation

Codecov / codecov/patch

superset/daos/database.py#L208

Added line #L208 was not covered by tests
return False, []


class SSHTunnelDAO(BaseDAO[SSHTunnel]):
@classmethod
Expand Down
16 changes: 12 additions & 4 deletions superset/databases/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
)
from superset.databases.utils import get_table_metadata
from superset.db_engine_specs import get_available_engine_specs
from superset.db_engine_specs.odps import OdpsEngineSpec
from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
from superset.exceptions import (
DatabaseNotFoundException,
Expand All @@ -118,6 +119,7 @@
)
from superset.extensions import security_manager
from superset.models.core import Database
from superset.sql.parse import Partition
from superset.sql_parse import Table
from superset.superset_typing import FlaskResponse
from superset.utils import json
Expand Down Expand Up @@ -1072,15 +1074,21 @@
parameters = QualifiedTableSchema().load(request.args)
except ValidationError as ex:
raise InvalidPayloadSchemaError(ex) from ex

table = Table(parameters["name"], parameters["schema"], parameters["catalog"])
table_name = str(parameters["name"])
table = Table(table_name, parameters["schema"], parameters["catalog"])
is_partitioned_table, partition_fields = DatabaseDAO.is_odps_partitioned_table(
database, table_name
)
try:
security_manager.raise_for_access(database=database, table=table)
except SupersetSecurityException as ex:
# instead of raising 403, raise 404 to hide table existence
raise TableNotFoundException("No such table") from ex

payload = database.db_engine_spec.get_table_metadata(database, table)
partition = Partition(is_partitioned_table, partition_fields)
if is_partitioned_table:
payload = OdpsEngineSpec.get_table_metadata(database, table, partition)

Check warning on line 1089 in superset/databases/api.py

View check run for this annotation

Codecov / codecov/patch

superset/databases/api.py#L1089

Added line #L1089 was not covered by tests
else:
payload = database.db_engine_spec.get_table_metadata(database, table)

return self.response(200, **payload)

Expand Down
191 changes: 191 additions & 0 deletions superset/db_engine_specs/odps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import logging
from typing import Any, Optional, TYPE_CHECKING

from sqlalchemy import select, text
from sqlalchemy.engine.base import Engine

from superset.databases.schemas import (
TableMetadataColumnsResponse,
TableMetadataResponse,
)
from superset.databases.utils import (
get_col_type,
get_foreign_keys_metadata,
get_indexes_metadata,
)
from superset.db_engine_specs.base import BaseEngineSpec, BasicParametersMixin
from superset.sql.parse import Partition, SQLScript
from superset.sql_parse import Table
from superset.superset_typing import ResultSetColumnType

if TYPE_CHECKING:
from superset.models.core import Database

Check warning on line 40 in superset/db_engine_specs/odps.py

View check run for this annotation

Codecov / codecov/patch

superset/db_engine_specs/odps.py#L40

Added line #L40 was not covered by tests

logger = logging.getLogger(__name__)


class OdpsBaseEngineSpec(BaseEngineSpec):
@classmethod
def get_table_metadata(
cls,
database: Database,
table: Table,
partition: Optional[Partition] = None,
) -> TableMetadataResponse:
"""
Returns basic table metadata
:param database: Database instance
:param table: A Table instance
:param partition: A Table partition info
:return: Basic table metadata
"""
return cls.get_table_metadata(database, table, partition)

Check warning on line 60 in superset/db_engine_specs/odps.py

View check run for this annotation

Codecov / codecov/patch

superset/db_engine_specs/odps.py#L60

Added line #L60 was not covered by tests


class OdpsEngineSpec(BasicParametersMixin, OdpsBaseEngineSpec):
default_driver = "odps"

@classmethod
def get_table_metadata(
cls, database: Any, table: Table, partition: Optional[Partition] = None
) -> TableMetadataResponse:
"""
Get table metadata information, including type, pk, fks.
This function raises SQLAlchemyError when a schema is not found.

:param partition: The table's partition info
:param database: The database model
:param table: Table instance
:return: Dict table metadata ready for API response
"""
keys = []
columns = database.get_columns(table)
primary_key = database.get_pk_constraint(table)
if primary_key and primary_key.get("constrained_columns"):
primary_key["column_names"] = primary_key.pop("constrained_columns")
primary_key["type"] = "pk"
keys += [primary_key]
foreign_keys = get_foreign_keys_metadata(database, table)
indexes = get_indexes_metadata(database, table)
keys += foreign_keys + indexes
payload_columns: list[TableMetadataColumnsResponse] = []
table_comment = database.get_table_comment(table)
for col in columns:
dtype = get_col_type(col)
payload_columns.append(

Check warning on line 93 in superset/db_engine_specs/odps.py

View check run for this annotation

Codecov / codecov/patch

superset/db_engine_specs/odps.py#L79-L93

Added lines #L79 - L93 were not covered by tests
{
"name": col["column_name"],
"type": dtype.split("(")[0] if "(" in dtype else dtype,
"longType": dtype,
"keys": [
k for k in keys if col["column_name"] in k["column_names"]
],
"comment": col.get("comment"),
}
)

with database.get_sqla_engine(

Check warning on line 105 in superset/db_engine_specs/odps.py

View check run for this annotation

Codecov / codecov/patch

superset/db_engine_specs/odps.py#L105

Added line #L105 was not covered by tests
catalog=table.catalog, schema=table.schema
) as engine:
return {

Check warning on line 108 in superset/db_engine_specs/odps.py

View check run for this annotation

Codecov / codecov/patch

superset/db_engine_specs/odps.py#L108

Added line #L108 was not covered by tests
"name": table.table,
"columns": payload_columns,
"selectStar": cls.select_star(
database=database,
table=table,
engine=engine,
limit=100,
show_cols=False,
indent=True,
latest_partition=True,
cols=columns,
partition=partition,
),
"primaryKey": primary_key,
"foreignKeys": foreign_keys,
"indexes": keys,
"comment": table_comment,
}

@classmethod
def select_star( # pylint: disable=too-many-arguments
cls,
database: Database,
table: Table,
engine: Engine,
limit: int = 100,
show_cols: bool = False,
indent: bool = True,
latest_partition: bool = True,
cols: list[ResultSetColumnType] | None = None,
partition: Optional[Partition] = None,
) -> str:
"""
Generate a "SELECT * from [schema.]table_name" query with appropriate limit.

WARNING: expects only unquoted table and schema names.

:param partition: The table's partition info
:param database: Database instance
:param table: Table instance
:param engine: SqlAlchemy Engine instance
:param limit: limit to impose on query
:param show_cols: Show columns in query; otherwise use "*"
:param indent: Add indentation to query
:param latest_partition: Only query the latest partition
:param cols: Columns to include in query
:return: SQL query
"""
# pylint: disable=redefined-outer-name
fields: str | list[Any] = "*"
cols = cols or []
if (show_cols or latest_partition) and not cols:
cols = database.get_columns(table)

Check warning on line 161 in superset/db_engine_specs/odps.py

View check run for this annotation

Codecov / codecov/patch

superset/db_engine_specs/odps.py#L158-L161

Added lines #L158 - L161 were not covered by tests

if show_cols:
fields = cls._get_fields(cols)
full_table_name = cls.quote_table(table, engine.dialect)
qry = select(fields).select_from(text(full_table_name))
if database.backend == "odps":
if (

Check warning on line 168 in superset/db_engine_specs/odps.py

View check run for this annotation

Codecov / codecov/patch

superset/db_engine_specs/odps.py#L163-L168

Added lines #L163 - L168 were not covered by tests
partition is not None
and partition.is_partitioned_table
and partition.partition_column is not None
and len(partition.partition_column) > 0
):
partition_str = partition.partition_column[0]
partition_str_where = f"CAST({partition_str} AS STRING) LIKE '%'"
qry = qry.where(text(partition_str_where))
if limit and cls.allow_limit_clause:
qry = qry.limit(limit)
if latest_partition:
partition_query = cls.where_latest_partition(

Check warning on line 180 in superset/db_engine_specs/odps.py

View check run for this annotation

Codecov / codecov/patch

superset/db_engine_specs/odps.py#L174-L180

Added lines #L174 - L180 were not covered by tests
database,
table,
qry,
columns=cols,
)
if partition_query is not None:
qry = partition_query
sql = database.compile_sqla_query(qry, table.catalog, table.schema)
if indent:
sql = SQLScript(sql, engine=cls.engine).format()
return sql

Check warning on line 191 in superset/db_engine_specs/odps.py

View check run for this annotation

Codecov / codecov/patch

superset/db_engine_specs/odps.py#L186-L191

Added lines #L186 - L191 were not covered by tests
33 changes: 32 additions & 1 deletion superset/sql/parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import urllib.parse
from collections.abc import Iterable
from dataclasses import dataclass
from typing import Any, Generic, TypeVar
from typing import Any, Generic, List, Optional, TypeVar

import sqlglot
import sqlparse
Expand Down Expand Up @@ -125,6 +125,37 @@
return str(self) == str(other)


@dataclass(eq=True, frozen=True)
class Partition:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this module is getting refactored as we speak by @betodealmeida , he can probably help making sure this lands safely

"""
Partition object, with two attribute keys:
ispartitioned_table and partition_comlumn,
used to provide partition information
Here is an example of an object:
{"ispartitioned_table":true,"partition_column":["month","day"]}
"""

is_partitioned_table: bool
partition_column: Optional[List[str]] = None

def __str__(self) -> str:
"""
Return the partition columns of table name.
"""
partition_column_str = (

Check warning on line 145 in superset/sql/parse.py

View check run for this annotation

Codecov / codecov/patch

superset/sql/parse.py#L145

Added line #L145 was not covered by tests
", ".join(map(str, self.partition_column))
if self.partition_column
else "None"
)
return (

Check warning on line 150 in superset/sql/parse.py

View check run for this annotation

Codecov / codecov/patch

superset/sql/parse.py#L150

Added line #L150 was not covered by tests
f"Partition(is_partitioned_table={self.is_partitioned_table}, "
f"partition_column=[{partition_column_str}])"
)

def __eq__(self, other: Any) -> bool:
return str(self) == str(other)

Check warning on line 156 in superset/sql/parse.py

View check run for this annotation

Codecov / codecov/patch

superset/sql/parse.py#L156

Added line #L156 was not covered by tests


# To avoid unnecessary parsing/formatting of queries, the statement has the concept of
# an "internal representation", which is the AST of the SQL statement. For most of the
# engines supported by Superset this is `sqlglot.exp.Expression`, but there is a special
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
LimitMethod,
)
from superset.db_engine_specs.mysql import MySQLEngineSpec
from superset.db_engine_specs.odps import OdpsBaseEngineSpec, OdpsEngineSpec
from superset.db_engine_specs.sqlite import SqliteEngineSpec
from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
from superset.sql_parse import Table
Expand Down Expand Up @@ -196,7 +197,11 @@ def test_engine_time_grain_validity(self):
time_grains = set(builtin_time_grains.keys())
# loop over all subclasses of BaseEngineSpec
for engine in load_engine_specs():
if engine is not BaseEngineSpec:
if (
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for unit tests, if they dependencies on external drivers (pyodps in this case) , there's a decorator for conditional unit tests that run only when the dependency is there. Currently we only install some of the database drivers in our CI builds (the ones that end up in development.txt). Most likely we don't want to install all possible drivers and run unit tests against it because there's just too many databases and drivers for the core maintainers to manage... Many drivers are finicky, and some drivers/integrations only few people care about.

Anyhow, not a bad thing to have unit tests in the repo, but won't run as part of our CI. Maybe in your fork/CI you can run tests that are odps-specific.

engine is not BaseEngineSpec
and engine is not OdpsBaseEngineSpec
and engine is not OdpsEngineSpec
):
# make sure time grain functions have been defined
assert len(engine.get_time_grain_expressions()) > 0
# make sure all defined time grains are supported
Expand Down
Loading