Skip to content

Commit

Permalink
feat: Add support for Azure Data Explorer (Kusto) db engine spec (apa…
Browse files Browse the repository at this point in the history
…che#17898)

* Add two Kusto engine specs: KQL and SQL. Some minor changes in core code to support Kusto engine specs.

* Remove redundant imports and logging.

* docs: Kusto sqlalchemy docs

* fix: Fix mypy and linting errors

* fix: Handle Black vs Pylint checks

* fix: isort problem

* refactor: Merge kustosql and kustokql in the single kusto module

* test: Add tests for Kusto db spec

* feat: Schema override does not require in KQL anymore

* Removed redundant imports.

* Added ".show" queries to readonly query determination.

* Fixed some bugs.
Added tests for convert_dttm.

* Fixed major sqlalchemy-kusto version.

* Fixed by isort.

Co-authored-by: Eugene Bikkinin <xnegxneg@gmail.com>
Co-authored-by: k.tomak <k.tomak@dodopizza.com>
Co-authored-by: Eugene Bikkinin <e.bikkinin@dodopizza.com>
  • Loading branch information
4 people authored and bwang221 committed Feb 10, 2022
1 parent 0434bdd commit b17d79d
Show file tree
Hide file tree
Showing 9 changed files with 390 additions and 3 deletions.
17 changes: 17 additions & 0 deletions docs/installation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -789,6 +789,23 @@ If you are using JDBC to connect to Drill, the connection string looks like this
For a complete tutorial about how to use Apache Drill with Superset, see this tutorial:
`Visualize Anything with Superset and Drill <http://thedataist.com/visualize-anything-with-superset-and-drill/>`_

Kusto
---------

The recommended connector library for Kusto is
[sqlalchemy-kusto](https://pypi.org/project/sqlalchemy-kusto/1.0.1/)>=1.0.1.

The connection string for Kusto looks like this:

```
kustosql+https://{cluster_url}/{database}?azure_ad_client_id={azure_ad_client_id}&azure_ad_client_secret={azure_ad_client_secret}&azure_ad_tenant_id={azure_ad_tenant_id}&msi=False
```

Make sure the user has privileges to access and use all required
databases/tables/views.

See `Azure Data Explorer (Kusto) dialect for SQLAlchemy <https://github.com/dodopizza/sqlalchemy-kusto/>`_.

Deeper SQLAlchemy integration
-----------------------------

Expand Down
1 change: 1 addition & 0 deletions docs/src/pages/docs/Connecting to Databases/index.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ A list of some of the recommended packages.
|[Hologres](/docs/databases/hologres)|```pip install psycopg2```|```postgresql+psycopg2://<UserName>:<DBPassword>@<Database Host>/<Database Name>```|
|[IBM Db2](/docs/databases/ibm-db2)|```pip install ibm_db_sa```|```db2+ibm_db://```|
|[IBM Netezza Performance Server](/docs/databases/netezza)|```pip install nzalchemy```|```netezza+nzpy://<UserName>:<DBPassword>@<Database Host>/<Database Name>```|
|[Kusto](/docs/databases/kusto)|```pip install sqlalchemy-kusto```|```kustosql+https://{cluster_url}/{database}?azure_ad_client_id={azure_ad_client_id}&azure_ad_client_secret={azure_ad_client_secret}&azure_ad_tenant_id={azure_ad_tenant_id}&msi=False```|
|[MySQL](/docs/databases/mysql)|```pip install mysqlclient```|```mysql://<UserName>:<DBPassword>@<Database Host>/<Database Name>```|
|[Oracle](/docs/databases/oracle)|```pip install cx_Oracle```|```oracle://```|
|[PostgreSQL](/docs/databases/postgres)|```pip install psycopg2```|```postgresql://<UserName>:<DBPassword>@<Database Host>/<Database Name>```|
Expand Down
21 changes: 21 additions & 0 deletions docs/src/pages/docs/Connecting to Databases/kusto.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
---
name: Kusto
menu: Connecting to Databases
route: /docs/databases/kusto
index: 32
version: 1
---

## Kusto

The recommended connector library for Kusto is
[sqlalchemy-kusto](https://pypi.org/project/sqlalchemy-kusto/1.0.1/)>=1.0.1.

The connection string for Kusto looks like this:

```
kustosql+https://{cluster_url}/{database}?azure_ad_client_id={azure_ad_client_id}&azure_ad_client_secret={azure_ad_client_secret}&azure_ad_tenant_id={azure_ad_tenant_id}&msi=False
```

Make sure the user has privileges to access and use all required
databases/tables/views.
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ def get_git_sha() -> str:
"hana": ["hdbcli==2.4.162", "sqlalchemy_hana==0.4.0"],
"hive": ["pyhive[hive]>=0.6.1", "tableschema", "thrift>=0.11.0, <1.0.0"],
"impala": ["impyla>0.16.2, <0.17"],
"kusto": ["sqlalchemy-kusto>=1.0.1, <2"],
"kylin": ["kylinpy>=2.8.1, <2.9"],
"mmsql": ["pymssql>=2.1.4, <2.2"],
"mysql": ["mysqlclient>=2.1.0, <3"],
Expand Down
4 changes: 4 additions & 0 deletions superset/db_engine_specs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1381,6 +1381,10 @@ def cancel_query( # pylint: disable=unused-argument

return False

@classmethod
def parse_sql(cls, sql: str) -> List[str]:
return [str(s).strip(" ;") for s in sqlparse.parse(sql)]


# schema for adding a database by providing parameters instead of the
# full SQLAlchemy URI
Expand Down
164 changes: 164 additions & 0 deletions superset/db_engine_specs/kusto.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from datetime import datetime
from typing import Any, Dict, List, Optional, Type

from superset.db_engine_specs.base import BaseEngineSpec, LimitMethod
from superset.db_engine_specs.exceptions import (
SupersetDBAPIDatabaseError,
SupersetDBAPIOperationalError,
SupersetDBAPIProgrammingError,
)
from superset.sql_parse import ParsedQuery
from superset.utils import core as utils


class KustoSqlEngineSpec(BaseEngineSpec): # pylint: disable=abstract-method
limit_method = LimitMethod.WRAP_SQL
engine = "kustosql"
engine_name = "KustoSQL"
time_groupby_inline = True
time_secondary_columns = True
allows_joins = True
allows_subqueries = True
allows_sql_comments = False

_time_grain_expressions = {
None: "{col}",
"PT1S": "DATEADD(second, DATEDIFF(second, '2000-01-01', {col}), '2000-01-01')",
"PT1M": "DATEADD(minute, DATEDIFF(minute, 0, {col}), 0)",
"PT5M": "DATEADD(minute, DATEDIFF(minute, 0, {col}) / 5 * 5, 0)",
"PT10M": "DATEADD(minute, DATEDIFF(minute, 0, {col}) / 10 * 10, 0)",
"PT15M": "DATEADD(minute, DATEDIFF(minute, 0, {col}) / 15 * 15, 0)",
"PT0.5H": "DATEADD(minute, DATEDIFF(minute, 0, {col}) / 30 * 30, 0)",
"PT1H": "DATEADD(hour, DATEDIFF(hour, 0, {col}), 0)",
"P1D": "DATEADD(day, DATEDIFF(day, 0, {col}), 0)",
"P1W": "DATEADD(day, -1, DATEADD(week, DATEDIFF(week, 0, {col}), 0))",
"P1M": "DATEADD(month, DATEDIFF(month, 0, {col}), 0)",
"P0.25Y": "DATEADD(quarter, DATEDIFF(quarter, 0, {col}), 0)",
"P1Y": "DATEADD(year, DATEDIFF(year, 0, {col}), 0)",
"1969-12-28T00:00:00Z/P1W": "DATEADD(day, -1,"
" DATEADD(week, DATEDIFF(week, 0, {col}), 0))",
"1969-12-29T00:00:00Z/P1W": "DATEADD(week,"
" DATEDIFF(week, 0, DATEADD(day, -1, {col})), 0)",
}

type_code_map: Dict[int, str] = {} # loaded from get_datatype only if needed

@classmethod
def get_dbapi_exception_mapping(cls) -> Dict[Type[Exception], Type[Exception]]:
# pylint: disable=import-outside-toplevel,import-error
import sqlalchemy_kusto.errors as kusto_exceptions

return {
kusto_exceptions.DatabaseError: SupersetDBAPIDatabaseError,
kusto_exceptions.OperationalError: SupersetDBAPIOperationalError,
kusto_exceptions.ProgrammingError: SupersetDBAPIProgrammingError,
}

@classmethod
def convert_dttm(
cls, target_type: str, dttm: datetime, db_extra: Optional[Dict[str, Any]] = None
) -> Optional[str]:
tt = target_type.upper()
if tt == utils.TemporalType.DATE:
return f"CONVERT(DATE, '{dttm.date().isoformat()}', 23)"
if tt == utils.TemporalType.DATETIME:
datetime_formatted = dttm.isoformat(timespec="milliseconds")
return f"""CONVERT(DATETIME, '{datetime_formatted}', 126)"""
if tt == utils.TemporalType.SMALLDATETIME:
datetime_formatted = dttm.isoformat(sep=" ", timespec="seconds")
return f"""CONVERT(SMALLDATETIME, '{datetime_formatted}', 20)"""
if tt == utils.TemporalType.TIMESTAMP:
datetime_formatted = dttm.isoformat(sep=" ", timespec="seconds")
return f"""CONVERT(TIMESTAMP, '{datetime_formatted}', 20)"""
return None

@classmethod
def is_readonly_query(cls, parsed_query: ParsedQuery) -> bool:
"""Pessimistic readonly, 100% sure statement won't mutate anything"""
return parsed_query.sql.lower().startswith("select")


class KustoKqlEngineSpec(BaseEngineSpec): # pylint: disable=abstract-method
limit_method = LimitMethod.WRAP_SQL
engine = "kustokql"
engine_name = "KustoKQL"
time_groupby_inline = True
time_secondary_columns = True
allows_joins = True
allows_subqueries = True
allows_sql_comments = False
run_multiple_statements_as_one = True

_time_grain_expressions = {
None: "{col}",
"PT1S": "{col}/ time(1s)",
"PT1M": "{col}/ time(1min)",
"PT1H": "{col}/ time(1h)",
"P1D": "{col}/ time(1d)",
"P1M": "datetime_diff('month',CreateDate, datetime(0001-01-01 00:00:00))+1",
"P1Y": "datetime_diff('year',CreateDate, datetime(0001-01-01 00:00:00))+1",
}

type_code_map: Dict[int, str] = {} # loaded from get_datatype only if needed

@classmethod
def get_dbapi_exception_mapping(cls) -> Dict[Type[Exception], Type[Exception]]:
# pylint: disable=import-outside-toplevel,import-error
import sqlalchemy_kusto.errors as kusto_exceptions

return {
kusto_exceptions.DatabaseError: SupersetDBAPIDatabaseError,
kusto_exceptions.OperationalError: SupersetDBAPIOperationalError,
kusto_exceptions.ProgrammingError: SupersetDBAPIProgrammingError,
}

@classmethod
def convert_dttm(
cls, target_type: str, dttm: datetime, db_extra: Optional[Dict[str, Any]] = None
) -> Optional[str]:
if target_type.upper() in [
utils.TemporalType.DATETIME,
utils.TemporalType.TIMESTAMP,
]:
return f"""datetime({dttm.isoformat(timespec="microseconds")})"""
if target_type.upper() == utils.TemporalType.DATE:
return f"""datetime({dttm.date().isoformat()})"""

return None

@classmethod
def is_readonly_query(cls, parsed_query: ParsedQuery) -> bool:
"""
Pessimistic readonly, 100% sure statement won't mutate anything.
"""
return KustoKqlEngineSpec.is_select_query(
parsed_query
) or parsed_query.sql.startswith(".show")

@classmethod
def is_select_query(cls, parsed_query: ParsedQuery) -> bool:
return not parsed_query.sql.startswith(".")

@classmethod
def parse_sql(cls, sql: str) -> List[str]:
"""
Kusto supports a single query statement, but it could include sub queries
and variables declared via let keyword.
"""
return [sql]
3 changes: 1 addition & 2 deletions superset/models/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import numpy
import pandas as pd
import sqlalchemy as sqla
import sqlparse
from flask import g, request
from flask_appbuilder import Model
from sqlalchemy import (
Expand Down Expand Up @@ -399,7 +398,7 @@ def get_df( # pylint: disable=too-many-locals
mutator: Optional[Callable[[pd.DataFrame], None]] = None,
username: Optional[str] = None,
) -> pd.DataFrame:
sqls = [str(s).strip(" ;") for s in sqlparse.parse(sql)]
sqls = self.db_engine_spec.parse_sql(sql)

engine = self.get_sqla_engine(schema=schema, user_name=username)
username = utils.get_username() or username
Expand Down
27 changes: 26 additions & 1 deletion tests/unit_tests/db_engine_specs/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
# specific language governing permissions and limitations
# under the License.
# pylint: disable=unused-argument, import-outside-toplevel, protected-access
import re

from flask.ctx import AppContext

Expand All @@ -31,3 +30,29 @@ def test_get_text_clause_with_colon(app_context: AppContext) -> None:
"SELECT foo FROM tbl WHERE foo = '123:456')"
)
assert text_clause.text == "SELECT foo FROM tbl WHERE foo = '123\\:456')"


def test_parse_sql_single_statement(app_context: AppContext) -> None:
"""
`parse_sql` should properly strip leading and trailing spaces and semicolons
"""

from superset.db_engine_specs.base import BaseEngineSpec

queries = BaseEngineSpec.parse_sql(" SELECT foo FROM tbl ; ")
assert queries == ["SELECT foo FROM tbl"]


def test_parse_sql_multi_statement(app_context: AppContext) -> None:
"""
For string with multiple SQL-statements `parse_sql` method should return list
where each element represents the single SQL-statement
"""

from superset.db_engine_specs.base import BaseEngineSpec

queries = BaseEngineSpec.parse_sql("SELECT foo FROM tbl1; SELECT bar FROM tbl2;")
assert queries == [
"SELECT foo FROM tbl1",
"SELECT bar FROM tbl2",
]
Loading

0 comments on commit b17d79d

Please sign in to comment.