Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions superset/daos/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
from __future__ import annotations

import logging
import re
from typing import Any
from urllib.parse import unquote

from sqlalchemy.orm import joinedload

Expand Down Expand Up @@ -239,6 +241,49 @@ def get_datasets(
.all()
)

@classmethod
def is_odps_partitioned_table(
cls, database: Database, table_name: str
) -> tuple[bool, list[str]]:
"""
This function is used to determine and retrieve
partition information of the ODPS table.
The return values are whether the partition
table is partitioned and the names of all partition fields.
"""
if not database:
raise ValueError("Database not found")
if database.backend != "odps":
return False, []
try:
from odps import ODPS
except ImportError:
logger.warning("pyodps is not installed, cannot check ODPS partition info")
return False, []
uri = database.sqlalchemy_uri
access_key = database.password
pattern = re.compile(
r"odps://(?P<username>[^:]+):(?P<password>[^@]+)@(?P<project>[^/]+)/(?:\?"
r"endpoint=(?P<endpoint>[^&]+))"
)
if not uri or not isinstance(uri, str):
logger.warning(
"Invalid or missing sqlalchemy URI, please provide a correct URI"
)
return False, []
if match := pattern.match(unquote(uri)):
access_id = match.group("username")
project = match.group("project")
endpoint = match.group("endpoint")
odps_client = ODPS(access_id, access_key, project, endpoint=endpoint)
table = odps_client.get_table(table_name)
if table.exist_partition:
partition_spec = table.table_schema.partitions
partition_fields = [partition.name for partition in partition_spec]
return True, partition_fields
return False, []
return False, []


