Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Issue 13956 #13980

Merged
merged 1 commit into from
Apr 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ assists people when migrating to a new version.
## Next
- [13772](https://github.com/apache/superset/pull/13772): Row level security (RLS) is now enabled by default. To activate the feature, please run `superset init` to expose the RLS menus to Admin users.

- [13980](https://github.com/apache/superset/pull/13980): Data health checks no longer use the metadata database as an interim cache. Though non-breaking, deployments which implement complex logic should likely memoize the callback function. Refer to documentation in the confg.py file for more detail.

### Breaking Changes
### Potential Downtime
### Deprecations
Expand Down
36 changes: 33 additions & 3 deletions superset/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@
if TYPE_CHECKING:
from flask_appbuilder.security.sqla import models # pylint: disable=unused-import

from superset.connectors.sqla.models import ( # pylint: disable=unused-import
SqlaTable,
)
from superset.models.core import Database # pylint: disable=unused-import

# Realtime stats logger, a StatsD implementation exists
Expand Down Expand Up @@ -1146,9 +1149,36 @@ class CeleryConfig: # pylint: disable=too-few-public-methods
GLOBAL_ASYNC_QUERIES_POLLING_DELAY = 500
GLOBAL_ASYNC_QUERIES_WEBSOCKET_URL = "ws://127.0.0.1:8080/"

# It's possible to add a dataset health check logic which is specific to your system.
# It will get executed each time when user open a chart's explore view.
DATASET_HEALTH_CHECK = None
# A SQL dataset health check. Note if enabled it is strongly advised that the callable
# be memoized to aid with performance, i.e.,
#
# @cache_manager.cache.memoize(timeout=0)
# def DATASET_HEALTH_CHECK(datasource: SqlaTable) -> Optional[str]:
# if (
# datasource.sql and
# len(sql_parse.ParsedQuery(datasource.sql, strip_comments=True).tables) == 1
# ):
# return (
# "This virtual dataset queries only one table and therefore could be "
# "replaced by querying the table directly."
# )
#
# return None
#
# Within the FLASK_APP_MUTATOR callable, i.e., once the application and thus cache have
# been initialized it is also necessary to add the following logic to blow the cache for
# all datasources if the callback function changed.
#
# def FLASK_APP_MUTATOR(app: Flask) -> None:
# name = "DATASET_HEALTH_CHECK"
# func = app.config[name]
# code = func.uncached.__code__.co_code
#
# if cache_manager.cache.get(name) != code:
# cache_manager.cache.delete_memoized(func)
# cache_manager.cache.set(name, code, timeout=0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice example!

#
DATASET_HEALTH_CHECK: Optional[Callable[["SqlaTable"], str]] = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should probably be a BaseDatasource not a SqlaTable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@etr2460 this functionality is only provided for the SQLA connector.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice do add an health check callable example here, or add it to the docs

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dpgaspar I added an example.


# SQLalchemy link doc reference
SQLALCHEMY_DOCS_URL = "https://docs.sqlalchemy.org/en/13/core/engines.html"
Expand Down
34 changes: 4 additions & 30 deletions superset/connectors/sqla/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
from superset.db_engine_specs.base import TimestampExpression
from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
from superset.exceptions import QueryObjectValidationError, SupersetSecurityException
from superset.extensions import event_logger
from superset.jinja_context import (
BaseTemplateProcessor,
ExtraCache,
Expand Down Expand Up @@ -687,9 +686,10 @@ def select_star(self) -> Optional[str]:
self.table_name, schema=self.schema, show_cols=False, latest_partition=False
)

@property
@property # type: ignore
def health_check_message(self) -> Optional[str]:
return self.extra_dict.get("health_check", {}).get("message")
check = config["DATASET_HEALTH_CHECK"]
return check(self) if check else None
Comment on lines +691 to +692
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's more pythonic? this or:

Suggested change
check = config["DATASET_HEALTH_CHECK"]
return check(self) if check else None
return (config["DATASET_HEALTH_CHECK"] or lambda: None)(self)

is this even valid python? 🤷


@property
def data(self) -> Dict[str, Any]:
Expand All @@ -703,13 +703,7 @@ def data(self) -> Dict[str, Any]:
data_["fetch_values_predicate"] = self.fetch_values_predicate
data_["template_params"] = self.template_params
data_["is_sqllab_view"] = self.is_sqllab_view
# Don't return previously populated health check message in case
# the health check feature is turned off
data_["health_check_message"] = (
self.health_check_message
if config.get("DATASET_HEALTH_CHECK")
else None
)
data_["health_check_message"] = self.health_check_message
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
data_["health_check_message"] = self.health_check_message
data_["health_check_message"] = self.check_health()

If this will run an external function every time this line is visited, maybe it's better to change this property to an actual function call so it "looks" more expensive...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ktmud I thought about that. I think we could also just use @functools.lru_cache for the property so at least the property is cached locally.

data_["extra"] = self.extra
return data_

Expand Down Expand Up @@ -1608,26 +1602,6 @@ class and any keys added via `ExtraCache`.
extra_cache_keys += sqla_query.extra_cache_keys
return extra_cache_keys

def health_check(self, commit: bool = False, force: bool = False) -> None:
check = config.get("DATASET_HEALTH_CHECK")
if check is None:
return

extra = self.extra_dict
# force re-run health check, or health check is updated
if force or extra.get("health_check", {}).get("version") != check.version:
with event_logger.log_context(action="dataset_health_check"):
message = check(self)
extra["health_check"] = {
"version": check.version,
"message": message,
}
self.extra = json.dumps(extra)

db.session.merge(self)
if commit:
db.session.commit()


sa.event.listen(SqlaTable, "after_insert", security_manager.set_perm)
sa.event.listen(SqlaTable, "after_update", security_manager.set_perm)
Expand Down
4 changes: 1 addition & 3 deletions superset/datasets/dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,7 @@ def update( # pylint: disable=W:279
super().update(model, properties, commit=commit)
properties["columns"] = original_properties

updated_model = super().update(model, properties, commit=False)
model.health_check(force=True, commit=False)
return updated_model
return super().update(model, properties, commit=False)

@classmethod
def update_columns(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""remove dataset health check message

Revision ID: 134cea61c5e7
Revises: 301362411006
Create Date: 2021-04-07 07:21:27.324983

"""

# revision identifiers, used by Alembic.
revision = "134cea61c5e7"
down_revision = "301362411006"

import json
import logging

from alembic import op
from sqlalchemy import Column, Integer, Text
from sqlalchemy.ext.declarative import declarative_base

from superset import db

Base = declarative_base()


class SqlaTable(Base):
__tablename__ = "tables"

id = Column(Integer, primary_key=True)
extra = Column(Text)


def upgrade():
bind = op.get_bind()
session = db.Session(bind=bind)

for datasource in session.query(SqlaTable):
if datasource.extra:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if datasource.extra:
if datasource.extra and "health_check" in datasource.extra:

This should limit following ops to a smaller subset.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is risky as your checking for the existence of a substring in a JSON encoded string rather than the existence of a key.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it is. It at least saves json.loads op for datasources without health_check info, but can be guaranteed to be correct as long as you keep all following steps.

To really save some database IO, we can do the pre-filtering in the SQLA query step:

    for datasource in session.query(SqlaTable).filter(
        SqlaTable.extra.like('%"health_check"%')
    ):

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Granted the downstream logic ensures that the string is actually from a JSON key (I misspoke earlier). That said this migration only takes a few seconds and thus I think the existing logic is fine.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fine by me as well. Just thought it was a good practice to always do pre-filtering when running db migrations.

try:
extra = json.loads(datasource.extra)

if extra and "health_check" in extra:
del extra["health_check"]
datasource.extra = json.dumps(extra) if extra else None
except Exception as ex:
logging.exception(ex)

session.commit()
session.close()


def downgrade():
pass
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we impl a down migration here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's nothing to downgrade to, i.e., the upgrade removes all the key and associated values from the extra blob.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, but if one needed to revert, would that even make sense to put the keys back?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@craig-rueda the key is optional and thus unnecessary to re-add if a downgrade was performed.

4 changes: 0 additions & 4 deletions superset/views/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -732,10 +732,6 @@ def explore( # pylint: disable=too-many-locals,too-many-return-statements,too-m
f"datasource_id={datasource_id}&"
)

# if feature enabled, run some health check rules for sqla datasource
if hasattr(datasource, "health_check"):
datasource.health_check()

viz_type = form_data.get("viz_type")
if not viz_type and datasource and datasource.default_endpoint:
return redirect(datasource.default_endpoint)
Expand Down
2 changes: 0 additions & 2 deletions superset/views/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,6 @@ def save(self) -> FlaskResponse:
status=409,
)
orm_datasource.update_from_object(datasource_dict)
if hasattr(orm_datasource, "health_check"):
orm_datasource.health_check(force=True, commit=False)
data = orm_datasource.data
db.session.commit()

Expand Down
16 changes: 3 additions & 13 deletions tests/datasource_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,21 +221,11 @@ def my_check(datasource):
return "Warning message!"

app.config["DATASET_HEALTH_CHECK"] = my_check
my_check.version = 0.1

self.login(username="admin")
tbl = self.get_table_by_name("birth_names")
self.datasource = ConnectorRegistry.get_datasource("table", tbl.id, db.session)

for key in self.datasource.export_fields:
self.original_attrs[key] = getattr(self.datasource, key)

url = f"/datasource/get/{tbl.type}/{tbl.id}/"
tbl.health_check(commit=True, force=True)
resp = self.get_json_resp(url)
self.assertEqual(resp["health_check_message"], "Warning message!")

del app.config["DATASET_HEALTH_CHECK"]
datasource = ConnectorRegistry.get_datasource("table", tbl.id, db.session)
assert datasource.health_check_message == "Warning message!"
app.config["DATASET_HEALTH_CHECK"] = None

def test_get_datasource_failed(self):
pytest.raises(
Expand Down