Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(elasticsearch): time_zone setting does not work for cast datetime expressions #17048

Merged
merged 13 commits into from
Nov 25, 2021
20 changes: 20 additions & 0 deletions docs/src/pages/docs/Connecting to Databases/elasticsearch.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,23 @@ POST /_aliases
```

Then register your table with the alias name logstasg_all

**Time zone**

By default, Superset uses UTC time zone for elasticsearch query. If you need to specify a time zone,
please edit your Database and enter the settings of your specified time zone in the Other > ENGINE PARAMETERS:


```
{
"connect_args": {
"time_zone": "Asia/Shanghai"
}
}
```

Another issue to note about the time zone problem is that before elasticsearch7.8, if you want to convert a string into a `DATETIME` object,
you need to use the `CAST` function,but this function does not support our `time_zone` setting. So it is recommended to upgrade to the version after elasticsearch7.8.
After elasticsearch7.8, you can use the `DATETIME_PARSE` function to solve this problem.
The DATETIME_PARSE function is to support our `time_zone` setting, and here you need to fill in your elasticsearch version number in the Other > VERSION setting.
the superset will use the `DATETIME_PARSE` function for conversion.
35 changes: 25 additions & 10 deletions superset/connectors/sqla/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,17 +247,23 @@ def is_temporal(self) -> bool:
def db_engine_spec(self) -> Type[BaseEngineSpec]:
return self.table.db_engine_spec

@property
def db_extra(self) -> Dict[str, Any]:
return self.table.database.get_extra()

@property
def type_generic(self) -> Optional[utils.GenericDataType]:
if self.is_dttm:
return GenericDataType.TEMPORAL
column_spec = self.db_engine_spec.get_column_spec(self.type)
column_spec = self.db_engine_spec.get_column_spec(
self.type, db_extra=self.db_extra
)
return column_spec.generic_type if column_spec else None

def get_sqla_col(self, label: Optional[str] = None) -> Column:
label = label or self.column_name
db_engine_spec = self.db_engine_spec
column_spec = db_engine_spec.get_column_spec(self.type)
column_spec = db_engine_spec.get_column_spec(self.type, db_extra=self.db_extra)
type_ = column_spec.sqla_type if column_spec else None
if self.expression:
tp = self.table.get_template_processor()
Expand Down Expand Up @@ -312,7 +318,9 @@ def get_timestamp_expression(

pdf = self.python_date_format
is_epoch = pdf in ("epoch_s", "epoch_ms")
column_spec = self.db_engine_spec.get_column_spec(self.type)
column_spec = self.db_engine_spec.get_column_spec(
self.type, db_extra=self.db_extra
)
type_ = column_spec.sqla_type if column_spec else DateTime
if not self.expression and not time_grain and not is_epoch:
sqla_col = column(self.column_name, type_=type_)
Expand All @@ -335,7 +343,11 @@ def dttm_sql_literal(
) -> str:
"""Convert datetime object to a SQL expression string"""
dttm_type = self.type or ("DATETIME" if self.is_dttm else None)
sql = self.db_engine_spec.convert_dttm(dttm_type, dttm) if dttm_type else None
sql = (
self.db_engine_spec.convert_dttm(dttm_type, dttm, **self.db_extra)
if dttm_type
else None
)

if sql:
return sql
Expand All @@ -348,10 +360,8 @@ def dttm_sql_literal(
utils.TimeRangeEndpoint.INCLUSIVE,
utils.TimeRangeEndpoint.EXCLUSIVE,
):
tf = (
self.table.database.get_extra()
.get("python_date_format_by_column_name", {})
.get(self.column_name)
tf = self.db_extra.get("python_date_format_by_column_name", {}).get(
self.column_name
)

if tf:
Expand Down Expand Up @@ -1174,7 +1184,9 @@ def get_sqla_query( # pylint: disable=too-many-arguments,too-many-locals,too-ma
sqla_col = col_obj.get_timestamp_expression(filter_grain)
else:
sqla_col = col_obj.get_sqla_col()
col_spec = db_engine_spec.get_column_spec(col_obj.type)
col_spec = db_engine_spec.get_column_spec(
col_obj.type, db_extra=self.database.get_extra()
)
is_list_target = op in (
utils.FilterOperator.IN.value,
utils.FilterOperator.NOT_IN.value,
Expand Down Expand Up @@ -1421,6 +1433,7 @@ def _get_series_orderby(
def _get_top_groups(
self, df: pd.DataFrame, dimensions: List[str], groupby_exprs: Dict[str, Any],
) -> ColumnElement:
db_extra: Dict[str, Any] = self.database.get_extra()
column_map = {column.column_name: column for column in self.columns}
groups = []
for _unused, row in df.iterrows():
Expand All @@ -1434,7 +1447,9 @@ def _get_top_groups(
# string into a timestamp.
if column_map[dimension].is_temporal and isinstance(value, str):
dttm = dateutil.parser.parse(value)
value = text(self.db_engine_spec.convert_dttm("TIMESTAMP", dttm))
value = text(
self.db_engine_spec.convert_dttm("TIMESTAMP", dttm, **db_extra)
)

group.append(groupby_exprs[dimension] == value)
groups.append(and_(*group))
Expand Down
4 changes: 3 additions & 1 deletion superset/connectors/sqla/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ def get_physical_table_metadata(
db_type = db_engine_spec.column_datatype_to_string(
col["type"], db_dialect
)
type_spec = db_engine_spec.get_column_spec(db_type)
type_spec = db_engine_spec.get_column_spec(
db_type, db_extra=database.get_extra()
)
col.update(
{
"type": db_type,
Expand Down
4 changes: 3 additions & 1 deletion superset/db_engine_specs/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ class AthenaEngineSpec(BaseEngineSpec):
}

@classmethod
def convert_dttm(cls, target_type: str, dttm: datetime) -> Optional[str]:
def convert_dttm(
cls, target_type: str, dttm: datetime, **kwargs: Any
) -> Optional[str]:
tt = target_type.upper()
if tt == utils.TemporalType.DATE:
return f"from_iso8601_date('{dttm.date().isoformat()}')"
Expand Down
9 changes: 5 additions & 4 deletions superset/db_engine_specs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
from superset.utils import core as utils
from superset.utils.core import ColumnSpec, GenericDataType
from superset.utils.hashing import md5_sha_from_str
from superset.utils.memoized import memoized
from superset.utils.network import is_hostname_valid, is_port_open

if TYPE_CHECKING:
Expand Down Expand Up @@ -686,13 +685,14 @@ def df_to_sql(

@classmethod
def convert_dttm( # pylint: disable=unused-argument
cls, target_type: str, dttm: datetime,
cls, target_type: str, dttm: datetime, **kwargs: Any,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason this is **kwargs: Any and not db_extra: Optional[Dict[str, Any]] like has been done in get_column_spec? Unpacking into kwargs will make more difficult to add new parameters to this method going forward

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, the writing here is not strict enough, for db_extra, it is better to show the declaration type, I think, the method signature can be like this, def convert_dttm(cls, target_type: str, dttm: datetime, db_extra: Optional[Dict[str, Any]], **kwargs: Any) also continue to retain kwargs, so that later if a data source needs non-db_extra information, this way is also compatible, what do you think

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aniaan I'd prefer not to add **kwargs unless it's currently needed (it's easy enough to add later when the need comes up). And thinking more closely, I'd personally prefer to keep all arguments named so that all parameters in the method are explicit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can, then we will not add it for now, and we will consider it later if we have this situation.
I'll correct the PR later

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@villebro I have updated it, you can review it when you have time

) -> Optional[str]:
"""
Convert Python datetime object to a SQL expression

:param target_type: The target type of expression
:param dttm: The datetime object
:param kwargs: The database extra object
:return: The SQL expression
"""
return None
Expand Down Expand Up @@ -1280,10 +1280,10 @@ def is_select_query(cls, parsed_query: ParsedQuery) -> bool:
return parsed_query.is_select()

@classmethod
@memoized
def get_column_spec( # pylint: disable=unused-argument
cls,
native_type: Optional[str],
db_extra: Optional[Dict[str, Any]] = None,
source: utils.ColumnTypeSource = utils.ColumnTypeSource.GET_TABLE,
column_type_mappings: Tuple[
Tuple[
Expand All @@ -1298,6 +1298,7 @@ def get_column_spec( # pylint: disable=unused-argument
Converts native database type to sqlalchemy column type.
:param native_type: Native database typee
:param source: Type coming from the database table or cursor description
:param db_extra: The database extra object
:return: ColumnSpec object
"""
col_types = cls.get_sqla_column_type(
Expand All @@ -1309,7 +1310,7 @@ def get_column_spec( # pylint: disable=unused-argument
# using datetimes
if generic_type == GenericDataType.TEMPORAL:
column_type = literal_dttm_type_factory(
column_type, cls, native_type or ""
column_type, cls, native_type or "", db_extra=db_extra or {}
)
is_dttm = generic_type == GenericDataType.TEMPORAL
return ColumnSpec(
Expand Down
4 changes: 3 additions & 1 deletion superset/db_engine_specs/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,9 @@ class BigQueryEngineSpec(BaseEngineSpec):
}

@classmethod
def convert_dttm(cls, target_type: str, dttm: datetime) -> Optional[str]:
def convert_dttm(
cls, target_type: str, dttm: datetime, **kwargs: Any
) -> Optional[str]:
tt = target_type.upper()
if tt == utils.TemporalType.DATE:
return f"CAST('{dttm.date().isoformat()}' AS DATE)"
Expand Down
6 changes: 4 additions & 2 deletions superset/db_engine_specs/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.
import logging
from datetime import datetime
from typing import Dict, List, Optional, Type, TYPE_CHECKING
from typing import Any, Dict, List, Optional, Type, TYPE_CHECKING

from urllib3.exceptions import NewConnectionError

Expand Down Expand Up @@ -72,7 +72,9 @@ def get_dbapi_mapped_exception(cls, exception: Exception) -> Exception:
return new_exception(str(exception))

@classmethod
def convert_dttm(cls, target_type: str, dttm: datetime) -> Optional[str]:
def convert_dttm(
cls, target_type: str, dttm: datetime, **kwargs: Any
) -> Optional[str]:
tt = target_type.upper()
if tt == utils.TemporalType.DATE:
return f"toDate('{dttm.date().isoformat()}')"
Expand Down
6 changes: 4 additions & 2 deletions superset/db_engine_specs/crate.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.
from datetime import datetime
from typing import Optional, TYPE_CHECKING
from typing import Any, Optional, TYPE_CHECKING

from superset.db_engine_specs.base import BaseEngineSpec
from superset.utils import core as utils
Expand Down Expand Up @@ -50,7 +50,9 @@ def epoch_ms_to_dttm(cls) -> str:
return "{col}"

@classmethod
def convert_dttm(cls, target_type: str, dttm: datetime) -> Optional[str]:
def convert_dttm(
cls, target_type: str, dttm: datetime, **kwargs: Any
) -> Optional[str]:
tt = target_type.upper()
if tt == utils.TemporalType.TIMESTAMP:
return f"{dttm.timestamp() * 1000}"
Expand Down
6 changes: 4 additions & 2 deletions superset/db_engine_specs/databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.o

from datetime import datetime
from typing import Optional
from typing import Any, Optional

from superset.db_engine_specs.base import BaseEngineSpec
from superset.db_engine_specs.hive import HiveEngineSpec
Expand All @@ -40,7 +40,9 @@ class DatabricksODBCEngineSpec(BaseEngineSpec):
_time_grain_expressions = HiveEngineSpec._time_grain_expressions

@classmethod
def convert_dttm(cls, target_type: str, dttm: datetime) -> Optional[str]:
def convert_dttm(
cls, target_type: str, dttm: datetime, **kwargs: Any
) -> Optional[str]:
return HiveEngineSpec.convert_dttm(target_type, dttm)

@classmethod
Expand Down
6 changes: 4 additions & 2 deletions superset/db_engine_specs/dremio.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.
from datetime import datetime
from typing import Optional
from typing import Any, Optional

from superset.db_engine_specs.base import BaseEngineSpec
from superset.utils import core as utils
Expand Down Expand Up @@ -43,7 +43,9 @@ def epoch_to_dttm(cls) -> str:
return "TO_DATE({col})"

@classmethod
def convert_dttm(cls, target_type: str, dttm: datetime) -> Optional[str]:
def convert_dttm(
cls, target_type: str, dttm: datetime, **kwargs: Any
) -> Optional[str]:
tt = target_type.upper()
if tt == utils.TemporalType.DATE:
return f"TO_DATE('{dttm.date().isoformat()}', 'YYYY-MM-DD')"
Expand Down
6 changes: 4 additions & 2 deletions superset/db_engine_specs/drill.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.
from datetime import datetime
from typing import Optional
from typing import Any, Optional
from urllib import parse

from sqlalchemy.engine.url import URL
Expand Down Expand Up @@ -55,7 +55,9 @@ def epoch_ms_to_dttm(cls) -> str:
return "TO_DATE({col})"

@classmethod
def convert_dttm(cls, target_type: str, dttm: datetime) -> Optional[str]:
def convert_dttm(
cls, target_type: str, dttm: datetime, **kwargs: Any
) -> Optional[str]:
tt = target_type.upper()
if tt == utils.TemporalType.DATE:
return f"TO_DATE('{dttm.date().isoformat()}', 'yyyy-MM-dd')"
Expand Down
4 changes: 3 additions & 1 deletion superset/db_engine_specs/druid.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ def get_extra_params(database: "Database") -> Dict[str, Any]:
return extra

@classmethod
def convert_dttm(cls, target_type: str, dttm: datetime) -> Optional[str]:
def convert_dttm(
cls, target_type: str, dttm: datetime, **kwargs: Any
) -> Optional[str]:
tt = target_type.upper()
if tt == utils.TemporalType.DATE:
return f"CAST(TIME_PARSE('{dttm.date().isoformat()}') AS DATE)"
Expand Down
23 changes: 20 additions & 3 deletions superset/db_engine_specs/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
# specific language governing permissions and limitations
# under the License.
from datetime import datetime
from typing import Dict, Optional, Type
from distutils.version import StrictVersion
from typing import Any, Dict, Optional, Type

from superset.db_engine_specs.base import BaseEngineSpec
from superset.db_engine_specs.exceptions import (
Expand Down Expand Up @@ -59,9 +60,23 @@ def get_dbapi_exception_mapping(cls) -> Dict[Type[Exception], Type[Exception]]:
}

@classmethod
def convert_dttm(cls, target_type: str, dttm: datetime) -> Optional[str]:
def convert_dttm(
cls, target_type: str, dttm: datetime, **kwargs: Any
) -> Optional[str]:

if target_type.upper() == utils.TemporalType.DATETIME:
es_version = kwargs.get("version")
# The elasticsearch CAST function does not take effect for the time zone
# setting. In elasticsearch7.8 and above, we can use the DATETIME_PARSE
# function to solve this problem.
if es_version and StrictVersion(es_version) >= StrictVersion("7.8"):
datetime_formatted = dttm.isoformat(sep=" ", timespec="seconds")
return (
f"""DATETIME_PARSE('{datetime_formatted}', 'yyyy-MM-dd HH:mm:ss')"""
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should have a fallback/more clear error message if the version isn't parseable by StrictVersion (Since this is to be populated by the user, we can expect to bump into unparseable values here):

>>> from distutils.version import StrictVersion
>>> StrictVersion("7.8.0.0")
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/ville/.pyenv/versions/3.8-dev/lib/python3.8/distutils/version.py", line 40, in __init__
    self.parse(vstring)
  File "/Users/ville/.pyenv/versions/3.8-dev/lib/python3.8/distutils/version.py", line 137, in parse
    raise ValueError("invalid version number '%s'" % vstring)
ValueError: invalid version number '7.8.0.0'

Same for non-string values (I expect someone may try to enter it as a number, not string):

>>> StrictVersion(7.8)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/ville/.pyenv/versions/3.8-dev/lib/python3.8/distutils/version.py", line 40, in __init__
    self.parse(vstring)
  File "/Users/ville/.pyenv/versions/3.8-dev/lib/python3.8/distutils/version.py", line 135, in parse
    match = self.version_re.match(vstring)
TypeError: expected string or bytes-like object

Just in case, perhaps we could do something as simple as

supports_dttm_parse = False
try:
    if es_version:
        supports_dttm_parse = StrictVersion(es_version) >= StrictVersion("7.8")
    except ValueError:
        ...

Copy link
Contributor Author

@aniaan aniaan Nov 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@villebro I have updated it, you can review it when you have time


return f"""CAST('{dttm.isoformat(timespec="seconds")}' AS DATETIME)"""

return None


Expand All @@ -87,7 +102,9 @@ class OpenDistroEngineSpec(BaseEngineSpec): # pylint: disable=abstract-method
engine_name = "ElasticSearch (OpenDistro SQL)"

@classmethod
def convert_dttm(cls, target_type: str, dttm: datetime) -> Optional[str]:
def convert_dttm(
cls, target_type: str, dttm: datetime, **kwargs: Any
) -> Optional[str]:
if target_type.upper() == utils.TemporalType.DATETIME:
return f"""'{dttm.isoformat(timespec="seconds")}'"""
return None
Expand Down
6 changes: 4 additions & 2 deletions superset/db_engine_specs/firebird.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.
from datetime import datetime
from typing import Optional
from typing import Any, Optional

from superset.db_engine_specs.base import BaseEngineSpec, LimitMethod
from superset.utils import core as utils
Expand Down Expand Up @@ -70,7 +70,9 @@ def epoch_to_dttm(cls) -> str:
return "DATEADD(second, {col}, CAST('00:00:00' AS TIMESTAMP))"

@classmethod
def convert_dttm(cls, target_type: str, dttm: datetime) -> Optional[str]:
def convert_dttm(
cls, target_type: str, dttm: datetime, **kwargs: Any
) -> Optional[str]:
tt = target_type.upper()
if tt == utils.TemporalType.TIMESTAMP:
dttm_formatted = dttm.isoformat(sep=" ")
Expand Down
6 changes: 4 additions & 2 deletions superset/db_engine_specs/firebolt.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.
from datetime import datetime
from typing import Optional
from typing import Any, Optional

from superset.db_engine_specs.base import BaseEngineSpec
from superset.utils import core as utils
Expand All @@ -41,7 +41,9 @@ class FireboltEngineSpec(BaseEngineSpec):
}

@classmethod
def convert_dttm(cls, target_type: str, dttm: datetime) -> Optional[str]:
def convert_dttm(
cls, target_type: str, dttm: datetime, **kwargs: Any
) -> Optional[str]:
tt = target_type.upper()
if tt == utils.TemporalType.DATE:
return f"CAST('{dttm.date().isoformat()}' AS DATE)"
Expand Down
Loading