class DatabaseUserOAuth2TokensDAO(BaseDAO[DatabaseUserOAuth2Tokens]):
"""
Expand Down
16 changes: 12 additions & 4 deletions superset/databases/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@
)
from superset.extensions import security_manager
from superset.models.core import Database
from superset.sql.parse import Table
from superset.sql.parse import Partition, Table
from superset.superset_typing import FlaskResponse
from superset.utils import json
from superset.utils.core import (
Expand Down Expand Up @@ -1079,15 +1079,23 @@ def table_metadata(self, pk: int) -> FlaskResponse:
parameters = QualifiedTableSchema().load(request.args)
except ValidationError as ex:
raise InvalidPayloadSchemaError(ex) from ex

table = Table(parameters["name"], parameters["schema"], parameters["catalog"])
table_name = str(parameters["name"])
table = Table(table_name, parameters["schema"], parameters["catalog"])
is_partitioned_table, partition_fields = DatabaseDAO.is_odps_partitioned_table(
database, table_name
)
try:
security_manager.raise_for_access(database=database, table=table)
except SupersetSecurityException as ex:
# instead of raising 403, raise 404 to hide table existence
raise TableNotFoundException("No such table") from ex
Comment on lines +1084 to 1091
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: The ODPS partition detection is performed before checking the user's table access, meaning an unauthorized caller can still trigger ODPS API calls (and potentially learn about table existence or cause backend errors) before a permission check is enforced; move the partition lookup after the security check so that only authorized users can hit the ODPS backend for metadata. [security]

Severity Level: Major ⚠️
- ❌ Unauthorized users can trigger ODPS table metadata lookups.
- ⚠️ Table existence may leak via ODPS error behavior or timing.
- ⚠️ Extra ODPS calls for denied requests increase backend load.
Suggested change
is_partitioned_table, partition_fields = DatabaseDAO.is_odps_partitioned_table(
database, table_name
)
try:
security_manager.raise_for_access(database=database, table=table)
except SupersetSecurityException as ex:
# instead of raising 403, raise 404 to hide table existence
raise TableNotFoundException("No such table") from ex
try:
security_manager.raise_for_access(database=database, table=table)
except SupersetSecurityException as ex:
# instead of raising 403, raise 404 to hide table existence
raise TableNotFoundException("No such table") from ex
is_partitioned_table, partition_fields = DatabaseDAO.is_odps_partitioned_table(
database, table_name
)
Steps of Reproduction ✅
1. Configure a Superset Database record pointing to an ODPS (MaxCompute) instance so that
`Database.backend == "odps"` (used in `DatabaseDAO.is_odps_partitioned_table` at
`superset/daos/database.py:245-257`).

2. Start Superset and authenticate as a user that has API access to the Database REST API
but lacks table-level access for a specific ODPS table (table access is enforced via
`security_manager.raise_for_access(database=database, table=table)` at
`superset/databases/api.py:1087-1091` and `SupersetSecurityManager.can_access_table` at
`superset/security/manager.py:906-920`).

3. Call the table metadata endpoint implemented by `DatabaseRestApi.table_metadata` in
`superset/databases/api.py:1023-1100` with `GET
/<int:pk>/table_metadata/?name=<table_name>&schema=<schema>&catalog=<catalog>` where `pk`
is the ODPS database id and `<table_name>` is any table name (including ones the user is
not allowed to access).

4. Observe that before the permission check runs, `table_metadata` invokes
`DatabaseDAO.is_odps_partitioned_table(database, table_name)` at
`superset/databases/api.py:1084-1085`, which in turn creates an ODPS client and calls
`odps_client.get_table(table_name)` at `superset/daos/database.py:278-280`, hitting the
external ODPS API with Superset's credentials; only after this remote call does
`security_manager.raise_for_access` run and potentially raise `TableNotFoundException` for
unauthorized users, meaning unauthorized callers can still trigger ODPS metadata lookups
and, if `get_table` raises (e.g. non-existent table), cause backend errors before access
is denied.
Prompt for AI Agent 🤖
This is a comment left during a code review.

**Path:** superset/databases/api.py
**Line:** 1084:1091
**Comment:**
	*Security: The ODPS partition detection is performed before checking the user's table access, meaning an unauthorized caller can still trigger ODPS API calls (and potentially learn about table existence or cause backend errors) before a permission check is enforced; move the partition lookup after the security check so that only authorized users can hit the ODPS backend for metadata.

Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise.
👍 | 👎

partition = Partition(is_partitioned_table, partition_fields)
if is_partitioned_table:
from superset.db_engine_specs.odps import OdpsEngineSpec

payload = database.db_engine_spec.get_table_metadata(database, table)
payload = OdpsEngineSpec.get_table_metadata(database, table, partition)
else:
payload = database.db_engine_spec.get_table_metadata(database, table)

return self.response(200, **payload)

Expand Down
8 changes: 8 additions & 0 deletions superset/db_engine_specs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,14 @@ def load_engine_specs() -> list[type[BaseEngineSpec]]:
except Exception: # pylint: disable=broad-except
logger.warning("Unable to load Superset DB engine spec: %s", ep.name)
continue
# Validate that the engine spec is a proper subclass of BaseEngineSpec
if not is_engine_spec(engine_spec):
logger.warning(
"Skipping invalid DB engine spec %s: "
"not a valid BaseEngineSpec subclass",
ep.name,
)
continue
engine_specs.append(engine_spec)

return engine_specs
Expand Down
192 changes: 192 additions & 0 deletions superset/db_engine_specs/odps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import logging
from typing import Any, Optional, TYPE_CHECKING

from sqlalchemy import select, text
from sqlalchemy.engine.base import Engine

from superset.databases.schemas import (
TableMetadataColumnsResponse,
TableMetadataResponse,
)
from superset.databases.utils import (
get_col_type,
get_foreign_keys_metadata,
get_indexes_metadata,
)
from superset.db_engine_specs.base import BaseEngineSpec, BasicParametersMixin
from superset.sql.parse import Partition, SQLScript, Table
from superset.superset_typing import ResultSetColumnType

if TYPE_CHECKING:
from superset.models.core import Database

logger = logging.getLogger(__name__)


class OdpsBaseEngineSpec(BaseEngineSpec):
@classmethod
def get_table_metadata(
cls,
database: Database,
table: Table,
partition: Optional[Partition] = None,
) -> TableMetadataResponse:
"""
Returns basic table metadata
:param database: Database instance
:param table: A Table instance
:param partition: A Table partition info
:return: Basic table metadata
"""
return cls.get_table_metadata(database, table, partition)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Infinite Recursion Bug

The method calls itself recursively, leading to infinite recursion. It should call the base implementation from utils instead.

Code suggestion
Check the AI-generated fix before applying
 - from superset.databases.utils import (
 -     get_col_type,
 -     get_foreign_keys_metadata,
 -     get_indexes_metadata,
 - )
 + from superset.databases.utils import (
 +     get_col_type,
 +     get_foreign_keys_metadata,
 +     get_indexes_metadata,
 +     get_table_metadata,
 + )
 @@ -60,1 +60,1 @@
 -        return cls.get_table_metadata(database, table, partition)
 +        return get_table_metadata(database, table)

Code Review Run #3ebe85


Should Bito avoid suggestions like this for future reviews? (Manage Rules)

  • Yes, avoid them

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: The OdpsBaseEngineSpec.get_table_metadata classmethod calls cls.get_table_metadata with the same signature it defines, which for OdpsBaseEngineSpec itself results in infinite recursion and a RecursionError whenever it is invoked; it should instead delegate to the base implementation on BaseEngineSpec and ignore the extra partition argument. [logic error]

Severity Level: Major ⚠️
- ❌ Any direct OdpsBaseEngineSpec.get_table_metadata call crashes.
- ⚠️ Blocks reuse of OdpsBaseEngineSpec in future subclasses.
- ⚠️ Risk for test code importing OdpsBaseEngineSpec directly.
Suggested change
return cls.get_table_metadata(database, table, partition)
# Delegate to the base implementation; the partition argument is currently unused.
return super().get_table_metadata(database, table)
Steps of Reproduction ✅
1. Import the ODPS engine spec base class from the PR code: `from
superset.db_engine_specs.odps import OdpsBaseEngineSpec` (defined at
`superset/db_engine_specs/odps.py:44`).

2. In any Python context (tests, shell, or application code), call
`OdpsBaseEngineSpec.get_table_metadata(database=None, table=None)`; the arguments are not
used before the recursion (`superset/db_engine_specs/odps.py:46-59`).

3. The classmethod body executes `return cls.get_table_metadata(database, table,
partition)` which re-invokes the same `OdpsBaseEngineSpec.get_table_metadata`
implementation with identical arguments (line 59).

4. This self-call repeats with no termination condition until Python's recursion limit is
exceeded, raising a `RecursionError` before any actual metadata lookup can occur.
Prompt for AI Agent 🤖
This is a comment left during a code review.

**Path:** superset/db_engine_specs/odps.py
**Line:** 59:59
**Comment:**
	*Logic Error: The `OdpsBaseEngineSpec.get_table_metadata` classmethod calls `cls.get_table_metadata` with the same signature it defines, which for `OdpsBaseEngineSpec` itself results in infinite recursion and a `RecursionError` whenever it is invoked; it should instead delegate to the base implementation on `BaseEngineSpec` and ignore the extra `partition` argument.

Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise.
👍 | 👎



class OdpsEngineSpec(BasicParametersMixin, OdpsBaseEngineSpec):
engine = "odps"
engine_name = "ODPS (MaxCompute)"
default_driver = "odps"

@classmethod
def get_table_metadata(
cls, database: Any, table: Table, partition: Optional[Partition] = None
) -> TableMetadataResponse:
"""
Get table metadata information, including type, pk, fks.
This function raises SQLAlchemyError when a schema is not found.

