Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add post processing to QueryObject #9427

Merged
merged 11 commits into from
Apr 10, 2020
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ marshmallow==2.19.5 # via flask-appbuilder, marshmallow-enum, marshmallow-
more-itertools==8.1.0 # via zipp
msgpack==0.6.2 # via apache-superset (setup.py)
numpy==1.18.1 # via pandas, pyarrow
pandas==0.25.3 # via apache-superset (setup.py)
pandas==1.0.3 # via apache-superset (setup.py)
parsedatetime==2.5 # via apache-superset (setup.py)
pathlib2==2.3.5 # via apache-superset (setup.py)
polyline==1.4.0 # via apache-superset (setup.py)
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def get_git_sha():
"isodate",
"markdown>=3.0",
"msgpack>=0.6.1, <0.7.0",
"pandas>=0.25.3, <1.0",
"pandas>=1.0.3, <1.1",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feature requires functionality that was only recently introduced in Pandas 1.0.

"parsedatetime",
"pathlib2",
"polyline",
Expand Down
9 changes: 6 additions & 3 deletions superset/common/query_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class QueryContext:
custom_cache_timeout: Optional[int]

# TODO: Type datasource and query_object dictionary with TypedDict when it becomes
# a vanilla python type https://github.com/python/mypy/issues/5288
# a vanilla python type https://github.com/python/mypy/issues/5288
def __init__(
self,
datasource: Dict[str, Any],
Expand All @@ -70,8 +70,8 @@ def get_query_result(self, query_object: QueryObject) -> Dict[str, Any]:
"""Returns a pandas dataframe based on the query object"""

# Here, we assume that all the queries will use the same datasource, which is
# is a valid assumption for current setting. In a long term, we may or maynot
# support multiple queries from different data source.
# a valid assumption for current setting. In the long term, we may
# support multiple queries from different data sources.

timestamp_format = None
if self.datasource.type == "table":
Expand Down Expand Up @@ -105,6 +105,9 @@ def get_query_result(self, query_object: QueryObject) -> Dict[str, Any]:
self.df_metrics_to_num(df, query_object)

df.replace([np.inf, -np.inf], np.nan)

df = query_object.exec_post_processing(df)

return {
"query": result.query,
"status": result.status,
Expand Down
40 changes: 37 additions & 3 deletions superset/common/query_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@
from typing import Any, Dict, List, Optional, Union

import simplejson as json
from flask_babel import gettext as _
from pandas import DataFrame

from superset import app
from superset.utils import core as utils
from superset.exceptions import QueryObjectValidationError
from superset.utils import core as utils, pandas_postprocessing
from superset.views.utils import get_time_range_endpoints

# TODO: Type Metrics dictionary with TypedDict when it becomes a vanilla python type
# https://github.com/python/mypy/issues/5288
# https://github.com/python/mypy/issues/5288


class QueryObject:
Expand All @@ -50,6 +53,7 @@ class QueryObject:
extras: Dict
columns: List[str]
orderby: List[List]
post_processing: List[Dict[str, Any]]

def __init__(
self,
Expand All @@ -67,6 +71,7 @@ def __init__(
extras: Optional[Dict] = None,
columns: Optional[List[str]] = None,
orderby: Optional[List[List]] = None,
post_processing: Optional[List[Dict[str, Any]]] = None,
relative_start: str = app.config["DEFAULT_RELATIVE_START_TIME"],
relative_end: str = app.config["DEFAULT_RELATIVE_END_TIME"],
):
Expand All @@ -81,8 +86,9 @@ def __init__(
self.time_range = time_range
self.time_shift = utils.parse_human_timedelta(time_shift)
self.groupby = groupby or []
self.post_processing = post_processing or []

# Temporal solution for backward compatability issue due the new format of
# Temporary solution for backward compatibility issue due the new format of
# non-ad-hoc metric which needs to adhere to superset-ui per
# https://git.io/Jvm7P.
self.metrics = [
Expand Down Expand Up @@ -138,9 +144,37 @@ def cache_key(self, **extra: Any) -> str:
if self.time_range:
cache_dict["time_range"] = self.time_range
json_data = self.json_dumps(cache_dict, sort_keys=True)
if self.post_processing:
cache_dict["post_processing"] = self.post_processing
return hashlib.md5(json_data.encode("utf-8")).hexdigest()

def json_dumps(self, obj: Any, sort_keys: bool = False) -> str:
return json.dumps(
obj, default=utils.json_int_dttm_ser, ignore_nan=True, sort_keys=sort_keys
)

def exec_post_processing(self, df: DataFrame) -> DataFrame:
"""
Perform post processing operations on DataFrame.

:param df: DataFrame returned from database model.
:return: new DataFrame to which all post processing operations have been
applied
villebro marked this conversation as resolved.
Show resolved Hide resolved
:raises ChartDataValidationError: If the post processing operation in incorrect
"""
for post_process in self.post_processing:
operation = post_process.get("operation")
villebro marked this conversation as resolved.
Show resolved Hide resolved
if not operation:
raise QueryObjectValidationError(
_("`operation` property of post processing object undefined")
)
villebro marked this conversation as resolved.
Show resolved Hide resolved
if not hasattr(pandas_postprocessing, operation):
villebro marked this conversation as resolved.
Show resolved Hide resolved
raise QueryObjectValidationError(
_(
"Unsupported post processing operation: %(operation)s",
type=operation,
)
)
options = post_process.get("options", {})
df = getattr(pandas_postprocessing, operation)(df, **options)
villebro marked this conversation as resolved.
Show resolved Hide resolved
return df
4 changes: 4 additions & 0 deletions superset/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,7 @@ class CertificateException(SupersetException):

class DatabaseNotFound(SupersetException):
status = 400


class QueryObjectValidationError(SupersetException):
status = 400
Loading