:param partition: The table's partition info
:param database: The database model
:param table: Table instance
:return: Dict table metadata ready for API response
"""
keys = []
columns = database.get_columns(table)
primary_key = database.get_pk_constraint(table)
if primary_key and primary_key.get("constrained_columns"):
primary_key["column_names"] = primary_key.pop("constrained_columns")
primary_key["type"] = "pk"
keys += [primary_key]
foreign_keys = get_foreign_keys_metadata(database, table)
indexes = get_indexes_metadata(database, table)
keys += foreign_keys + indexes
payload_columns: list[TableMetadataColumnsResponse] = []
table_comment = database.get_table_comment(table)
for col in columns:
dtype = get_col_type(col)
payload_columns.append(
{
"name": col["column_name"],
"type": dtype.split("(")[0] if "(" in dtype else dtype,
"longType": dtype,
"keys": [
k for k in keys if col["column_name"] in k["column_names"]
],
"comment": col.get("comment"),
}
)

with database.get_sqla_engine(
catalog=table.catalog, schema=table.schema
) as engine:
return {
"name": table.table,
"columns": payload_columns,
"selectStar": cls.select_star(
database=database,
table=table,
engine=engine,
limit=100,
show_cols=False,
indent=True,
latest_partition=True,
cols=columns,
partition=partition,
),
"primaryKey": primary_key,
"foreignKeys": foreign_keys,
"indexes": keys,
"comment": table_comment,
}

@classmethod
def select_star( # pylint: disable=too-many-arguments
cls,
database: Database,
table: Table,
engine: Engine,
limit: int = 100,
show_cols: bool = False,
indent: bool = True,
latest_partition: bool = True,
cols: list[ResultSetColumnType] | None = None,
partition: Optional[Partition] = None,
) -> str:
"""
Generate a "SELECT * from [schema.]table_name" query with appropriate limit.

WARNING: expects only unquoted table and schema names.

:param partition: The table's partition info
:param database: Database instance
:param table: Table instance
:param engine: SqlAlchemy Engine instance
:param limit: limit to impose on query
:param show_cols: Show columns in query; otherwise use "*"
:param indent: Add indentation to query
:param latest_partition: Only query the latest partition
:param cols: Columns to include in query
:return: SQL query
"""
# pylint: disable=redefined-outer-name
fields: str | list[Any] = "*"
cols = cols or []
if (show_cols or latest_partition) and not cols:
cols = database.get_columns(table)

if show_cols:
fields = cls._get_fields(cols)
full_table_name = cls.quote_table(table, engine.dialect)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: OdpsEngineSpec.select_star assumes its third argument engine always has a .dialect attribute, but in the generic code path Database.select_star passes a SQLAlchemy Dialect object instead, causing an AttributeError; the method should accept either an engine or a dialect and normalize to a dialect before calling quote_table. [logic error]

Severity Level: Critical 🚨
- ❌ /api/v1/database/<pk>/select_star fails for ODPS.
- ❌ SQLLab cannot generate SELECT * for ODPS tables.
- ⚠️ ODPS engine diverges from BaseEngineSpec.select_star contract.
Suggested change
full_table_name = cls.quote_table(table, engine.dialect)
dialect = getattr(engine, "dialect", engine)
full_table_name = cls.quote_table(table, dialect)
Steps of Reproduction ✅
1. Configure an ODPS (MaxCompute) database and ensure its backend is `"odps"`, so
`Database.db_engine_spec` resolves to `OdpsEngineSpec` (see
`superset/models/core.py:1021-1038` and `superset/db_engine_specs/odps.py:62-65`).

2. Trigger the `/api/v1/database/<pk>/select_star/<table_name>/` endpoint defined in
`DatabaseRestApi.select_star` (`superset/databases/api.py:1181-1242`) for this ODPS
database, for example from the UI "select star" feature in SQLLab.

3. Inside `DatabaseRestApi.select_star`, Superset calls
`database.select_star(Table(table_name, schema_name, database.get_default_catalog()),
latest_partition=True)` (`superset/databases/api.py:1232-1237`).

4. `Database.select_star` (`superset/models/core.py:825-845`) computes `dialect =
self.get_dialect()` and then calls `self.db_engine_spec.select_star(self, table,
dialect=dialect, limit=..., show_cols=..., indent=..., latest_partition=..., cols=cols)`.
Because `OdpsEngineSpec.select_star` is defined with signature `(database, table, engine,
...)` and no `dialect` keyword (`superset/db_engine_specs/odps.py:129-140`), Python raises
`TypeError: select_star() got an unexpected keyword argument 'dialect'` before reaching
the `engine.dialect` access at lines 166-167.
Prompt for AI Agent 🤖
This is a comment left during a code review.

**Path:** superset/db_engine_specs/odps.py
**Line:** 166:166
**Comment:**
	*Logic Error: `OdpsEngineSpec.select_star` assumes its third argument `engine` always has a `.dialect` attribute, but in the generic code path `Database.select_star` passes a SQLAlchemy `Dialect` object instead, causing an `AttributeError`; the method should accept either an engine or a dialect and normalize to a dialect before calling `quote_table`.

Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise.
👍 | 👎

qry = select(fields).select_from(text(full_table_name))
if database.backend == "odps":
if (
partition is not None
and partition.is_partitioned_table
and partition.partition_column is not None
and len(partition.partition_column) > 0
):
partition_str = partition.partition_column[0]
partition_str_where = f"CAST({partition_str} AS STRING) LIKE '%'"
qry = qry.where(text(partition_str_where))
if limit:
qry = qry.limit(limit)
if latest_partition:
partition_query = cls.where_latest_partition(
database,
table,
qry,
columns=cols,
)
if partition_query is not None:
qry = partition_query
sql = database.compile_sqla_query(qry, table.catalog, table.schema)
if indent:
sql = SQLScript(sql, engine=cls.engine).format()
return sql
31 changes: 31 additions & 0 deletions superset/sql/parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,37 @@ def qualify(
)


@dataclass(eq=True, frozen=True)
class Partition:
"""
Partition object, with two attribute keys:
ispartitioned_table and partition_comlumn,
used to provide partition information
Here is an example of an object:
{"ispartitioned_table":true,"partition_column":["month","day"]}
"""

is_partitioned_table: bool
partition_column: list[str] | None = None

def __str__(self) -> str:
"""
Return the partition columns of table name.
"""
partition_column_str = (
", ".join(map(str, self.partition_column))
if self.partition_column
else "None"
)
return (
f"Partition(is_partitioned_table={self.is_partitioned_table}, "
f"partition_column=[{partition_column_str}])"
)

def __eq__(self, other: Any) -> bool:
return str(self) == str(other)
Comment on lines +351 to +352
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incorrect equality implementation

The custom eq compares string representations, which is incorrect and violates hash consistency for the frozen dataclass.

Code suggestion
Check the AI-generated fix before applying
Suggested change
def __eq__(self, other: Any) -> bool:
return str(self) == str(other)

Code Review Run #60811b


Should Bito avoid suggestions like this for future reviews? (Manage Rules)

  • Yes, avoid them



# To avoid unnecessary parsing/formatting of queries, the statement has the concept of
# an "internal representation", which is the AST of the SQL statement. For most of the
# engines supported by Superset this is `sqlglot.exp.Expression`, but there is a special
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
builtin_time_grains,
)
from superset.db_engine_specs.mysql import MySQLEngineSpec
from superset.db_engine_specs.odps import OdpsBaseEngineSpec, OdpsEngineSpec
from superset.db_engine_specs.sqlite import SqliteEngineSpec
from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
from superset.sql.parse import Table
Expand Down Expand Up @@ -80,7 +81,11 @@ def test_engine_time_grain_validity(self):
time_grains = set(builtin_time_grains.keys())
# loop over all subclasses of BaseEngineSpec
for engine in load_engine_specs():
if engine is not BaseEngineSpec:
if (
engine is not BaseEngineSpec
and engine is not OdpsBaseEngineSpec
and engine is not OdpsEngineSpec
):
# make sure time grain functions have been defined
assert len(engine.get_time_grain_expressions()) > 0
# make sure all defined time grains are supported
Expand Down
Loading
Loading